You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
shepherd-agent/shepherd/plugins/picam.py

174 lines
7.3 KiB

import io
import os
from datetime import datetime
import time
import shepherd.config as shconf
import shepherd.plugin
from picamera import PiCamera
from PIL import Image, ImageDraw, ImageFont
asset_dir = os.path.dirname(os.path.realpath(__file__))
overlayfont_filename = os.path.join(asset_dir, "DejaVuSansMono.ttf")
logo_filename = os.path.join(asset_dir, "smallshepherd.png")
# on server side, we want to be able to list commands that a module responds to
# without actually instantiating the module class. Add command templates into
# the conf_def, than attach to them in the interface? Was worried about having
# "two sources of truth", but you already need to match the conf_def to the
# name where you access the value in the module. Could have add_command, which
# you then add standard conf_def subclasses to, to reuse validation and server
# form generation logic...
class PiCamPlugin(shepherd.plugin.Plugin):
@staticmethod
def define_config(confdef):
confdef.add_def('upload_images', shconf.BoolDef(default=False, optional=True,
helptext="If true, move to an Uploader bucket. Requires Uploader plugin"))
confdef.add_def('upload_bucket', shconf.StringDef(default="", optional=True,
helptext="Name of uploader bucket to shift images to."))
confdef.add_def('save_directory', shconf.StringDef(default="", optional=True,
helptext="Name of directory path to save images. If empty, a 'usbcamera' directory under the Shepherd root dir will be used"))
confdef.add_def('append_id', shconf.BoolDef(default=True, optional=True,
helptext="If true, add the system ID to the end of image filenames"))
confdef.add_def('show_overlay', shconf.BoolDef(default=True, optional=True,
helptext="If true, add an overlay on each image with the system ID and date."))
confdef.add_def('overlay_desc', shconf.StringDef(default="", optional=True,
helptext="Text to add to the overlay after the system ID and camera name"))
confdef.add_def('jpeg_quality', shconf.IntDef(default=80, minval=60, maxval=95, optional=True,
helptext="JPEG quality to save with. Max of 95, passed directly to Pillow"))
array = confdef.add_def('trigger', shconf.DictListDef(
helptext="Array of triggers that will use all cameras"))
array.add_def('hour', shconf.StringDef())
array.add_def('minute', shconf.StringDef())
array.add_def('second', shconf.StringDef(default="0", optional=True))
def __init__(self, pluginInterface, config):
super().__init__(pluginInterface, config)
self.config = config
self.interface = pluginInterface
self.plugins = pluginInterface.other_plugins
self.hooks = pluginInterface.hooks
self.root_dir = os.path.expanduser(pluginInterface.coreconfig["root_dir"])
self.id = pluginInterface.coreconfig["id"]
self.interface.register_hook("pre_cam")
self.interface.register_hook("post_cam")
self.interface.register_function(self.camera_job)
# do some camera init stuff
print("PiCamera config:")
print(self.config)
# Seconds to wait for exposure and white balance auto-adjust to stabilise
self.stabilise_delay = 3
if self.config["save_directory"] == "":
self.save_directory = os.path.join(self.root_dir, "picamera")
else:
self.save_directory = self.config["save_directory"]
if not os.path.exists(self.save_directory):
os.makedirs(self.save_directory)
if self.config["show_overlay"]:
# Load assets
self.logo_im = Image.open(logo_filename)
self.font_size_cache = {}
self.logo_size_cache = {}
#global cam_led
#cam_led = LED(CAMERA_LED_PIN, active_high=False, initial_value=False)
for trigger in self.config["trigger"]:
trigger_id = trigger["hour"]+'-' + trigger["minute"]+'-'+trigger["second"]
self.interface.add_job(
self.camera_job, trigger["hour"], trigger["minute"], trigger["second"], job_name=trigger_id)
def _generate_overlay(self, width, height, image_time):
font_size = int(height/40)
margin_size = int(font_size/5)
if font_size not in self.font_size_cache:
self.font_size_cache[font_size] = ImageFont.truetype(
overlayfont_filename, int(font_size*0.9))
thisfont = self.font_size_cache[font_size]
if font_size not in self.logo_size_cache:
newsize = (int(self.logo_im.width*(
font_size/self.logo_im.height)), font_size)
self.logo_size_cache[font_size] = self.logo_im.resize(
newsize, Image.BILINEAR)
thislogo = self.logo_size_cache[font_size]
desc_text = self.config["overlay_desc"]
if self.config["append_id"]:
desc_text = self.id + " " + desc_text
time_text = image_time.strftime("%Y-%m-%d %H:%M:%S")
overlay = Image.new('RGBA', (width, font_size+(2*margin_size)), (0, 0, 0))
overlay.paste(thislogo, (int((overlay.width-thislogo.width)/2), margin_size))
draw = ImageDraw.Draw(overlay)
draw.text((margin_size*2, margin_size), desc_text,
font=thisfont, fill=(255, 255, 255, 255))
datewidth, _ = draw.textsize(time_text, thisfont)
draw.text((overlay.width-(margin_size*2)-datewidth, margin_size), time_text, font=thisfont,
fill=(255, 255, 255, 255))
# make whole overlay half transparent
overlay.putalpha(128)
return overlay
def camera_job(self):
self.hooks.pre_cam()
# Capture image
print("Running camera...")
stream = io.BytesIO()
with PiCamera() as picam:
picam.resolution = (3280, 2464)
picam.start_preview()
time.sleep(self.stabilise_delay)
picam.capture(stream, format='jpeg')
# "Rewind" the stream to the beginning so we can read its content
stream.seek(0)
img = Image.open(stream)
# Process image
image_time = datetime.now()
if self.config["show_overlay"]:
overlay = self._generate_overlay(img.width, img.height, image_time)
img.paste(overlay, (0, img.height-overlay.height), overlay)
image_filename = image_time.strftime("%Y-%m-%d %H-%M-%S")
if self.config["append_id"]:
image_filename = image_filename + " " + self.id
image_filename = image_filename + ".jpg"
image_filename = os.path.join(self.save_directory, image_filename)
img.save(image_filename+".writing", "JPEG", quality=self.config["jpeg_quality"])
os.rename(image_filename+".writing", image_filename)
if self.config["upload_images"]:
self.plugins["uploader"].move_to_bucket(image_filename, self.config["upload_bucket"])
self.hooks.post_cam()
if __name__ == "__main__":
pass
# print("main")
# main(sys.argv[1:])