You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
174 lines
7.3 KiB
174 lines
7.3 KiB
import io
|
|
import os
|
|
from datetime import datetime
|
|
import time
|
|
|
|
import shepherd.config as shconf
|
|
import shepherd.plugin
|
|
|
|
|
|
from picamera import PiCamera
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
|
|
|
|
asset_dir = os.path.dirname(os.path.realpath(__file__))
|
|
|
|
overlayfont_filename = os.path.join(asset_dir, "DejaVuSansMono.ttf")
|
|
logo_filename = os.path.join(asset_dir, "smallshepherd.png")
|
|
|
|
# on server side, we want to be able to list commands that a module responds to
|
|
# without actually instantiating the module class. Add command templates into
|
|
# the conf_def, than attach to them in the interface? Was worried about having
|
|
# "two sources of truth", but you already need to match the conf_def to the
|
|
# name where you access the value in the module. Could have add_command, which
|
|
# you then add standard conf_def subclasses to, to reuse validation and server
|
|
# form generation logic...
|
|
|
|
|
|
class PiCamPlugin(shepherd.plugin.Plugin):
|
|
@staticmethod
|
|
def define_config(confdef):
|
|
confdef.add_def('upload_images', shconf.BoolDef(default=False, optional=True,
|
|
helptext="If true, move to an Uploader bucket. Requires Uploader plugin"))
|
|
confdef.add_def('upload_bucket', shconf.StringDef(default="", optional=True,
|
|
helptext="Name of uploader bucket to shift images to."))
|
|
confdef.add_def('save_directory', shconf.StringDef(default="", optional=True,
|
|
helptext="Name of directory path to save images. If empty, a 'usbcamera' directory under the Shepherd root dir will be used"))
|
|
confdef.add_def('append_id', shconf.BoolDef(default=True, optional=True,
|
|
helptext="If true, add the system ID to the end of image filenames"))
|
|
confdef.add_def('show_overlay', shconf.BoolDef(default=True, optional=True,
|
|
helptext="If true, add an overlay on each image with the system ID and date."))
|
|
confdef.add_def('overlay_desc', shconf.StringDef(default="", optional=True,
|
|
helptext="Text to add to the overlay after the system ID and camera name"))
|
|
confdef.add_def('jpeg_quality', shconf.IntDef(default=80, minval=60, maxval=95, optional=True,
|
|
helptext="JPEG quality to save with. Max of 95, passed directly to Pillow"))
|
|
|
|
array = confdef.add_def('trigger', shconf.DictListDef(
|
|
helptext="Array of triggers that will use all cameras"))
|
|
array.add_def('hour', shconf.StringDef())
|
|
array.add_def('minute', shconf.StringDef())
|
|
array.add_def('second', shconf.StringDef(default="0", optional=True))
|
|
|
|
def __init__(self, pluginInterface, config):
|
|
super().__init__(pluginInterface, config)
|
|
self.config = config
|
|
self.interface = pluginInterface
|
|
self.plugins = pluginInterface.other_plugins
|
|
self.hooks = pluginInterface.hooks
|
|
|
|
self.root_dir = os.path.expanduser(pluginInterface.coreconfig["root_dir"])
|
|
self.id = pluginInterface.coreconfig["name"]
|
|
|
|
self.interface.register_hook("pre_cam")
|
|
self.interface.register_hook("post_cam")
|
|
self.interface.register_function(self.camera_job)
|
|
# do some camera init stuff
|
|
|
|
print("PiCamera config:")
|
|
print(self.config)
|
|
|
|
# Seconds to wait for exposure and white balance auto-adjust to stabilise
|
|
self.stabilise_delay = 3
|
|
|
|
if self.config["save_directory"] == "":
|
|
self.save_directory = os.path.join(self.root_dir, "picamera")
|
|
else:
|
|
self.save_directory = self.config["save_directory"]
|
|
|
|
if not os.path.exists(self.save_directory):
|
|
os.makedirs(self.save_directory)
|
|
|
|
if self.config["show_overlay"]:
|
|
# Load assets
|
|
self.logo_im = Image.open(logo_filename)
|
|
|
|
self.font_size_cache = {}
|
|
self.logo_size_cache = {}
|
|
|
|
#global cam_led
|
|
#cam_led = LED(CAMERA_LED_PIN, active_high=False, initial_value=False)
|
|
|
|
for trigger in self.config["trigger"]:
|
|
trigger_id = trigger["hour"]+'-' + trigger["minute"]+'-'+trigger["second"]
|
|
self.interface.add_job(
|
|
self.camera_job, trigger["hour"], trigger["minute"], trigger["second"], job_name=trigger_id)
|
|
|
|
def _generate_overlay(self, width, height, image_time):
|
|
font_size = int(height/40)
|
|
margin_size = int(font_size/5)
|
|
|
|
if font_size not in self.font_size_cache:
|
|
self.font_size_cache[font_size] = ImageFont.truetype(
|
|
overlayfont_filename, int(font_size*0.9))
|
|
thisfont = self.font_size_cache[font_size]
|
|
|
|
if font_size not in self.logo_size_cache:
|
|
newsize = (int(self.logo_im.width*(
|
|
font_size/self.logo_im.height)), font_size)
|
|
self.logo_size_cache[font_size] = self.logo_im.resize(
|
|
newsize, Image.BILINEAR)
|
|
thislogo = self.logo_size_cache[font_size]
|
|
|
|
desc_text = self.config["overlay_desc"]
|
|
if self.config["append_id"]:
|
|
desc_text = self.id + " " + desc_text
|
|
|
|
time_text = image_time.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
overlay = Image.new('RGBA', (width, font_size+(2*margin_size)), (0, 0, 0))
|
|
overlay.paste(thislogo, (int((overlay.width-thislogo.width)/2), margin_size))
|
|
|
|
draw = ImageDraw.Draw(overlay)
|
|
draw.text((margin_size*2, margin_size), desc_text,
|
|
font=thisfont, fill=(255, 255, 255, 255))
|
|
|
|
datewidth, _ = draw.textsize(time_text, thisfont)
|
|
draw.text((overlay.width-(margin_size*2)-datewidth, margin_size), time_text, font=thisfont,
|
|
fill=(255, 255, 255, 255))
|
|
|
|
# make whole overlay half transparent
|
|
overlay.putalpha(128)
|
|
return overlay
|
|
|
|
def camera_job(self):
|
|
self.hooks.pre_cam()
|
|
|
|
# Capture image
|
|
print("Running camera...")
|
|
stream = io.BytesIO()
|
|
with PiCamera() as picam:
|
|
picam.resolution = (3280, 2464)
|
|
picam.start_preview()
|
|
time.sleep(self.stabilise_delay)
|
|
picam.capture(stream, format='jpeg')
|
|
# "Rewind" the stream to the beginning so we can read its content
|
|
stream.seek(0)
|
|
img = Image.open(stream)
|
|
|
|
# Process image
|
|
image_time = datetime.now()
|
|
|
|
if self.config["show_overlay"]:
|
|
overlay = self._generate_overlay(img.width, img.height, image_time)
|
|
img.paste(overlay, (0, img.height-overlay.height), overlay)
|
|
|
|
image_filename = image_time.strftime("%Y-%m-%d %H-%M-%S")
|
|
if self.config["append_id"]:
|
|
image_filename = image_filename + " " + self.id
|
|
|
|
image_filename = image_filename + ".jpg"
|
|
image_filename = os.path.join(self.save_directory, image_filename)
|
|
img.save(image_filename+".writing", "JPEG", quality=self.config["jpeg_quality"])
|
|
os.rename(image_filename+".writing", image_filename)
|
|
|
|
if self.config["upload_images"]:
|
|
self.plugins["uploader"].move_to_bucket(image_filename, self.config["upload_bucket"])
|
|
|
|
self.hooks.post_cam()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pass
|
|
# print("main")
|
|
# main(sys.argv[1:])
|