Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement graceful shutdown in Fluffy #2645

Merged
merged 4 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 106 additions & 40 deletions fluffy/fluffy.nim
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ func optionToOpt[T](o: Option[T]): Opt[T] =
else:
Opt.none(T)

proc run(config: PortalConf) {.raises: [CatchableError].} =
proc run(
config: PortalConf
): (PortalNode, Opt[MetricsHttpServerRef], Opt[RpcHttpServer], Opt[RpcWebSocketServer]) {.
raises: [CatchableError]
.} =
setupLogging(config.logLevel, config.logStdout, none(OutFile))

notice "Launching Fluffy", version = fullVersionStr, cmdParams = commandLineParams()
Expand Down Expand Up @@ -185,26 +189,30 @@ proc run(config: PortalConf) {.raises: [CatchableError].} =
quit 1

## Start metrics HTTP server
if config.metricsEnabled:
let
address = config.metricsAddress
port = config.metricsPort
url = "http://" & $address & ":" & $port & "/metrics"

server = MetricsHttpServerRef.new($address, port).valueOr:
error "Could not instantiate metrics HTTP server", url, error
let metricsServer =
if config.metricsEnabled:
let
address = config.metricsAddress
port = config.metricsPort
url = "http://" & $address & ":" & $port & "/metrics"

server = MetricsHttpServerRef.new($address, port).valueOr:
error "Could not instantiate metrics HTTP server", url, error
quit QuitFailure

info "Starting metrics HTTP server", url
try:
waitFor server.start()
except MetricsError as exc:
fatal "Could not start metrics HTTP server",
url, error_msg = exc.msg, error_name = exc.name
quit QuitFailure

info "Starting metrics HTTP server", url
try:
waitFor server.start()
except MetricsError as exc:
fatal "Could not start metrics HTTP server",
url, error_msg = exc.msg, error_name = exc.name
quit QuitFailure

## Start discovery v5 protocol and the Portal node.
d.start()
Opt.some(server)
else:
Opt.none(MetricsHttpServerRef)

## Start the Portal node.
node.start()

## Start the JSON-RPC APIs
Expand Down Expand Up @@ -235,24 +243,32 @@ proc run(config: PortalConf) {.raises: [CatchableError].} =

rpcServer.start()

if config.rpcEnabled:
let
ta = initTAddress(config.rpcAddress, config.rpcPort)
rpcHttpServer = RpcHttpServer.new()
# Note: Set maxRequestBodySize to 4MB instead of 1MB as there are blocks
# that reach that limit (in hex, for gossip method).
rpcHttpServer.addHttpServer(ta, maxRequestBodySize = 4 * 1_048_576)

setupRpcServer(rpcHttpServer)

if config.wsEnabled:
let
ta = initTAddress(config.rpcAddress, config.wsPort)
rpcWsServer = newRpcWebSocketServer(ta, compression = config.wsCompression)

setupRpcServer(rpcWsServer)

runForever()
let rpcHttpServer =
if config.rpcEnabled:
let
ta = initTAddress(config.rpcAddress, config.rpcPort)
rpcHttpServer = RpcHttpServer.new()
# Note: Set maxRequestBodySize to 4MB instead of 1MB as there are blocks
# that reach that limit (in hex, for gossip method).
rpcHttpServer.addHttpServer(ta, maxRequestBodySize = 4 * 1_048_576)
setupRpcServer(rpcHttpServer)

Opt.some(rpcHttpServer)
else:
Opt.none(RpcHttpServer)

let rpcWsServer =
if config.wsEnabled:
let
ta = initTAddress(config.rpcAddress, config.wsPort)
rpcWsServer = newRpcWebSocketServer(ta, compression = config.wsCompression)
setupRpcServer(rpcWsServer)

Opt.some(rpcWsServer)
else:
Opt.none(RpcWebSocketServer)

return (node, metricsServer, rpcHttpServer, rpcWsServer)

when isMainModule:
{.pop.}
Expand All @@ -262,6 +278,56 @@ when isMainModule:
)
{.push raises: [].}

case config.cmd
of PortalCmd.noCommand:
run(config)
let (node, metricsServer, rpcHttpServer, rpcWsServer) =
case config.cmd
of PortalCmd.noCommand:
run(config)

# Ctrl+C handling
proc controlCHandler() {.noconv.} =
when defined(windows):
# workaround for https://github.com/nim-lang/Nim/issues/4057
try:
setupForeignThreadGc()
except Exception as exc:
raiseAssert exc.msg # shouldn't happen

notice "Shutting down after having received SIGINT"
node.state = PortalNodeState.Stopping

try:
setControlCHook(controlCHandler)
except Exception as exc: # TODO Exception
warn "Cannot set ctrl-c handler", msg = exc.msg

while node.state == PortalNodeState.Running:
try:
poll()
except CatchableError as e:
warn "Exception in poll()", exc = e.name, err = e.msg

