You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							235 lines
						
					
					
						
							9.2 KiB
						
					
					
				
			
		
		
	
	
							235 lines
						
					
					
						
							9.2 KiB
						
					
					
				| #!/usr/bin/env python3
 | |
| 
 | |
| import shutil
 | |
| import os
 | |
| import threading
 | |
| import paramiko
 | |
| import shepherd.config as shconf
 | |
| import shepherd.plugin
 | |
| # configdef = shepherd.config.definition()
 | |
| 
 | |
| # Can either import shepherd.config here, and call a function to build a config_def
 | |
| # or can leave a config_def entry point.
 | |
| # probably go with entry point, to stay consistent with the module
 | |
| 
 | |
| 
 | |
| # on server side, we want to be able to list commands that a module responds to
 | |
| # without actually instantiating the module class. Add command templates into
 | |
| # the conf_def, than attach to them in the interface? Was worried about having
 | |
| # "two sources of truth", but you already need to match the conf_def to the
 | |
| # name where you access the value in the module. Could have add_command, which
 | |
| # you then add standard conf_def subclasses to, to reuse validation and server
 | |
| # form generation logic...
 | |
| 
 | |
| 
 | |
| # The uploader plugin allows the definition of upload "buckets" - essentially
 | |
| # as a way of collecting together settings for upload, timing, and retention settings
 | |
| # Buckets will ignore any filenames ending with ".writing" or ".uploaded"
 | |
| # The "move_to_bucket" interface function is provided, but bucket directories will
 | |
| # also work with any files moved into them externally.
 | |
| 
 | |
| 
 | |
| class Destination():
 | |
|     def __init__(self, config, node_id, root_dir):
 | |
|         self.config = config
 | |
|         self.node_id = node_id
 | |
|         self.root_dir = root_dir
 | |
|         self.sendlist_condition = threading.Condition()
 | |
|         self.send_list = []
 | |
| 
 | |
|         self.thread = threading.Thread(target=self._send_files)
 | |
|         self.thread.start()
 | |
| 
 | |
|     # Override this in subclasses, implementing the actual upload process.
 | |
|     # Return true on success, false on failure.
 | |
|     def upload(self, filepath, suffix):
 | |
|         print("Dummy uploading "+filepath)
 | |
|         return True
 | |
| 
 | |
|     def add_files_to_send(self, file_path_list):
 | |
|         self.sendlist_condition.acquire()
 | |
|         for file_path in file_path_list:
 | |
|             if file_path not in self.send_list:
 | |
|                 self.send_list.append(file_path)
 | |
|                 self.sendlist_condition.notify()
 | |
| 
 | |
|         self.sendlist_condition.release()
 | |
| 
 | |
|     def _file_available(self):
 | |
|         return len(self.send_list) > 0
 | |
| 
 | |
|     def _send_files(self):
 | |
|         while True:
 | |
|             self.sendlist_condition.acquire()
 | |
|             # this drops through immediately if there is something to send, otherwise waits
 | |
|             self.sendlist_condition.wait_for(self._file_available)
 | |
|             file_to_send = self.send_list.pop(0)
 | |
|             os.rename(file_to_send, file_to_send+".uploading")
 | |
|             self.sendlist_condition.release()
 | |
| 
 | |
|             # Rename uploaded file to end with ".uploaded" on success, or back
 | |
|             # to original path on failure.
 | |
|             try:
 | |
|                 self.upload(file_to_send, ".uploading")
 | |
|                 os.rename(file_to_send+".uploading", file_to_send+".uploaded")
 | |
|             except Exception as e:
 | |
|                 print(F"Upload failed with exception {e}")
 | |
|                 os.rename(file_to_send+".uploading", file_to_send)
 | |
|                 self.send_list.append(file_to_send)
 | |
| 
 | |
| 
 | |
| class SFTPDestination(Destination):
 | |
|     def upload(self, filepath, suffix):
 | |
|         print("Starting upload...")
 | |
|         with paramiko.Transport((self.config["address"],
 | |
|                                  self.config["port"])) as transport:
 | |
|             transport.connect(username=self.config["username"],
 | |
|                               password=self.config["password"])
 | |
|             with paramiko.SFTPClient.from_transport(transport) as sftp:
 | |
|                 print("Uploading "+filepath+" to " +
 | |
|                       self.config["address"]+" via SFTP")
 | |
|                 if self.config["add_id_to_path"]:
 | |
|                     destdir = os.path.join(self.config["path"],
 | |
|                                            self.node_id)
 | |
|                 else:
 | |
|                     destdir = self.config["path"]
 | |
| 
 | |
|                 try:
 | |
|                     sftp.listdir(destdir)
 | |
|                 except IOError:
 | |
|                     print("Creating remot dir:" + destdir)
 | |
|                     sftp.mkdir(destdir)
 | |
| 
 | |
|                 print("Target dir:"+destdir)
 | |
|                 sftp.put(filepath+suffix,
 | |
|                          os.path.join(destdir, os.path.basename(filepath)))
 | |
| 
 | |
| 
 | |
| class Bucket():
 | |
|     def __init__(self, name, open_link_on_new, opportunistic, keep_copy,
 | |
|                  destination, node_id, root_dir, path=None, old_path=None):
 | |
|         self.newfile_event = threading.Event()
 | |
|         self.newfile_event.set()
 | |
| 
 | |
|         self.node_id = node_id
 | |
|         self.root_dir = root_dir
 | |
| 
 | |
|         self.destination = destination
 | |
| 
 | |
|         self.path = path
 | |
|         if self.path is None:
 | |
|             self.path = os.path.join(self.root_dir, name)
 | |
|         if not os.path.exists(self.path):
 | |
|             os.makedirs(self.path)
 | |
| 
 | |
|         if keep_copy:
 | |
|             self.old_path = old_path
 | |
|             if self.old_path is None:
 | |
|                 self.old_path = os.path.join(
 | |
|                     self.root_dir, name + "_old")
 | |
|             if not os.path.exists(self.old_path):
 | |
|                 os.makedirs(self.old_path)
 | |
| 
 | |
|         self.thread = threading.Thread(target=self._check_files)
 | |
|         self.thread.start()
 | |
| 
 | |
|     def _check_files(self):
 | |
| 
 | |
|         while True:
 | |
|             # NOTE: The reason we use an event here, rather than a lock or condition
 | |
|             # is that we're not sharing any internal state between the threads - just
 | |
|             # the filesystem itself and using the atomicity of file operations. While
 | |
|             # less clean in a pure python sense, this allows for more flexibility in
 | |
|             # allowing other sources of files
 | |
|             self.newfile_event.wait(timeout=10)
 | |
|             self.newfile_event.clear()
 | |
|             bucket_files = []
 | |
|             for item in os.listdir(self.path):
 | |
|                 item_path = os.path.join(self.path, item)
 | |
|                 if (os.path.isfile(item_path) and
 | |
|                     (not item.endswith(".writing")) and
 | |
|                     (not item.endswith(".uploading")) and
 | |
|                         (not item.endswith(".uploaded"))):
 | |
|                     bucket_files.append(item_path)
 | |
|                 # TODO check for .uploaded files and either delete or
 | |
|                 # if keep_copy, move to self.old_path
 | |
| 
 | |
|             if bucket_files:
 | |
|                 self.destination.add_files_to_send(bucket_files)
 | |
| 
 | |
| 
 | |
| class UploaderPlugin(shepherd.plugin.Plugin):
 | |
|     @staticmethod
 | |
|     def define_config(confdef):
 | |
|         dests = confdef.add_def('destination', shconf.DictListDef())
 | |
|         dests.add_def('name', shconf.StringDef())
 | |
|         dests.add_def('protocol', shconf.StringDef())
 | |
|         dests.add_def('address', shconf.StringDef(optional=True))
 | |
|         dests.add_def('port', shconf.IntDef(optional=True))
 | |
|         dests.add_def('path', shconf.StringDef(optional=True))
 | |
|         dests.add_def('username', shconf.StringDef(optional=True))
 | |
|         dests.add_def('password', shconf.StringDef(optional=True))
 | |
|         dests.add_def('keyfile', shconf.StringDef(
 | |
|             default="", optional=True))
 | |
|         dests.add_def('add_id_to_path', shconf.BoolDef(
 | |
|             default=True, optional=True))
 | |
| 
 | |
|         buckets = confdef.add_def('bucket', shconf.DictListDef())
 | |
|         buckets.add_def('name', shconf.StringDef())
 | |
|         buckets.add_def('open_link_on_new', shconf.BoolDef())
 | |
|         buckets.add_def('opportunistic', shconf.BoolDef(
 | |
|             default=True, optional=True))
 | |
|         buckets.add_def('keep_copy', shconf.BoolDef())
 | |
|         buckets.add_def('destination', shconf.StringDef())
 | |
| 
 | |
|     def __init__(self, pluginInterface, config):
 | |
|         super().__init__(pluginInterface, config)
 | |
|         self.config = config
 | |
|         self.interface = pluginInterface
 | |
|         self.plugins = pluginInterface.other_plugins
 | |
|         self.hooks = pluginInterface.hooks
 | |
| 
 | |
|         self.root_dir = os.path.expanduser(pluginInterface.coreconfig["root_dir"])
 | |
|         self.id = pluginInterface.coreconfig["name"]
 | |
| 
 | |
|         print("Uploader config:")
 | |
|         print(self.config)
 | |
| 
 | |
|         self.interface.register_function(self.move_to_bucket)
 | |
| 
 | |
|         self.destinations = {}
 | |
|         self.buckets = {}
 | |
| 
 | |
|         for dest_conf in self.config["destination"]:
 | |
|             if dest_conf["protocol"] == "sftp":
 | |
|                 self.destinations[dest_conf["name"]] = SFTPDestination(
 | |
|                     dest_conf, self.id, self.root_dir)
 | |
|             else:
 | |
|                 self.destinations[dest_conf["name"]] = Destination(
 | |
|                     dest_conf, self.id, self.root_dir)
 | |
| 
 | |
|         for bucketconf in self.config["bucket"]:
 | |
|             bucketconf["destination"] = self.destinations[bucketconf["destination"]]
 | |
|             self.buckets[bucketconf["name"]] = Bucket(
 | |
|                 **bucketconf, node_id=self.id, root_dir=self.root_dir)
 | |
| 
 | |
|     def move_to_bucket(self, filepath, bucket_name):
 | |
|         # use intermediary step with ".writing" on the filename
 | |
|         # in case the source isn't in the same filesystem and so the
 | |
|         # move operation might not be atomic. Once it's there, the rename
 | |
|         # _is_ atomic
 | |
|         dest_path = os.path.join(self.buckets[bucket_name].path,
 | |
|                                  os.path.basename(filepath))
 | |
|         temp_dest_path = dest_path + ".writing"
 | |
|         shutil.move(filepath, temp_dest_path)
 | |
|         os.rename(temp_dest_path, dest_path)
 | |
| 
 | |
|         # notify bucket to check for new files
 | |
|         self.buckets[bucket_name].newfile_event.set()
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     pass
 | |
|     # print("main")
 | |
|     # main(sys.argv[1:])
 |