You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
shepherd-agent/shepherd/modules/picam.py

135 lines
4.8 KiB

import shepherd.config
import shepherd.module
import io
import os
from datetime import datetime
import time
from picamera import PiCamera
from PIL import Image, ImageDraw, ImageFont
overlayfont = "/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf"
class PiCamConfDef(shepherd.config.ConfDefinition):
def __init__(self):
super().__init__()
self.add_def('upload_images', shepherd.config.BoolDef(default=False, optional=True))
self.add_def('upload_bucket', shepherd.config.StringDef(default="", optional=True))
self.add_def('save_directory', shepherd.config.StringDef(default="", optional=False))
self.add_def('append_text', shepherd.config.StringDef(default="", optional=True))
self.add_def('append_id', shepherd.config.BoolDef(default=True, optional=True))
array = self.add_def('trigger', shepherd.config.TableArrayDef())
array.add_def('hour', shepherd.config.StringDef())
array.add_def('minute', shepherd.config.StringDef())
array.add_def('second', shepherd.config.StringDef(default="0", optional=True))
# on server side, we want to be able to list commands that a module responds to
# without actually instantiating the module class. Add command templates into
# the conf_def, than attach to them in the interface? Was worried about having
# "two sources of truth", but you already need to match the conf_def to the
# name where you access the value in the module. Could have add_command, which
# you then add standard conf_def subclasses to, to reuse validation and server
# form generation logic...
class PiCamInterface(shepherd.module.Interface):
def __init__(self, module):
super().__init__(module)
self.hook_pre_cam = shepherd.module.Hook()
self.hook_post_cam = shepherd.module.Hook()
# self.add_command("trigger", self.module.camera_job)
# other module can then call, in init_interfaces, if self.modules.picam is not None:
# self.modules.picam.hooks.attach("pre_cam",self.myfunc)
# self.modules.picam.pre_cam.attach(self.my_func)
# self.modules.picam.trigger()
class PiCamModule(shepherd.module.SimpleModule):
conf_def = PiCamConfDef()
def setup(self):
self.interface = PiCamInterface(self)
# do some camera init stuff
print("Camera config:")
print(self.config)
if self.config["save_directory"] is "":
self.save_directory = os.path.join(self.shepherd.root_dir,
"camera")
else:
self.save_directory = self.config["save_directory"]
if not os.path.exists(self.save_directory):
os.makedirs(self.save_directory)
#global cam_led
#cam_led = LED(CAMERA_LED_PIN, active_high=False, initial_value=False)
for trigger in self.config["trigger"]:
self.shepherd.scheduler.add_job(self.camera_job, 'cron',
hour=trigger["hour"],
minute=trigger["minute"],
second=trigger["second"])
def setup_other_modules(self):
pass
def camera_job(self):
self.interface.hook_pre_cam()
print("Running camera...")
stream = io.BytesIO()
with PiCamera() as picam:
picam.resolution = (3280, 2464)
picam.start_preview()
time.sleep(2)
picam.capture(stream, format='jpeg')
# "Rewind" the stream to the beginning so we can read its content
image_time = datetime.now()
stream.seek(0)
newimage = Image.open(stream)
try:
fnt = ImageFont.truetype(overlayfont, 50)
except IOError:
fnt = ImageFont.load_default()
draw = ImageDraw.Draw(newimage)
image_text = image_time.strftime("%Y-%m-%d %H:%M:%S")
if self.config["append_id"]:
image_text = image_text + " " + self.shepherd.id
image_text = image_text + self.config["append_text"]
draw.text((50, newimage.height-100), image_text, font=fnt,
fill=(255, 255, 255, 200))
image_filename = image_time.strftime("%Y-%m-%d %H-%M-%S")
if self.config["append_id"]:
image_filename = image_filename + " " + self.shepherd.id
image_filename = image_filename + self.config["append_text"] + ".jpg"
image_filename = os.path.join(self.save_directory, image_filename)
newimage.save(image_filename+".writing", "JPEG")
os.rename(image_filename+".writing", image_filename)
if self.config["upload_images"]:
self.modules.uploader.move_to_bucket(image_filename,
self.config["upload_bucket"])
self.interface.hook_post_cam()
if __name__ == "__main__":
pass
# print("main")
# main(sys.argv[1:])