hermes/backend/api/backendruntime/runtime.go
imterah b93bf456b5
fix: Fixes 100% CPU usage in the backend runtime
This makes the backend runtime not constantly search for messages to be
processed. Instead, it only wakes up when it needs to be woken up via
goroutines.
2025-03-21 13:17:08 -04:00

396 lines
10 KiB
Go

package backendruntime
import (
"context"
"fmt"
"net"
"os"
"os/exec"
"strings"
"sync"
"time"
"git.terah.dev/imterah/hermes/backend/backendlauncher"
"git.terah.dev/imterah/hermes/backend/commonbackend"
"github.com/charmbracelet/log"
)
// TODO TODO TODO(imterah):
// This code is a mess. This NEEDS to be rearchitected and refactored to work better. Or at the very least, this code needs to be documented heavily.
func handleCommand(command interface{}, sock net.Conn, rtcChan chan interface{}) error {
bytes, err := commonbackend.Marshal(command)
if err != nil {
log.Warnf("Failed to marshal message: %s", err.Error())
rtcChan <- fmt.Errorf("failed to marshal message: %s", err.Error())
return fmt.Errorf("failed to marshal message: %s", err.Error())
}
if _, err := sock.Write(bytes); err != nil {
log.Warnf("Failed to write message: %s", err.Error())
rtcChan <- fmt.Errorf("failed to write message: %s", err.Error())
return fmt.Errorf("failed to write message: %s", err.Error())
}
data, err := commonbackend.Unmarshal(sock)
if err != nil {
log.Warnf("Failed to unmarshal message: %s", err.Error())
rtcChan <- fmt.Errorf("failed to unmarshal message: %s", err.Error())
return fmt.Errorf("failed to unmarshal message: %s", err.Error())
}
rtcChan <- data
return nil
}
func (runtime *Runtime) goRoutineHandler() error {
log.Debug("Starting up backend runtime")
log.Debug("Running socket acquisition")
logLevel := os.Getenv("HERMES_LOG_LEVEL")
sockPath, sockListener, err := backendlauncher.GetUnixSocket(TempDir)
if err != nil {
return err
}
runtime.currentListener = sockListener
log.Debugf("Acquired unix socket at: %s", sockPath)
go func() {
log.Debug("Created new Goroutine for socket connection handling")
for {
log.Debug("Waiting for Unix socket connections...")
sock, err := runtime.currentListener.Accept()
if err != nil {
log.Warnf("Failed to accept Unix socket connection in a backend runtime instance: %s", err.Error())
return
}
log.Debug("Recieved connection. Attempting to figure out backend state...")
timeoutChannel := time.After(500 * time.Millisecond)
select {
case <-timeoutChannel:
log.Debug("Timeout reached. Assuming backend is running.")
case hasRestarted, ok := <-runtime.processRestartNotification:
if !ok {
log.Warnf("Failed to get the process restart notification state!")
}
if hasRestarted {
if runtime.OnCrashCallback == nil {
log.Warn("The backend has restarted for some reason, but we could not run the on crash callback as the callback is not set!")
} else {
log.Debug("We have restarted. Running the restart callback...")
runtime.OnCrashCallback(sock)
}
log.Debug("Clearing caches...")
runtime.cleanUpPendingCommandProcessingJobs()
runtime.messageBufferLock = sync.Mutex{}
} else {
log.Debug("We have not restarted.")
}
}
go func() {
log.Debug("Setting up Hermes keepalive Goroutine")
hasFailedBackendRunningCheckAlready := false
for {
if !runtime.isRuntimeRunning {
return
}
// Asking for the backend status seems to be a "good-enough" keepalive system. Plus, it provides useful telemetry.
// There isn't a ping command in the backend API, so we have to make do with what we have.
//
// To be safe here, we have to use the proper (yet annoying) facilities to prevent cross-talk, since we're in
// a goroutine, and can't talk directly. This actually has benefits, as the OuterLoop should exit on its own, if we
// encounter a critical error.
statusResponse, err := runtime.ProcessCommand(&commonbackend.BackendStatusRequest{})
if err != nil {
log.Warnf("Failed to get response for backend (in backend runtime keep alive): %s", err.Error())
log.Debugf("Attempting to close socket...")
err := sock.Close()
if err != nil {
log.Debugf("Failed to close socket: %s", err.Error())
}
continue
}
switch responseMessage := statusResponse.(type) {
case *commonbackend.BackendStatusResponse:
if !responseMessage.IsRunning {
if hasFailedBackendRunningCheckAlready {
if responseMessage.Message != "" {
log.Warnf("Backend (in backend keepalive) is up but not active: %s", responseMessage.Message)
} else {
log.Warnf("Backend (in backend keepalive) is up but not active")
}
}
hasFailedBackendRunningCheckAlready = true
}
default:
log.Errorf("Got illegal response type for backend (in backend keepalive): %T", responseMessage)
log.Debugf("Attempting to close socket...")
err := sock.Close()
if err != nil {
log.Debugf("Failed to close socket: %s", err.Error())
}
}
time.Sleep(5 * time.Second)
}
}()
OuterLoop:
for {
_ = <-runtime.startProcessingNotification
runtime.isRuntimeCurrentlyProcessing = true
for chanIndex, messageData := range runtime.messageBuffer {
if messageData == nil {
continue
}
err := handleCommand(messageData.Message, sock, messageData.Channel)
if err != nil {
log.Warnf("failed to handle command in backend runtime instance: %s", err.Error())
if strings.HasPrefix(err.Error(), "failed to write message") {
break OuterLoop
}
}
runtime.messageBuffer[chanIndex] = nil
}
runtime.isRuntimeCurrentlyProcessing = false
}
sock.Close()
}
}()
runtime.processRestartNotification <- false
for {
log.Debug("Starting process...")
ctx := context.Background()
runtime.currentProcess = exec.CommandContext(ctx, runtime.ProcessPath)
runtime.currentProcess.Env = append(runtime.currentProcess.Env, fmt.Sprintf("HERMES_API_SOCK=%s", sockPath), fmt.Sprintf("HERMES_LOG_LEVEL=%s", logLevel))
runtime.currentProcess.Stdout = runtime.logger
runtime.currentProcess.Stderr = runtime.logger
err := runtime.currentProcess.Run()
if err != nil {
if err, ok := err.(*exec.ExitError); ok {
if err.ExitCode() != -1 && err.ExitCode() != 0 {
log.Warnf("A backend process died with exit code '%d' and with error '%s'", err.ExitCode(), err.Error())
}
} else {
log.Warnf("A backend process died with error: %s", err.Error())
}
} else {
log.Debug("Process exited gracefully.")
}
if !runtime.isRuntimeRunning {
return nil
}
log.Debug("Sleeping 5 seconds, and then restarting process")
time.Sleep(5 * time.Second)
// NOTE(imterah): This could cause hangs if we're not careful. If the process dies so much that we can't keep up, it should deserve to be hung, really.
// There's probably a better way to do this, but this works.
//
// If this does turn out to be a problem, just increase the Goroutine buffer size.
runtime.processRestartNotification <- true
log.Debug("Sent off notification.")
}
}
func (runtime *Runtime) Start() error {
if runtime.isRuntimeRunning {
return fmt.Errorf("runtime already running")
}
runtime.messageBuffer = make([]*messageForBuf, 10)
runtime.messageBufferLock = sync.Mutex{}
runtime.startProcessingNotification = make(chan bool)
runtime.processRestartNotification = make(chan bool, 1)
runtime.logger = &writeLogger{
Runtime: runtime,
}
go func() {
err := runtime.goRoutineHandler()
if err != nil {
log.Errorf("Failed during execution of runtime: %s", err.Error())
}
}()
runtime.isRuntimeRunning = true
return nil
}
func (runtime *Runtime) Stop() error {
if !runtime.isRuntimeRunning {
return fmt.Errorf("runtime not running")
}
runtime.isRuntimeRunning = false
if runtime.currentProcess != nil && runtime.currentProcess.Cancel != nil {
err := runtime.currentProcess.Cancel()
if err != nil {
return fmt.Errorf("failed to stop process: %s", err.Error())
}
} else {
log.Warn("Failed to kill process (Stop recieved), currentProcess or currentProcess.Cancel is nil")
}
if runtime.currentListener != nil {
err := runtime.currentListener.Close()
if err != nil {
return fmt.Errorf("failed to stop listener: %s", err.Error())
}
} else {
log.Warn("Failed to kill listener, as the listener is nil")
}
return nil
}
func (runtime *Runtime) ProcessCommand(command interface{}) (interface{}, error) {
schedulingAttempts := 0
var commandChannel chan interface{}
SchedulingLoop:
for {
if !runtime.isRuntimeRunning {
time.Sleep(10 * time.Millisecond)
}
if schedulingAttempts > 50 {
return nil, fmt.Errorf("failed to schedule message transmission after 50 tries (REPORT THIS ISSUE)")
}
runtime.messageBufferLock.Lock()
// Attempt to find spot in buffer to schedule message transmission
for i, message := range runtime.messageBuffer {
if message != nil {
continue
}
commandChannel = make(chan interface{})
runtime.messageBuffer[i] = &messageForBuf{
Channel: commandChannel,
Message: command,
}
runtime.messageBufferLock.Unlock()
break SchedulingLoop
}
runtime.messageBufferLock.Unlock()
time.Sleep(100 * time.Millisecond)
schedulingAttempts++
}
if !runtime.isRuntimeCurrentlyProcessing {
runtime.startProcessingNotification <- true
}
// Fetch response and close Channel
response, ok := <-commandChannel
if !ok {
return nil, fmt.Errorf("failed to read from command channel: recieved signal that is not OK")
}
close(commandChannel)
err, ok := response.(error)
if ok {
return nil, err
}
return response, nil
}
func (runtime *Runtime) cleanUpPendingCommandProcessingJobs() {
for messageIndex, message := range runtime.messageBuffer {
if message == nil {
continue
}
timeoutChannel := time.After(100 * time.Millisecond)
select {
case <-timeoutChannel:
log.Warn("Message channel is likely running (timed out reading from it without an error)")
close(message.Channel)
case _, ok := <-message.Channel:
if ok {
log.Warn("Message channel is running, but should be stopped (since message is NOT nil!)")
close(message.Channel)
}
}
runtime.messageBuffer[messageIndex] = nil
}
}
func NewBackend(path string) *Runtime {
return &Runtime{
ProcessPath: path,
}
}
func Init(backends []*Backend) error {
var err error
TempDir, err = os.MkdirTemp("", "hermes-sockets-")
if err != nil {
return err
}
AvailableBackends = backends
return nil
}