From fa49d4abac753f3ca823fe51a5e9cd44c1738350 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Tue, 1 Dec 2020 17:01:40 +0100 Subject: [PATCH 1/2] ingest/ledgerbackend: Remove returning error on Stellar-Core process exit in catchup --- .../ledgerbackend/buffered_meta_pipe_reader.go | 18 +++++++----------- ingest/ledgerbackend/captive_core_backend.go | 7 ++++++- .../ledgerbackend/captive_core_backend_test.go | 3 +-- ingest/ledgerbackend/stellar_core_runner.go | 2 +- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/ingest/ledgerbackend/buffered_meta_pipe_reader.go b/ingest/ledgerbackend/buffered_meta_pipe_reader.go index dff51eeae8..10b8b3e67c 100644 --- a/ingest/ledgerbackend/buffered_meta_pipe_reader.go +++ b/ingest/ledgerbackend/buffered_meta_pipe_reader.go @@ -84,7 +84,7 @@ func newBufferedLedgerMetaReader(runner stellarCoreRunnerInterface) *bufferedLed // * Meta pipe buffer is full so it will wait until it refills. // * The next ledger available in the buffer exceeds the meta pipe buffer size. // In such case the method will block until LedgerCloseMeta buffer is empty. -func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMeta, error) { +func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe(untilSequence uint32) (*xdr.LedgerCloseMeta, error) { frameLength, err := xdr.ReadFrameLength(b.r) if err != nil { select { @@ -99,6 +99,10 @@ func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMet // Wait for LedgerCloseMeta buffer to be cleared to minimize memory usage. select { case <-b.runner.getProcessExitChan(): + if untilSequence != 0 { + time.Sleep(time.Second) + continue + } return nil, wrapStellarCoreRunnerError(b.runner) case <-time.After(time.Second): } @@ -152,15 +156,12 @@ func (b *bufferedLedgerMetaReader) start(untilSequence uint32) { for { select { - case <-b.runner.getProcessExitChan(): - b.c <- metaResult{nil, wrapStellarCoreRunnerError(b.runner)} - return case <-printBufferOccupation.C: log.Debug("captive core read-ahead buffer occupation:", len(b.c)) default: } - meta, err := b.readLedgerMetaFromPipe() + meta, err := b.readLedgerMetaFromPipe(untilSequence) if err != nil { // When `GetLedger` sees the error it will close the backend. We shouldn't // close it now because there may be some ledgers in a buffer. @@ -168,12 +169,7 @@ func (b *bufferedLedgerMetaReader) start(untilSequence uint32) { return } - select { - case b.c <- metaResult{meta, nil}: - case <-b.runner.getProcessExitChan(): - b.c <- metaResult{nil, wrapStellarCoreRunnerError(b.runner)} - return - } + b.c <- metaResult{meta, nil} if untilSequence != 0 { if meta.LedgerSequence() >= untilSequence { diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index 917aa487a7..567de34112 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -388,7 +388,12 @@ func (c *CaptiveStellarCore) PrepareRange(ledgerRange Range) error { for { select { case <-c.stellarCoreRunner.getProcessExitChan(): - return wrapStellarCoreRunnerError(c.stellarCoreRunner) + // Return only in case of Stellar-Core process error. Normal exit + // is expected in catchup when all ledgers sent to a buffer. + processErr := c.stellarCoreRunner.getProcessExitError() + if processErr != nil { + return errors.Wrap(processErr, "stellar-core process exited with an error") + } default: } // Wait for the first ledger or an error diff --git a/ingest/ledgerbackend/captive_core_backend_test.go b/ingest/ledgerbackend/captive_core_backend_test.go index ff4729d8d6..2d364fbedd 100644 --- a/ingest/ledgerbackend/captive_core_backend_test.go +++ b/ingest/ledgerbackend/captive_core_backend_test.go @@ -264,8 +264,7 @@ func TestCaptivePrepareRangeTerminated(t *testing.T) { } err := captiveBackend.PrepareRange(BoundedRange(100, 200)) - assert.Error(t, err) - assert.EqualError(t, err, "stellar-core process exited unexpectedly without an error") + assert.NoError(t, err) } func TestCaptivePrepareRange_ErrClosingSession(t *testing.T) { diff --git a/ingest/ledgerbackend/stellar_core_runner.go b/ingest/ledgerbackend/stellar_core_runner.go index 689230979f..de99c5e571 100644 --- a/ingest/ledgerbackend/stellar_core_runner.go +++ b/ingest/ledgerbackend/stellar_core_runner.go @@ -140,7 +140,7 @@ func (r *stellarCoreRunner) getLogLineWriter() io.Writer { // If there's a logger, we attempt to extract metadata about the log // entry, then redirect it to the logger. Otherwise, we just use stdout. if r.Log == nil { - fmt.Print(line) + fmt.Println(line) continue } From aecfd759c151e707e00c995ff86b3919db6357d1 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Tue, 1 Dec 2020 21:09:16 +0100 Subject: [PATCH 2/2] Fix bug --- ingest/ledgerbackend/buffered_meta_pipe_reader.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/ingest/ledgerbackend/buffered_meta_pipe_reader.go b/ingest/ledgerbackend/buffered_meta_pipe_reader.go index 10b8b3e67c..1964767d70 100644 --- a/ingest/ledgerbackend/buffered_meta_pipe_reader.go +++ b/ingest/ledgerbackend/buffered_meta_pipe_reader.go @@ -100,6 +100,13 @@ func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe(untilSequence uint32) select { case <-b.runner.getProcessExitChan(): if untilSequence != 0 { + // If untilSequence != 0 it's possible that Stellar-Core process + // exits but there are still ledgers in a buffer (catchup). In such + // case we ignore cases when Stellar-Core exited with no errors. + processErr := b.runner.getProcessExitError() + if processErr != nil { + return nil, errors.Wrap(processErr, "stellar-core process exited with an error") + } time.Sleep(time.Second) continue }