package backendruntime import ( "context" "fmt" "net" "os" "os/exec" "strings" "sync" "time" "git.terah.dev/imterah/hermes/backend/backendlauncher" "git.terah.dev/imterah/hermes/backend/commonbackend" "github.com/charmbracelet/log" ) func handleCommand(commandType string, command interface{}, sock net.Conn, rtcChan chan interface{}) error { bytes, err := commonbackend.Marshal(commandType, command) if err != nil { log.Warnf("Failed to marshal message: %s", err.Error()) rtcChan <- fmt.Errorf("failed to marshal message: %s", err.Error()) return fmt.Errorf("failed to marshal message: %s", err.Error()) } if _, err := sock.Write(bytes); err != nil { log.Warnf("Failed to write message: %s", err.Error()) rtcChan <- fmt.Errorf("failed to write message: %s", err.Error()) return fmt.Errorf("failed to write message: %s", err.Error()) } _, data, err := commonbackend.Unmarshal(sock) if err != nil { log.Warnf("Failed to unmarshal message: %s", err.Error()) rtcChan <- fmt.Errorf("failed to unmarshal message: %s", err.Error()) return fmt.Errorf("failed to unmarshal message: %s", err.Error()) } rtcChan <- data return nil } func (runtime *Runtime) goRoutineHandler() error { log.Debug("Starting up backend runtime") log.Debug("Running socket acquisition") logLevel := os.Getenv("HERMES_LOG_LEVEL") sockPath, sockListener, err := backendlauncher.GetUnixSocket(TempDir) if err != nil { return err } runtime.currentListener = sockListener log.Debugf("Acquired unix socket at: %s", sockPath) go func() { log.Debug("Created new Goroutine for socket connection handling") for { log.Debug("Waiting for Unix socket connections...") sock, err := runtime.currentListener.Accept() if err != nil { log.Warnf("Failed to accept Unix socket connection in a backend runtime instance: %s", err.Error()) return } log.Debug("Recieved connection. Attempting to figure out backend state...") timeoutChannel := time.After(500 * time.Millisecond) select { case <-timeoutChannel: log.Debug("Timeout reached. Assuming backend is running.") case hasRestarted, ok := <-runtime.processRestartNotification: if !ok { log.Warnf("Failed to get the process restart notification state!") } if hasRestarted { if runtime.OnCrashCallback == nil { log.Warn("The backend has restarted for some reason, but we could not run the on crash callback as the callback is not set!") } else { log.Debug("We have restarted. Running the restart callback...") runtime.OnCrashCallback(sock) log.Debug("Finished running the restart callback. Clearing caches...") runtime.cleanUpPendingCommandProcessingJobs() runtime.messageBufferLock = sync.Mutex{} } } else { log.Debug("We have not restarted.") } } go func() { log.Debugf("Setting up Hermes keepalive Goroutine") hasFailedBackendRunningCheckAlready := false for { if !runtime.isRuntimeRunning { return } // Asking for the backend status seems to be a "good-enough" keepalive system. Plus, it provides useful telemetry. // There isn't a ping command in the backend API, so we have to make do with what we have. // // To be safe here, we have to use the proper (yet annoying) facilities to prevent cross-talk, since we're in // a goroutine, and can't talk directly. This actually has benefits, as the OuterLoop should exit on its own, if we // encounter a critical error. statusResponse, err := runtime.ProcessCommand(&commonbackend.BackendStatusRequest{ Type: "backendStatusRequest", }) if err != nil { log.Warnf("Failed to get response for backend (in backend runtime keep alive): %s", err.Error()) log.Debugf("Attempting to close socket...") err := sock.Close() if err != nil { log.Debugf("Failed to close socket: %s", err.Error()) } continue } switch responseMessage := statusResponse.(type) { case *commonbackend.BackendStatusResponse: if !responseMessage.IsRunning { if hasFailedBackendRunningCheckAlready { if responseMessage.Message != "" { log.Warnf("Backend (in backend keepalive) is up but not active: %s", responseMessage.Message) } else { log.Warnf("Backend (in backend keepalive) is up but not active") } } hasFailedBackendRunningCheckAlready = true } default: log.Errorf("Got illegal response type for backend (in backend keepalive): %T", responseMessage) log.Debugf("Attempting to close socket...") err := sock.Close() if err != nil { log.Debugf("Failed to close socket: %s", err.Error()) } } time.Sleep(5) } }() OuterLoop: for { for chanIndex, messageData := range runtime.messageBuffer { if messageData == nil { continue } // We don't use the Mutex here as the Mutex is only to prevent cross-talk between multiple ProcessCommand() calls. switch command := messageData.Message.(type) { case *commonbackend.AddProxy: err := handleCommand("addProxy", command, sock, messageData.Channel) if err != nil { log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) if strings.HasPrefix(err.Error(), "failed to write message") { break OuterLoop } } case *commonbackend.BackendStatusRequest: err := handleCommand("backendStatusRequest", command, sock, messageData.Channel) if err != nil { log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) if strings.HasPrefix(err.Error(), "failed to write message") { break OuterLoop } } case *commonbackend.CheckClientParameters: err := handleCommand("checkClientParameters", command, sock, messageData.Channel) if err != nil { log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) if strings.HasPrefix(err.Error(), "failed to write message") { break OuterLoop } } case *commonbackend.CheckServerParameters: err := handleCommand("checkServerParameters", command, sock, messageData.Channel) if err != nil { log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) if strings.HasPrefix(err.Error(), "failed to write message") { break OuterLoop } } case *commonbackend.ProxyConnectionsRequest: err := handleCommand("proxyConnectionsRequest", command, sock, messageData.Channel) if err != nil { log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) if strings.HasPrefix(err.Error(), "failed to write message") { break OuterLoop } } case *commonbackend.ProxyInstanceRequest: err := handleCommand("proxyInstanceRequest", command, sock, messageData.Channel) if err != nil { log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) if strings.HasPrefix(err.Error(), "failed to write message") { break OuterLoop } } case *commonbackend.ProxyStatusRequest: err := handleCommand("proxyStatusRequest", command, sock, messageData.Channel) if err != nil { log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) if strings.HasPrefix(err.Error(), "failed to write message") { break OuterLoop } } case *commonbackend.RemoveProxy: err := handleCommand("removeProxy", command, sock, messageData.Channel) if err != nil { log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) if strings.HasPrefix(err.Error(), "failed to write message") { break OuterLoop } } case *commonbackend.Start: err := handleCommand("start", command, sock, messageData.Channel) if err != nil { log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) if strings.HasPrefix(err.Error(), "failed to write message") { break OuterLoop } } case *commonbackend.Stop: err := handleCommand("stop", command, sock, messageData.Channel) if err != nil { log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) if strings.HasPrefix(err.Error(), "failed to write message") { break OuterLoop } } default: log.Warnf("Recieved unknown command type from channel: %q", command) messageData.Channel <- fmt.Errorf("unknown command recieved") } runtime.messageBuffer[chanIndex] = nil } } sock.Close() } }() runtime.processRestartNotification <- false for { log.Debug("Starting process...") ctx := context.Background() runtime.currentProcess = exec.CommandContext(ctx, runtime.ProcessPath) runtime.currentProcess.Env = append(runtime.currentProcess.Env, fmt.Sprintf("HERMES_API_SOCK=%s", sockPath), fmt.Sprintf("HERMES_LOG_LEVEL=%s", logLevel)) runtime.currentProcess.Stdout = runtime.logger runtime.currentProcess.Stderr = runtime.logger err := runtime.currentProcess.Run() if err != nil { if err, ok := err.(*exec.ExitError); ok { if err.ExitCode() != -1 && err.ExitCode() != 0 { log.Warnf("A backend process died with exit code '%d' and with error '%s'", err.ExitCode(), err.Error()) } } else { log.Warnf("A backend process died with error: %s", err.Error()) } } else { log.Debug("Process exited gracefully.") } if !runtime.isRuntimeRunning { return nil } log.Debug("Sleeping 5 seconds, and then restarting process") time.Sleep(5 * time.Second) // NOTE(imterah): This could cause hangs if we're not careful. If the process dies so much that we can't keep up, it should deserve to be hung, really. // There's probably a better way to do this, but this works. // // If this does turn out to be a problem, just increase the Goroutine buffer size. runtime.processRestartNotification <- true log.Debug("Sent off notification.") } } func (runtime *Runtime) Start() error { if runtime.isRuntimeRunning { return fmt.Errorf("runtime already running") } runtime.messageBuffer = make([]*messageForBuf, 10) runtime.messageBufferLock = sync.Mutex{} runtime.processRestartNotification = make(chan bool, 1) runtime.logger = &writeLogger{ Runtime: runtime, } go func() { err := runtime.goRoutineHandler() if err != nil { log.Errorf("Failed during execution of runtime: %s", err.Error()) } }() runtime.isRuntimeRunning = true return nil } func (runtime *Runtime) Stop() error { if !runtime.isRuntimeRunning { return fmt.Errorf("runtime not running") } runtime.isRuntimeRunning = false if runtime.currentProcess != nil && runtime.currentProcess.Cancel != nil { err := runtime.currentProcess.Cancel() if err != nil { return fmt.Errorf("failed to stop process: %s", err.Error()) } } else { log.Warn("Failed to kill process (Stop recieved), currentProcess or currentProcess.Cancel is nil") } if runtime.currentListener != nil { err := runtime.currentListener.Close() if err != nil { return fmt.Errorf("failed to stop listener: %s", err.Error()) } } else { log.Warn("Failed to kill listener, as the listener is nil") } return nil } func (runtime *Runtime) ProcessCommand(command interface{}) (interface{}, error) { schedulingAttempts := 0 var commandChannel chan interface{} SchedulingLoop: for { if !runtime.isRuntimeRunning { time.Sleep(10 * time.Millisecond) } if schedulingAttempts > 50 { return nil, fmt.Errorf("failed to schedule synchronous message transmission after 50 tries with 100ms intervals (REPORT THIS ISSUE)") } runtime.messageBufferLock.Lock() // Attempt to find spot in buffer to schedule message transmission for i, message := range runtime.messageBuffer { if message != nil { continue } commandChannel = make(chan interface{}) runtime.messageBuffer[i] = &messageForBuf{ Channel: commandChannel, Message: command, } runtime.messageBufferLock.Unlock() break SchedulingLoop } runtime.messageBufferLock.Unlock() time.Sleep(100 * time.Millisecond) schedulingAttempts++ } // Fetch response and close Channel response := <-commandChannel close(commandChannel) err, ok := response.(error) if ok { return nil, err } return response, nil } func (runtime *Runtime) cleanUpPendingCommandProcessingJobs() { for messageIndex, message := range runtime.messageBuffer { if message == nil { continue } close(message.Channel) runtime.messageBuffer[messageIndex] = nil } } func NewBackend(path string) *Runtime { return &Runtime{ ProcessPath: path, } } func Init(backends []*Backend) error { var err error TempDir, err = os.MkdirTemp("", "hermes-sockets-") if err != nil { return err } AvailableBackends = backends return nil }