chore: Finalize install scripts.
This commit is contained in:
parent
61ccbf61d6
commit
9006a8e002
7 changed files with 191 additions and 39 deletions
|
@ -1,28 +1,146 @@
|
|||
# TODO:
|
||||
# Install logging over HTTP *could* be implemented (see autoinstall documentation), however
|
||||
# it is not implemented here.
|
||||
from termcolor import colored
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from os import getcwd, environ
|
||||
from pathlib import Path
|
||||
import socketserver
|
||||
import http.server
|
||||
import socket
|
||||
import json
|
||||
import sys
|
||||
|
||||
requests = set()
|
||||
def json_to_bytes(str: str) -> bytearray:
|
||||
return bytearray(json.dumps(str), "utf-8")
|
||||
|
||||
class HTTPHandler(http.server.SimpleHTTPRequestHandler):
|
||||
# Who needs Flask, anyways?
|
||||
class HTTPHandler(http.server.BaseHTTPRequestHandler):
|
||||
def send_headers(self):
|
||||
self.send_header("Content-Type", "application/json")
|
||||
self.end_headers()
|
||||
|
||||
def do_POST(self):
|
||||
if self.path == "/api/installer_update_webhook":
|
||||
content_length = 0
|
||||
|
||||
try:
|
||||
content_length = int(self.headers.get('Content-Length'))
|
||||
except ValueError:
|
||||
self.send_response(400)
|
||||
self.send_headers()
|
||||
|
||||
self.wfile.write(json_to_bytes({
|
||||
"success": False,
|
||||
"error": "Failed to decode Content-Length to read body",
|
||||
}))
|
||||
|
||||
return
|
||||
|
||||
resp_data = self.rfile.read(content_length).decode("utf-8")
|
||||
resp_decoded_data: dict = {}
|
||||
|
||||
try:
|
||||
resp_decoded_data = json.loads(resp_data)
|
||||
|
||||
if type(resp_decoded_data) is not dict:
|
||||
self.send_response(400)
|
||||
self.send_headers()
|
||||
|
||||
self.wfile.write(json_to_bytes({
|
||||
"success": False,
|
||||
"error": "Recieved invalid type for JSON",
|
||||
}))
|
||||
|
||||
return
|
||||
except json.JSONDecodeError:
|
||||
self.send_response(400)
|
||||
self.send_headers()
|
||||
|
||||
self.wfile.write(json_to_bytes({
|
||||
"success": False,
|
||||
"error": "Failed to decode JSON",
|
||||
}))
|
||||
|
||||
return
|
||||
|
||||
date_time = datetime.fromtimestamp(resp_decoded_data["timestamp"], timezone.utc)
|
||||
str_formatted_time = date_time.strftime("%H:%M:%S")
|
||||
|
||||
result_is_safe = resp_decoded_data["result"] == "SUCCESS" if "result" in resp_decoded_data else True
|
||||
output_file = sys.stdout if result_is_safe else sys.stderr
|
||||
|
||||
output_coloring = "light_blue"
|
||||
|
||||
if "result" in resp_decoded_data:
|
||||
res = resp_decoded_data["result"]
|
||||
|
||||
if res == "SUCCESS":
|
||||
output_coloring = "light_green"
|
||||
elif res == "WARN":
|
||||
output_coloring = "light_yellow"
|
||||
elif res == "FAIL":
|
||||
output_coloring = "light_red"
|
||||
|
||||
result_text_component = f" {resp_decoded_data["result"]} " if "result" in resp_decoded_data else " "
|
||||
final_output_text = f"{str_formatted_time} {resp_decoded_data["event_type"].upper()} {resp_decoded_data["level"]}:{result_text_component}{resp_decoded_data["name"]} ({resp_decoded_data["description"]})"
|
||||
|
||||
print(colored(final_output_text, output_coloring), file=output_file)
|
||||
|
||||
self.send_response(200)
|
||||
self.send_headers()
|
||||
|
||||
self.wfile.write(json_to_bytes({
|
||||
"success": True,
|
||||
}))
|
||||
|
||||
if resp_decoded_data["event_type"] == "finish" and resp_decoded_data["name"] == "subiquity/Shutdown/shutdown":
|
||||
print("\nSuccessfully finished installing!")
|
||||
exit(0)
|
||||
else:
|
||||
self.send_response(404)
|
||||
self.send_headers()
|
||||
|
||||
self.wfile.write(json_to_bytes({
|
||||
"success": False,
|
||||
"error": "Unknown route"
|
||||
}))
|
||||
|
||||
def do_GET(self):
|
||||
http.server.SimpleHTTPRequestHandler.do_GET(self)
|
||||
requests.add(self.path)
|
||||
resolved_path = str(Path(self.path).resolve())
|
||||
file_path = getcwd() + resolved_path
|
||||
|
||||
found_meta_data = "/meta-data" in requests
|
||||
found_user_data = "/user-data" in requests
|
||||
found_vendor_data = "/vendor-data" in requests
|
||||
try:
|
||||
self.send_response(200)
|
||||
self.end_headers()
|
||||
|
||||
with open(file_path, "rb") as file:
|
||||
self.wfile.write(file.read())
|
||||
except (FileNotFoundError, IsADirectoryError):
|
||||
self.send_response(404)
|
||||
self.send_headers()
|
||||
|
||||
self.wfile.write(json_to_bytes({
|
||||
"success": False,
|
||||
"error": "file not found"
|
||||
}))
|
||||
except () as exception:
|
||||
exception.print_exception()
|
||||
|
||||
def log_message(self, format: str, *args):
|
||||
status_code = 0
|
||||
|
||||
try:
|
||||
status_code = int(args[1])
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Disable logging for the /api/ endpoint for POST requests unless the error code > 400
|
||||
if len(args) >= 1 and args[0].startswith("POST") and self.path.startswith("/api/") and status_code < 400:
|
||||
return
|
||||
|
||||
if found_meta_data and found_user_data and found_vendor_data:
|
||||
print("[x] sent all our data, exiting...")
|
||||
sys.exit(0)
|
||||
super().log_message(format, *args)
|
||||
|
||||
server = socketserver.TCPServer(("", int(sys.argv[1])), HTTPHandler)
|
||||
port = int(sys.argv[1]) if "SERVE_DEVELOP" not in environ else 10240
|
||||
server = socketserver.TCPServer(("", port), HTTPHandler)
|
||||
server.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
|
||||
print("[x] started HTTP server.")
|
||||
|
|
Reference in a new issue