Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/ledgerbackend: Fix flaky Captive Core tests #3213

Merged
merged 8 commits into from
Nov 16, 2020
47 changes: 35 additions & 12 deletions ingest/ledgerbackend/buffered_meta_pipe_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,17 @@ type bufferedLedgerMetaReader struct {
r *bufio.Reader
c chan metaResult
runner stellarCoreRunnerInterface
closed chan struct{}
}

// newBufferedLedgerMetaReader creates a new meta reader that will shutdown
// when stellar-core terminates.
func newBufferedLedgerMetaReader(runner stellarCoreRunnerInterface) bufferedLedgerMetaReader {
return bufferedLedgerMetaReader{
func newBufferedLedgerMetaReader(runner stellarCoreRunnerInterface) *bufferedLedgerMetaReader {
return &bufferedLedgerMetaReader{
c: make(chan metaResult, ledgerReadAheadBufferSize),
r: bufio.NewReaderSize(runner.getMetaPipe(), metaPipeBufferSize),
runner: runner,
closed: make(chan struct{}),
}
}

Expand All @@ -87,7 +89,7 @@ func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMet
if err != nil {
select {
case <-b.runner.getProcessExitChan():
return nil, errors.New("stellar-core process not-running")
return nil, wrapStellarCoreRunnerError(b.runner)
default:
return nil, errors.Wrap(err, "error reading frame length")
}
Expand All @@ -97,7 +99,7 @@ func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMet
// Wait for LedgerCloseMeta buffer to be cleared to minimize memory usage.
select {
case <-b.runner.getProcessExitChan():
return nil, errors.New("stellar-core process not-running")
return nil, wrapStellarCoreRunnerError(b.runner)
case <-time.After(time.Second):
}
}
Expand All @@ -112,24 +114,46 @@ func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMet

select {
case <-b.runner.getProcessExitChan():
return nil, errors.New("stellar-core process not-running")
return nil, wrapStellarCoreRunnerError(b.runner)
default:
return nil, err
}
}
return &xlcm, nil
}

