-
Notifications
You must be signed in to change notification settings - Fork 513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ingest/ledgerbackend: Handle user initiated shutdown during catchup #3258
Changes from all commits
f876d60
61f93b3
b467745
5baa384
570446a
4ad9ce7
648ebf5
eddef27
acedd53
6acf8f8
f00c19c
0505023
cdd8c09
98732e9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,9 @@ | ||
package ledgerbackend | ||
|
||
import ( | ||
"context" | ||
"encoding/hex" | ||
"sync" | ||
"time" | ||
|
||
"github.com/pkg/errors" | ||
|
@@ -75,9 +77,16 @@ type CaptiveStellarCore struct { | |
ledgerBuffer *bufferedLedgerMetaReader | ||
stellarCoreRunner stellarCoreRunnerInterface | ||
|
||
// waitIntervalPrepareRange defines a time to wait between checking if the buffer | ||
// is empty. Default 1s, lower in tests to make them faster. | ||
waitIntervalPrepareRange time.Duration | ||
|
||
// For testing | ||
stellarCoreRunnerFactory func(mode stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) | ||
|
||
// mutex protects all values below, ex. Close() can be called concurrently. | ||
mutex sync.Mutex | ||
|
||
// Defines if the blocking mode (off by default) is on or off. In blocking mode, | ||
// calling GetLedger blocks until the requested ledger is available. This is useful | ||
// for scenarios when Horizon consumes ledgers faster than Stellar-Core produces them | ||
|
@@ -91,10 +100,6 @@ type CaptiveStellarCore struct { | |
nextLedger uint32 // next ledger expected, error w/ restart if not seen | ||
lastLedger *uint32 // end of current segment if offline, nil if online | ||
previousLedgerHash *string | ||
|
||
// waitIntervalPrepareRange defines a time to wait between checking if the buffer | ||
// is empty. Default 1s, lower in tests to make them faster. | ||
waitIntervalPrepareRange time.Duration | ||
} | ||
|
||
// CaptiveCoreConfig contains all the parameters required to create a CaptiveStellarCore instance | ||
|
@@ -134,7 +139,7 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) { | |
c := &CaptiveStellarCore{ | ||
archive: archive, | ||
configAppendPath: config.ConfigAppendPath, | ||
waitIntervalPrepareRange: time.Second, | ||
waitIntervalPrepareRange: 100 * time.Millisecond, | ||
ledgerHashStore: config.LedgerHashStore, | ||
} | ||
c.stellarCoreRunnerFactory = func(mode stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { | ||
|
@@ -153,7 +158,7 @@ func (c *CaptiveStellarCore) getLatestCheckpointSequence() (uint32, error) { | |
} | ||
|
||
func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error { | ||
err := c.Close() | ||
err := c.close() | ||
if err != nil { | ||
return errors.Wrap(err, "error closing existing session") | ||
} | ||
|
@@ -202,7 +207,7 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error { | |
return nil | ||
} | ||
|
||
err := c.Close() | ||
err := c.close() | ||
if err != nil { | ||
return errors.Wrap(err, "error closing existing session") | ||
} | ||
|
@@ -258,18 +263,6 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error { | |
// read-ahead buffer | ||
c.ledgerBuffer = newBufferedLedgerMetaReader(c.stellarCoreRunner) | ||
go c.ledgerBuffer.start(0) | ||
|
||
// if nextLedger is behind - fast-forward until expected ledger | ||
if c.nextLedger < from { | ||
// make GetFrom blocking temporarily | ||
c.blocking = true | ||
_, _, err := c.GetLedger(from) | ||
c.blocking = false | ||
if err != nil { | ||
return errors.Wrapf(err, "Error fast-forwarding to %d", from) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
|
@@ -340,11 +333,19 @@ func (c *CaptiveStellarCore) runFromParams(from uint32) (runFrom uint32, ledgerH | |
// it normally (including connecting to the Stellar network). | ||
// Please note that using a BoundedRange, currently, requires a full-trust on | ||
// history archive. This issue is being fixed in Stellar-Core. | ||
// | ||
// Returns `context.Cancelled` if `Close` was called from another Go routine. | ||
func (c *CaptiveStellarCore) PrepareRange(ledgerRange Range) error { | ||
// c.mutex is unlocked on error and before waitloop to allow calling Close() | ||
// to intrerupt the process. | ||
c.mutex.Lock() | ||
|
||
// Range already prepared | ||
if prepared, err := c.IsPrepared(ledgerRange); err != nil { | ||
if prepared, err := c.isPrepared(ledgerRange); err != nil { | ||
c.mutex.Unlock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be safer / cleaner to do something like defer func() {
if !unlocked {
c.mutex.Unlock()
unlocked = true
}
} instead (and set It just seems like it could be really easy for someone (like me lol) to edit this code in the future, then forget to unlock the mutex and be in a dangerous state. |
||
return errors.Wrap(err, "error in IsPrepared") | ||
} else if prepared { | ||
c.mutex.Unlock() | ||
return nil | ||
} | ||
|
||
|
@@ -355,24 +356,51 @@ func (c *CaptiveStellarCore) PrepareRange(ledgerRange Range) error { | |
err = c.openOnlineReplaySubprocess(ledgerRange.from) | ||
} | ||
if err != nil { | ||
c.mutex.Unlock() | ||
return errors.Wrap(err, "opening subprocess") | ||
} | ||
|
||
c.mutex.Unlock() | ||
|
||
waitloop: | ||
for { | ||
select { | ||
case <-c.stellarCoreRunner.getProcessExitChan(): | ||
// Return only in case of Stellar-Core process error. Normal exit | ||
// is expected in catchup when all ledgers sent to a buffer. | ||
processErr := c.stellarCoreRunner.getProcessExitError() | ||
if processErr != nil { | ||
case <-c.stellarCoreRunner.getTomb().Dead(): | ||
bartekn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// If core is dead we need to handle the Err() value: | ||
// - nil and unbounded range: return error because we don't expect | ||
// Stellar-Core to exit while streaming unbounded range, | ||
// - nil and bounded range: this is expeced in bounded range because | ||
// Stellar-Core can exit after streaming all ledgers, break and | ||
// wait for buffer to fill, | ||
// - context.Cancelled: user initiated exit, return the same error, | ||
// - not nil: exit with error, wrap and return. | ||
processErr := c.stellarCoreRunner.getTomb().Err() | ||
switch { | ||
case processErr == nil && !ledgerRange.bounded: | ||
return errors.New("stellar-core process exited unexpectedly without an error") | ||
case processErr == nil && ledgerRange.bounded: | ||
return nil | ||
case processErr == context.Canceled: | ||
return processErr | ||
default: | ||
return errors.Wrap(processErr, "stellar-core process exited with an error") | ||
} | ||
Comment on lines
+377
to
387
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
default: | ||
} | ||
// Wait for the first ledger or an error | ||
if len(c.ledgerBuffer.getChannel()) > 0 { | ||
break | ||
// Wait/fast-forward to the expected ledger or an error. We need to check | ||
// buffer length because `GetLedger` may be blocking. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would make sense to add a context to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I really wanted to avoid using
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. I will open a new issue. |
||
for len(c.ledgerBuffer.getChannel()) > 0 { | ||
_, _, err := c.GetLedger(c.nextLedger) | ||
if err != nil { | ||
return errors.Wrapf(err, "Error fast-forwarding to %d", ledgerRange.from) | ||
} | ||
|
||
// If nextLedger > ledgerRange.from then ledgerRange.from is cached. | ||
if c.nextLedger > ledgerRange.from { | ||
break waitloop | ||
} | ||
} | ||
|
||
time.Sleep(c.waitIntervalPrepareRange) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need an explicit sleep here? I would (at the very least) use a ticker and intertwine the check with the other channels in the Even better, It may even be possible to remove the wait and condition by using an asynchronous go-routine for fast-forwarding. It may seem outside of the scope of this PR, but it affects the shutdown wait time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can try it out for sure! Can you create a new issue? I'm not sure if we'll be able to test it before beta. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure |
||
} | ||
|
||
|
@@ -381,6 +409,12 @@ func (c *CaptiveStellarCore) PrepareRange(ledgerRange Range) error { | |
|
||
// IsPrepared returns true if a given ledgerRange is prepared. | ||
func (c *CaptiveStellarCore) IsPrepared(ledgerRange Range) (bool, error) { | ||
c.mutex.Lock() | ||
defer c.mutex.Unlock() | ||
return c.isPrepared(ledgerRange) | ||
} | ||
|
||
func (c *CaptiveStellarCore) isPrepared(ledgerRange Range) (bool, error) { | ||
lastLedger := uint32(0) | ||
if c.lastLedger != nil { | ||
lastLedger = *c.lastLedger | ||
|
@@ -391,10 +425,10 @@ func (c *CaptiveStellarCore) IsPrepared(ledgerRange Range) (bool, error) { | |
cachedLedger = c.cachedMeta.LedgerSequence() | ||
} | ||
|
||
return c.isPrepared(c.nextLedger, lastLedger, cachedLedger, ledgerRange), nil | ||
return c.isPreparedParams(c.nextLedger, lastLedger, cachedLedger, ledgerRange), nil | ||
} | ||
|
||
func (*CaptiveStellarCore) isPrepared(nextLedger, lastLedger, cachedLedger uint32, ledgerRange Range) bool { | ||
func (*CaptiveStellarCore) isPreparedParams(nextLedger, lastLedger, cachedLedger uint32, ledgerRange Range) bool { | ||
if nextLedger == 0 { | ||
return false | ||
} | ||
|
@@ -431,7 +465,15 @@ func (*CaptiveStellarCore) isPrepared(nextLedger, lastLedger, cachedLedger uint3 | |
// * UnboundedRange makes GetLedger non-blocking. The method will return with | ||
// the first argument equal false. | ||
// This is done to provide maximum performance when streaming old ledgers. | ||
// | ||
// Returns `context.Cancelled` if `Close` was called from another Go routine. | ||
func (c *CaptiveStellarCore) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMeta, error) { | ||
c.mutex.Lock() | ||
defer c.mutex.Unlock() | ||
return c.getLedger(sequence) | ||
} | ||
|
||
func (c *CaptiveStellarCore) getLedger(sequence uint32) (bool, xdr.LedgerCloseMeta, error) { | ||
if c.cachedMeta != nil && sequence == c.cachedMeta.LedgerSequence() { | ||
// GetLedger can be called multiple times using the same sequence, ex. to create | ||
// change and transaction readers. If we have this ledger buffered, let's return it. | ||
|
@@ -466,7 +508,7 @@ loop: | |
return false, xdr.LedgerCloseMeta{}, nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If Close() is called concurrently with this function is there a possible race condition where the ledger buffer is set to nil right before it is accessed here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch, I focused on shutdown code and forgot about the obvious things. Fixed in 648ebf5. |
||
} | ||
|
||
// We don't have to handle getProcessExitChan() because this is handled | ||
// We don't have to handle Stellar-Core exit because this is handled | ||
// in bufferedLedgerMetaReader (will send an error to the channel). | ||
result := <-c.ledgerBuffer.getChannel() | ||
if result.err != nil { | ||
|
@@ -507,7 +549,7 @@ loop: | |
|
||
// If we got the _last_ ledger in a segment, close before returning. | ||
if c.lastLedger != nil && *c.lastLedger == seq { | ||
if err := c.Close(); err != nil { | ||
if err := c.close(); err != nil { | ||
return false, xdr.LedgerCloseMeta{}, errors.Wrap(err, "error closing session") | ||
} | ||
} | ||
|
@@ -517,7 +559,7 @@ loop: | |
// All paths above that break out of the loop (instead of return) | ||
// set e to non-nil: there was an error and we should close and | ||
// reset state before retuning an error to our caller. | ||
c.Close() | ||
c.close() | ||
return false, xdr.LedgerCloseMeta{}, errOut | ||
} | ||
|
||
|
@@ -529,6 +571,9 @@ loop: | |
// the latest sequence closed by the network. It's always the last value available | ||
// in the backend. | ||
func (c *CaptiveStellarCore) GetLatestLedgerSequence() (uint32, error) { | ||
c.mutex.Lock() | ||
defer c.mutex.Unlock() | ||
|
||
if c.isClosed() { | ||
return 0, errors.New("stellar-core must be opened to return latest available sequence") | ||
} | ||
|
@@ -544,23 +589,27 @@ func (c *CaptiveStellarCore) isClosed() bool { | |
} | ||
|
||
// Close closes existing Stellar-Core process, streaming sessions and removes all | ||
// temporary files. | ||
// temporary files. Please note that this can be executed concurently, however it | ||
// may block if `GetLedger` is running on a unbounded range (because `GetLedger` | ||
// is blocking in this case). | ||
func (c *CaptiveStellarCore) Close() error { | ||
c.mutex.Lock() | ||
defer c.mutex.Unlock() | ||
return c.close() | ||
} | ||
|
||
func (c *CaptiveStellarCore) close() error { | ||
if c.stellarCoreRunner != nil { | ||
// Closing stellarCoreRunner will automatically close bufferedLedgerMetaReader | ||
// because it's listening for getProcessExitChan(). | ||
// Closing stellarCoreRunner will automatically close bufferedLedgerMetaReader's | ||
// start() go routine because it's checking it's state. | ||
err := c.stellarCoreRunner.close() | ||
c.stellarCoreRunner = nil | ||
if err != nil { | ||
return errors.Wrap(err, "error closing stellar-core subprocess") | ||
} | ||
} | ||
|
||
// c.ledgerBuffer might be nil if stellarCoreRunner.runFrom / stellarCoreRunner.catchup responded with an error | ||
if c.ledgerBuffer != nil { | ||
// Wait for bufferedLedgerMetaReader go routine to return. | ||
c.ledgerBuffer.waitForClose() | ||
c.ledgerBuffer = nil | ||
} | ||
if c.ledgerBuffer != nil { | ||
c.ledgerBuffer.close() | ||
} | ||
|
||
c.nextLedger = 0 | ||
|
@@ -569,11 +618,3 @@ func (c *CaptiveStellarCore) Close() error { | |
|
||
return nil | ||
} | ||
|
||
func wrapStellarCoreRunnerError(r stellarCoreRunnerInterface) error { | ||
processErr := r.getProcessExitError() | ||
if processErr != nil { | ||
return errors.Wrap(processErr, "stellar-core process exited with an error") | ||
} | ||
return errors.New("stellar-core process exited unexpectedly without an error") | ||
} |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ package ledgerbackend | |
|
||
import ( | ||
"bufio" | ||
"context" | ||
"fmt" | ||
"io" | ||
"io/ioutil" | ||
|
@@ -11,23 +12,20 @@ import ( | |
"path/filepath" | ||
"regexp" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/pkg/errors" | ||
"github.com/sirupsen/logrus" | ||
|
||
"github.com/stellar/go/support/log" | ||
"gopkg.in/tomb.v1" | ||
) | ||
|
||
type stellarCoreRunnerInterface interface { | ||
catchup(from, to uint32) error | ||
runFrom(from uint32, hash string) error | ||
getMetaPipe() io.Reader | ||
// getProcessExitChan returns a channel that closes on process exit | ||
getProcessExitChan() <-chan struct{} | ||
// getProcessExitError returns an exit error of the process, can be nil | ||
getProcessExitError() error | ||
getTomb() *tomb.Tomb | ||
close() error | ||
} | ||
|
||
|
@@ -38,6 +36,8 @@ const ( | |
stellarCoreRunnerModeOffline | ||
) | ||
|
||
// stellarCoreRunner is a helper for starting stellar-core. Should be used only | ||
// once, for multiple runs create separate instances. | ||
type stellarCoreRunner struct { | ||
executablePath string | ||
configAppendPath string | ||
|
@@ -46,19 +46,25 @@ type stellarCoreRunner struct { | |
httpPort uint | ||
mode stellarCoreRunnerMode | ||
|
||
started bool | ||
wg sync.WaitGroup | ||
shutdown chan struct{} | ||
// tomb is used to handle cmd go routine termination. Err() value can be one | ||
// of the following: | ||
// - nil: process exit without error, not user initiated (this can be an | ||
// error in layers above if they expect more data but process is done), | ||
// - context.Canceled: process killed after user request, | ||
// - not nil: process exit with an error. | ||
// | ||
// tomb is created when a new cmd go routine starts. | ||
tomb *tomb.Tomb | ||
|
||
cmd *exec.Cmd | ||
|
||
// processExit channel receives an error when the process exited with an error | ||
// or nil if the process exited without an error. | ||
processExit chan struct{} | ||
processExitError error | ||
metaPipe io.Reader | ||
tempDir string | ||
nonce string | ||
// There's a gotcha! When cmd.Wait() signal was received it doesn't mean that | ||
// all ledgers have been read from meta pipe. Turns out that OS actually | ||
// maintains a buffer. So don't rely on this. Keep reading until EOF is | ||
// returned. | ||
metaPipe io.Reader | ||
tempDir string | ||
nonce string | ||
|
||
log *log.Entry | ||
} | ||
|
@@ -87,9 +93,6 @@ func newStellarCoreRunner(config CaptiveCoreConfig, mode stellarCoreRunnerMode) | |
historyURLs: config.HistoryArchiveURLs, | ||
httpPort: config.HTTPPort, | ||
mode: mode, | ||
shutdown: make(chan struct{}), | ||
processExit: make(chan struct{}), | ||
processExitError: nil, | ||
tempDir: tempDir, | ||
nonce: fmt.Sprintf("captive-stellar-core-%x", r.Uint64()), | ||
log: coreLogger, | ||
|
@@ -237,7 +240,7 @@ func (r *stellarCoreRunner) runCmd(params ...string) error { | |
} | ||
|
||
func (r *stellarCoreRunner) catchup(from, to uint32) error { | ||
if r.started { | ||
if r.tomb != nil { | ||
return errors.New("runner already started") | ||
} | ||
if err := r.runCmd("new-db"); err != nil { | ||
|
@@ -254,17 +257,17 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error { | |
return errors.Wrap(err, "error creating `stellar-core catchup` subprocess") | ||
} | ||
r.cmd = cmd | ||
r.tomb = new(tomb.Tomb) | ||
r.metaPipe, err = r.start() | ||
if err != nil { | ||
return errors.Wrap(err, "error starting `stellar-core catchup` subprocess") | ||
} | ||
r.started = true | ||
|
||
return nil | ||
} | ||
|
||
func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { | ||
if r.started { | ||
if r.tomb != nil { | ||
return errors.New("runner already started") | ||
} | ||
var err error | ||
|
@@ -278,11 +281,11 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { | |
if err != nil { | ||
return errors.Wrap(err, "error creating `stellar-core run` subprocess") | ||
} | ||
r.tomb = new(tomb.Tomb) | ||
r.metaPipe, err = r.start() | ||
if err != nil { | ||
return errors.Wrap(err, "error starting `stellar-core run` subprocess") | ||
} | ||
r.started = true | ||
|
||
return nil | ||
} | ||
|
@@ -291,36 +294,38 @@ func (r *stellarCoreRunner) getMetaPipe() io.Reader { | |
return r.metaPipe | ||
} | ||
|
||
func (r *stellarCoreRunner) getProcessExitChan() <-chan struct{} { | ||
return r.processExit | ||
} | ||
|
||
func (r *stellarCoreRunner) getProcessExitError() error { | ||
return r.processExitError | ||
func (r *stellarCoreRunner) getTomb() *tomb.Tomb { | ||
return r.tomb | ||
} | ||
|
||
func (r *stellarCoreRunner) close() error { | ||
var err1, err2 error | ||
|
||
if r.tomb != nil { | ||
// Kill tomb with context.Canceled. Kill will be called again in start() | ||
// when process exit is handled but the error value will not be overwritten. | ||
r.tomb.Kill(context.Canceled) | ||
Comment on lines
+305
to
+307
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The answer to this is probably painfully-obvious, but by "user-initiated shutdown," we generally mean something akin to Ctrl+C and/or via a code path (e.g. calling |
||
} | ||
|
||
if r.processIsAlive() { | ||
err1 = r.cmd.Process.Kill() | ||
r.cmd.Wait() | ||
r.cmd = nil | ||
} | ||
err2 = os.RemoveAll(r.tempDir) | ||
r.tempDir = "" | ||
|
||
if r.started { | ||
close(r.shutdown) | ||
r.wg.Wait() | ||
if r.tomb != nil { | ||
r.tomb.Wait() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
r.started = false | ||
r.tomb = nil | ||
r.cmd = nil | ||
|
||
err2 = os.RemoveAll(r.tempDir) | ||
r.tempDir = "" | ||
|
||
if err1 != nil { | ||
return errors.Wrap(err1, "error killing subprocess") | ||
} | ||
if err2 != nil { | ||
return errors.Wrap(err2, "error removing subprocess tmpdir") | ||
} | ||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,17 +29,9 @@ func (c *stellarCoreRunner) start() (io.Reader, error) { | |
return io.Reader(nil), err | ||
} | ||
|
||
c.wg.Add(1) | ||
go func() { | ||
err := make(chan error, 1) | ||
select { | ||
case err <- c.cmd.Wait(): | ||
c.processExitError = <-err | ||
close(c.processExit) | ||
close(err) | ||
case <-c.shutdown: | ||
} | ||
c.wg.Done() | ||
c.tomb.Kill(c.cmd.Wait()) | ||
c.tomb.Done() | ||
Comment on lines
+33
to
+34
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
}() | ||
|
||
// Then accept on the server end. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dying
is whenKill
was called but it's possible thattomb
is notDead
yet. We can use this signal to start shutting down go routine.