import io import os from datetime import datetime import time import shepherd.config as shconf import shepherd.plugin from picamera import PiCamera from PIL import Image, ImageDraw, ImageFont asset_dir = os.path.dirname(os.path.realpath(__file__)) overlayfont_filename = os.path.join(asset_dir, "DejaVuSansMono.ttf") logo_filename = os.path.join(asset_dir, "smallshepherd.png") # on server side, we want to be able to list commands that a module responds to # without actually instantiating the module class. Add command templates into # the conf_def, than attach to them in the interface? Was worried about having # "two sources of truth", but you already need to match the conf_def to the # name where you access the value in the module. Could have add_command, which # you then add standard conf_def subclasses to, to reuse validation and server # form generation logic... class PiCamPlugin(shepherd.plugin.Plugin): @staticmethod def define_config(confdef): confdef.add_def('upload_images', shconf.BoolDef(default=False, optional=True, helptext="If true, move to an Uploader bucket. Requires Uploader plugin")) confdef.add_def('upload_bucket', shconf.StringDef(default="", optional=True, helptext="Name of uploader bucket to shift images to.")) confdef.add_def('save_directory', shconf.StringDef(default="", optional=True, helptext="Name of directory path to save images. If empty, a 'usbcamera' directory under the Shepherd root dir will be used")) confdef.add_def('append_id', shconf.BoolDef(default=True, optional=True, helptext="If true, add the system ID to the end of image filenames")) confdef.add_def('show_overlay', shconf.BoolDef(default=True, optional=True, helptext="If true, add an overlay on each image with the system ID and date.")) confdef.add_def('overlay_desc', shconf.StringDef(default="", optional=True, helptext="Text to add to the overlay after the system ID and camera name")) confdef.add_def('jpeg_quality', shconf.IntDef(default=80, minval=60, maxval=95, optional=True, helptext="JPEG quality to save with. Max of 95, passed directly to Pillow")) array = confdef.add_def('trigger', shconf.DictListDef( helptext="Array of triggers that will use all cameras")) array.add_def('hour', shconf.StringDef()) array.add_def('minute', shconf.StringDef()) array.add_def('second', shconf.StringDef(default="0", optional=True)) def __init__(self, pluginInterface, config): super().__init__(pluginInterface, config) self.config = config self.interface = pluginInterface self.plugins = pluginInterface.other_plugins self.hooks = pluginInterface.hooks self.root_dir = os.path.expanduser(pluginInterface.coreconfig["root_dir"]) self.id = pluginInterface.coreconfig["name"] self.interface.register_hook("pre_cam") self.interface.register_hook("post_cam") self.interface.register_function(self.camera_job) # do some camera init stuff print("PiCamera config:") print(self.config) # Seconds to wait for exposure and white balance auto-adjust to stabilise self.stabilise_delay = 3 if self.config["save_directory"] == "": self.save_directory = os.path.join(self.root_dir, "picamera") else: self.save_directory = self.config["save_directory"] if not os.path.exists(self.save_directory): os.makedirs(self.save_directory) if self.config["show_overlay"]: # Load assets self.logo_im = Image.open(logo_filename) self.font_size_cache = {} self.logo_size_cache = {} #global cam_led #cam_led = LED(CAMERA_LED_PIN, active_high=False, initial_value=False) for trigger in self.config["trigger"]: trigger_id = trigger["hour"]+'-' + trigger["minute"]+'-'+trigger["second"] self.interface.add_job( self.camera_job, trigger["hour"], trigger["minute"], trigger["second"], job_name=trigger_id) def _generate_overlay(self, width, height, image_time): font_size = int(height/40) margin_size = int(font_size/5) if font_size not in self.font_size_cache: self.font_size_cache[font_size] = ImageFont.truetype( overlayfont_filename, int(font_size*0.9)) thisfont = self.font_size_cache[font_size] if font_size not in self.logo_size_cache: newsize = (int(self.logo_im.width*( font_size/self.logo_im.height)), font_size) self.logo_size_cache[font_size] = self.logo_im.resize( newsize, Image.BILINEAR) thislogo = self.logo_size_cache[font_size] desc_text = self.config["overlay_desc"] if self.config["append_id"]: desc_text = self.id + " " + desc_text time_text = image_time.strftime("%Y-%m-%d %H:%M:%S") overlay = Image.new('RGBA', (width, font_size+(2*margin_size)), (0, 0, 0)) overlay.paste(thislogo, (int((overlay.width-thislogo.width)/2), margin_size)) draw = ImageDraw.Draw(overlay) draw.text((margin_size*2, margin_size), desc_text, font=thisfont, fill=(255, 255, 255, 255)) datewidth, _ = draw.textsize(time_text, thisfont) draw.text((overlay.width-(margin_size*2)-datewidth, margin_size), time_text, font=thisfont, fill=(255, 255, 255, 255)) # make whole overlay half transparent overlay.putalpha(128) return overlay def camera_job(self): self.hooks.pre_cam() # Capture image print("Running camera...") stream = io.BytesIO() with PiCamera() as picam: picam.resolution = (3280, 2464) picam.start_preview() time.sleep(self.stabilise_delay) picam.capture(stream, format='jpeg') # "Rewind" the stream to the beginning so we can read its content stream.seek(0) img = Image.open(stream) # Process image image_time = datetime.now() if self.config["show_overlay"]: overlay = self._generate_overlay(img.width, img.height, image_time) img.paste(overlay, (0, img.height-overlay.height), overlay) image_filename = image_time.strftime("%Y-%m-%d %H-%M-%S") if self.config["append_id"]: image_filename = image_filename + " " + self.id image_filename = image_filename + ".jpg" image_filename = os.path.join(self.save_directory, image_filename) img.save(image_filename+".writing", "JPEG", quality=self.config["jpeg_quality"]) os.rename(image_filename+".writing", image_filename) if self.config["upload_images"]: self.plugins["uploader"].move_to_bucket(image_filename, self.config["upload_bucket"]) self.hooks.post_cam() if __name__ == "__main__": pass # print("main") # main(sys.argv[1:])