Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: unknown block sync to subscribe/unsubscribe to network events #5654

Merged
merged 3 commits into from
Jun 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,11 @@ export function createLodestarMetrics(
},

syncUnknownBlock: {
switchNetworkSubscriptions: register.gauge<"action">({
name: "lodestar_sync_unknown_block_network_subscriptions_count",
help: "Switch network subscriptions on/off",
labelNames: ["action"],
}),
requests: register.gauge<"type">({
name: "lodestar_sync_unknown_block_requests_total",
help: "Total number of unknown block events or requests",
Expand Down
63 changes: 39 additions & 24 deletions packages/beacon-node/src/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,14 @@ export class BeaconSync implements IBeaconSync {

// Subscribe to RangeSync completing a SyncChain and recompute sync state
if (!opts.disableRangeSync) {
// prod code
this.logger.debug("RangeSync enabled.");
this.rangeSync.on(RangeSyncEvent.completedChain, this.updateSyncState);
this.network.events.on(NetworkEvent.peerConnected, this.addPeer);
this.network.events.on(NetworkEvent.peerDisconnected, this.removePeer);
} else {
// test code, this is needed for Unknown block sync sim test
this.unknownBlockSync.subscribeToNetwork();
this.logger.debug("RangeSync disabled.");
}

Expand Down Expand Up @@ -196,36 +199,48 @@ export class BeaconSync implements IBeaconSync {
const state = this.state; // Don't run the getter twice

// We have become synced, subscribe to all the gossip core topics
if (
state === SyncState.Synced &&
!this.network.isSubscribedToGossipCoreTopics() &&
this.chain.clock.currentEpoch >= MIN_EPOCH_TO_START_GOSSIP
) {
this.network
.subscribeGossipCoreTopics()
.then(() => {
this.metrics?.syncSwitchGossipSubscriptions.inc({action: "subscribed"});
this.logger.info("Subscribed gossip core topics");
})
.catch((e) => {
this.logger.error("Error subscribing to gossip core topics", {}, e);
});
if (state === SyncState.Synced && this.chain.clock.currentEpoch >= MIN_EPOCH_TO_START_GOSSIP) {
if (!this.network.isSubscribedToGossipCoreTopics()) {
this.network
.subscribeGossipCoreTopics()
.then(() => {
this.metrics?.syncSwitchGossipSubscriptions.inc({action: "subscribed"});
this.logger.info("Subscribed gossip core topics");
})
.catch((e) => {
this.logger.error("Error subscribing to gossip core topics", {}, e);
});
}
Copy link
Contributor

@dapplion dapplion Jun 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rebundle this if statement as before? If you need you can re-order the conditions

    if (
      state === SyncState.Synced &&
      this.chain.clock.currentEpoch >= MIN_EPOCH_TO_START_GOSSIP &&
      !this.network.isSubscribedToGossipCoreTopics()
    ) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we can do it but then the below this.unknownBlockSync.subscribeToNetwork(); has to depend on this.network.isSubscribedToGossipCoreTopics(). Logically it's the same but it'll make it hard to reason about later.


// also start searching for unknown blocks
if (!this.unknownBlockSync.isSubscribedToNetwork()) {
this.unknownBlockSync.subscribeToNetwork();
this.metrics?.syncUnknownBlock.switchNetworkSubscriptions.inc({action: "subscribed"});
}
}

// If we stopped being synced and falled significantly behind, stop gossip
else if (state !== SyncState.Synced && this.network.isSubscribedToGossipCoreTopics()) {
else if (state !== SyncState.Synced) {
const syncDiff = this.chain.clock.currentSlot - this.chain.forkChoice.getHead().slot;
if (syncDiff > this.slotImportTolerance * 2) {
this.logger.warn(`Node sync has fallen behind by ${syncDiff} slots`);
this.network
.unsubscribeGossipCoreTopics()
.then(() => {
this.metrics?.syncSwitchGossipSubscriptions.inc({action: "unsubscribed"});
this.logger.info("Un-subscribed gossip core topics");
})
.catch((e) => {
this.logger.error("Error unsubscribing to gossip core topics", {}, e);
});
if (this.network.isSubscribedToGossipCoreTopics()) {
this.network
.unsubscribeGossipCoreTopics()
.then(() => {
this.metrics?.syncSwitchGossipSubscriptions.inc({action: "unsubscribed"});
this.logger.info("Un-subscribed gossip core topics");
})
.catch((e) => {
this.logger.error("Error unsubscribing to gossip core topics", {}, e);
});
}

// also stop searching for unknown blocks
if (this.unknownBlockSync.isSubscribedToNetwork()) {
this.unknownBlockSync.unsubscribeFromNetwork();
this.metrics?.syncUnknownBlock.switchNetworkSubscriptions.inc({action: "unsubscribed"});
}
}
}
};
Expand Down
38 changes: 27 additions & 11 deletions packages/beacon-node/src/sync/unknownBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,17 @@ export class UnknownBlockSync {
private readonly knownBadBlocks = new Set<RootHex>();
private readonly proposerBoostSecWindow: number;
private readonly maxPendingBlocks;
private subscribedToNetworkEvents = false;

constructor(
private readonly config: ChainForkConfig,
private readonly network: INetwork,
private readonly chain: IBeaconChain,
private readonly logger: Logger,
private readonly metrics: Metrics | null,
opts?: SyncOptions
private readonly opts?: SyncOptions
) {
if (!opts?.disableUnknownBlockSync) {
this.logger.debug("UnknownBlockSync enabled.");
this.network.events.on(NetworkEvent.unknownBlock, this.onUnknownBlock);
this.network.events.on(NetworkEvent.unknownBlockParent, this.onUnknownParent);
this.network.events.on(NetworkEvent.peerConnected, this.triggerUnknownBlockSearch);
} else {
this.logger.debug("UnknownBlockSync disabled.");
}
this.maxPendingBlocks = opts?.maxPendingBlocks ?? MAX_PENDING_BLOCKS;

this.proposerBoostSecWindow = this.config.SECONDS_PER_SLOT / INTERVALS_PER_SLOT;

if (metrics) {
Expand All @@ -61,12 +53,36 @@ export class UnknownBlockSync {
}
}

close(): void {
subscribeToNetwork(): void {
if (!this.opts?.disableUnknownBlockSync) {
// cannot chain to the above if or the log will be incorrect
if (!this.subscribedToNetworkEvents) {
this.logger.debug("UnknownBlockSync enabled.");
this.network.events.on(NetworkEvent.unknownBlock, this.onUnknownBlock);
this.network.events.on(NetworkEvent.unknownBlockParent, this.onUnknownParent);
this.network.events.on(NetworkEvent.peerConnected, this.triggerUnknownBlockSearch);
this.subscribedToNetworkEvents = true;
}
} else {
this.logger.debug("UnknownBlockSync disabled.");
}
}

unsubscribeFromNetwork(): void {
this.network.events.off(NetworkEvent.unknownBlock, this.onUnknownBlock);
this.network.events.off(NetworkEvent.unknownBlockParent, this.onUnknownParent);
this.network.events.off(NetworkEvent.peerConnected, this.triggerUnknownBlockSearch);
}

close(): void {
this.unsubscribeFromNetwork();
// add more in the future if needed
}

isSubscribedToNetwork(): boolean {
return this.subscribedToNetworkEvents;
}

/**
* Process an unknownBlock event and register the block in `pendingBlocks` Map.
*/
Expand Down
3 changes: 3 additions & 0 deletions packages/beacon-node/test/unit/sync/unknownBlock.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ describe("sync / UnknownBlockSync", () => {
...defaultSyncOptions,
maxPendingBlocks,
});
syncService.subscribeToNetwork();
if (event === NetworkEvent.unknownBlockParent) {
network.events?.emit(NetworkEvent.unknownBlockParent, {
blockInput: getBlockInput.preDeneb(config, blockC, BlockSource.gossip),
Expand Down Expand Up @@ -220,6 +221,8 @@ describe("sync / UnknownBlockSync", () => {
"Wrong blocks in mock ForkChoice"
);
}

syncService.close();
});
}
});