From 1e1a330a4bf5e18fc4cfe93c96f384dbdff25bac Mon Sep 17 00:00:00 2001 From: imterah Date: Mon, 6 Jan 2025 01:24:11 -0500 Subject: [PATCH] feature: Refactors backend runtime's communication mechanism to be more stable. --- backend/api/backendruntime/core.go | 15 + backend/api/backendruntime/runtime.go | 261 +++++++++++------- backend/api/backendruntime/struct.go | 27 +- backend/api/controllers/v1/backends/create.go | 28 +- .../api/controllers/v1/proxies/connections.go | 14 +- backend/api/controllers/v1/proxies/create.go | 14 +- backend/api/controllers/v1/proxies/remove.go | 14 +- backend/api/controllers/v1/proxies/start.go | 6 +- backend/api/controllers/v1/proxies/stop.go | 14 +- backend/api/main.go | 18 +- 10 files changed, 254 insertions(+), 157 deletions(-) create mode 100644 backend/api/backendruntime/core.go diff --git a/backend/api/backendruntime/core.go b/backend/api/backendruntime/core.go new file mode 100644 index 0000000..e06a2a6 --- /dev/null +++ b/backend/api/backendruntime/core.go @@ -0,0 +1,15 @@ +package backendruntime + +import "os" + +var ( + AvailableBackends []*Backend + RunningBackends map[uint]*Runtime + TempDir string + isDevelopmentMode bool +) + +func init() { + RunningBackends = make(map[uint]*Runtime) + isDevelopmentMode = os.Getenv("HERMES_DEVELOPMENT_MODE") != "" +} diff --git a/backend/api/backendruntime/runtime.go b/backend/api/backendruntime/runtime.go index 683b7f8..e2359c2 100644 --- a/backend/api/backendruntime/runtime.go +++ b/backend/api/backendruntime/runtime.go @@ -7,6 +7,7 @@ import ( "os" "os/exec" "strings" + "sync" "time" "git.terah.dev/imterah/hermes/backend/backendlauncher" @@ -14,16 +15,6 @@ import ( "github.com/charmbracelet/log" ) -var ( - AvailableBackends []*Backend - RunningBackends map[uint]*Runtime - TempDir string -) - -func init() { - RunningBackends = make(map[uint]*Runtime) -} - func handleCommand(commandType string, command interface{}, sock net.Conn, rtcChan chan interface{}) error { bytes, err := commonbackend.Marshal(commandType, command) @@ -101,7 +92,9 @@ func (runtime *Runtime) goRoutineHandler() error { } else { log.Debug("We have restarted. Running the restart callback...") runtime.OnCrashCallback(sock) - log.Debug("Finished running the restart callback.") + log.Debug("Finished running the restart callback. Clearing caches...") + runtime.cleanUpPendingCommandProcessingJobs() + runtime.messageBufferLock = sync.Mutex{} } } else { log.Debug("We have not restarted.") @@ -112,8 +105,6 @@ func (runtime *Runtime) goRoutineHandler() error { log.Debugf("Setting up Hermes keepalive Goroutine") hasFailedBackendRunningCheckAlready := false - time.Sleep(time.Second * 1) - for { if !runtime.isRuntimeRunning { return @@ -125,21 +116,23 @@ func (runtime *Runtime) goRoutineHandler() error { // To be safe here, we have to use the proper (yet annoying) facilities to prevent cross-talk, since we're in // a goroutine, and can't talk directly. This actually has benefits, as the OuterLoop should exit on its own, if we // encounter a critical error. - runtime.RuntimeCommands <- &commonbackend.BackendStatusRequest{ + statusResponse, err := runtime.ProcessCommand(&commonbackend.BackendStatusRequest{ Type: "backendStatusRequest", - } + }) - statusResponse := <-runtime.RuntimeCommands - - switch responseMessage := statusResponse.(type) { - case error: - log.Warnf("Failed to get response for backend (in backend runtime keep alive): %s", responseMessage.Error()) + if err != nil { + log.Warnf("Failed to get response for backend (in backend runtime keep alive): %s", err.Error()) log.Debugf("Attempting to close socket...") err := sock.Close() if err != nil { log.Debugf("Failed to close socket: %s", err.Error()) } + + continue + } + + switch responseMessage := statusResponse.(type) { case *commonbackend.BackendStatusResponse: if !responseMessage.IsRunning { if hasFailedBackendRunningCheckAlready { @@ -168,112 +161,119 @@ func (runtime *Runtime) goRoutineHandler() error { OuterLoop: for { - commandRaw := <-runtime.RuntimeCommands - - switch command := commandRaw.(type) { - case *commonbackend.AddProxy: - err := handleCommand("addProxy", command, sock, runtime.RuntimeCommands) - - if err != nil { - log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) - - if strings.HasPrefix(err.Error(), "failed to write message") { - break OuterLoop - } + for chanIndex, messageData := range runtime.messageBuffer { + if messageData == nil { + continue } - case *commonbackend.BackendStatusRequest: - err := handleCommand("backendStatusRequest", command, sock, runtime.RuntimeCommands) - if err != nil { - log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + // We don't use the Mutex here as the Mutex is only to prevent cross-talk between multiple ProcessCommand() calls. + switch command := messageData.Message.(type) { + case *commonbackend.AddProxy: + err := handleCommand("addProxy", command, sock, messageData.Channel) - if strings.HasPrefix(err.Error(), "failed to write message") { - break OuterLoop + if err != nil { + log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + + if strings.HasPrefix(err.Error(), "failed to write message") { + break OuterLoop + } } - } - case *commonbackend.CheckClientParameters: - err := handleCommand("checkClientParameters", command, sock, runtime.RuntimeCommands) + case *commonbackend.BackendStatusRequest: + err := handleCommand("backendStatusRequest", command, sock, messageData.Channel) - if err != nil { - log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + if err != nil { + log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) - if strings.HasPrefix(err.Error(), "failed to write message") { - break OuterLoop + if strings.HasPrefix(err.Error(), "failed to write message") { + break OuterLoop + } } - } - case *commonbackend.CheckServerParameters: - err := handleCommand("checkServerParameters", command, sock, runtime.RuntimeCommands) + case *commonbackend.CheckClientParameters: + err := handleCommand("checkClientParameters", command, sock, messageData.Channel) - if err != nil { - log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + if err != nil { + log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) - if strings.HasPrefix(err.Error(), "failed to write message") { - break OuterLoop + if strings.HasPrefix(err.Error(), "failed to write message") { + break OuterLoop + } } - } - case *commonbackend.ProxyConnectionsRequest: - err := handleCommand("proxyConnectionsRequest", command, sock, runtime.RuntimeCommands) + case *commonbackend.CheckServerParameters: + err := handleCommand("checkServerParameters", command, sock, messageData.Channel) - if err != nil { - log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + if err != nil { + log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) - if strings.HasPrefix(err.Error(), "failed to write message") { - break OuterLoop + if strings.HasPrefix(err.Error(), "failed to write message") { + break OuterLoop + } } - } - case *commonbackend.ProxyInstanceRequest: - err := handleCommand("proxyInstanceRequest", command, sock, runtime.RuntimeCommands) + case *commonbackend.ProxyConnectionsRequest: + err := handleCommand("proxyConnectionsRequest", command, sock, messageData.Channel) - if err != nil { - log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + if err != nil { + log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) - if strings.HasPrefix(err.Error(), "failed to write message") { - break OuterLoop + if strings.HasPrefix(err.Error(), "failed to write message") { + break OuterLoop + } } - } - case *commonbackend.ProxyStatusRequest: - err := handleCommand("proxyStatusRequest", command, sock, runtime.RuntimeCommands) + case *commonbackend.ProxyInstanceRequest: + err := handleCommand("proxyInstanceRequest", command, sock, messageData.Channel) - if err != nil { - log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + if err != nil { + log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) - if strings.HasPrefix(err.Error(), "failed to write message") { - break OuterLoop + if strings.HasPrefix(err.Error(), "failed to write message") { + break OuterLoop + } } - } - case *commonbackend.RemoveProxy: - err := handleCommand("removeProxy", command, sock, runtime.RuntimeCommands) + case *commonbackend.ProxyStatusRequest: + err := handleCommand("proxyStatusRequest", command, sock, messageData.Channel) - if err != nil { - log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + if err != nil { + log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) - if strings.HasPrefix(err.Error(), "failed to write message") { - break OuterLoop + if strings.HasPrefix(err.Error(), "failed to write message") { + break OuterLoop + } } - } - case *commonbackend.Start: - err := handleCommand("start", command, sock, runtime.RuntimeCommands) + case *commonbackend.RemoveProxy: + err := handleCommand("removeProxy", command, sock, messageData.Channel) - if err != nil { - log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + if err != nil { + log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) - if strings.HasPrefix(err.Error(), "failed to write message") { - break OuterLoop + if strings.HasPrefix(err.Error(), "failed to write message") { + break OuterLoop + } } - } - case *commonbackend.Stop: - err := handleCommand("stop", command, sock, runtime.RuntimeCommands) + case *commonbackend.Start: + err := handleCommand("start", command, sock, messageData.Channel) - if err != nil { - log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + if err != nil { + log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) - if strings.HasPrefix(err.Error(), "failed to write message") { - break OuterLoop + if strings.HasPrefix(err.Error(), "failed to write message") { + break OuterLoop + } } + case *commonbackend.Stop: + err := handleCommand("stop", command, sock, messageData.Channel) + + if err != nil { + log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + + if strings.HasPrefix(err.Error(), "failed to write message") { + break OuterLoop + } + } + default: + log.Warnf("Recieved unknown command type from channel: %q", command) + messageData.Channel <- fmt.Errorf("unknown command recieved") } - default: - log.Warnf("Recieved unknown command type from channel: %q", command) - runtime.RuntimeCommands <- fmt.Errorf("unknown command recieved") + + runtime.messageBuffer[chanIndex] = nil } } @@ -330,7 +330,9 @@ func (runtime *Runtime) Start() error { return fmt.Errorf("runtime already running") } - runtime.RuntimeCommands = make(chan interface{}) + runtime.messageBuffer = make([]*messageForBuf, 10) + runtime.messageBufferLock = sync.Mutex{} + runtime.processRestartNotification = make(chan bool, 1) runtime.logger = &writeLogger{ @@ -379,6 +381,69 @@ func (runtime *Runtime) Stop() error { return nil } +func (runtime *Runtime) ProcessCommand(command interface{}) (interface{}, error) { + schedulingAttempts := 0 + var commandChannel chan interface{} + +SchedulingLoop: + for { + if !runtime.isRuntimeRunning { + time.Sleep(10 * time.Millisecond) + } + + if schedulingAttempts > 50 { + return nil, fmt.Errorf("failed to schedule synchronous message transmission after 50 tries with 100ms intervals (REPORT THIS ISSUE)") + } + + runtime.messageBufferLock.Lock() + + // Attempt to find spot in buffer to schedule message transmission + for i, message := range runtime.messageBuffer { + if message != nil { + continue + } + + commandChannel = make(chan interface{}) + + runtime.messageBuffer[i] = &messageForBuf{ + Channel: commandChannel, + Message: command, + } + + runtime.messageBufferLock.Unlock() + break SchedulingLoop + } + + runtime.messageBufferLock.Unlock() + time.Sleep(100 * time.Millisecond) + + schedulingAttempts++ + } + + // Fetch response and close Channel + response := <-commandChannel + close(commandChannel) + + err, ok := response.(error) + + if ok { + return nil, err + } + + return response, nil +} + +func (runtime *Runtime) cleanUpPendingCommandProcessingJobs() { + for messageIndex, message := range runtime.messageBuffer { + if message == nil { + continue + } + + close(message.Channel) + runtime.messageBuffer[messageIndex] = nil + } +} + func NewBackend(path string) *Runtime { return &Runtime{ ProcessPath: path, diff --git a/backend/api/backendruntime/struct.go b/backend/api/backendruntime/struct.go index 28a74b9..9d057ad 100644 --- a/backend/api/backendruntime/struct.go +++ b/backend/api/backendruntime/struct.go @@ -4,6 +4,9 @@ import ( "net" "os/exec" "strings" + "sync" + + "github.com/charmbracelet/log" ) type Backend struct { @@ -11,6 +14,11 @@ type Backend struct { Path string `validate:"required"` } +type messageForBuf struct { + Channel chan interface{} + Message interface{} +} + type Runtime struct { isRuntimeRunning bool logger *writeLogger @@ -18,9 +26,11 @@ type Runtime struct { currentListener net.Listener processRestartNotification chan bool - ProcessPath string - Logs []string - RuntimeCommands chan interface{} + messageBufferLock sync.Mutex + messageBuffer []*messageForBuf + + ProcessPath string + Logs []string OnCrashCallback func(sock net.Conn) } @@ -31,6 +41,17 @@ type writeLogger struct { func (writer writeLogger) Write(p []byte) (n int, err error) { logSplit := strings.Split(string(p), "\n") + + if isDevelopmentMode { + for _, logLine := range logSplit { + if logLine == "" { + continue + } + + log.Debug("spawned backend logs: " + logLine) + } + } + writer.Runtime.Logs = append(writer.Runtime.Logs, logSplit...) return len(p), err diff --git a/backend/api/controllers/v1/backends/create.go b/backend/api/controllers/v1/backends/create.go index 03c3509..0d8c614 100644 --- a/backend/api/controllers/v1/backends/create.go +++ b/backend/api/controllers/v1/backends/create.go @@ -125,16 +125,13 @@ func CreateBackend(c *gin.Context) { return } - backend.RuntimeCommands <- &commonbackend.CheckServerParameters{ + backendParamCheckResponse, err := backend.ProcessCommand(&commonbackend.CheckServerParameters{ Type: "checkServerParameters", Arguments: backendParameters, - } + }) - backendParamCheckResponse := <-backend.RuntimeCommands - - switch responseMessage := backendParamCheckResponse.(type) { - case error: - log.Warnf("Failed to get response for backend: %s", responseMessage.Error()) + if err != nil { + log.Warnf("Failed to get response for backend: %s", err.Error()) err = backend.Stop() @@ -147,6 +144,9 @@ func CreateBackend(c *gin.Context) { }) return + } + + switch responseMessage := backendParamCheckResponse.(type) { case *commonbackend.CheckParametersResponse: if responseMessage.InResponseTo != "checkServerParameters" { log.Errorf("Got illegal response to CheckServerParameters: %s", responseMessage.InResponseTo) @@ -215,16 +215,13 @@ func CreateBackend(c *gin.Context) { return } - backend.RuntimeCommands <- &commonbackend.Start{ + backendStartResponse, err := backend.ProcessCommand(&commonbackend.Start{ Type: "start", Arguments: backendParameters, - } + }) - backendStartResponse := <-backend.RuntimeCommands - - switch responseMessage := backendStartResponse.(type) { - case error: - log.Warnf("Failed to get response for backend: %s", responseMessage.Error()) + if err != nil { + log.Warnf("Failed to get response for backend: %s", err.Error()) err = backend.Stop() @@ -237,6 +234,9 @@ func CreateBackend(c *gin.Context) { }) return + } + + switch responseMessage := backendStartResponse.(type) { case *commonbackend.BackendStatusResponse: if !responseMessage.IsRunning { err = backend.Stop() diff --git a/backend/api/controllers/v1/proxies/connections.go b/backend/api/controllers/v1/proxies/connections.go index 9f1affa..baceb4e 100644 --- a/backend/api/controllers/v1/proxies/connections.go +++ b/backend/api/controllers/v1/proxies/connections.go @@ -118,19 +118,19 @@ func GetConnections(c *gin.Context) { return } - backendRuntime.RuntimeCommands <- &commonbackend.ProxyConnectionsRequest{ + backendResponse, err := backendRuntime.ProcessCommand(&commonbackend.ProxyConnectionsRequest{ Type: "proxyConnectionsRequest", - } + }) - backendResponse := <-backendRuntime.RuntimeCommands - - switch responseMessage := backendResponse.(type) { - case error: - log.Warnf("Failed to get response for backend: %s", responseMessage.Error()) + if err != nil { + log.Warnf("Failed to get response for backend: %s", err.Error()) c.JSON(http.StatusInternalServerError, gin.H{ "error": "Failed to get status response from backend", }) + } + + switch responseMessage := backendResponse.(type) { case *commonbackend.ProxyConnectionsResponse: sanitizedConnections := []*SanitizedConnection{} diff --git a/backend/api/controllers/v1/proxies/create.go b/backend/api/controllers/v1/proxies/create.go index 1ac0bbc..b8f2256 100644 --- a/backend/api/controllers/v1/proxies/create.go +++ b/backend/api/controllers/v1/proxies/create.go @@ -139,25 +139,25 @@ func CreateProxy(c *gin.Context) { return } - backend.RuntimeCommands <- &commonbackend.AddProxy{ + backendResponse, err := backend.ProcessCommand(&commonbackend.AddProxy{ Type: "addProxy", SourceIP: proxy.SourceIP, SourcePort: proxy.SourcePort, DestPort: proxy.DestinationPort, Protocol: proxy.Protocol, - } + }) - backendResponse := <-backend.RuntimeCommands - - switch responseMessage := backendResponse.(type) { - case error: - log.Warnf("Failed to get response for backend #%d: %s", proxy.BackendID, responseMessage.Error()) + if err != nil { + log.Warnf("Failed to get response for backend #%d: %s", proxy.BackendID, err.Error()) c.JSON(http.StatusInternalServerError, gin.H{ "error": "failed to get response from backend", }) return + } + + switch responseMessage := backendResponse.(type) { case *commonbackend.ProxyStatusResponse: if !responseMessage.IsActive { log.Warnf("Failed to start proxy for backend #%d", proxy.BackendID) diff --git a/backend/api/controllers/v1/proxies/remove.go b/backend/api/controllers/v1/proxies/remove.go index 4e476dd..8087e29 100644 --- a/backend/api/controllers/v1/proxies/remove.go +++ b/backend/api/controllers/v1/proxies/remove.go @@ -110,25 +110,25 @@ func RemoveProxy(c *gin.Context) { return } - backend.RuntimeCommands <- &commonbackend.RemoveProxy{ + backendResponse, err := backend.ProcessCommand(&commonbackend.RemoveProxy{ Type: "removeProxy", SourceIP: proxy.SourceIP, SourcePort: proxy.SourcePort, DestPort: proxy.DestinationPort, Protocol: proxy.Protocol, - } + }) - backendResponse := <-backend.RuntimeCommands - - switch responseMessage := backendResponse.(type) { - case error: - log.Warnf("Failed to get response for backend #%d: %s", proxy.BackendID, responseMessage.Error()) + if err != nil { + log.Warnf("Failed to get response for backend #%d: %s", proxy.BackendID, err.Error()) c.JSON(http.StatusInternalServerError, gin.H{ "error": "Failed to get response from backend. Proxy was still successfully deleted", }) return + } + + switch responseMessage := backendResponse.(type) { case *commonbackend.ProxyStatusResponse: if responseMessage.IsActive { c.JSON(http.StatusInternalServerError, gin.H{ diff --git a/backend/api/controllers/v1/proxies/start.go b/backend/api/controllers/v1/proxies/start.go index 2647277..d0cd5e0 100644 --- a/backend/api/controllers/v1/proxies/start.go +++ b/backend/api/controllers/v1/proxies/start.go @@ -100,15 +100,13 @@ func StartProxy(c *gin.Context) { return } - backend.RuntimeCommands <- &commonbackend.AddProxy{ + backendResponse, err := backend.ProcessCommand(&commonbackend.AddProxy{ Type: "addProxy", SourceIP: proxy.SourceIP, SourcePort: proxy.SourcePort, DestPort: proxy.DestinationPort, Protocol: proxy.Protocol, - } - - backendResponse := <-backend.RuntimeCommands + }) switch responseMessage := backendResponse.(type) { case error: diff --git a/backend/api/controllers/v1/proxies/stop.go b/backend/api/controllers/v1/proxies/stop.go index 474228a..1f5f525 100644 --- a/backend/api/controllers/v1/proxies/stop.go +++ b/backend/api/controllers/v1/proxies/stop.go @@ -100,25 +100,25 @@ func StopProxy(c *gin.Context) { return } - backend.RuntimeCommands <- &commonbackend.RemoveProxy{ + backendResponse, err := backend.ProcessCommand(&commonbackend.RemoveProxy{ Type: "removeProxy", SourceIP: proxy.SourceIP, SourcePort: proxy.SourcePort, DestPort: proxy.DestinationPort, Protocol: proxy.Protocol, - } + }) - backendResponse := <-backend.RuntimeCommands - - switch responseMessage := backendResponse.(type) { - case error: - log.Warnf("Failed to get response for backend #%d: %s", proxy.BackendID, responseMessage.Error()) + if err != nil { + log.Warnf("Failed to get response for backend #%d: %s", proxy.BackendID, err.Error()) c.JSON(http.StatusInternalServerError, gin.H{ "error": "failed to get response from backend", }) return + } + + switch responseMessage := backendResponse.(type) { case *commonbackend.ProxyStatusResponse: if responseMessage.IsActive { c.JSON(http.StatusInternalServerError, gin.H{ diff --git a/backend/api/main.go b/backend/api/main.go index c233d06..81fc2ba 100644 --- a/backend/api/main.go +++ b/backend/api/main.go @@ -112,12 +112,10 @@ func apiEntrypoint(cCtx *cli.Context) error { continue } - backendInstance.RuntimeCommands <- &commonbackend.Start{ + backendStartResponse, err := backendInstance.ProcessCommand(&commonbackend.Start{ Type: "start", Arguments: backendParameters, - } - - backendStartResponse := <-backendInstance.RuntimeCommands + }) switch responseMessage := backendStartResponse.(type) { case error: @@ -165,20 +163,20 @@ func apiEntrypoint(cCtx *cli.Context) error { for _, proxy := range autoStartProxies { log.Infof("Starting up route #%d for backend #%d: %s", proxy.ID, backend.ID, proxy.Name) - backendInstance.RuntimeCommands <- &commonbackend.AddProxy{ + backendResponse, err := backendInstance.ProcessCommand(&commonbackend.AddProxy{ Type: "addProxy", SourceIP: proxy.SourceIP, SourcePort: proxy.SourcePort, DestPort: proxy.DestinationPort, Protocol: proxy.Protocol, + }) + + if err != nil { + log.Errorf("Failed to get response for backend #%d and route #%d: %s", proxy.BackendID, proxy.ID, err.Error()) + continue } - backendResponse := <-backendInstance.RuntimeCommands - switch responseMessage := backendResponse.(type) { - case error: - log.Errorf("Failed to get response for backend #%d and route #%d: %s", proxy.BackendID, proxy.ID, responseMessage.Error()) - continue case *commonbackend.ProxyStatusResponse: if !responseMessage.IsActive { log.Warnf("Failed to start proxy for backend #%d and route #%d", proxy.BackendID, proxy.ID)