Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug(PgListener): periodic protocol error due to unknown message type #2956

Closed
OliverNChalk opened this issue Dec 30, 2023 · 0 comments · Fixed by #3467
Closed

bug(PgListener): periodic protocol error due to unknown message type #2956

OliverNChalk opened this issue Dec 30, 2023 · 0 comments · Fixed by #3467
Labels

Comments

@OliverNChalk
Copy link

OliverNChalk commented Dec 30, 2023

Bug Description

Roughly once or twice per 24 hours we will have our PgListener return the following error:

Protocol("unknown message type: '\\0'")

This happens both from a local dev machines & in our production environment. There does not seem to be any correlation between 1 connection receiving the message and the other receiving it (i.e. one will disconnect while the other will stay connected).

Minimal Reproduction

I don't have a minimal reproduction repo as at this stage I'm not sure how to reliably trigger it. I have seen similar issues: #886 and #1097. Let me know how I can help investigate further & provide more info.

Here is how we are creating & driving the PgListener:

        // Setup PgNotify listener for relevant streams.
        let mut listener = PgListener::connect_with(pool).await.unwrap();
        listener.listen("PARAM_INTERMEDIATES").await.unwrap();
        listener.listen("PARAM_LATEST").await.unwrap();

        loop {
            tokio::select! {
                biased;

                res = self.listener.try_recv() => {
                    let notification = res.unwrap().unwrap();
                    match notification.channel() {
                        "PARAM_INTERMEDIATES" => self.on_intermediary_param(notification),
                        "PARAM_LATEST" => self.on_param(notification),
                        _ => panic!("Unexpected notification; notification={notification:?}")
                    }
                }

                opt = self.worker_handles.next() => {
                    let (name, res) = opt.unwrap();

                    res.unwrap();

                    panic!("Worker exited without panic; worker={name}");
                }

                _ = self.snapshot_interval.tick() => {
                    self.pub_tx.try_send((
                        PUBLISHER_TOPIC,
                        ParamMsg::Snapshot(self.params.clone())
                    )).unwrap();
                }

                _ = self.cxl.cancelled() => break,
            }
        }

Info

  • SQLx version: 0.7.3
  • SQLx features enabled: [
    "chrono",
    "postgres",
    "runtime-tokio",
    "time",
    "tls-rustls",
    "bigdecimal",
    ]
  • Database server and version: Postgres 15.5
  • Operating system: Ubuntu
  • rustc --version: 1.72
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant