From 93f2f9cbee9c0253763309e6a3ce69417c688ad2 Mon Sep 17 00:00:00 2001 From: imterah Date: Mon, 6 Jan 2025 00:09:14 -0500 Subject: [PATCH 1/3] fix: Adds missing backend status command implementations. --- backend/backendutil/application.go | 36 ++++++++++++++++++++++++++++++ backend/backendutil/structure.go | 1 + backend/dummybackend/main.go | 4 ++++ backend/sshbackend/main.go | 4 ++++ 4 files changed, 45 insertions(+) diff --git a/backend/backendutil/application.go b/backend/backendutil/application.go index 5b99b2d..ccb21f2 100644 --- a/backend/backendutil/application.go +++ b/backend/backendutil/application.go @@ -72,6 +72,42 @@ func (helper *BackendApplicationHelper) Start() error { continue } + helper.socket.Write(responseMarshalled) + case "backendStatusRequest": + _, ok := commandRaw.(*commonbackend.BackendStatusRequest) + + if !ok { + return fmt.Errorf("failed to typecast") + } + + ok, err := helper.Backend.GetBackendStatus() + + var ( + message string + statusCode int + ) + + if err != nil { + message = err.Error() + statusCode = commonbackend.StatusFailure + } else { + statusCode = commonbackend.StatusSuccess + } + + response := &commonbackend.BackendStatusResponse{ + Type: "backendStatusResponse", + IsRunning: ok, + StatusCode: statusCode, + Message: message, + } + + responseMarshalled, err := commonbackend.Marshal(response.Type, response) + + if err != nil { + log.Error("failed to marshal response: %s", err.Error()) + continue + } + helper.socket.Write(responseMarshalled) case "stop": _, ok := commandRaw.(*commonbackend.Stop) diff --git a/backend/backendutil/structure.go b/backend/backendutil/structure.go index 3b97466..0eb7116 100644 --- a/backend/backendutil/structure.go +++ b/backend/backendutil/structure.go @@ -5,6 +5,7 @@ import "git.terah.dev/imterah/hermes/backend/commonbackend" type BackendInterface interface { StartBackend(arguments []byte) (bool, error) StopBackend() (bool, error) + GetBackendStatus() (bool, error) StartProxy(command *commonbackend.AddProxy) (bool, error) StopProxy(command *commonbackend.RemoveProxy) (bool, error) GetAllClientConnections() []*commonbackend.ProxyClientConnection diff --git a/backend/dummybackend/main.go b/backend/dummybackend/main.go index 92d88f5..f28615c 100644 --- a/backend/dummybackend/main.go +++ b/backend/dummybackend/main.go @@ -19,6 +19,10 @@ func (backend *DummyBackend) StopBackend() (bool, error) { return true, nil } +func (backend *DummyBackend) GetBackendStatus() (bool, error) { + return true, nil +} + func (backend *DummyBackend) StartProxy(command *commonbackend.AddProxy) (bool, error) { return true, nil } diff --git a/backend/sshbackend/main.go b/backend/sshbackend/main.go index e96ed5e..1bf35e7 100644 --- a/backend/sshbackend/main.go +++ b/backend/sshbackend/main.go @@ -96,6 +96,10 @@ func (backend *SSHBackend) StopBackend() (bool, error) { return true, nil } +func (backend *SSHBackend) GetBackendStatus() (bool, error) { + return backend.conn != nil, nil +} + func (backend *SSHBackend) StartProxy(command *commonbackend.AddProxy) (bool, error) { listenerObject := &SSHListener{ SourceIP: command.SourceIP, From 1e1a330a4bf5e18fc4cfe93c96f384dbdff25bac Mon Sep 17 00:00:00 2001 From: imterah Date: Mon, 6 Jan 2025 01:24:11 -0500 Subject: [PATCH 2/3] feature: Refactors backend runtime's communication mechanism to be more stable. --- backend/api/backendruntime/core.go | 15 + backend/api/backendruntime/runtime.go | 261 +++++++++++------- backend/api/backendruntime/struct.go | 27 +- backend/api/controllers/v1/backends/create.go | 28 +- .../api/controllers/v1/proxies/connections.go | 14 +- backend/api/controllers/v1/proxies/create.go | 14 +- backend/api/controllers/v1/proxies/remove.go | 14 +- backend/api/controllers/v1/proxies/start.go | 6 +- backend/api/controllers/v1/proxies/stop.go | 14 +- backend/api/main.go | 18 +- 10 files changed, 254 insertions(+), 157 deletions(-) create mode 100644 backend/api/backendruntime/core.go diff --git a/backend/api/backendruntime/core.go b/backend/api/backendruntime/core.go new file mode 100644 index 0000000..e06a2a6 --- /dev/null +++ b/backend/api/backendruntime/core.go @@ -0,0 +1,15 @@ +package backendruntime + +import "os" + +var ( + AvailableBackends []*Backend + RunningBackends map[uint]*Runtime + TempDir string + isDevelopmentMode bool +) + +func init() { + RunningBackends = make(map[uint]*Runtime) + isDevelopmentMode = os.Getenv("HERMES_DEVELOPMENT_MODE") != "" +} diff --git a/backend/api/backendruntime/runtime.go b/backend/api/backendruntime/runtime.go index 683b7f8..e2359c2 100644 --- a/backend/api/backendruntime/runtime.go +++ b/backend/api/backendruntime/runtime.go @@ -7,6 +7,7 @@ import ( "os" "os/exec" "strings" + "sync" "time" "git.terah.dev/imterah/hermes/backend/backendlauncher" @@ -14,16 +15,6 @@ import ( "github.com/charmbracelet/log" ) -var ( - AvailableBackends []*Backend - RunningBackends map[uint]*Runtime - TempDir string -) - -func init() { - RunningBackends = make(map[uint]*Runtime) -} - func handleCommand(commandType string, command interface{}, sock net.Conn, rtcChan chan interface{}) error { bytes, err := commonbackend.Marshal(commandType, command) @@ -101,7 +92,9 @@ func (runtime *Runtime) goRoutineHandler() error { } else { log.Debug("We have restarted. Running the restart callback...") runtime.OnCrashCallback(sock) - log.Debug("Finished running the restart callback.") + log.Debug("Finished running the restart callback. Clearing caches...") + runtime.cleanUpPendingCommandProcessingJobs() + runtime.messageBufferLock = sync.Mutex{} } } else { log.Debug("We have not restarted.") @@ -112,8 +105,6 @@ func (runtime *Runtime) goRoutineHandler() error { log.Debugf("Setting up Hermes keepalive Goroutine") hasFailedBackendRunningCheckAlready := false - time.Sleep(time.Second * 1) - for { if !runtime.isRuntimeRunning { return @@ -125,21 +116,23 @@ func (runtime *Runtime) goRoutineHandler() error { // To be safe here, we have to use the proper (yet annoying) facilities to prevent cross-talk, since we're in // a goroutine, and can't talk directly. This actually has benefits, as the OuterLoop should exit on its own, if we // encounter a critical error. - runtime.RuntimeCommands <- &commonbackend.BackendStatusRequest{ + statusResponse, err := runtime.ProcessCommand(&commonbackend.BackendStatusRequest{ Type: "backendStatusRequest", - } + }) - statusResponse := <-runtime.RuntimeCommands - - switch responseMessage := statusResponse.(type) { - case error: - log.Warnf("Failed to get response for backend (in backend runtime keep alive): %s", responseMessage.Error()) + if err != nil { + log.Warnf("Failed to get response for backend (in backend runtime keep alive): %s", err.Error()) log.Debugf("Attempting to close socket...") err := sock.Close() if err != nil { log.Debugf("Failed to close socket: %s", err.Error()) } + + continue + } + + switch responseMessage := statusResponse.(type) { case *commonbackend.BackendStatusResponse: if !responseMessage.IsRunning { if hasFailedBackendRunningCheckAlready { @@ -168,112 +161,119 @@ func (runtime *Runtime) goRoutineHandler() error { OuterLoop: for { - commandRaw := <-runtime.RuntimeCommands - - switch command := commandRaw.(type) { - case *commonbackend.AddProxy: - err := handleCommand("addProxy", command, sock, runtime.RuntimeCommands) - - if err != nil { - log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) - - if strings.HasPrefix(err.Error(), "failed to write message") { - break OuterLoop - } + for chanIndex, messageData := range runtime.messageBuffer { + if messageData == nil { + continue } - case *commonbackend.BackendStatusRequest: - err := handleCommand("backendStatusRequest", command, sock, runtime.RuntimeCommands) - if err != nil { - log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + // We don't use the Mutex here as the Mutex is only to prevent cross-talk between multiple ProcessCommand() calls. + switch command := messageData.Message.(type) { + case *commonbackend.AddProxy: + err := handleCommand("addProxy", command, sock, messageData.Channel) - if strings.HasPrefix(err.Error(), "failed to write message") { - break OuterLoop + if err != nil { + log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + + if strings.HasPrefix(err.Error(), "failed to write message") { + break OuterLoop + } } - } - case *commonbackend.CheckClientParameters: - err := handleCommand("checkClientParameters", command, sock, runtime.RuntimeCommands) + case *commonbackend.BackendStatusRequest: + err := handleCommand("backendStatusRequest", command, sock, messageData.Channel) - if err != nil { - log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + if err != nil { + log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) - if strings.HasPrefix(err.Error(), "failed to write message") { - break OuterLoop + if strings.HasPrefix(err.Error(), "failed to write message") { + break OuterLoop + } } - } - case *commonbackend.CheckServerParameters: - err := handleCommand("checkServerParameters", command, sock, runtime.RuntimeCommands) + case *commonbackend.CheckClientParameters: + err := handleCommand("checkClientParameters", command, sock, messageData.Channel) - if err != nil { - log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + if err != nil { + log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) - if strings.HasPrefix(err.Error(), "failed to write message") { - break OuterLoop + if strings.HasPrefix(err.Error(), "failed to write message") { + break OuterLoop + } } - } - case *commonbackend.ProxyConnectionsRequest: - err := handleCommand("proxyConnectionsRequest", command, sock, runtime.RuntimeCommands) + case *commonbackend.CheckServerParameters: + err := handleCommand("checkServerParameters", command, sock, messageData.Channel) - if err != nil { - log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + if err != nil { + log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) - if strings.HasPrefix(err.Error(), "failed to write message") { - break OuterLoop + if strings.HasPrefix(err.Error(), "failed to write message") { + break OuterLoop + } } - } - case *commonbackend.ProxyInstanceRequest: - err := handleCommand("proxyInstanceRequest", command, sock, runtime.RuntimeCommands) + case *commonbackend.ProxyConnectionsRequest: + err := handleCommand("proxyConnectionsRequest", command, sock, messageData.Channel) - if err != nil { - log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + if err != nil { + log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) - if strings.HasPrefix(err.Error(), "failed to write message") { - break OuterLoop + if strings.HasPrefix(err.Error(), "failed to write message") { + break OuterLoop + } } - } - case *commonbackend.ProxyStatusRequest: - err := handleCommand("proxyStatusRequest", command, sock, runtime.RuntimeCommands) + case *commonbackend.ProxyInstanceRequest: + err := handleCommand("proxyInstanceRequest", command, sock, messageData.Channel) - if err != nil { - log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + if err != nil { + log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) - if strings.HasPrefix(err.Error(), "failed to write message") { - break OuterLoop + if strings.HasPrefix(err.Error(), "failed to write message") { + break OuterLoop + } } - } - case *commonbackend.RemoveProxy: - err := handleCommand("removeProxy", command, sock, runtime.RuntimeCommands) + case *commonbackend.ProxyStatusRequest: + err := handleCommand("proxyStatusRequest", command, sock, messageData.Channel) - if err != nil { - log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + if err != nil { + log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) - if strings.HasPrefix(err.Error(), "failed to write message") { - break OuterLoop + if strings.HasPrefix(err.Error(), "failed to write message") { + break OuterLoop + } } - } - case *commonbackend.Start: - err := handleCommand("start", command, sock, runtime.RuntimeCommands) + case *commonbackend.RemoveProxy: + err := handleCommand("removeProxy", command, sock, messageData.Channel) - if err != nil { - log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + if err != nil { + log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) - if strings.HasPrefix(err.Error(), "failed to write message") { - break OuterLoop + if strings.HasPrefix(err.Error(), "failed to write message") { + break OuterLoop + } } - } - case *commonbackend.Stop: - err := handleCommand("stop", command, sock, runtime.RuntimeCommands) + case *commonbackend.Start: + err := handleCommand("start", command, sock, messageData.Channel) - if err != nil { - log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + if err != nil { + log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) - if strings.HasPrefix(err.Error(), "failed to write message") { - break OuterLoop + if strings.HasPrefix(err.Error(), "failed to write message") { + break OuterLoop + } } + case *commonbackend.Stop: + err := handleCommand("stop", command, sock, messageData.Channel) + + if err != nil { + log.Warnf("failed to handle command in backend runtime instance: %s", err.Error()) + + if strings.HasPrefix(err.Error(), "failed to write message") { + break OuterLoop + } + } + default: + log.Warnf("Recieved unknown command type from channel: %q", command) + messageData.Channel <- fmt.Errorf("unknown command recieved") } - default: - log.Warnf("Recieved unknown command type from channel: %q", command) - runtime.RuntimeCommands <- fmt.Errorf("unknown command recieved") + + runtime.messageBuffer[chanIndex] = nil } } @@ -330,7 +330,9 @@ func (runtime *Runtime) Start() error { return fmt.Errorf("runtime already running") } - runtime.RuntimeCommands = make(chan interface{}) + runtime.messageBuffer = make([]*messageForBuf, 10) + runtime.messageBufferLock = sync.Mutex{} + runtime.processRestartNotification = make(chan bool, 1) runtime.logger = &writeLogger{ @@ -379,6 +381,69 @@ func (runtime *Runtime) Stop() error { return nil } +func (runtime *Runtime) ProcessCommand(command interface{}) (interface{}, error) { + schedulingAttempts := 0 + var commandChannel chan interface{} + +SchedulingLoop: + for { + if !runtime.isRuntimeRunning { + time.Sleep(10 * time.Millisecond) + } + + if schedulingAttempts > 50 { + return nil, fmt.Errorf("failed to schedule synchronous message transmission after 50 tries with 100ms intervals (REPORT THIS ISSUE)") + } + + runtime.messageBufferLock.Lock() + + // Attempt to find spot in buffer to schedule message transmission + for i, message := range runtime.messageBuffer { + if message != nil { + continue + } + + commandChannel = make(chan interface{}) + + runtime.messageBuffer[i] = &messageForBuf{ + Channel: commandChannel, + Message: command, + } + + runtime.messageBufferLock.Unlock() + break SchedulingLoop + } + + runtime.messageBufferLock.Unlock() + time.Sleep(100 * time.Millisecond) + + schedulingAttempts++ + } + + // Fetch response and close Channel + response := <-commandChannel + close(commandChannel) + + err, ok := response.(error) + + if ok { + return nil, err + } + + return response, nil +} + +func (runtime *Runtime) cleanUpPendingCommandProcessingJobs() { + for messageIndex, message := range runtime.messageBuffer { + if message == nil { + continue + } + + close(message.Channel) + runtime.messageBuffer[messageIndex] = nil + } +} + func NewBackend(path string) *Runtime { return &Runtime{ ProcessPath: path, diff --git a/backend/api/backendruntime/struct.go b/backend/api/backendruntime/struct.go index 28a74b9..9d057ad 100644 --- a/backend/api/backendruntime/struct.go +++ b/backend/api/backendruntime/struct.go @@ -4,6 +4,9 @@ import ( "net" "os/exec" "strings" + "sync" + + "github.com/charmbracelet/log" ) type Backend struct { @@ -11,6 +14,11 @@ type Backend struct { Path string `validate:"required"` } +type messageForBuf struct { + Channel chan interface{} + Message interface{} +} + type Runtime struct { isRuntimeRunning bool logger *writeLogger @@ -18,9 +26,11 @@ type Runtime struct { currentListener net.Listener processRestartNotification chan bool - ProcessPath string - Logs []string - RuntimeCommands chan interface{} + messageBufferLock sync.Mutex + messageBuffer []*messageForBuf + + ProcessPath string + Logs []string OnCrashCallback func(sock net.Conn) } @@ -31,6 +41,17 @@ type writeLogger struct { func (writer writeLogger) Write(p []byte) (n int, err error) { logSplit := strings.Split(string(p), "\n") + + if isDevelopmentMode { + for _, logLine := range logSplit { + if logLine == "" { + continue + } + + log.Debug("spawned backend logs: " + logLine) + } + } + writer.Runtime.Logs = append(writer.Runtime.Logs, logSplit...) return len(p), err diff --git a/backend/api/controllers/v1/backends/create.go b/backend/api/controllers/v1/backends/create.go index 03c3509..0d8c614 100644 --- a/backend/api/controllers/v1/backends/create.go +++ b/backend/api/controllers/v1/backends/create.go @@ -125,16 +125,13 @@ func CreateBackend(c *gin.Context) { return } - backend.RuntimeCommands <- &commonbackend.CheckServerParameters{ + backendParamCheckResponse, err := backend.ProcessCommand(&commonbackend.CheckServerParameters{ Type: "checkServerParameters", Arguments: backendParameters, - } + }) - backendParamCheckResponse := <-backend.RuntimeCommands - - switch responseMessage := backendParamCheckResponse.(type) { - case error: - log.Warnf("Failed to get response for backend: %s", responseMessage.Error()) + if err != nil { + log.Warnf("Failed to get response for backend: %s", err.Error()) err = backend.Stop() @@ -147,6 +144,9 @@ func CreateBackend(c *gin.Context) { }) return + } + + switch responseMessage := backendParamCheckResponse.(type) { case *commonbackend.CheckParametersResponse: if responseMessage.InResponseTo != "checkServerParameters" { log.Errorf("Got illegal response to CheckServerParameters: %s", responseMessage.InResponseTo) @@ -215,16 +215,13 @@ func CreateBackend(c *gin.Context) { return } - backend.RuntimeCommands <- &commonbackend.Start{ + backendStartResponse, err := backend.ProcessCommand(&commonbackend.Start{ Type: "start", Arguments: backendParameters, - } + }) - backendStartResponse := <-backend.RuntimeCommands - - switch responseMessage := backendStartResponse.(type) { - case error: - log.Warnf("Failed to get response for backend: %s", responseMessage.Error()) + if err != nil { + log.Warnf("Failed to get response for backend: %s", err.Error()) err = backend.Stop() @@ -237,6 +234,9 @@ func CreateBackend(c *gin.Context) { }) return + } + + switch responseMessage := backendStartResponse.(type) { case *commonbackend.BackendStatusResponse: if !responseMessage.IsRunning { err = backend.Stop() diff --git a/backend/api/controllers/v1/proxies/connections.go b/backend/api/controllers/v1/proxies/connections.go index 9f1affa..baceb4e 100644 --- a/backend/api/controllers/v1/proxies/connections.go +++ b/backend/api/controllers/v1/proxies/connections.go @@ -118,19 +118,19 @@ func GetConnections(c *gin.Context) { return } - backendRuntime.RuntimeCommands <- &commonbackend.ProxyConnectionsRequest{ + backendResponse, err := backendRuntime.ProcessCommand(&commonbackend.ProxyConnectionsRequest{ Type: "proxyConnectionsRequest", - } + }) - backendResponse := <-backendRuntime.RuntimeCommands - - switch responseMessage := backendResponse.(type) { - case error: - log.Warnf("Failed to get response for backend: %s", responseMessage.Error()) + if err != nil { + log.Warnf("Failed to get response for backend: %s", err.Error()) c.JSON(http.StatusInternalServerError, gin.H{ "error": "Failed to get status response from backend", }) + } + + switch responseMessage := backendResponse.(type) { case *commonbackend.ProxyConnectionsResponse: sanitizedConnections := []*SanitizedConnection{} diff --git a/backend/api/controllers/v1/proxies/create.go b/backend/api/controllers/v1/proxies/create.go index 1ac0bbc..b8f2256 100644 --- a/backend/api/controllers/v1/proxies/create.go +++ b/backend/api/controllers/v1/proxies/create.go @@ -139,25 +139,25 @@ func CreateProxy(c *gin.Context) { return } - backend.RuntimeCommands <- &commonbackend.AddProxy{ + backendResponse, err := backend.ProcessCommand(&commonbackend.AddProxy{ Type: "addProxy", SourceIP: proxy.SourceIP, SourcePort: proxy.SourcePort, DestPort: proxy.DestinationPort, Protocol: proxy.Protocol, - } + }) - backendResponse := <-backend.RuntimeCommands - - switch responseMessage := backendResponse.(type) { - case error: - log.Warnf("Failed to get response for backend #%d: %s", proxy.BackendID, responseMessage.Error()) + if err != nil { + log.Warnf("Failed to get response for backend #%d: %s", proxy.BackendID, err.Error()) c.JSON(http.StatusInternalServerError, gin.H{ "error": "failed to get response from backend", }) return + } + + switch responseMessage := backendResponse.(type) { case *commonbackend.ProxyStatusResponse: if !responseMessage.IsActive { log.Warnf("Failed to start proxy for backend #%d", proxy.BackendID) diff --git a/backend/api/controllers/v1/proxies/remove.go b/backend/api/controllers/v1/proxies/remove.go index 4e476dd..8087e29 100644 --- a/backend/api/controllers/v1/proxies/remove.go +++ b/backend/api/controllers/v1/proxies/remove.go @@ -110,25 +110,25 @@ func RemoveProxy(c *gin.Context) { return } - backend.RuntimeCommands <- &commonbackend.RemoveProxy{ + backendResponse, err := backend.ProcessCommand(&commonbackend.RemoveProxy{ Type: "removeProxy", SourceIP: proxy.SourceIP, SourcePort: proxy.SourcePort, DestPort: proxy.DestinationPort, Protocol: proxy.Protocol, - } + }) - backendResponse := <-backend.RuntimeCommands - - switch responseMessage := backendResponse.(type) { - case error: - log.Warnf("Failed to get response for backend #%d: %s", proxy.BackendID, responseMessage.Error()) + if err != nil { + log.Warnf("Failed to get response for backend #%d: %s", proxy.BackendID, err.Error()) c.JSON(http.StatusInternalServerError, gin.H{ "error": "Failed to get response from backend. Proxy was still successfully deleted", }) return + } + + switch responseMessage := backendResponse.(type) { case *commonbackend.ProxyStatusResponse: if responseMessage.IsActive { c.JSON(http.StatusInternalServerError, gin.H{ diff --git a/backend/api/controllers/v1/proxies/start.go b/backend/api/controllers/v1/proxies/start.go index 2647277..d0cd5e0 100644 --- a/backend/api/controllers/v1/proxies/start.go +++ b/backend/api/controllers/v1/proxies/start.go @@ -100,15 +100,13 @@ func StartProxy(c *gin.Context) { return } - backend.RuntimeCommands <- &commonbackend.AddProxy{ + backendResponse, err := backend.ProcessCommand(&commonbackend.AddProxy{ Type: "addProxy", SourceIP: proxy.SourceIP, SourcePort: proxy.SourcePort, DestPort: proxy.DestinationPort, Protocol: proxy.Protocol, - } - - backendResponse := <-backend.RuntimeCommands + }) switch responseMessage := backendResponse.(type) { case error: diff --git a/backend/api/controllers/v1/proxies/stop.go b/backend/api/controllers/v1/proxies/stop.go index 474228a..1f5f525 100644 --- a/backend/api/controllers/v1/proxies/stop.go +++ b/backend/api/controllers/v1/proxies/stop.go @@ -100,25 +100,25 @@ func StopProxy(c *gin.Context) { return } - backend.RuntimeCommands <- &commonbackend.RemoveProxy{ + backendResponse, err := backend.ProcessCommand(&commonbackend.RemoveProxy{ Type: "removeProxy", SourceIP: proxy.SourceIP, SourcePort: proxy.SourcePort, DestPort: proxy.DestinationPort, Protocol: proxy.Protocol, - } + }) - backendResponse := <-backend.RuntimeCommands - - switch responseMessage := backendResponse.(type) { - case error: - log.Warnf("Failed to get response for backend #%d: %s", proxy.BackendID, responseMessage.Error()) + if err != nil { + log.Warnf("Failed to get response for backend #%d: %s", proxy.BackendID, err.Error()) c.JSON(http.StatusInternalServerError, gin.H{ "error": "failed to get response from backend", }) return + } + + switch responseMessage := backendResponse.(type) { case *commonbackend.ProxyStatusResponse: if responseMessage.IsActive { c.JSON(http.StatusInternalServerError, gin.H{ diff --git a/backend/api/main.go b/backend/api/main.go index c233d06..81fc2ba 100644 --- a/backend/api/main.go +++ b/backend/api/main.go @@ -112,12 +112,10 @@ func apiEntrypoint(cCtx *cli.Context) error { continue } - backendInstance.RuntimeCommands <- &commonbackend.Start{ + backendStartResponse, err := backendInstance.ProcessCommand(&commonbackend.Start{ Type: "start", Arguments: backendParameters, - } - - backendStartResponse := <-backendInstance.RuntimeCommands + }) switch responseMessage := backendStartResponse.(type) { case error: @@ -165,20 +163,20 @@ func apiEntrypoint(cCtx *cli.Context) error { for _, proxy := range autoStartProxies { log.Infof("Starting up route #%d for backend #%d: %s", proxy.ID, backend.ID, proxy.Name) - backendInstance.RuntimeCommands <- &commonbackend.AddProxy{ + backendResponse, err := backendInstance.ProcessCommand(&commonbackend.AddProxy{ Type: "addProxy", SourceIP: proxy.SourceIP, SourcePort: proxy.SourcePort, DestPort: proxy.DestinationPort, Protocol: proxy.Protocol, + }) + + if err != nil { + log.Errorf("Failed to get response for backend #%d and route #%d: %s", proxy.BackendID, proxy.ID, err.Error()) + continue } - backendResponse := <-backendInstance.RuntimeCommands - switch responseMessage := backendResponse.(type) { - case error: - log.Errorf("Failed to get response for backend #%d and route #%d: %s", proxy.BackendID, proxy.ID, responseMessage.Error()) - continue case *commonbackend.ProxyStatusResponse: if !responseMessage.IsActive { log.Warnf("Failed to start proxy for backend #%d and route #%d", proxy.BackendID, proxy.ID) From f8d32fb1c6fd319446886a61bc9983ce8216c53a Mon Sep 17 00:00:00 2001 From: imterah Date: Wed, 8 Jan 2025 09:12:48 -0500 Subject: [PATCH 3/3] fix: Fixes more instability issues. :) --- .devcontainer/devcontainer.json | 18 ++-- backend/api/backendruntime/runtime.go | 30 ++++-- .../api/controllers/v1/proxies/connections.go | 2 + backend/api/controllers/v1/proxies/create.go | 2 + backend/api/controllers/v1/proxies/lookup.go | 2 + backend/api/main.go | 99 ++++++++++++++++++- backend/commonbackend/marshal.go | 2 +- routes/Hermes API/Backend/Lookup.bru | 4 +- 8 files changed, 132 insertions(+), 27 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 9abd7c6..5e6cec4 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -13,16 +13,14 @@ }, // run arguments passed to docker - "runArgs": [ - "--security-opt", "label=disable" - ], + "runArgs": ["--security-opt", "label=disable"], "containerEnv": { - // extensions to preload before other extensions + // extensions to preload before other extensions "PRELOAD_EXTENSIONS": "arrterian.nix-env-selector" }, - // disable command overriding and updating remote user ID + // disable command overriding and updating remote user ID "overrideCommand": false, "userEnvProbe": "loginShell", "updateRemoteUserUID": false, @@ -31,18 +29,14 @@ "onCreateCommand": "nix-shell --command 'echo done building nix dev environment'", // Use 'forwardPorts' to make a list of ports inside the container available locally. - "forwardPorts": [ - 3000 - ], + "forwardPorts": [8000], "customizations": { "vscode": { - "extensions": [ - "arrterian.nix-env-selector" - ] + "extensions": ["arrterian.nix-env-selector"] } } // Use 'postCreateCommand' to run commands after the container is created. // "postCreateCommand": "go version", -} \ No newline at end of file +} diff --git a/backend/api/backendruntime/runtime.go b/backend/api/backendruntime/runtime.go index e2359c2..1e79406 100644 --- a/backend/api/backendruntime/runtime.go +++ b/backend/api/backendruntime/runtime.go @@ -92,17 +92,18 @@ func (runtime *Runtime) goRoutineHandler() error { } else { log.Debug("We have restarted. Running the restart callback...") runtime.OnCrashCallback(sock) - log.Debug("Finished running the restart callback. Clearing caches...") - runtime.cleanUpPendingCommandProcessingJobs() - runtime.messageBufferLock = sync.Mutex{} } + + log.Debug("Clearing caches...") + runtime.cleanUpPendingCommandProcessingJobs() + runtime.messageBufferLock = sync.Mutex{} } else { log.Debug("We have not restarted.") } } go func() { - log.Debugf("Setting up Hermes keepalive Goroutine") + log.Debug("Setting up Hermes keepalive Goroutine") hasFailedBackendRunningCheckAlready := false for { @@ -155,7 +156,7 @@ func (runtime *Runtime) goRoutineHandler() error { } } - time.Sleep(5) + time.Sleep(5 * time.Second) } }() @@ -166,7 +167,6 @@ func (runtime *Runtime) goRoutineHandler() error { continue } - // We don't use the Mutex here as the Mutex is only to prevent cross-talk between multiple ProcessCommand() calls. switch command := messageData.Message.(type) { case *commonbackend.AddProxy: err := handleCommand("addProxy", command, sock, messageData.Channel) @@ -269,7 +269,7 @@ func (runtime *Runtime) goRoutineHandler() error { } } default: - log.Warnf("Recieved unknown command type from channel: %q", command) + log.Warnf("Recieved unknown command type from channel: %T", command) messageData.Channel <- fmt.Errorf("unknown command recieved") } @@ -392,7 +392,7 @@ SchedulingLoop: } if schedulingAttempts > 50 { - return nil, fmt.Errorf("failed to schedule synchronous message transmission after 50 tries with 100ms intervals (REPORT THIS ISSUE)") + return nil, fmt.Errorf("failed to schedule message transmission after 50 tries (REPORT THIS ISSUE)") } runtime.messageBufferLock.Lock() @@ -439,7 +439,19 @@ func (runtime *Runtime) cleanUpPendingCommandProcessingJobs() { continue } - close(message.Channel) + timeoutChannel := time.After(100 * time.Millisecond) + + select { + case <-timeoutChannel: + log.Fatal("Message channel is likely running (timed out reading from it without an error)") + close(message.Channel) + case _, ok := <-message.Channel: + if ok { + log.Fatal("Message channel is running, but should be stopped (since message is NOT nil!)") + close(message.Channel) + } + } + runtime.messageBuffer[messageIndex] = nil } } diff --git a/backend/api/controllers/v1/proxies/connections.go b/backend/api/controllers/v1/proxies/connections.go index baceb4e..f46c284 100644 --- a/backend/api/controllers/v1/proxies/connections.go +++ b/backend/api/controllers/v1/proxies/connections.go @@ -128,6 +128,8 @@ func GetConnections(c *gin.Context) { c.JSON(http.StatusInternalServerError, gin.H{ "error": "Failed to get status response from backend", }) + + return } switch responseMessage := backendResponse.(type) { diff --git a/backend/api/controllers/v1/proxies/create.go b/backend/api/controllers/v1/proxies/create.go index b8f2256..20ad144 100644 --- a/backend/api/controllers/v1/proxies/create.go +++ b/backend/api/controllers/v1/proxies/create.go @@ -123,6 +123,8 @@ func CreateProxy(c *gin.Context) { c.JSON(http.StatusInternalServerError, gin.H{ "error": "Failed to add forward rule to database", }) + + return } if autoStart { diff --git a/backend/api/controllers/v1/proxies/lookup.go b/backend/api/controllers/v1/proxies/lookup.go index 5f40698..3b0d4c3 100644 --- a/backend/api/controllers/v1/proxies/lookup.go +++ b/backend/api/controllers/v1/proxies/lookup.go @@ -95,6 +95,8 @@ func LookupProxy(c *gin.Context) { c.JSON(http.StatusBadRequest, gin.H{ "error": "Protocol specified in body must either be 'tcp' or 'udp'", }) + + return } } diff --git a/backend/api/main.go b/backend/api/main.go index 81fc2ba..2969e9b 100644 --- a/backend/api/main.go +++ b/backend/api/main.go @@ -4,6 +4,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "net" "os" "path" "path/filepath" @@ -98,6 +99,96 @@ func apiEntrypoint(cCtx *cli.Context) error { } backendInstance := backendruntime.NewBackend(backendRuntimeFilePath) + + backendInstance.OnCrashCallback = func(conn net.Conn) { + backendParameters, err := base64.StdEncoding.DecodeString(backend.BackendParameters) + + if err != nil { + log.Errorf("Failed to decode backend parameters for backend #%d: %s", backend.ID, err.Error()) + return + } + + marshalledStartCommand, err := commonbackend.Marshal("start", &commonbackend.Start{ + Type: "start", + Arguments: backendParameters, + }) + + if err != nil { + log.Errorf("Failed to marshal start command for backend #%d: %s", backend.ID, err.Error()) + return + } + + if _, err := conn.Write(marshalledStartCommand); err != nil { + log.Errorf("Failed to send start command for backend #%d: %s", backend.ID, err.Error()) + return + } + + _, backendResponse, err := commonbackend.Unmarshal(conn) + + if err != nil { + log.Errorf("Failed to get start command response for backend #%d: %s", backend.ID, err.Error()) + return + } + + switch responseMessage := backendResponse.(type) { + case *commonbackend.BackendStatusResponse: + if !responseMessage.IsRunning { + log.Errorf("Failed to start backend #%d: %s", backend.ID, responseMessage.Message) + return + } + + log.Infof("Backend #%d has been reinitialized successfully", backend.ID) + } + + log.Warnf("Backend #%d has reinitialized! Starting up auto-starting proxies...", backend.ID) + + autoStartProxies := []dbcore.Proxy{} + + if err := dbcore.DB.Where("backend_id = ? AND auto_start = true", backend.ID).Find(&autoStartProxies).Error; err != nil { + log.Errorf("Failed to query proxies to autostart: %s", err.Error()) + return + } + + for _, proxy := range autoStartProxies { + log.Infof("Starting up route #%d for backend #%d: %s", proxy.ID, backend.ID, proxy.Name) + + marhalledCommand, err := commonbackend.Marshal("addProxy", &commonbackend.AddProxy{ + Type: "addProxy", + SourceIP: proxy.SourceIP, + SourcePort: proxy.SourcePort, + DestPort: proxy.DestinationPort, + Protocol: proxy.Protocol, + }) + + if err != nil { + log.Errorf("Failed to marshal proxy adding request for backend #%d and route #%d: %s", proxy.BackendID, proxy.ID, err.Error()) + continue + } + + if _, err := conn.Write(marhalledCommand); err != nil { + log.Errorf("Failed to send proxy adding request for backend #%d and route #%d: %s", proxy.BackendID, proxy.ID, err.Error()) + continue + } + + _, backendResponse, err := commonbackend.Unmarshal(conn) + + if err != nil { + log.Errorf("Failed to get response for backend #%d and route #%d: %s", proxy.BackendID, proxy.ID, err.Error()) + continue + } + + switch responseMessage := backendResponse.(type) { + case *commonbackend.ProxyStatusResponse: + if !responseMessage.IsActive { + log.Warnf("Failed to start proxy for backend #%d and route #%d", proxy.BackendID, proxy.ID) + } + default: + log.Errorf("Got illegal response type for backend #%d and proxy #%d: %T", proxy.BackendID, proxy.ID, responseMessage) + continue + } + } + } + err = backendInstance.Start() if err != nil { @@ -117,9 +208,8 @@ func apiEntrypoint(cCtx *cli.Context) error { Arguments: backendParameters, }) - switch responseMessage := backendStartResponse.(type) { - case error: - log.Warnf("Failed to get response for backend #%d: %s", backend.ID, responseMessage.Error()) + if err != nil { + log.Warnf("Failed to get response for backend #%d: %s", backend.ID, err.Error()) err = backendInstance.Stop() @@ -128,6 +218,9 @@ func apiEntrypoint(cCtx *cli.Context) error { } continue + } + + switch responseMessage := backendStartResponse.(type) { case *commonbackend.BackendStatusResponse: if !responseMessage.IsRunning { err = backendInstance.Stop() diff --git a/backend/commonbackend/marshal.go b/backend/commonbackend/marshal.go index 31895de..6baf02e 100644 --- a/backend/commonbackend/marshal.go +++ b/backend/commonbackend/marshal.go @@ -346,7 +346,7 @@ func Marshal(commandType string, command interface{}) ([]byte, error) { return nil, fmt.Errorf("failed to typecast") } - statusRequestBytes := make([]byte, 2) + statusRequestBytes := make([]byte, 1) statusRequestBytes[0] = BackendStatusRequestID return statusRequestBytes, nil diff --git a/routes/Hermes API/Backend/Lookup.bru b/routes/Hermes API/Backend/Lookup.bru index 0da7fa7..8c50b52 100644 --- a/routes/Hermes API/Backend/Lookup.bru +++ b/routes/Hermes API/Backend/Lookup.bru @@ -12,8 +12,8 @@ post { body:json { { - "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOlsiMSJdLCJleHAiOjE3MzUwNzY0MTEsIm5iZiI6MTczNDk5MDAxMSwiaWF0IjoxNzM0OTkwMDExfQ.N9TLraX4peHt7FKv8tPcHuEzL0K7T2IBEw3piQS_4OY", + "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOlsiMSJdLCJleHAiOjE3MzYyMzI2NjEsIm5iZiI6MTczNjE0NjI2MSwiaWF0IjoxNzM2MTQ2MjYxfQ.juoZ74xs-FBnbbT9Zlei1LmcNx7kTEfzymHlVbeMmtQ", "name": "SSH", - "id": 2 + "id": 1 } }