func (b *bufferedLedgerMetaReader) GetChannel() <-chan metaResult {
func (b *bufferedLedgerMetaReader) getChannel() <-chan metaResult {
return b.c
}

func (b *bufferedLedgerMetaReader) Start(untilSequence uint32) {
func (b *bufferedLedgerMetaReader) waitForClose() {
// If buffer is full, keep reading to make sure it receives
// a shutdown signal from stellarCoreRunner.
loop:
for {
select {
case <-b.c:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this function be simplified to just <-b.closed ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately no and the reason is in the comment above:

	// If buffer is full, keep reading to make sure it receives
	// a shutdown signal from stellarCoreRunner.

To make it more clear start go routine closes b.close only on error or after receiving process exit signal from Stellar-Core. However, the channel is buffered (up to 20 messages now) so in case it's full we need to read one message to unblock it so start can exit.

case <-b.closed:
break loop
}
}
}

// Start starts an internal go routine that reads binary ledger data into
// internal buffers. The go routine returns when Stellar-Core process exits
// however it won't happen instantly when data is read. A blocking method:
// waitForClose() can be used to block until go routine returns.
func (b *bufferedLedgerMetaReader) start(untilSequence uint32) {
printBufferOccupation := time.NewTicker(5 * time.Second)
defer printBufferOccupation.Stop()
defer func() {
printBufferOccupation.Stop()
close(b.closed)
}()

for {
select {
case <-b.runner.getProcessExitChan():
b.c <- metaResult{nil, wrapStellarCoreRunnerError(b.runner)}
return
case <-printBufferOccupation.C:
log.Debug("captive core read-ahead buffer occupation:", len(b.c))
Expand All @@ -140,15 +164,14 @@ func (b *bufferedLedgerMetaReader) Start(untilSequence uint32) {
if err != nil {
// When `GetLedger` sees the error it will close the backend. We shouldn't
// close it now because there may be some ledgers in a buffer.
select {
case b.c <- metaResult{nil, err}:
case <-b.runner.getProcessExitChan():
}
b.c <- metaResult{nil, err}
return
}

select {
case b.c <- metaResult{meta, nil}:
case <-b.runner.getProcessExitChan():
b.c <- metaResult{nil, wrapStellarCoreRunnerError(b.runner)}
return
}

Expand Down
79 changes: 43 additions & 36 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,28 @@ type CaptiveStellarCore struct {
historyURLs []string
archive historyarchive.ArchiveInterface

ledgerBuffer bufferedLedgerMetaReader
// Quick note on how shutdown works:
// If Stellar-Core exits, the exit signal is "catched" by bufferedLedgerMetaReader
// (which ends it's go routine) and later propagated via metaResult to
// CaptiveStellarCore.
// If user calls CaptiveStellarCore.Close(), it kills Stellar-Core process
// and the rest is handles by the process explained above.
ledgerBuffer *bufferedLedgerMetaReader
stellarCoreRunner stellarCoreRunnerInterface

// For testing
stellarCoreRunnerFactory func(configPath string) (stellarCoreRunnerInterface, error)

stellarCoreRunner stellarCoreRunnerInterface
cachedMeta *xdr.LedgerCloseMeta

// Defines if the blocking mode (off by default) is on or off. In blocking mode,
// calling GetLedger blocks until the requested ledger is available. This is useful
// for scenarios when Horizon consumes ledgers faster than Stellar-Core produces them
// and using `time.Sleep` when ledger is not available can actually slow entire
// ingestion process.
blocking bool

// cachedMeta keeps that ledger data of the last fetched ledger. Updated in GetLedger().
cachedMeta *xdr.LedgerCloseMeta

nextLedger uint32 // next ledger expected, error w/ restart if not seen
lastLedger *uint32 // end of current segment if offline, nil if online

Expand Down Expand Up @@ -182,7 +189,7 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error

// read-ahead buffer
c.ledgerBuffer = newBufferedLedgerMetaReader(c.stellarCoreRunner)
go c.ledgerBuffer.Start(to)
go c.ledgerBuffer.start(to)
return nil
}

Expand Down Expand Up @@ -241,7 +248,7 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error {

// read-ahead buffer
c.ledgerBuffer = newBufferedLedgerMetaReader(c.stellarCoreRunner)
go c.ledgerBuffer.Start(0)
go c.ledgerBuffer.start(0)

// if nextLedger is behind - fast-forward until expected ledger
if c.nextLedger < from {
Expand Down Expand Up @@ -332,25 +339,14 @@ func (c *CaptiveStellarCore) PrepareRange(ledgerRange Range) error {
return errors.Wrap(err, "opening subprocess")
}

metaPipe := c.stellarCoreRunner.getMetaPipe()
if metaPipe == nil {
return errors.New("missing metadata pipe")
}

for {
select {
case <-c.stellarCoreRunner.getProcessExitChan():
processErr := c.stellarCoreRunner.getProcessExitError()
if processErr != nil {
err = errors.Wrap(processErr, "stellar-core process exited with an error")
} else {
err = errors.New("stellar-core process exited unexpectedly without an error")
}
return err
return wrapStellarCoreRunnerError(c.stellarCoreRunner)
default:
}
// Wait for the first ledger or an error
if len(c.ledgerBuffer.GetChannel()) > 0 {
if len(c.ledgerBuffer.getChannel()) > 0 {
break
}
time.Sleep(c.waitIntervalPrepareRange)
Expand Down Expand Up @@ -416,26 +412,25 @@ func (c *CaptiveStellarCore) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMe
)
}

if c.lastLedger != nil && sequence > *c.lastLedger {
return false, xdr.LedgerCloseMeta{}, errors.Errorf(
"reading past bounded range (requested sequence=%d, last ledger in range=%d)",
sequence,
*c.lastLedger,
)
}

// Now loop along the range until we find the ledger we want.
var errOut error
loop:
for {
if !c.blocking && len(c.ledgerBuffer.GetChannel()) == 0 {
if !c.blocking && len(c.ledgerBuffer.getChannel()) == 0 {
return false, xdr.LedgerCloseMeta{}, nil
}

var result metaResult
select {
case <-c.stellarCoreRunner.getProcessExitChan():
processErr := c.stellarCoreRunner.getProcessExitError()
if processErr != nil {
errOut = errors.Wrap(processErr, "stellar-core process exited with an error")
} else {
errOut = errors.New("stellar-core process exited unexpectedly without an error")
}
break loop
case result = <-c.ledgerBuffer.GetChannel():
}
// We don't have to handle getProcessExitChan() because this is handled
// in bufferedLedgerMetaReader (will send an error to the channel).
result := <-c.ledgerBuffer.getChannel()
if result.err != nil {
errOut = result.err
break loop
Expand Down Expand Up @@ -481,7 +476,7 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence() (uint32, error) {
}

if c.lastLedger == nil {
return c.nextLedger - 1 + uint32(len(c.ledgerBuffer.GetChannel())), nil
return c.nextLedger - 1 + uint32(len(c.ledgerBuffer.getChannel())), nil
}
return *c.lastLedger, nil
}
Expand All @@ -493,9 +488,6 @@ func (c *CaptiveStellarCore) isClosed() bool {
// Close closes existing Stellar-Core process, streaming sessions and removes all
// temporary files.
func (c *CaptiveStellarCore) Close() error {
c.nextLedger = 0
c.lastLedger = nil

if c.stellarCoreRunner != nil {
// Closing stellarCoreRunner will automatically close bufferedLedgerMetaReader
// because it's listening for getProcessExitChan().
Expand All @@ -504,7 +496,22 @@ func (c *CaptiveStellarCore) Close() error {
if err != nil {
return errors.Wrap(err, "error closing stellar-core subprocess")
}

// Wait for bufferedLedgerMetaReader go routine to return.
c.ledgerBuffer.waitForClose()
c.ledgerBuffer = nil
}

c.nextLedger = 0
c.lastLedger = nil

return nil
}

func wrapStellarCoreRunnerError(r stellarCoreRunnerInterface) error {
processErr := r.getProcessExitError()
if processErr != nil {
return errors.Wrap(processErr, "stellar-core process exited with an error")
}
return errors.New("stellar-core process exited unexpectedly without an error")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the case of an offline replay, after streaming all the ledgers in the catchup range, we expect stellar-core to exit without error, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I fixed it in 873797e and I was able to simplify GetLedger: it no longer needs to handle getProcessExitChan() because if Stellar-Core exits, bufferedLedgerMetaReader will send an error to the channel.

}
Loading