Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplify exponential backoff and refactor env #1185

Merged
merged 12 commits into from
Nov 23, 2024
51 changes: 51 additions & 0 deletions backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package frankenphp

import (
"sync"
"time"
)

type exponentialBackoff struct {
backoff time.Duration
failureCount int
mu sync.RWMutex
maxBackoff time.Duration
minBackoff time.Duration
maxConsecutiveFailures int
}

// recordSuccess resets the backoff and failureCount
func (e *exponentialBackoff) recordSuccess() {
e.mu.Lock()
e.failureCount = 0
e.backoff = e.minBackoff
e.mu.Unlock()
}

// recordFailure increments the failure count and increases the backoff, it returns true if maxConsecutiveFailures has been reached
func (e *exponentialBackoff) recordFailure() bool {
e.mu.Lock()
e.failureCount += 1
if e.backoff < e.minBackoff {
e.backoff = e.minBackoff
}

e.backoff = min(e.backoff*2, e.maxBackoff)

e.mu.Unlock()
return e.failureCount >= e.maxConsecutiveFailures
}

// wait sleeps for the backoff duration if failureCount is non-zero.
// NOTE: this is not tested and should be kept 'obviously correct' (i.e., simple)
func (e *exponentialBackoff) wait() {
e.mu.RLock()
if e.failureCount == 0 {
withinboredom marked this conversation as resolved.
Show resolved Hide resolved
e.mu.RUnlock()

return
}
e.mu.RUnlock()

time.Sleep(e.backoff)
}
41 changes: 41 additions & 0 deletions backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package frankenphp

import (
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestExponentialBackoff_Reset(t *testing.T) {
e := &exponentialBackoff{
maxBackoff: 5 * time.Second,
minBackoff: 500 * time.Millisecond,
maxConsecutiveFailures: 3,
}

assert.False(t, e.recordFailure())
assert.False(t, e.recordFailure())
e.recordSuccess()

e.mu.RLock()
defer e.mu.RUnlock()
assert.Equal(t, 0, e.failureCount, "expected failureCount to be reset to 0")
assert.Equal(t, e.backoff, e.minBackoff, "expected backoff to be reset to minBackoff")
}

func TestExponentialBackoff_Trigger(t *testing.T) {
e := &exponentialBackoff{
maxBackoff: 500 * 3 * time.Millisecond,
minBackoff: 500 * time.Millisecond,
maxConsecutiveFailures: 3,
}

assert.False(t, e.recordFailure())
assert.False(t, e.recordFailure())
assert.True(t, e.recordFailure())

e.mu.RLock()
defer e.mu.RUnlock()
assert.Equal(t, e.failureCount, e.maxConsecutiveFailures, "expected failureCount to be maxConsecutiveFailures")
assert.Equal(t, e.backoff, e.maxBackoff, "expected backoff to be maxBackoff")
}
74 changes: 74 additions & 0 deletions env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package frankenphp

// #include "frankenphp.h"
import "C"
import (
"os"
"strings"
"unsafe"
)

//export go_putenv
func go_putenv(str *C.char, length C.int) C.bool {
envString := C.GoStringN(str, length)

// Check if '=' is present in the string
if key, val, found := strings.Cut(envString, "="); found {
return os.Setenv(key, val) == nil
}

// No '=', unset the environment variable
return os.Unsetenv(envString) == nil
}

//export go_getfullenv
func go_getfullenv(threadIndex C.uintptr_t) (*C.go_string, C.size_t) {
thread := phpThreads[threadIndex]

env := os.Environ()
goStrings := make([]C.go_string, len(env)*2)

for i, envVar := range env {
key, val, _ := strings.Cut(envVar, "=")
goStrings[i*2] = C.go_string{C.size_t(len(key)), thread.pinString(key)}
goStrings[i*2+1] = C.go_string{C.size_t(len(val)), thread.pinString(val)}
}

value := unsafe.SliceData(goStrings)
thread.Pin(value)

return value, C.size_t(len(env))
}

//export go_getenv
func go_getenv(threadIndex C.uintptr_t, name *C.go_string) (C.bool, *C.go_string) {
thread := phpThreads[threadIndex]

// Create a byte slice from C string with a specified length
envName := C.GoStringN(name.data, C.int(name.len))

// Get the environment variable value
envValue, exists := os.LookupEnv(envName)
if !exists {
// Environment variable does not exist
return false, nil // Return 0 to indicate failure
}

// Convert Go string to C string
value := &C.go_string{C.size_t(len(envValue)), thread.pinString(envValue)}
thread.Pin(value)

return true, value // Return 1 to indicate success
}

//export go_sapi_getenv
func go_sapi_getenv(threadIndex C.uintptr_t, name *C.go_string) *C.char {
envName := C.GoStringN(name.data, C.int(name.len))

envValue, exists := os.LookupEnv(envName)
if !exists {
return nil
}

return phpThreads[threadIndex].pinCString(envValue)
}
82 changes: 0 additions & 82 deletions frankenphp.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,88 +505,6 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error
return nil
}

//export go_putenv
func go_putenv(str *C.char, length C.int) C.bool {
// Create a byte slice from C string with a specified length
s := C.GoBytes(unsafe.Pointer(str), length)

// Convert byte slice to string
envString := string(s)

// Check if '=' is present in the string
if key, val, found := strings.Cut(envString, "="); found {
if os.Setenv(key, val) != nil {
return false // Failure
}
} else {
// No '=', unset the environment variable
if os.Unsetenv(envString) != nil {
return false // Failure
}
}

return true // Success
}

//export go_getfullenv
func go_getfullenv(threadIndex C.uintptr_t) (*C.go_string, C.size_t) {
thread := phpThreads[threadIndex]

env := os.Environ()
goStrings := make([]C.go_string, len(env)*2)

for i, envVar := range env {
key, val, _ := strings.Cut(envVar, "=")
k := unsafe.StringData(key)
v := unsafe.StringData(val)
thread.Pin(k)
thread.Pin(v)

goStrings[i*2] = C.go_string{C.size_t(len(key)), (*C.char)(unsafe.Pointer(k))}
goStrings[i*2+1] = C.go_string{C.size_t(len(val)), (*C.char)(unsafe.Pointer(v))}
}

value := unsafe.SliceData(goStrings)
thread.Pin(value)

return value, C.size_t(len(env))
}

//export go_getenv
func go_getenv(threadIndex C.uintptr_t, name *C.go_string) (C.bool, *C.go_string) {
thread := phpThreads[threadIndex]

// Create a byte slice from C string with a specified length
envName := C.GoStringN(name.data, C.int(name.len))

// Get the environment variable value
envValue, exists := os.LookupEnv(envName)
if !exists {
// Environment variable does not exist
return false, nil // Return 0 to indicate failure
}

// Convert Go string to C string
val := unsafe.StringData(envValue)
thread.Pin(val)
value := &C.go_string{C.size_t(len(envValue)), (*C.char)(unsafe.Pointer(val))}
thread.Pin(value)

return true, value // Return 1 to indicate success
}

//export go_sapi_getenv
func go_sapi_getenv(threadIndex C.uintptr_t, name *C.go_string) *C.char {
envName := C.GoStringN(name.data, C.int(name.len))

envValue, exists := os.LookupEnv(envName)
if !exists {
return nil
}

return phpThreads[threadIndex].pinCString(envValue)
}

//export go_handle_request
func go_handle_request(threadIndex C.uintptr_t) bool {
select {
Expand Down
60 changes: 12 additions & 48 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ type worker struct {
ready chan struct{}
}

const maxWorkerErrorBackoff = 1 * time.Second
const minWorkerErrorBackoff = 100 * time.Millisecond
const maxWorkerConsecutiveFailures = 6

var (
watcherIsEnabled bool
workerShutdownWG sync.WaitGroup
Expand Down Expand Up @@ -97,33 +93,15 @@ func newWorker(o workerOpt) (*worker, error) {
func (worker *worker) startNewWorkerThread() {
workerShutdownWG.Add(1)
defer workerShutdownWG.Done()

backoff := minWorkerErrorBackoff
failureCount := 0
backingOffLock := sync.RWMutex{}
backoff := &exponentialBackoff{
maxBackoff: 1 * time.Second,
minBackoff: 100 * time.Millisecond,
maxConsecutiveFailures: 6,
}

for {
// if the worker can stay up longer than backoff*2, it is probably an application error
upFunc := sync.Once{}
go func() {
backingOffLock.RLock()
wait := backoff * 2
backingOffLock.RUnlock()
time.Sleep(wait)
upFunc.Do(func() {
backingOffLock.Lock()
defer backingOffLock.Unlock()
// if we come back to a stable state, reset the failure count
if backoff == minWorkerErrorBackoff {
failureCount = 0
}

// earn back the backoff over time
if failureCount > 0 {
backoff = max(backoff/2, 100*time.Millisecond)
}
})
}()
backoff.wait()

metrics.StartWorker(worker.fileName)

Expand Down Expand Up @@ -176,31 +154,17 @@ func (worker *worker) startNewWorkerThread() {
c.Write(zap.String("worker", worker.fileName))
}
metrics.StopWorker(worker.fileName, StopReasonRestart)
backoff.recordSuccess()
continue
}

// on exit status 1 we log the error and apply an exponential backoff when restarting
upFunc.Do(func() {
backingOffLock.Lock()
defer backingOffLock.Unlock()
// if we end up here, the worker has not been up for backoff*2
// this is probably due to a syntax error or another fatal error
if failureCount >= maxWorkerConsecutiveFailures {
if !watcherIsEnabled {
panic(fmt.Errorf("workers %q: too many consecutive failures", worker.fileName))
}
logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", failureCount))
if backoff.recordFailure() {
if !watcherIsEnabled {
panic(fmt.Errorf("workers %q: too many consecutive failures", worker.fileName))
}
failureCount += 1
})
backingOffLock.RLock()
wait := backoff
backingOffLock.RUnlock()
time.Sleep(wait)
backingOffLock.Lock()
backoff *= 2
backoff = min(backoff, maxWorkerErrorBackoff)
backingOffLock.Unlock()
logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", backoff.failureCount))
}
metrics.StopWorker(worker.fileName, StopReasonCrash)
}

Expand Down
Loading