You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
shepherd-agent/shepherd/control.py

140 lines
4.7 KiB

import os
import uuid
import subprocess
import requests
import threading
import json
from urllib.parse import urlparse, urlunparse, urljoin
from collections import namedtuple
import shepherd.plugin
# Check for shepherd.new file in edit conf dir. If there,
# or if no shepherd.id file can be found, generate a new one.
# For now, also attempt to delete /var/lib/zerotier-one/identity.public and identity.secret
# Once generated, if it was due to shepherd.new file, delete it.
#Start new thread, and push ID and core config to api.shepherd.distreon.net/client/update
class UpdateManager():
def __init__(self):
pass
class SequenceUpdate():
Item = namedtuple('Item', ['sequence_number', 'content'])
def __init__(self):
self.items = []
self._sequence_count = 0
self._dirty = False
def _next_sequence_number(self):
# TODO: need to establish a max sequence number, so that it can be compared to half
# that range and wrap around.
self._sequence_count +=1
return self._sequence_count
def mark_as_dirty(self):
self._dirty = True
def add_item(self, item):
self.items.append(self.Item(self._next_sequence_number(), item))
self.mark_as_dirty()
def get_payload():
pass
def process_ack():
pass
client_id = None
control_url = None
api_key = None
def _update_job(core_config, plugin_config):
payload = {"client_id":client_id, "core_config":core_config,"plugin_config":plugin_config}
#json_string = json.dumps(payload)
try:
# Using the json arg rather than json.dumps ourselves automatically sets the Content-Type
# header to application/json, which Flask expects to work correctly
r = requests.post(control_url, json=payload, auth=(client_id, api_key))
except requests.exceptions.ConnectionError:
raise
def generate_new_zerotier_id():
print("Removing old Zerotier id files")
try:
os.remove("/var/lib/zerotier-one/identity.public")
os.remove("/var/lib/zerotier-one/identity.secret")
except:
pass
print("Restarting Zerotier systemd service to regenerate ID")
subprocess.run(["systemctl", "restart", "zerotier-one.service"])
def generate_new_id(root_dir):
global client_id
with open(os.path.join(root_dir, "shepherd.id"), 'w+') as f:
new_id = uuid.uuid1()
client_id = str(new_id)
f.write(client_id)
generate_new_zerotier_id()
def init_control(core_config, plugin_config):
global client_id
global control_url
global api_key
# On init, need to be able to quickly return the cached shepherd control layer if necessary.
# Create the /update endpoint structure
root_dir = os.path.expanduser(core_config["root_dir"])
editconf_dir = os.path.dirname(os.path.expanduser(core_config["conf_edit_path"]))
#Some weirdness with URL parsing means that by default most urls (like www.google.com)
# get treated as relative
# https://stackoverflow.com/questions/53816559/python-3-netloc-value-in-urllib-parse-is-empty-if-url-doesnt-have
control_url = core_config["control_server"]
if "//" not in control_url:
control_url = "//"+control_url
control_url = urlunparse(urlparse(control_url)._replace(scheme="https"))
control_url = urljoin(control_url, "/client/update")
print(F"Control url is: {control_url}")
api_key = core_config["api_key"]
if os.path.isfile(os.path.join(editconf_dir, "shepherd.new")):
generate_new_id(root_dir)
os.remove(os.path.join(editconf_dir, "shepherd.new"))
print(F"Config hostname: {core_config['hostname']}")
if not (core_config["hostname"] == ""):
print("Attempting to change hostname")
subprocess.run(["raspi-config", "nonint", "do_hostname", core_config["hostname"]])
elif not os.path.isfile(os.path.join(root_dir, "shepherd.id")):
generate_new_id(root_dir)
else:
with open(os.path.join(root_dir, "shepherd.id"), 'r') as id_file:
client_id = id_file.readline().strip()
print(F"Client ID is: {client_id}")
control_thread = threading.Thread(target=_update_job, args=(core_config,plugin_config))
control_thread.start()
def _post_logs_job():
logs = shepherd.plugin.plugin_functions["scout"].get_logs()
measurements = shepherd.plugin.plugin_functions["scout"].get_measurements()
payload = {"client_id":client_id, "logs":logs, "measurements":measurements}
try:
r = requests.post(control_url, json=payload, auth=(client_id, api_key))
except requests.exceptions.ConnectionError:
pass
def post_logs():
post_logs_thread = threading.Thread(target=_post_logs_job, args=())
post_logs_thread.start()