package backendruntime import ( "context" "fmt" "net" "os" "os/exec" "strings" "sync" "time" "git.terah.dev/imterah/hermes/backend/backendlauncher" "git.terah.dev/imterah/hermes/backend/commonbackend" "github.com/charmbracelet/log" ) // TODO TODO TODO(imterah): // This code is a mess. This NEEDS to be rearchitected and refactored to work better. Or at the very least, this code needs to be documented heavily. func handleCommand(command interface{}, sock net.Conn, rtcChan chan interface{}) error { bytes, err := commonbackend.Marshal(command) if err != nil { log.Warnf("Failed to marshal message: %s", err.Error()) rtcChan <- fmt.Errorf("failed to marshal message: %s", err.Error()) return fmt.Errorf("failed to marshal message: %s", err.Error()) } if _, err := sock.Write(bytes); err != nil { log.Warnf("Failed to write message: %s", err.Error()) rtcChan <- fmt.Errorf("failed to write message: %s", err.Error()) return fmt.Errorf("failed to write message: %s", err.Error()) } data, err := commonbackend.Unmarshal(sock) if err != nil { log.Warnf("Failed to unmarshal message: %s", err.Error()) rtcChan <- fmt.Errorf("failed to unmarshal message: %s", err.Error()) return fmt.Errorf("failed to unmarshal message: %s", err.Error()) } rtcChan <- data return nil } func (runtime *Runtime) goRoutineHandler() error { log.Debug("Starting up backend runtime") log.Debug("Running socket acquisition") logLevel := os.Getenv("HERMES_LOG_LEVEL") sockPath, sockListener, err := backendlauncher.GetUnixSocket(TempDir) if err != nil { return err } runtime.currentListener = sockListener log.Debugf("Acquired unix socket at: %s", sockPath) go func() { log.Debug("Created new Goroutine for socket connection handling") for { log.Debug("Waiting for Unix socket connections...") sock, err := runtime.currentListener.Accept() if err != nil { log.Warnf("Failed to accept Unix socket connection in a backend runtime instance: %s", err.Error()) return } log.Debug("Recieved connection. Attempting to figure out backend state...") timeoutChannel := time.After(500 * time.Millisecond) select { case <-timeoutChannel: log.Debug("Timeout reached. Assuming backend is running.") case hasRestarted, ok := <-runtime.processRestartNotification: if !ok { log.Warnf("Failed to get the process restart notification state!") } if hasRestarted { if runtime.OnCrashCallback == nil { log.Warn("The backend has restarted for some reason, but we could not run the on crash callback as the callback is not set!") } else { log.Debug("We have restarted. Running the restart callback...") runtime.OnCrashCallback(sock) } log.Debug("Clearing caches...") runtime.cleanUpPendingCommandProcessingJobs() runtime.messageBufferLock = sync.Mutex{} } else { log.Debug("We have not restarted.") } } go func() { log.Debug("Setting up Hermes keepalive Goroutine") hasFailedBackendRunningCheckAlready := false for { if !runtime.isRuntimeRunning { return } // Asking for the backend status seems to be a "good-enough" keepalive system. Plus, it provides useful telemetry. // There isn't a ping command in the backend API, so we have to make do with what we have. // // To be safe here, we have to use the proper (yet annoying) facilities to prevent cross-talk, since we're in // a goroutine, and can't talk directly. This actually has benefits, as the OuterLoop should exit on its own, if we // encounter a critical error. statusResponse, err := runtime.ProcessCommand(&commonbackend.BackendStatusRequest{}) if err != nil { log.Warnf("Failed to get response for backend (in backend runtime keep alive): %s", err.Error()) log.Debugf("Attempting to close socket...") err := sock.Close() if err != nil { log.Debugf("Failed to close socket: %s", err.Error()) } continue } switch responseMessage := statusResponse.(type) { case *commonbackend.BackendStatusResponse: if !responseMessage.IsRunning { if hasFailedBackendRunningCheckAlready { if responseMessage.Message != "" { log.Warnf("Backend (in backend keepalive) is up but not active: %s", responseMessage.Message) } else { log.Warnf("Backend (in backend keepalive) is up but not active") } } hasFailedBackendRunningCheckAlready = true } default: log.Errorf("Got illegal response type for backend (in backend keepalive): %T", responseMessage) log.Debugf("Attempting to close socket...") err := sock.Close() if err != nil { log.Debugf("Failed to close socket: %s", err.Error()) } } time.Sleep(5 * time.Second) } }() OuterLoop: for { _ = <-runtime.startProcessingNotification runtime.isRuntimeCurrentlyProcessing = true for chanIndex, messageData := range runtime.messageBuffer { if messageData == nil { continue } err := handleCommand(messageData.Message, sock, messageData.Channel) if err != nil { log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) if strings.HasPrefix(err.Error(), "failed to write message") { break OuterLoop } } runtime.messageBuffer[chanIndex] = nil } runtime.isRuntimeCurrentlyProcessing = false } sock.Close() } }() runtime.processRestartNotification <- false for { log.Debug("Starting process...") ctx := context.Background() runtime.currentProcess = exec.CommandContext(ctx, runtime.ProcessPath) runtime.currentProcess.Env = append(runtime.currentProcess.Env, fmt.Sprintf("HERMES_API_SOCK=%s", sockPath), fmt.Sprintf("HERMES_LOG_LEVEL=%s", logLevel)) runtime.currentProcess.Stdout = runtime.logger runtime.currentProcess.Stderr = runtime.logger err := runtime.currentProcess.Run() if err != nil { if err, ok := err.(*exec.ExitError); ok { if err.ExitCode() != -1 && err.ExitCode() != 0 { log.Warnf("A backend process died with exit code '%d' and with error '%s'", err.ExitCode(), err.Error()) } } else { log.Warnf("A backend process died with error: %s", err.Error()) } } else { log.Debug("Process exited gracefully.") } if !runtime.isRuntimeRunning { return nil } log.Debug("Sleeping 5 seconds, and then restarting process") time.Sleep(5 * time.Second) // NOTE(imterah): This could cause hangs if we're not careful. If the process dies so much that we can't keep up, it should deserve to be hung, really. // There's probably a better way to do this, but this works. // // If this does turn out to be a problem, just increase the Goroutine buffer size. runtime.processRestartNotification <- true log.Debug("Sent off notification.") } } func (runtime *Runtime) Start() error { if runtime.isRuntimeRunning { return fmt.Errorf("runtime already running") } runtime.messageBuffer = make([]*messageForBuf, 10) runtime.messageBufferLock = sync.Mutex{} runtime.startProcessingNotification = make(chan bool) runtime.processRestartNotification = make(chan bool, 1) runtime.logger = &writeLogger{ Runtime: runtime, } go func() { err := runtime.goRoutineHandler() if err != nil { log.Errorf("Failed during execution of runtime: %s", err.Error()) } }() runtime.isRuntimeRunning = true return nil } func (runtime *Runtime) Stop() error { if !runtime.isRuntimeRunning { return fmt.Errorf("runtime not running") } runtime.isRuntimeRunning = false if runtime.currentProcess != nil && runtime.currentProcess.Cancel != nil { err := runtime.currentProcess.Cancel() if err != nil { return fmt.Errorf("failed to stop process: %s", err.Error()) } } else { log.Warn("Failed to kill process (Stop recieved), currentProcess or currentProcess.Cancel is nil") } if runtime.currentListener != nil { err := runtime.currentListener.Close() if err != nil { return fmt.Errorf("failed to stop listener: %s", err.Error()) } } else { log.Warn("Failed to kill listener, as the listener is nil") } return nil } func (runtime *Runtime) ProcessCommand(command interface{}) (interface{}, error) { schedulingAttempts := 0 var commandChannel chan interface{} SchedulingLoop: for { if !runtime.isRuntimeRunning { time.Sleep(10 * time.Millisecond) } if schedulingAttempts > 50 { return nil, fmt.Errorf("failed to schedule message transmission after 50 tries (REPORT THIS ISSUE)") } runtime.messageBufferLock.Lock() // Attempt to find spot in buffer to schedule message transmission for i, message := range runtime.messageBuffer { if message != nil { continue } commandChannel = make(chan interface{}) runtime.messageBuffer[i] = &messageForBuf{ Channel: commandChannel, Message: command, } runtime.messageBufferLock.Unlock() break SchedulingLoop } runtime.messageBufferLock.Unlock() time.Sleep(100 * time.Millisecond) schedulingAttempts++ } if !runtime.isRuntimeCurrentlyProcessing { runtime.startProcessingNotification <- true } // Fetch response and close Channel response, ok := <-commandChannel if !ok { return nil, fmt.Errorf("failed to read from command channel: recieved signal that is not OK") } close(commandChannel) err, ok := response.(error) if ok { return nil, err } return response, nil } func (runtime *Runtime) cleanUpPendingCommandProcessingJobs() { for messageIndex, message := range runtime.messageBuffer { if message == nil { continue } timeoutChannel := time.After(100 * time.Millisecond) select { case <-timeoutChannel: log.Warn("Message channel is likely running (timed out reading from it without an error)") close(message.Channel) case _, ok := <-message.Channel: if ok { log.Warn("Message channel is running, but should be stopped (since message is NOT nil!)") close(message.Channel) } } runtime.messageBuffer[messageIndex] = nil } } func NewBackend(path string) *Runtime { return &Runtime{ ProcessPath: path, } } func Init(backends []*Backend) error { var err error TempDir, err = os.MkdirTemp("", "hermes-sockets-") if err != nil { return err } AvailableBackends = backends return nil }