if rpcWsServer.isSome():
let server = rpcWsServer.get()
try:
server.stop()
waitFor server.closeWait()
except CatchableError as e:
warn "Failed to stop rpc WS server", exc = e.name, err = e.msg

if rpcHttpServer.isSome():
let server = rpcHttpServer.get()
try:
waitFor server.stop()
waitFor server.closeWait()
except CatchableError as e:
warn "Failed to stop rpc HTTP server", exc = e.name, err = e.msg

if metricsServer.isSome():
let server = metricsServer.get()
try:
waitFor server.stop()
waitFor server.close()
except CatchableError as e:
warn "Failed to stop metrics HTTP server", exc = e.name, err = e.msg

waitFor node.stop()
5 changes: 3 additions & 2 deletions fluffy/network/beacon/beacon_light_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,10 @@ proc start*(lightClient: LightClient) =
info "Starting beacon light client", trusted_block_root = lightClient.trustedBlockRoot
lightClient.manager.start()

proc stop*(lightClient: LightClient) =
proc stop*(lightClient: LightClient) {.async: (raises: []).} =
info "Stopping beacon light client"
discard lightClient.manager.stop()

await lightClient.manager.stop()

proc resetToFinalizedHeader*(
lightClient: LightClient,
Expand Down
10 changes: 5 additions & 5 deletions fluffy/network/beacon/beacon_light_client_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type
GetBoolCallback* = proc(): bool {.gcsafe, raises: [].}
GetSlotCallback* = proc(): Slot {.gcsafe, raises: [].}

LightClientManager* = object
LightClientManager* = ref object
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to change this to a ref object so that the stop proc can become async

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And making LightClientManager in the stop call a var did not help?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that and it didn't work. This was the error: Error: 'self' is of type <var LightClientManager> which cannot be captured as it would violate memory safety, declared here: /home/user/development/status-im/nimbus-eth1/fluffy/network/beacon/beacon_light_client_manager.nim(323, 12); using '-d:nimNoLentIterators' helps in some cases. Consider using a <ref var LightClientManager> which can be captured.

network: BeaconNetwork
rng: ref HmacDrbgContext
getTrustedBlockRoot: GetTrustedBlockRootCallback
Expand Down Expand Up @@ -315,13 +315,13 @@ proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} =
didLatestSyncTaskProgress = didProgress,
)

proc start*(self: var LightClientManager) =
proc start*(self: LightClientManager) =
## Start light client manager's loop.
doAssert self.loopFuture == nil
self.loopFuture = self.loop()

proc stop*(self: var LightClientManager) {.async: (raises: []).} =
proc stop*(self: LightClientManager) {.async: (raises: []).} =
## Stop light client manager's loop.
if self.loopFuture != nil:
await noCancel self.loopFuture.cancelAndWait()
if not self.loopFuture.isNil():
await noCancel(self.loopFuture.cancelAndWait())
self.loopFuture = nil
16 changes: 11 additions & 5 deletions fluffy/network/beacon/beacon_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -381,13 +381,19 @@ proc start*(n: BeaconNetwork) =
n.portalProtocol.start()
n.processContentLoop = processContentLoop(n)

proc stop*(n: BeaconNetwork) =
proc stop*(n: BeaconNetwork) {.async: (raises: []).} =
info "Stopping Portal beacon chain network"

n.portalProtocol.stop()
var futures: seq[Future[void]]
futures.add(n.portalProtocol.stop())

if not n.processContentLoop.isNil:
n.processContentLoop.cancelSoon()
if not n.processContentLoop.isNil():
futures.add(n.processContentLoop.cancelAndWait())

if not n.statusLogLoop.isNil():
n.statusLogLoop.cancelSoon()
futures.add(n.statusLogLoop.cancelAndWait())

await noCancel(allFutures(futures))

n.processContentLoop = nil
n.statusLogLoop = nil
14 changes: 9 additions & 5 deletions fluffy/network/history/history_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -722,13 +722,17 @@ proc start*(n: HistoryNetwork) =
n.statusLogLoop = statusLogLoop(n)
pruneDeprecatedAccumulatorRecords(n.accumulator, n.contentDB)

proc stop*(n: HistoryNetwork) =
proc stop*(n: HistoryNetwork) {.async: (raises: []).} =
info "Stopping Portal execution history network"

n.portalProtocol.stop()
var futures: seq[Future[void]]
futures.add(n.portalProtocol.stop())

if not n.processContentLoop.isNil:
n.processContentLoop.cancelSoon()

futures.add(n.processContentLoop.cancelAndWait())
if not n.statusLogLoop.isNil:
n.statusLogLoop.cancelSoon()
futures.add(n.statusLogLoop.cancelAndWait())
await noCancel(allFutures(futures))

n.processContentLoop = nil
n.statusLogLoop = nil
15 changes: 10 additions & 5 deletions fluffy/network/state/state_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,18 @@ proc start*(n: StateNetwork) =
n.processContentLoop = processContentLoop(n)
n.statusLogLoop = statusLogLoop(n)

