Skip to content

Commit

Permalink
Add timeout for start of indexers and make hammer time configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
zeripath committed Jul 23, 2019
1 parent a92957a commit e93af21
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 26 deletions.
9 changes: 9 additions & 0 deletions custom/conf/app.ini.sample
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ LFS_CONTENT_PATH = data/lfs
LFS_JWT_SECRET =
; LFS authentication validity period (in time.Duration), pushes taking longer than this may fail.
LFS_HTTP_AUTH_EXPIRY = 20m
; Allow graceful restarts using SIGHUP to fork
ALLOW_GRACEFUL_RESTARTS = true
; After a restart the parent will finish ongoing requests before
; shutting down. Force shutdown if this process takes longer than this delay.
; set to a negative value to disable
GRACEFUL_HAMMER_TIME = 60s

; Define allowed algorithms and their minimum key length (use -1 to disable a type)
[ssh.minimum_key_sizes]
Expand Down Expand Up @@ -290,6 +296,9 @@ ISSUE_INDEXER_QUEUE_DIR = indexers/issues.queue
ISSUE_INDEXER_QUEUE_CONN_STR = "addrs=127.0.0.1:6379 db=0"
; Batch queue number, default is 20
ISSUE_INDEXER_QUEUE_BATCH_NUMBER = 20
; Timeout the indexer if it takes longer than this to start.
; Set to a negative value to disable timeout.
STARTUP_TIMEOUT=30s

; repo indexer by default disabled, since it uses a lot of disk space
REPO_INDEXER_ENABLED = false
Expand Down
3 changes: 3 additions & 0 deletions docs/content/doc/advanced/config-cheat-sheet.en-us.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ Values containing `#` or `;` must be quoted using `` ` `` or `"""`.
- `LETSENCRYPT_ACCEPTTOS`: **false**: This is an explicit check that you accept the terms of service for Let's Encrypt.
- `LETSENCRYPT_DIRECTORY`: **https**: Directory that Letsencrypt will use to cache information such as certs and private keys.
- `LETSENCRYPT_EMAIL`: **email@example.com**: Email used by Letsencrypt to notify about problems with issued certificates. (No default)
- `ALLOW_GRACEFUL_RESTARTS`: **true**: Perform a graceful restart on SIGHUP
- `GRACEFUL_HAMMER_TIME`: **60s**: After a restart the parent process will stop accepting new connections and will allow requests to finish before stopping. Shutdown will be forced if it takes longer than this time.

## Database (`database`)

Expand Down Expand Up @@ -179,6 +181,7 @@ Values containing `#` or `;` must be quoted using `` ` `` or `"""`.
- `REPO_INDEXER_PATH`: **indexers/repos.bleve**: Index file used for code search.
- `UPDATE_BUFFER_LEN`: **20**: Buffer length of index request.
- `MAX_FILE_SIZE`: **1048576**: Maximum size in bytes of files to be indexed.
- `STARTUP_TIMEOUT`: **30s**: If the indexer takes longer than this timeout to start - fail. (This timeout will be added to the hammer time above for child processes - as bleve will not start until the previous parent is shutdown.)

## Security (`security`)

Expand Down
18 changes: 18 additions & 0 deletions models/repo_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"fmt"
"strconv"
"strings"
"time"

"code.gitea.io/gitea/modules/base"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/indexer"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
Expand Down Expand Up @@ -69,11 +71,27 @@ func InitRepoIndexer() {
if !setting.Indexer.RepoIndexerEnabled {
return
}
waitChannel := make(chan struct{})
repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength)
go func() {
indexer.InitRepoIndexer(populateRepoIndexerAsynchronously)
go processRepoIndexerOperationQueue()
close(waitChannel)
}()
if setting.Indexer.StartupTimeout > 0 {
go func() {
timeout := setting.Indexer.StartupTimeout
if graceful.IsChild && setting.GracefulHammerTime > 0 {
timeout += setting.GracefulHammerTime
}
select {
case <-waitChannel:
case <-time.After(timeout):
log.Fatal("Timedout starting Repo Indexer")
}
}()

}
}

// populateRepoIndexerAsynchronously asynchronously populates the repo indexer
Expand Down
20 changes: 9 additions & 11 deletions modules/graceful/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,15 @@ var (
DefaultWriteTimeOut time.Duration
// DefaultMaxHeaderBytes default max header bytes
DefaultMaxHeaderBytes int
// DefaultHammerTime default hammer time
DefaultHammerTime time.Duration

// We have been forked iff LISTEN_FDS is set and our parents PID is not 1
isChild = len(os.Getenv(listenFDs)) > 0 && os.Getppid() > 1
// IsChild reports if we are a fork iff LISTEN_FDS is set and our parent PID is not 1
IsChild = len(os.Getenv(listenFDs)) > 0 && os.Getppid() > 1
)

func init() {
runningServerReg = sync.RWMutex{}

DefaultMaxHeaderBytes = 0 // use http.DefaultMaxHeaderBytes - which currently is 1 << 20 (1MB)

// after a restart the parent will finish ongoing requests before
// shutting down. set to a negative value to disable
DefaultHammerTime = 60 * time.Second
}

// ServeFunction represents a listen.Accept loop
Expand All @@ -76,7 +70,11 @@ func NewServer(network, address string) *Server {
runningServerReg.Lock()
defer runningServerReg.Unlock()

log.Info("Beginning new server: %s:%s on PID: %d (%t)", network, address, os.Getpid(), isChild)
if IsChild {
log.Info("Restarting new server: %s:%s on PID: %d", network, address, os.Getpid())
} else {
log.Info("Starting new server: %s:%s on PID: %d", network, address, os.Getpid())
}
srv := &Server{
wg: sync.WaitGroup{},
sigChan: make(chan os.Signal),
Expand Down Expand Up @@ -108,7 +106,7 @@ func (srv *Server) ListenAndServe(serve ServeFunction) error {

srv.listener = newWrappedListener(l, srv)

if isChild {
if IsChild {
_ = syscall.Kill(syscall.Getppid(), syscall.SIGTERM)
}

Expand Down Expand Up @@ -154,7 +152,7 @@ func (srv *Server) ListenAndServeTLSConfig(tlsConfig *tls.Config, serve ServeFun
wl := newWrappedListener(l, srv)
srv.listener = tls.NewListener(wl, tlsConfig)

if isChild {
if IsChild {
_ = syscall.Kill(syscall.Getppid(), syscall.SIGTERM)
}
srv.BeforeBegin(srv.network, srv.address)
Expand Down
11 changes: 6 additions & 5 deletions modules/graceful/server_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,20 @@ import (
"time"

"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
)

// shutdown closes the listener so that no new connections are accepted. it also
// starts a goroutine that will hammer (stop all running requests) the server
// after DefaultHammerTime.
// shutdown closes the listener so that no new connections are accepted
// and starts a goroutine that will hammer (stop all running requests) the server
// after setting.GracefulHammerTime.
func (srv *Server) shutdown() {
if srv.getState() != stateRunning {
return
}

srv.setState(stateShuttingDown)
if DefaultHammerTime >= 0 {
go srv.hammerTime(DefaultHammerTime)
if setting.GracefulHammerTime >= 0 {
go srv.hammerTime(setting.GracefulHammerTime)
}

if srv.OnShutdown != nil {
Expand Down
15 changes: 11 additions & 4 deletions modules/graceful/server_signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
)

var hookableSignals []os.Signal
Expand Down Expand Up @@ -42,10 +43,16 @@ func (srv *Server) handleSignals() {
srv.preSignalHooks(sig)
switch sig {
case syscall.SIGHUP:
log.Info("PID: %d. Received SIGHUP. Forking...", pid)
err := srv.fork()
if err != nil {
log.Error("Error whilst forking from PID: %d : %v", pid, err)
if setting.GracefulRestartable {
log.Info("PID: %d. Received SIGHUP. Forking...", pid)
err := srv.fork()
if err != nil {
log.Error("Error whilst forking from PID: %d : %v", pid, err)
}
} else {
log.Info("PID: %d. Received SIGHUP. Not set restartable. Shutting down...", pid)

srv.shutdown()
}
case syscall.SIGUSR1:
log.Info("PID %d. Received SIGUSR1.", pid)
Expand Down
24 changes: 18 additions & 6 deletions modules/indexer/issues/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
package issues

import (
"sync"
"time"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util"
Expand Down Expand Up @@ -50,13 +51,12 @@ var (
// issueIndexerQueue queue of issue ids to be updated
issueIndexerQueue Queue
issueIndexer Indexer
wg sync.WaitGroup
waitChannel = make(chan struct{})
)

// InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until
// all issue index done.
func InitIssueIndexer(syncReindex bool) {
wg.Add(1)
go func() {
var populate bool
var dummyQueue bool
Expand Down Expand Up @@ -133,10 +133,22 @@ func InitIssueIndexer(syncReindex bool) {
go populateIssueIndexer()
}
}
wg.Done()
close(waitChannel)
}()
if syncReindex {
wg.Wait()
<-waitChannel
} else if setting.Indexer.StartupTimeout > 0 {
go func() {
timeout := setting.Indexer.StartupTimeout
if graceful.IsChild && setting.GracefulHammerTime > 0 {
timeout += setting.GracefulHammerTime
}
select {
case <-waitChannel:
case <-time.After(timeout):
log.Fatal("Timedout starting Issue Indexer")
}
}()
}
}

Expand Down Expand Up @@ -217,7 +229,7 @@ func DeleteRepoIssueIndexer(repo *models.Repository) {

// SearchIssuesByKeyword search issue ids by keywords and repo id
func SearchIssuesByKeyword(repoID int64, keyword string) ([]int64, error) {
wg.Wait()
<-waitChannel
var issueIDs []int64
res, err := issueIndexer.Search(keyword, repoID, 1000, 0)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions modules/setting/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package setting
import (
"path"
"path/filepath"
"time"
)

// enumerates all the indexer queue types
Expand All @@ -29,6 +30,7 @@ var (
IssueQueueDir string
IssueQueueConnStr string
IssueQueueBatchNumber int
StartupTimeout time.Duration
}{
IssueType: "bleve",
IssuePath: "indexers/issues.bleve",
Expand Down Expand Up @@ -57,4 +59,5 @@ func newIndexerService() {
Indexer.IssueQueueDir = sec.Key("ISSUE_INDEXER_QUEUE_DIR").MustString(path.Join(AppDataPath, "indexers/issues.queue"))
Indexer.IssueQueueConnStr = sec.Key("ISSUE_INDEXER_QUEUE_CONN_STR").MustString(path.Join(AppDataPath, ""))
Indexer.IssueQueueBatchNumber = sec.Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(20)
Indexer.StartupTimeout = sec.Key("STARTUP_TIMEOUT").MustDuration(30 * time.Second)
}
4 changes: 4 additions & 0 deletions modules/setting/setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ var (
LetsEncryptTOS bool
LetsEncryptDirectory string
LetsEncryptEmail string
GracefulRestartable bool
GracefulHammerTime time.Duration

SSH = struct {
Disabled bool `ini:"DISABLE_SSH"`
Expand Down Expand Up @@ -568,6 +570,8 @@ func NewContext() {
Domain = sec.Key("DOMAIN").MustString("localhost")
HTTPAddr = sec.Key("HTTP_ADDR").MustString("0.0.0.0")
HTTPPort = sec.Key("HTTP_PORT").MustString("3000")
GracefulRestartable = sec.Key("ALLOW_GRACEFUL_RESTARTS").MustBool(true)
GracefulHammerTime = sec.Key("GRACEFUL_HAMMER_TIME").MustDuration(60 * time.Second)

defaultAppURL := string(Protocol) + "://" + Domain
if (Protocol == HTTP && HTTPPort != "80") || (Protocol == HTTPS && HTTPPort != "443") {
Expand Down

0 comments on commit e93af21

Please sign in to comment.