fix: Fixes more instability issues. :)
This commit is contained in:
parent
1e1a330a4b
commit
f8d32fb1c6
8 changed files with 132 additions and 27 deletions
|
@ -92,17 +92,18 @@ func (runtime *Runtime) goRoutineHandler() error {
|
|||
} else {
|
||||
log.Debug("We have restarted. Running the restart callback...")
|
||||
runtime.OnCrashCallback(sock)
|
||||
log.Debug("Finished running the restart callback. Clearing caches...")
|
||||
runtime.cleanUpPendingCommandProcessingJobs()
|
||||
runtime.messageBufferLock = sync.Mutex{}
|
||||
}
|
||||
|
||||
log.Debug("Clearing caches...")
|
||||
runtime.cleanUpPendingCommandProcessingJobs()
|
||||
runtime.messageBufferLock = sync.Mutex{}
|
||||
} else {
|
||||
log.Debug("We have not restarted.")
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
log.Debugf("Setting up Hermes keepalive Goroutine")
|
||||
log.Debug("Setting up Hermes keepalive Goroutine")
|
||||
hasFailedBackendRunningCheckAlready := false
|
||||
|
||||
for {
|
||||
|
@ -155,7 +156,7 @@ func (runtime *Runtime) goRoutineHandler() error {
|
|||
}
|
||||
}
|
||||
|
||||
time.Sleep(5)
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -166,7 +167,6 @@ func (runtime *Runtime) goRoutineHandler() error {
|
|||
continue
|
||||
}
|
||||
|
||||
// We don't use the Mutex here as the Mutex is only to prevent cross-talk between multiple ProcessCommand() calls.
|
||||
switch command := messageData.Message.(type) {
|
||||
case *commonbackend.AddProxy:
|
||||
err := handleCommand("addProxy", command, sock, messageData.Channel)
|
||||
|
@ -269,7 +269,7 @@ func (runtime *Runtime) goRoutineHandler() error {
|
|||
}
|
||||
}
|
||||
default:
|
||||
log.Warnf("Recieved unknown command type from channel: %q", command)
|
||||
log.Warnf("Recieved unknown command type from channel: %T", command)
|
||||
messageData.Channel <- fmt.Errorf("unknown command recieved")
|
||||
}
|
||||
|
||||
|
@ -392,7 +392,7 @@ SchedulingLoop:
|
|||
}
|
||||
|
||||
if schedulingAttempts > 50 {
|
||||
return nil, fmt.Errorf("failed to schedule synchronous message transmission after 50 tries with 100ms intervals (REPORT THIS ISSUE)")
|
||||
return nil, fmt.Errorf("failed to schedule message transmission after 50 tries (REPORT THIS ISSUE)")
|
||||
}
|
||||
|
||||
runtime.messageBufferLock.Lock()
|
||||
|
@ -439,7 +439,19 @@ func (runtime *Runtime) cleanUpPendingCommandProcessingJobs() {
|
|||
continue
|
||||
}
|
||||
|
||||
close(message.Channel)
|
||||
timeoutChannel := time.After(100 * time.Millisecond)
|
||||
|
||||
select {
|
||||
case <-timeoutChannel:
|
||||
log.Fatal("Message channel is likely running (timed out reading from it without an error)")
|
||||
close(message.Channel)
|
||||
case _, ok := <-message.Channel:
|
||||
if ok {
|
||||
log.Fatal("Message channel is running, but should be stopped (since message is NOT nil!)")
|
||||
close(message.Channel)
|
||||
}
|
||||
}
|
||||
|
||||
runtime.messageBuffer[messageIndex] = nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -128,6 +128,8 @@ func GetConnections(c *gin.Context) {
|
|||
c.JSON(http.StatusInternalServerError, gin.H{
|
||||
"error": "Failed to get status response from backend",
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
switch responseMessage := backendResponse.(type) {
|
||||
|
|
|
@ -123,6 +123,8 @@ func CreateProxy(c *gin.Context) {
|
|||
c.JSON(http.StatusInternalServerError, gin.H{
|
||||
"error": "Failed to add forward rule to database",
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if autoStart {
|
||||
|
|
|
@ -95,6 +95,8 @@ func LookupProxy(c *gin.Context) {
|
|||
c.JSON(http.StatusBadRequest, gin.H{
|
||||
"error": "Protocol specified in body must either be 'tcp' or 'udp'",
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
|
@ -98,6 +99,96 @@ func apiEntrypoint(cCtx *cli.Context) error {
|
|||
}
|
||||
|
||||
backendInstance := backendruntime.NewBackend(backendRuntimeFilePath)
|
||||
|
||||
backendInstance.OnCrashCallback = func(conn net.Conn) {
|
||||
backendParameters, err := base64.StdEncoding.DecodeString(backend.BackendParameters)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("Failed to decode backend parameters for backend #%d: %s", backend.ID, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
marshalledStartCommand, err := commonbackend.Marshal("start", &commonbackend.Start{
|
||||
Type: "start",
|
||||
Arguments: backendParameters,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("Failed to marshal start command for backend #%d: %s", backend.ID, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := conn.Write(marshalledStartCommand); err != nil {
|
||||
log.Errorf("Failed to send start command for backend #%d: %s", backend.ID, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
_, backendResponse, err := commonbackend.Unmarshal(conn)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("Failed to get start command response for backend #%d: %s", backend.ID, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
switch responseMessage := backendResponse.(type) {
|
||||
case *commonbackend.BackendStatusResponse:
|
||||
if !responseMessage.IsRunning {
|
||||
log.Errorf("Failed to start backend #%d: %s", backend.ID, responseMessage.Message)
|
||||
return
|
||||
}
|
||||
|
||||
log.Infof("Backend #%d has been reinitialized successfully", backend.ID)
|
||||
}
|
||||
|
||||
log.Warnf("Backend #%d has reinitialized! Starting up auto-starting proxies...", backend.ID)
|
||||
|
||||
autoStartProxies := []dbcore.Proxy{}
|
||||
|
||||
if err := dbcore.DB.Where("backend_id = ? AND auto_start = true", backend.ID).Find(&autoStartProxies).Error; err != nil {
|
||||
log.Errorf("Failed to query proxies to autostart: %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
for _, proxy := range autoStartProxies {
|
||||
log.Infof("Starting up route #%d for backend #%d: %s", proxy.ID, backend.ID, proxy.Name)
|
||||
|
||||
marhalledCommand, err := commonbackend.Marshal("addProxy", &commonbackend.AddProxy{
|
||||
Type: "addProxy",
|
||||
SourceIP: proxy.SourceIP,
|
||||
SourcePort: proxy.SourcePort,
|
||||
DestPort: proxy.DestinationPort,
|
||||
Protocol: proxy.Protocol,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("Failed to marshal proxy adding request for backend #%d and route #%d: %s", proxy.BackendID, proxy.ID, err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err := conn.Write(marhalledCommand); err != nil {
|
||||
log.Errorf("Failed to send proxy adding request for backend #%d and route #%d: %s", proxy.BackendID, proxy.ID, err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
_, backendResponse, err := commonbackend.Unmarshal(conn)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("Failed to get response for backend #%d and route #%d: %s", proxy.BackendID, proxy.ID, err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
switch responseMessage := backendResponse.(type) {
|
||||
case *commonbackend.ProxyStatusResponse:
|
||||
if !responseMessage.IsActive {
|
||||
log.Warnf("Failed to start proxy for backend #%d and route #%d", proxy.BackendID, proxy.ID)
|
||||
}
|
||||
default:
|
||||
log.Errorf("Got illegal response type for backend #%d and proxy #%d: %T", proxy.BackendID, proxy.ID, responseMessage)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
err = backendInstance.Start()
|
||||
|
||||
if err != nil {
|
||||
|
@ -117,9 +208,8 @@ func apiEntrypoint(cCtx *cli.Context) error {
|
|||
Arguments: backendParameters,
|
||||
})
|
||||
|
||||
switch responseMessage := backendStartResponse.(type) {
|
||||
case error:
|
||||
log.Warnf("Failed to get response for backend #%d: %s", backend.ID, responseMessage.Error())
|
||||
if err != nil {
|
||||
log.Warnf("Failed to get response for backend #%d: %s", backend.ID, err.Error())
|
||||
|
||||
err = backendInstance.Stop()
|
||||
|
||||
|
@ -128,6 +218,9 @@ func apiEntrypoint(cCtx *cli.Context) error {
|
|||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
switch responseMessage := backendStartResponse.(type) {
|
||||
case *commonbackend.BackendStatusResponse:
|
||||
if !responseMessage.IsRunning {
|
||||
err = backendInstance.Stop()
|
||||
|
|
|
@ -346,7 +346,7 @@ func Marshal(commandType string, command interface{}) ([]byte, error) {
|
|||
return nil, fmt.Errorf("failed to typecast")
|
||||
}
|
||||
|
||||
statusRequestBytes := make([]byte, 2)
|
||||
statusRequestBytes := make([]byte, 1)
|
||||
statusRequestBytes[0] = BackendStatusRequestID
|
||||
|
||||
return statusRequestBytes, nil
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue