Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/ledgerbackend: Handle user initiated shutdown during catchup #3258

Closed
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -89,5 +89,6 @@ require (
gopkg.in/gavv/httpexpect.v1 v1.0.0-20170111145843-40724cf1e4a0
gopkg.in/gorp.v1 v1.7.1 // indirect
gopkg.in/square/go-jose.v2 v2.4.1
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7
gopkg.in/tylerb/graceful.v1 v1.2.13
)
59 changes: 36 additions & 23 deletions ingest/ledgerbackend/buffered_meta_pipe_reader.go
Original file line number Diff line number Diff line change
@@ -2,9 +2,12 @@ package ledgerbackend

import (
"bufio"
"context"
"io"
"time"

"gopkg.in/tomb.v1"

"github.com/pkg/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
@@ -65,7 +68,7 @@ type bufferedLedgerMetaReader struct {
r *bufio.Reader
c chan metaResult
runner stellarCoreRunnerInterface
closed chan struct{}
tomb *tomb.Tomb
}

// newBufferedLedgerMetaReader creates a new meta reader that will shutdown
@@ -75,7 +78,7 @@ func newBufferedLedgerMetaReader(runner stellarCoreRunnerInterface) *bufferedLed
c: make(chan metaResult, ledgerReadAheadBufferSize),
r: bufio.NewReaderSize(runner.getMetaPipe(), metaPipeBufferSize),
runner: runner,
closed: make(chan struct{}),
tomb: new(tomb.Tomb),
}
}

@@ -88,7 +91,7 @@ func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe(untilSequence uint32)
frameLength, err := xdr.ReadFrameLength(b.r)
if err != nil {
select {
case <-b.runner.getProcessExitChan():
case <-b.runner.getTomb().Dead():
return nil, wrapStellarCoreRunnerError(b.runner)
default:
return nil, errors.Wrap(err, "error reading frame length")
@@ -98,12 +101,12 @@ func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe(untilSequence uint32)
for frameLength > metaPipeBufferSize && len(b.c) > 0 {
// Wait for LedgerCloseMeta buffer to be cleared to minimize memory usage.
select {
case <-b.runner.getProcessExitChan():
case <-b.runner.getTomb().Dead():
if untilSequence != 0 {
// If untilSequence != 0 it's possible that Stellar-Core process
// exits but there are still ledgers in a buffer (catchup). In such
// case we ignore cases when Stellar-Core exited with no errors.
processErr := b.runner.getProcessExitError()
processErr := b.runner.getTomb().Err()
if processErr != nil {
return nil, errors.Wrap(processErr, "stellar-core process exited with an error")
}
@@ -124,7 +127,7 @@ func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe(untilSequence uint32)
err = errors.Wrap(err, "unmarshalling framed LedgerCloseMeta")

select {
case <-b.runner.getProcessExitChan():
case <-b.runner.getTomb().Dead():
return nil, wrapStellarCoreRunnerError(b.runner)
default:
return nil, err
@@ -137,17 +140,12 @@ func (b *bufferedLedgerMetaReader) getChannel() <-chan metaResult {
return b.c
}

func (b *bufferedLedgerMetaReader) waitForClose() {
// If buffer is full, keep reading to make sure it receives
// a shutdown signal from stellarCoreRunner.
loop:
for {
select {
case <-b.c:
case <-b.closed:
break loop
}
}
// close makes sure that go routine returns. In general, it will return
// when Stellar-Core exits but there's a special case when it's blocked
// on writing to a full channel and calling Close() solve this.
func (b *bufferedLedgerMetaReader) close() {
b.tomb.Kill(nil)
b.tomb.Wait()
}

// Start starts an internal go routine that reads binary ledger data into
@@ -158,7 +156,7 @@ func (b *bufferedLedgerMetaReader) start(untilSequence uint32) {
printBufferOccupation := time.NewTicker(5 * time.Second)
defer func() {
printBufferOccupation.Stop()
close(b.closed)
b.tomb.Done()
}()

for {
@@ -169,14 +167,17 @@ func (b *bufferedLedgerMetaReader) start(untilSequence uint32) {
}

meta, err := b.readLedgerMetaFromPipe(untilSequence)
if err != nil {
// When `GetLedger` sees the error it will close the backend. We shouldn't
// close it now because there may be some ledgers in a buffer.
b.c <- metaResult{nil, err}

select {
case b.c <- metaResult{meta, err}:
case <-b.tomb.Dying():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Finally, there are several events we can listen to. In this case Dying is when Kill was called but it's possible that tomb is not Dead yet. We can use this signal to start shutting down go routine.

// Go routine can be flagged as dying by calling b.tomb.Kill (in close()).
return
}

b.c <- metaResult{meta, nil}
if err != nil {
return
}

if untilSequence != 0 {
if meta.LedgerSequence() >= untilSequence {
@@ -186,3 +187,15 @@ func (b *bufferedLedgerMetaReader) start(untilSequence uint32) {
}
}
}

func wrapStellarCoreRunnerError(r stellarCoreRunnerInterface) error {
processErr := r.getTomb().Err()
switch processErr {
case nil:
return errors.New("stellar-core process exited unexpectedly without an error")
case context.Canceled:
return processErr
default:
return errors.Wrap(processErr, "stellar-core process exited with an error")
}
}
143 changes: 92 additions & 51 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package ledgerbackend

import (
"context"
"encoding/hex"
"sync"
"time"

"github.com/pkg/errors"
@@ -75,9 +77,16 @@ type CaptiveStellarCore struct {
ledgerBuffer *bufferedLedgerMetaReader
stellarCoreRunner stellarCoreRunnerInterface

// waitIntervalPrepareRange defines a time to wait between checking if the buffer
// is empty. Default 1s, lower in tests to make them faster.
waitIntervalPrepareRange time.Duration

// For testing
stellarCoreRunnerFactory func(mode stellarCoreRunnerMode) (stellarCoreRunnerInterface, error)

// mutex protects all values below, ex. Close() can be called concurrently.
mutex sync.Mutex

// Defines if the blocking mode (off by default) is on or off. In blocking mode,
// calling GetLedger blocks until the requested ledger is available. This is useful
// for scenarios when Horizon consumes ledgers faster than Stellar-Core produces them
@@ -91,10 +100,6 @@ type CaptiveStellarCore struct {
nextLedger uint32 // next ledger expected, error w/ restart if not seen
lastLedger *uint32 // end of current segment if offline, nil if online
previousLedgerHash *string

// waitIntervalPrepareRange defines a time to wait between checking if the buffer
// is empty. Default 1s, lower in tests to make them faster.
waitIntervalPrepareRange time.Duration
}

// CaptiveCoreConfig contains all the parameters required to create a CaptiveStellarCore instance
@@ -134,7 +139,7 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) {
c := &CaptiveStellarCore{
archive: archive,
configAppendPath: config.ConfigAppendPath,
waitIntervalPrepareRange: time.Second,
waitIntervalPrepareRange: 100 * time.Millisecond,
ledgerHashStore: config.LedgerHashStore,
}
c.stellarCoreRunnerFactory = func(mode stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
@@ -153,7 +158,7 @@ func (c *CaptiveStellarCore) getLatestCheckpointSequence() (uint32, error) {
}

func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error {
err := c.Close()
err := c.close()
if err != nil {
return errors.Wrap(err, "error closing existing session")
}
@@ -202,7 +207,7 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error {
return nil
}

err := c.Close()
err := c.close()
if err != nil {
return errors.Wrap(err, "error closing existing session")
}
@@ -258,18 +263,6 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error {
// read-ahead buffer
c.ledgerBuffer = newBufferedLedgerMetaReader(c.stellarCoreRunner)
go c.ledgerBuffer.start(0)

// if nextLedger is behind - fast-forward until expected ledger
if c.nextLedger < from {
// make GetFrom blocking temporarily
c.blocking = true
_, _, err := c.GetLedger(from)
c.blocking = false
if err != nil {
return errors.Wrapf(err, "Error fast-forwarding to %d", from)
}
}

return nil
}

@@ -340,11 +333,19 @@ func (c *CaptiveStellarCore) runFromParams(from uint32) (runFrom uint32, ledgerH
// it normally (including connecting to the Stellar network).
// Please note that using a BoundedRange, currently, requires a full-trust on
// history archive. This issue is being fixed in Stellar-Core.
//
// Returns `context.Cancelled` if `Close` was called from another Go routine.
func (c *CaptiveStellarCore) PrepareRange(ledgerRange Range) error {
// c.mutex is unlocked on error and before waitloop to allow calling Close()
// to intrerupt the process.
c.mutex.Lock()

// Range already prepared
if prepared, err := c.IsPrepared(ledgerRange); err != nil {
if prepared, err := c.isPrepared(ledgerRange); err != nil {
c.mutex.Unlock()
Copy link
Contributor

@Shaptic Shaptic Dec 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be safer / cleaner to do something like

defer func() {
  if !unlocked { 
    c.mutex.Unlock()
    unlocked = true
  }
}

instead (and set unlocked=true below before the waitloop)? Or, alternatively, moving the waitloop to its own function would let us just defer c.mutex.Unlock() directly here.

It just seems like it could be really easy for someone (like me lol) to edit this code in the future, then forget to unlock the mutex and be in a dangerous state.

return errors.Wrap(err, "error in IsPrepared")
} else if prepared {
c.mutex.Unlock()
return nil
}

@@ -355,24 +356,51 @@ func (c *CaptiveStellarCore) PrepareRange(ledgerRange Range) error {
err = c.openOnlineReplaySubprocess(ledgerRange.from)
}
if err != nil {
c.mutex.Unlock()
return errors.Wrap(err, "opening subprocess")
}

c.mutex.Unlock()

waitloop:
for {
select {
case <-c.stellarCoreRunner.getProcessExitChan():
// Return only in case of Stellar-Core process error. Normal exit
// is expected in catchup when all ledgers sent to a buffer.
processErr := c.stellarCoreRunner.getProcessExitError()
if processErr != nil {
case <-c.stellarCoreRunner.getTomb().Dead():
// If core is dead we need to handle the Err() value:
// - nil and unbounded range: return error because we don't expect
// Stellar-Core to exit while streaming unbounded range,
// - nil and bounded range: this is expeced in bounded range because
// Stellar-Core can exit after streaming all ledgers, break and
// wait for buffer to fill,
// - context.Cancelled: user initiated exit, return the same error,
// - not nil: exit with error, wrap and return.
processErr := c.stellarCoreRunner.getTomb().Err()
switch {
case processErr == nil && !ledgerRange.bounded:
return errors.New("stellar-core process exited unexpectedly without an error")
case processErr == nil && ledgerRange.bounded:
return nil
case processErr == context.Canceled:
return processErr
default:
return errors.Wrap(processErr, "stellar-core process exited with an error")
}
Comment on lines +377 to 387
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Now, this is where we actually try to handler the Stellar-Core process exit. I think that all the cases are explained pretty well in a comment above. One thing I wanted to note is what I already described in the previous comment. Please note that the first case handles the case in which the error is nil but we still return an error because it was unexpected.

default:
}
// Wait for the first ledger or an error
if len(c.ledgerBuffer.getChannel()) > 0 {
break
// Wait/fast-forward to the expected ledger or an error. We need to check
// buffer length because `GetLedger` may be blocking.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make sense to add a context to GetLedger() which we can cancel on shutdown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really wanted to avoid using context.WithCancel here because it doesn't provide two things:

  1. wait until cancel request has been handled so the caller can be sure that it's safe to exit (like: core process exit, files removed, or go routine in a buffer returns),
  2. easy way to pass a final error value if any.

tomb provides mechanism for both and because of this: simplifies entire process. I'm open for discussion, can we talk in a new issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I will open a new issue.

for len(c.ledgerBuffer.getChannel()) > 0 {
_, _, err := c.GetLedger(c.nextLedger)
if err != nil {
return errors.Wrapf(err, "Error fast-forwarding to %d", ledgerRange.from)
}

// If nextLedger > ledgerRange.from then ledgerRange.from is cached.
if c.nextLedger > ledgerRange.from {
break waitloop
}
}

time.Sleep(c.waitIntervalPrepareRange)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need an explicit sleep here?

I would (at the very least) use a ticker and intertwine the check with the other channels in the select statement.

Even better, It may even be possible to remove the wait and condition by using an asynchronous go-routine for fast-forwarding.

It may seem outside of the scope of this PR, but it affects the shutdown wait time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can try it out for sure! Can you create a new issue? I'm not sure if we'll be able to test it before beta.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

}

@@ -381,6 +409,12 @@ func (c *CaptiveStellarCore) PrepareRange(ledgerRange Range) error {

// IsPrepared returns true if a given ledgerRange is prepared.
func (c *CaptiveStellarCore) IsPrepared(ledgerRange Range) (bool, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.isPrepared(ledgerRange)
}

func (c *CaptiveStellarCore) isPrepared(ledgerRange Range) (bool, error) {
lastLedger := uint32(0)
if c.lastLedger != nil {
lastLedger = *c.lastLedger
@@ -391,10 +425,10 @@ func (c *CaptiveStellarCore) IsPrepared(ledgerRange Range) (bool, error) {
cachedLedger = c.cachedMeta.LedgerSequence()
}

return c.isPrepared(c.nextLedger, lastLedger, cachedLedger, ledgerRange), nil
return c.isPreparedParams(c.nextLedger, lastLedger, cachedLedger, ledgerRange), nil
}

func (*CaptiveStellarCore) isPrepared(nextLedger, lastLedger, cachedLedger uint32, ledgerRange Range) bool {
func (*CaptiveStellarCore) isPreparedParams(nextLedger, lastLedger, cachedLedger uint32, ledgerRange Range) bool {
if nextLedger == 0 {
return false
}
@@ -431,7 +465,15 @@ func (*CaptiveStellarCore) isPrepared(nextLedger, lastLedger, cachedLedger uint3
// * UnboundedRange makes GetLedger non-blocking. The method will return with
// the first argument equal false.
// This is done to provide maximum performance when streaming old ledgers.
//
// Returns `context.Cancelled` if `Close` was called from another Go routine.
func (c *CaptiveStellarCore) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMeta, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.getLedger(sequence)
}

func (c *CaptiveStellarCore) getLedger(sequence uint32) (bool, xdr.LedgerCloseMeta, error) {
if c.cachedMeta != nil && sequence == c.cachedMeta.LedgerSequence() {
// GetLedger can be called multiple times using the same sequence, ex. to create
// change and transaction readers. If we have this ledger buffered, let's return it.
@@ -466,7 +508,7 @@ loop:
return false, xdr.LedgerCloseMeta{}, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Close() is called concurrently with this function is there a possible race condition where the ledger buffer is set to nil right before it is accessed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I focused on shutdown code and forgot about the obvious things. Fixed in 648ebf5.

}

// We don't have to handle getProcessExitChan() because this is handled
// We don't have to handle Stellar-Core exit because this is handled
// in bufferedLedgerMetaReader (will send an error to the channel).
result := <-c.ledgerBuffer.getChannel()
if result.err != nil {
@@ -507,7 +549,7 @@ loop:

// If we got the _last_ ledger in a segment, close before returning.
if c.lastLedger != nil && *c.lastLedger == seq {
if err := c.Close(); err != nil {
if err := c.close(); err != nil {
return false, xdr.LedgerCloseMeta{}, errors.Wrap(err, "error closing session")
}
}
@@ -517,7 +559,7 @@ loop:
// All paths above that break out of the loop (instead of return)
// set e to non-nil: there was an error and we should close and
// reset state before retuning an error to our caller.
c.Close()
c.close()
return false, xdr.LedgerCloseMeta{}, errOut
}

@@ -529,6 +571,9 @@ loop:
// the latest sequence closed by the network. It's always the last value available
// in the backend.
func (c *CaptiveStellarCore) GetLatestLedgerSequence() (uint32, error) {
c.mutex.Lock()
defer c.mutex.Unlock()

if c.isClosed() {
return 0, errors.New("stellar-core must be opened to return latest available sequence")
}
@@ -544,23 +589,27 @@ func (c *CaptiveStellarCore) isClosed() bool {
}

// Close closes existing Stellar-Core process, streaming sessions and removes all
// temporary files.
// temporary files. Please note that this can be executed concurently, however it
// may block if `GetLedger` is running on a unbounded range (because `GetLedger`
// is blocking in this case).
func (c *CaptiveStellarCore) Close() error {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.close()
}

func (c *CaptiveStellarCore) close() error {
if c.stellarCoreRunner != nil {
// Closing stellarCoreRunner will automatically close bufferedLedgerMetaReader
// because it's listening for getProcessExitChan().
// Closing stellarCoreRunner will automatically close bufferedLedgerMetaReader's
// start() go routine because it's checking it's state.
err := c.stellarCoreRunner.close()
c.stellarCoreRunner = nil
if err != nil {
return errors.Wrap(err, "error closing stellar-core subprocess")
}
}

// c.ledgerBuffer might be nil if stellarCoreRunner.runFrom / stellarCoreRunner.catchup responded with an error
if c.ledgerBuffer != nil {
// Wait for bufferedLedgerMetaReader go routine to return.
c.ledgerBuffer.waitForClose()
c.ledgerBuffer = nil
}
if c.ledgerBuffer != nil {
c.ledgerBuffer.close()
}

c.nextLedger = 0
@@ -569,11 +618,3 @@ func (c *CaptiveStellarCore) Close() error {

return nil
}

func wrapStellarCoreRunnerError(r stellarCoreRunnerInterface) error {
processErr := r.getProcessExitError()
if processErr != nil {
return errors.Wrap(processErr, "stellar-core process exited with an error")
}
return errors.New("stellar-core process exited unexpectedly without an error")
}
336 changes: 256 additions & 80 deletions ingest/ledgerbackend/captive_core_backend_test.go

Large diffs are not rendered by default.

77 changes: 41 additions & 36 deletions ingest/ledgerbackend/stellar_core_runner.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package ledgerbackend

import (
"bufio"
"context"
"fmt"
"io"
"io/ioutil"
@@ -11,23 +12,20 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/stellar/go/support/log"
"gopkg.in/tomb.v1"
)

type stellarCoreRunnerInterface interface {
catchup(from, to uint32) error
runFrom(from uint32, hash string) error
getMetaPipe() io.Reader
// getProcessExitChan returns a channel that closes on process exit
getProcessExitChan() <-chan struct{}
// getProcessExitError returns an exit error of the process, can be nil
getProcessExitError() error
getTomb() *tomb.Tomb
close() error
}

@@ -38,6 +36,8 @@ const (
stellarCoreRunnerModeOffline
)

// stellarCoreRunner is a helper for starting stellar-core. Should be used only
// once, for multiple runs create separate instances.
type stellarCoreRunner struct {
executablePath string
configAppendPath string
@@ -46,19 +46,25 @@ type stellarCoreRunner struct {
httpPort uint
mode stellarCoreRunnerMode

started bool
wg sync.WaitGroup
shutdown chan struct{}
// tomb is used to handle cmd go routine termination. Err() value can be one
// of the following:
// - nil: process exit without error, not user initiated (this can be an
// error in layers above if they expect more data but process is done),
// - context.Canceled: process killed after user request,
// - not nil: process exit with an error.
//
// tomb is created when a new cmd go routine starts.
tomb *tomb.Tomb

cmd *exec.Cmd

// processExit channel receives an error when the process exited with an error
// or nil if the process exited without an error.
processExit chan struct{}
processExitError error
metaPipe io.Reader
tempDir string
nonce string
// There's a gotcha! When cmd.Wait() signal was received it doesn't mean that
// all ledgers have been read from meta pipe. Turns out that OS actually
// maintains a buffer. So don't rely on this. Keep reading until EOF is
// returned.
metaPipe io.Reader
tempDir string
nonce string

log *log.Entry
}
@@ -87,9 +93,6 @@ func newStellarCoreRunner(config CaptiveCoreConfig, mode stellarCoreRunnerMode)
historyURLs: config.HistoryArchiveURLs,
httpPort: config.HTTPPort,
mode: mode,
shutdown: make(chan struct{}),
processExit: make(chan struct{}),
processExitError: nil,
tempDir: tempDir,
nonce: fmt.Sprintf("captive-stellar-core-%x", r.Uint64()),
log: coreLogger,
@@ -237,7 +240,7 @@ func (r *stellarCoreRunner) runCmd(params ...string) error {
}

func (r *stellarCoreRunner) catchup(from, to uint32) error {
if r.started {
if r.tomb != nil {
return errors.New("runner already started")
}
if err := r.runCmd("new-db"); err != nil {
@@ -254,17 +257,17 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error {
return errors.Wrap(err, "error creating `stellar-core catchup` subprocess")
}
r.cmd = cmd
r.tomb = new(tomb.Tomb)
r.metaPipe, err = r.start()
if err != nil {
return errors.Wrap(err, "error starting `stellar-core catchup` subprocess")
}
r.started = true

return nil
}

func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
if r.started {
if r.tomb != nil {
return errors.New("runner already started")
}
var err error
@@ -278,11 +281,11 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
if err != nil {
return errors.Wrap(err, "error creating `stellar-core run` subprocess")
}
r.tomb = new(tomb.Tomb)
r.metaPipe, err = r.start()
if err != nil {
return errors.Wrap(err, "error starting `stellar-core run` subprocess")
}
r.started = true

return nil
}
@@ -291,36 +294,38 @@ func (r *stellarCoreRunner) getMetaPipe() io.Reader {
return r.metaPipe
}

func (r *stellarCoreRunner) getProcessExitChan() <-chan struct{} {
return r.processExit
}

func (r *stellarCoreRunner) getProcessExitError() error {
return r.processExitError
func (r *stellarCoreRunner) getTomb() *tomb.Tomb {
return r.tomb
}

func (r *stellarCoreRunner) close() error {
var err1, err2 error

if r.tomb != nil {
// Kill tomb with context.Canceled. Kill will be called again in start()
// when process exit is handled but the error value will not be overwritten.
r.tomb.Kill(context.Canceled)
Comment on lines +305 to +307
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. As mentioned in summary above there are two ways we can kill Stellar-Core process. The first option is here. This is user-initiated shutdown and we can determine it later in caller (CaptiveStellarCore in our case) by checking if the error (tomb.Err()) is equal context.Canceled. The nice property of tomb is that the error is not overwritten. So if Kill is called again later we will still know it's user init'd shutdown.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The answer to this is probably painfully-obvious, but by "user-initiated shutdown," we generally mean something akin to Ctrl+C and/or via a code path (e.g. calling app.Close()) right?

}

if r.processIsAlive() {
err1 = r.cmd.Process.Kill()
r.cmd.Wait()
r.cmd = nil
}
err2 = os.RemoveAll(r.tempDir)
r.tempDir = ""

if r.started {
close(r.shutdown)
r.wg.Wait()
if r.tomb != nil {
r.tomb.Wait()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I just wanted to briefly describe the last property of tomb which is waiting for the go routine to return. Obviously you could achieve the same thing using channels or wait group but this works really well with other tomb API methods.

}
r.started = false
r.tomb = nil
r.cmd = nil

err2 = os.RemoveAll(r.tempDir)
r.tempDir = ""

if err1 != nil {
return errors.Wrap(err1, "error killing subprocess")
}
if err2 != nil {
return errors.Wrap(err2, "error removing subprocess tmpdir")
}

return nil
}
12 changes: 2 additions & 10 deletions ingest/ledgerbackend/stellar_core_runner_posix.go
Original file line number Diff line number Diff line change
@@ -37,17 +37,9 @@ func (c *stellarCoreRunner) start() (io.Reader, error) {
return readFile, errors.Wrap(err, "error starting stellar-core")
}

c.wg.Add(1)
go func() {
err := make(chan error, 1)
select {
case err <- c.cmd.Wait():
c.processExitError = <-err
close(c.processExit)
close(err)
case <-c.shutdown:
}
c.wg.Done()
c.tomb.Kill(c.cmd.Wait())
c.tomb.Done()
}()

return readFile, nil
12 changes: 2 additions & 10 deletions ingest/ledgerbackend/stellar_core_runner_windows.go
Original file line number Diff line number Diff line change
@@ -29,17 +29,9 @@ func (c *stellarCoreRunner) start() (io.Reader, error) {
return io.Reader(nil), err
}

c.wg.Add(1)
go func() {
err := make(chan error, 1)
select {
case err <- c.cmd.Wait():
c.processExitError = <-err
close(c.processExit)
close(err)
case <-c.shutdown:
}
c.wg.Done()
c.tomb.Kill(c.cmd.Wait())
c.tomb.Done()
Comment on lines +33 to +34
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Second option is that Stellar-Core exited itself. We pass the error to caller using Kill again. It's possible that there is no error an Stellar-Core exited gracefully but please note that this exit may be unexpected in some cases so it may still trigger an error. Also, this is one of the features of tomb that's missing in other solutions. It has an easy way to propagate error back to the caller.

}()

// Then accept on the server end.
4 changes: 2 additions & 2 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
@@ -970,7 +970,7 @@ func (s *system) maybePrepareRange(from uint32) (bool, error) {
// Release distributed ingestion lock and prepare the range
s.historyQ.Rollback()
log.Info("Released ingestion lock to prepare range")
log.WithFields(logpkg.F{"ledger": from}).Info("Preparing range")
log.WithFields(logpkg.F{"from": from}).Info("Preparing range")
startTime := time.Now()

err = s.ledgerBackend.PrepareRange(ledgerRange)
@@ -979,7 +979,7 @@ func (s *system) maybePrepareRange(from uint32) (bool, error) {
}

log.WithFields(logpkg.F{
"ledger": from,
"from": from,
"duration": time.Since(startTime).Seconds(),
}).Info("Range prepared")

11 changes: 10 additions & 1 deletion services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
@@ -496,11 +496,20 @@ func (s *system) updateCursor(ledgerSequence uint32) error {

func (s *system) Shutdown() {
log.Info("Shutting down ingestion system...")

s.stateVerificationMutex.Lock()
defer s.stateVerificationMutex.Unlock()
if s.stateVerificationRunning {
log.Info("Shutting down state verifier...")
}
s.stateVerificationMutex.Unlock()

if s.ledgerBackend != nil {
log.Info("Shutting down ledger backend...")
err := s.ledgerBackend.Close()
if err != nil {
log.WithError(err).Error("Error shutting down ledger backend...")
}
}
s.cancel()
}