Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: add Subscription::close_reason #1320

Merged
merged 12 commits into from
Mar 15, 2024

Conversation

niklasad1
Copy link
Member

@niklasad1 niklasad1 commented Mar 12, 2024

This PR changes adds a new API to the subscription know how the subscription was closed.

It still drops the subscription if it's lagging but the subscription should easily-extendable to
implement specific handling for lagging, see the example below

Example of custom subscription logic

fn drop_oldest_when_lagging<T: Clone + DeserializeOwned + Send + Sync + 'static>(
	mut sub: Subscription<T>,
	buffer_size: usize,
) -> impl Stream<Item = T> {
	let (mut tx, rx) = async_broadcast::broadcast(buffer_size);

	tx.set_overflow(true);

	tokio::spawn(async move {
		// poll sub, sending msgs to our chan which throws stuff away
		while let Some(n) = sub.next().await {
			let msg = match n {
				Ok(msg) => msg,
				Err(e) => {
					tracing::error!("Failed to recv subscription message: {e}");
					continue;
				}
			};

			tx.try_broadcast(msg).expect("Infallible in overflowing mode; qed");
		}
	});
	rx
}

Resolves #1291

@niklasad1 niklasad1 requested a review from a team as a code owner March 12, 2024 11:01
core/src/client/mod.rs Outdated Show resolved Hide resolved
@niklasad1 niklasad1 added the breaking change Breaking change in the public APIs label Mar 12, 2024
core/src/client/mod.rs Outdated Show resolved Hide resolved
core/src/client/mod.rs Outdated Show resolved Hide resolved
@niklasad1 niklasad1 changed the title refactor(client subscription): don't drop when can't keep with server client: add Subscription::close_reason Mar 15, 2024
mut sub: Subscription<T>,
buffer_size: usize,
) -> impl Stream<Item = Result<T, BroadcastStreamRecvError>> {
let (tx, rx) = tokio::sync::broadcast::channel(buffer_size);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh I didn't realise that this channel handled lagging messages, TIL :)

Copy link
Collaborator

@jsdw jsdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! Looks good to me :)

core/src/client/mod.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@lexnv lexnv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Nice one 👍

With this PR we should have a bit more info about what happens when the subscription stream ends, which makes our life easier when debugging connectivity issues 🙏

@niklasad1 niklasad1 merged commit d40521b into master Mar 15, 2024
11 checks passed
@niklasad1 niklasad1 deleted the na-improve-client-subscription-api-v5 branch March 15, 2024 17:18
@niklasad1 niklasad1 mentioned this pull request Jun 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking change Breaking change in the public APIs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[client]: simplify subscription API
3 participants