From ae213658c618305d274d84b92c2dbe0ff3d1d84e Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Tue, 6 Aug 2024 22:27:00 +0200 Subject: [PATCH] experiments Signed-off-by: Marc-Antoine Perennou --- examples/t.rs | 114 +++++++++++++++++++++++++++++++++++++++++++++++ src/channel.rs | 13 +++++- src/generated.rs | 2 + 3 files changed, 127 insertions(+), 2 deletions(-) create mode 100644 examples/t.rs diff --git a/examples/t.rs b/examples/t.rs new file mode 100644 index 00000000..651a4550 --- /dev/null +++ b/examples/t.rs @@ -0,0 +1,114 @@ +use lapin::{ + message::DeliveryResult, options::*, publisher_confirm::Confirmation, types::FieldTable, + BasicProperties, Connection, ConnectionProperties, +}; +use tracing::info; + +fn main() { + if std::env::var("RUST_LOG").is_err() { + std::env::set_var("RUST_LOG", "info"); + } + + tracing_subscriber::fmt::init(); + + let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()); + + async_global_executor::block_on(async { + let conn = Connection::connect(&addr, ConnectionProperties::default()) + .await + .expect("connection error"); + + info!("CONNECTED"); + + { + let channel1 = conn.create_channel().await.expect("create_channel"); + let channel2 = conn.create_channel().await.expect("create_channel"); + channel1 + .queue_declare( + "recover-test", + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .await + .expect("queue_declare"); + + info!("will consume"); + let channel = channel2.clone(); + channel2 + .basic_consume( + "recover-test", + "my_consumer", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await + .expect("basic_consume") + .set_delegate(move |delivery: DeliveryResult| { + let channel = channel.clone(); + async move { + info!(message=?delivery, "received message"); + if let Ok(Some(delivery)) = delivery { + delivery + .ack(BasicAckOptions::default()) + .await + .expect("basic_ack"); + if &delivery.data[..] == b"after" { + channel + .basic_cancel("my_consumer", BasicCancelOptions::default()) + .await + .expect("basic_cancel"); + } + } + } + }); + + info!("will publish"); + let confirm = channel1 + .basic_publish( + "", + "recover-test", + BasicPublishOptions::default(), + b"before", + BasicProperties::default(), + ) + .await + .expect("basic_publish") + .await + .expect("publisher-confirms"); + assert_eq!(confirm, Confirmation::NotRequested); + + info!("before fail"); + assert!(channel1 + .queue_declare( + "fake queue", + QueueDeclareOptions { + passive: true, + ..QueueDeclareOptions::default() + }, + FieldTable::default(), + ) + .await + .is_err()); + info!("after fail"); + + std::thread::sleep_ms(100); + + info!("publish after"); + let confirm = channel1 + .basic_publish( + "", + "recover-test", + BasicPublishOptions::default(), + b"after", + BasicProperties::default(), + ) + .await + .expect("basic_publish") + .await + .expect("publisher-confirms"); + assert_eq!(confirm, Confirmation::NotRequested); + } + + conn.run().expect("conn.run"); + }); +} diff --git a/src/channel.rs b/src/channel.rs index 12938411..35d3dd15 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -568,6 +568,7 @@ impl Channel { } fn on_channel_close_ok_sent(&self, error: Option) { + /* self.set_closed( error .clone() @@ -576,6 +577,7 @@ impl Channel { if let Some(error) = error { self.error_handler.on_error(error); } + */ } fn on_basic_recover_async_sent(&self) { @@ -894,10 +896,17 @@ impl Channel { Error::ProtocolError(error) }); self.set_closing(error.clone().ok()); + self.set_state(ChannelState::Initial); let error = error.map_err(|error| info!(channel=%self.id, ?method, code_to_error=%error, "Channel closed with a non-error code")).ok(); let channel = self.clone(); - self.internal_rpc - .register_internal_future(async move { channel.channel_close_ok(error).await }); + self.internal_rpc.register_internal_future(async move { + channel.channel_close_ok(error.clone()).await?; + if let Some(error) = error { + channel.frames.clear_expected_replies(channel.id, error); + } + channel.clone().channel_open(channel).await?; + Ok(()) + }); Ok(()) } diff --git a/src/generated.rs b/src/generated.rs index 4abae5f2..7a4276a0 100644 --- a/src/generated.rs +++ b/src/generated.rs @@ -1394,9 +1394,11 @@ impl Channel { self.on_channel_close_received(method) } async fn channel_close_ok(&self, error: Option) -> Result<()> { + /* if !self.status.closing() { return Err(Error::InvalidChannelState(self.status.state())); } + */ let method = AMQPClass::Channel(protocol::channel::AMQPMethod::CloseOk( protocol::channel::CloseOk {},