proc stop*(n: StateNetwork) =
proc stop*(n: StateNetwork) {.async: (raises: []).} =
info "Stopping Portal execution state network"

n.portalProtocol.stop()
var futures: seq[Future[void]]
futures.add(n.portalProtocol.stop())

if not n.processContentLoop.isNil():
n.processContentLoop.cancelSoon()

futures.add(n.processContentLoop.cancelAndWait())
if not n.statusLogLoop.isNil():
n.statusLogLoop.cancelSoon()
futures.add(n.statusLogLoop.cancelAndWait())

await noCancel(allFutures(futures))

n.processContentLoop = nil
n.statusLogLoop = nil
19 changes: 13 additions & 6 deletions fluffy/network/wire/portal_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1708,14 +1708,21 @@ proc start*(p: PortalProtocol) =
for i in 0 ..< concurrentOffers:
p.offerWorkers.add(offerWorker(p))

proc stop*(p: PortalProtocol) =
if not p.revalidateLoop.isNil:
p.revalidateLoop.cancelSoon()
if not p.refreshLoop.isNil:
p.refreshLoop.cancelSoon()
proc stop*(p: PortalProtocol) {.async: (raises: []).} =
var futures: seq[Future[void]]

if not p.revalidateLoop.isNil():
futures.add(p.revalidateLoop.cancelAndWait())
if not p.refreshLoop.isNil():
futures.add(p.refreshLoop.cancelAndWait())

for worker in p.offerWorkers:
worker.cancelSoon()
futures.add(worker.cancelAndWait())

await noCancel(allFutures(futures))

p.revalidateLoop = nil
p.refreshLoop = nil
p.offerWorkers = @[]

proc resolve*(
Expand Down
32 changes: 24 additions & 8 deletions fluffy/portal_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ export
beacon_light_client, history_network, state_network, portal_protocol_config, forks

type
PortalNodeState* = enum
Starting
Running
Stopping

PortalNodeConfig* = object
accumulatorFile*: Opt[string]
disableStateRootValidation*: bool
Expand All @@ -33,6 +38,7 @@ type
storageCapacity*: uint64

PortalNode* = ref object
state*: PortalNodeState
discovery: protocol.Protocol
contentDB: ContentDB
streamManager: StreamManager
Expand Down Expand Up @@ -202,6 +208,8 @@ proc statusLogLoop(n: PortalNode) {.async: (raises: []).} =
proc start*(n: PortalNode) =
debug "Starting Portal node"

n.discovery.start()

if n.beaconNetwork.isSome():
n.beaconNetwork.value.start()
if n.historyNetwork.isSome():
Expand All @@ -214,18 +222,26 @@ proc start*(n: PortalNode) =

n.statusLogLoop = statusLogLoop(n)

proc stop*(n: PortalNode) =
n.state = PortalNodeState.Running

proc stop*(n: PortalNode) {.async: (raises: []).} =
debug "Stopping Portal node"

var futures: seq[Future[void]]

if n.beaconNetwork.isSome():
n.beaconNetwork.value.stop()
futures.add(n.beaconNetwork.value.stop())
if n.historyNetwork.isSome():
n.historyNetwork.value.stop()
futures.add(n.historyNetwork.value.stop())
if n.stateNetwork.isSome():
n.stateNetwork.value.stop()

futures.add(n.stateNetwork.value.stop())
if n.beaconLightClient.isSome():
n.beaconLightClient.value.stop()
futures.add(n.beaconLightClient.value.stop())
if not n.statusLogLoop.isNil():
futures.add(n.statusLogLoop.cancelAndWait())

await noCancel(allFutures(futures))

if not n.statusLogLoop.isNil:
n.statusLogLoop.cancelSoon()
await n.discovery.closeWait()
n.contentDB.close()
n.statusLogLoop = nil
2 changes: 1 addition & 1 deletion fluffy/tests/beacon_network_tests/beacon_test_helpers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ proc start*(n: BeaconNode) =
n.beaconNetwork.start()

proc stop*(n: BeaconNode) {.async.} =
n.beaconNetwork.stop()
discard n.beaconNetwork.stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these can just await also now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that but it slows down and even breaks the tests in some cases. Probably because the tests stop and start the network multiple times. Adding the discard effectively allows the tests to behave as before.

await n.discoveryProtocol.closeWait()

proc containsId*(n: BeaconNode, contentId: ContentId): bool =
Expand Down
2 changes: 1 addition & 1 deletion fluffy/tests/state_network_tests/state_test_helpers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ proc start*(sn: StateNode) =
sn.stateNetwork.start()

proc stop*(sn: StateNode) {.async.} =
sn.stateNetwork.stop()
discard sn.stateNetwork.stop()
await sn.discoveryProtocol.closeWait()

proc containsId*(sn: StateNode, contentId: ContentId): bool {.inline.} =
Expand Down
Loading
Loading