-
Notifications
You must be signed in to change notification settings - Fork 513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ingest/ledgerbackend: Fix flaky Captive Core tests #3213
Changes from all commits
c828f31
4074452
873797e
e3a4aa8
7bc7ee4
c3ca633
777694d
8ce4eb5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,21 +68,28 @@ type CaptiveStellarCore struct { | |
historyURLs []string | ||
archive historyarchive.ArchiveInterface | ||
|
||
ledgerBuffer bufferedLedgerMetaReader | ||
// Quick note on how shutdown works: | ||
// If Stellar-Core exits, the exit signal is "catched" by bufferedLedgerMetaReader | ||
// (which ends it's go routine) and later propagated via metaResult to | ||
// CaptiveStellarCore. | ||
// If user calls CaptiveStellarCore.Close(), it kills Stellar-Core process | ||
// and the rest is handles by the process explained above. | ||
ledgerBuffer *bufferedLedgerMetaReader | ||
stellarCoreRunner stellarCoreRunnerInterface | ||
|
||
// For testing | ||
stellarCoreRunnerFactory func(configPath string) (stellarCoreRunnerInterface, error) | ||
|
||
stellarCoreRunner stellarCoreRunnerInterface | ||
cachedMeta *xdr.LedgerCloseMeta | ||
|
||
// Defines if the blocking mode (off by default) is on or off. In blocking mode, | ||
// calling GetLedger blocks until the requested ledger is available. This is useful | ||
// for scenarios when Horizon consumes ledgers faster than Stellar-Core produces them | ||
// and using `time.Sleep` when ledger is not available can actually slow entire | ||
// ingestion process. | ||
blocking bool | ||
|
||
// cachedMeta keeps that ledger data of the last fetched ledger. Updated in GetLedger(). | ||
cachedMeta *xdr.LedgerCloseMeta | ||
|
||
nextLedger uint32 // next ledger expected, error w/ restart if not seen | ||
lastLedger *uint32 // end of current segment if offline, nil if online | ||
|
||
|
@@ -182,7 +189,7 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error | |
|
||
// read-ahead buffer | ||
c.ledgerBuffer = newBufferedLedgerMetaReader(c.stellarCoreRunner) | ||
go c.ledgerBuffer.Start(to) | ||
go c.ledgerBuffer.start(to) | ||
return nil | ||
} | ||
|
||
|
@@ -241,7 +248,7 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error { | |
|
||
// read-ahead buffer | ||
c.ledgerBuffer = newBufferedLedgerMetaReader(c.stellarCoreRunner) | ||
go c.ledgerBuffer.Start(0) | ||
go c.ledgerBuffer.start(0) | ||
|
||
// if nextLedger is behind - fast-forward until expected ledger | ||
if c.nextLedger < from { | ||
|
@@ -332,25 +339,14 @@ func (c *CaptiveStellarCore) PrepareRange(ledgerRange Range) error { | |
return errors.Wrap(err, "opening subprocess") | ||
} | ||
|
||
metaPipe := c.stellarCoreRunner.getMetaPipe() | ||
if metaPipe == nil { | ||
return errors.New("missing metadata pipe") | ||
} | ||
|
||
for { | ||
select { | ||
case <-c.stellarCoreRunner.getProcessExitChan(): | ||
processErr := c.stellarCoreRunner.getProcessExitError() | ||
if processErr != nil { | ||
err = errors.Wrap(processErr, "stellar-core process exited with an error") | ||
} else { | ||
err = errors.New("stellar-core process exited unexpectedly without an error") | ||
} | ||
return err | ||
return wrapStellarCoreRunnerError(c.stellarCoreRunner) | ||
default: | ||
} | ||
// Wait for the first ledger or an error | ||
if len(c.ledgerBuffer.GetChannel()) > 0 { | ||
if len(c.ledgerBuffer.getChannel()) > 0 { | ||
break | ||
} | ||
time.Sleep(c.waitIntervalPrepareRange) | ||
|
@@ -416,26 +412,25 @@ func (c *CaptiveStellarCore) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMe | |
) | ||
} | ||
|
||
if c.lastLedger != nil && sequence > *c.lastLedger { | ||
return false, xdr.LedgerCloseMeta{}, errors.Errorf( | ||
"reading past bounded range (requested sequence=%d, last ledger in range=%d)", | ||
sequence, | ||
*c.lastLedger, | ||
) | ||
} | ||
|
||
// Now loop along the range until we find the ledger we want. | ||
var errOut error | ||
loop: | ||
for { | ||
if !c.blocking && len(c.ledgerBuffer.GetChannel()) == 0 { | ||
if !c.blocking && len(c.ledgerBuffer.getChannel()) == 0 { | ||
return false, xdr.LedgerCloseMeta{}, nil | ||
} | ||
|
||
var result metaResult | ||
select { | ||
case <-c.stellarCoreRunner.getProcessExitChan(): | ||
processErr := c.stellarCoreRunner.getProcessExitError() | ||
if processErr != nil { | ||
errOut = errors.Wrap(processErr, "stellar-core process exited with an error") | ||
} else { | ||
errOut = errors.New("stellar-core process exited unexpectedly without an error") | ||
} | ||
break loop | ||
case result = <-c.ledgerBuffer.GetChannel(): | ||
} | ||
// We don't have to handle getProcessExitChan() because this is handled | ||
// in bufferedLedgerMetaReader (will send an error to the channel). | ||
result := <-c.ledgerBuffer.getChannel() | ||
if result.err != nil { | ||
errOut = result.err | ||
break loop | ||
|
@@ -481,7 +476,7 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence() (uint32, error) { | |
} | ||
|
||
if c.lastLedger == nil { | ||
return c.nextLedger - 1 + uint32(len(c.ledgerBuffer.GetChannel())), nil | ||
return c.nextLedger - 1 + uint32(len(c.ledgerBuffer.getChannel())), nil | ||
} | ||
return *c.lastLedger, nil | ||
} | ||
|
@@ -493,9 +488,6 @@ func (c *CaptiveStellarCore) isClosed() bool { | |
// Close closes existing Stellar-Core process, streaming sessions and removes all | ||
// temporary files. | ||
func (c *CaptiveStellarCore) Close() error { | ||
c.nextLedger = 0 | ||
c.lastLedger = nil | ||
|
||
if c.stellarCoreRunner != nil { | ||
// Closing stellarCoreRunner will automatically close bufferedLedgerMetaReader | ||
// because it's listening for getProcessExitChan(). | ||
|
@@ -504,7 +496,22 @@ func (c *CaptiveStellarCore) Close() error { | |
if err != nil { | ||
return errors.Wrap(err, "error closing stellar-core subprocess") | ||
} | ||
|
||
// Wait for bufferedLedgerMetaReader go routine to return. | ||
c.ledgerBuffer.waitForClose() | ||
c.ledgerBuffer = nil | ||
} | ||
|
||
c.nextLedger = 0 | ||
c.lastLedger = nil | ||
|
||
return nil | ||
} | ||
|
||
func wrapStellarCoreRunnerError(r stellarCoreRunnerInterface) error { | ||
processErr := r.getProcessExitError() | ||
if processErr != nil { | ||
return errors.Wrap(processErr, "stellar-core process exited with an error") | ||
} | ||
return errors.New("stellar-core process exited unexpectedly without an error") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in the case of an offline replay, after streaming all the ledgers in the catchup range, we expect stellar-core to exit without error, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch! I fixed it in 873797e and I was able to simplify |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this function be simplified to just
<-b.closed
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately no and the reason is in the comment above:
To make it more clear
start
go routine closesb.close
only on error or after receiving process exit signal from Stellar-Core. However, the channel is buffered (up to 20 messages now) so in case it's full we need to read one message to unblock it sostart
can exit.