Skip to content

Commit

Permalink
experiments
Browse files Browse the repository at this point in the history
Signed-off-by: Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
  • Loading branch information
Keruspe committed Aug 6, 2024
1 parent 69b4c2a commit ae21365
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 2 deletions.
114 changes: 114 additions & 0 deletions examples/t.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use lapin::{
message::DeliveryResult, options::*, publisher_confirm::Confirmation, types::FieldTable,
BasicProperties, Connection, ConnectionProperties,
};
use tracing::info;

fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}

tracing_subscriber::fmt::init();

let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());

async_global_executor::block_on(async {
let conn = Connection::connect(&addr, ConnectionProperties::default())
.await
.expect("connection error");

info!("CONNECTED");

{
let channel1 = conn.create_channel().await.expect("create_channel");
let channel2 = conn.create_channel().await.expect("create_channel");
channel1
.queue_declare(
"recover-test",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.expect("queue_declare");

info!("will consume");
let channel = channel2.clone();
channel2
.basic_consume(
"recover-test",
"my_consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.expect("basic_consume")
.set_delegate(move |delivery: DeliveryResult| {
let channel = channel.clone();
async move {
info!(message=?delivery, "received message");
if let Ok(Some(delivery)) = delivery {
delivery
.ack(BasicAckOptions::default())
.await
.expect("basic_ack");
if &delivery.data[..] == b"after" {
channel
.basic_cancel("my_consumer", BasicCancelOptions::default())
.await
.expect("basic_cancel");
}
}
}
});

info!("will publish");
let confirm = channel1
.basic_publish(
"",
"recover-test",
BasicPublishOptions::default(),
b"before",
BasicProperties::default(),
)
.await
.expect("basic_publish")
.await
.expect("publisher-confirms");
assert_eq!(confirm, Confirmation::NotRequested);

info!("before fail");
assert!(channel1
.queue_declare(
"fake queue",
QueueDeclareOptions {
passive: true,
..QueueDeclareOptions::default()
},
FieldTable::default(),
)
.await
.is_err());
info!("after fail");

std::thread::sleep_ms(100);

Check warning on line 94 in examples/t.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable)

use of deprecated function `std::thread::sleep_ms`: replaced by `std::thread::sleep`

Check warning on line 94 in examples/t.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable)

use of deprecated function `std::thread::sleep_ms`: replaced by `std::thread::sleep`

Check warning on line 94 in examples/t.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, nightly)

use of deprecated function `std::thread::sleep_ms`: replaced by `std::thread::sleep`

Check warning on line 94 in examples/t.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, nightly)

use of deprecated function `std::thread::sleep_ms`: replaced by `std::thread::sleep`

Check warning on line 94 in examples/t.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, 1.74.0)

use of deprecated function `std::thread::sleep_ms`: replaced by `std::thread::sleep`

Check warning on line 94 in examples/t.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, beta)

use of deprecated function `std::thread::sleep_ms`: replaced by `std::thread::sleep`

Check warning on line 94 in examples/t.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, beta)

use of deprecated function `std::thread::sleep_ms`: replaced by `std::thread::sleep`

info!("publish after");
let confirm = channel1
.basic_publish(
"",
"recover-test",
BasicPublishOptions::default(),
b"after",
BasicProperties::default(),
)
.await
.expect("basic_publish")
.await
.expect("publisher-confirms");
assert_eq!(confirm, Confirmation::NotRequested);
}

conn.run().expect("conn.run");
});
}
13 changes: 11 additions & 2 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ impl Channel {
}

fn on_channel_close_ok_sent(&self, error: Option<Error>) {

Check warning on line 570 in src/channel.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable)

unused variable: `error`

Check warning on line 570 in src/channel.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable)

unused variable: `error`

Check warning on line 570 in src/channel.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, nightly)

unused variable: `error`

Check warning on line 570 in src/channel.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, nightly)

unused variable: `error`

Check warning on line 570 in src/channel.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, nightly)

unused variable: `error`

Check warning on line 570 in src/channel.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, 1.74.0)

unused variable: `error`

Check warning on line 570 in src/channel.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, 1.74.0)

unused variable: `error`

Check warning on line 570 in src/channel.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, beta)

unused variable: `error`

Check warning on line 570 in src/channel.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, beta)

unused variable: `error`
/*
self.set_closed(
error
.clone()
Expand All @@ -576,6 +577,7 @@ impl Channel {
if let Some(error) = error {
self.error_handler.on_error(error);
}
*/
}

fn on_basic_recover_async_sent(&self) {
Expand Down Expand Up @@ -894,10 +896,17 @@ impl Channel {
Error::ProtocolError(error)
});
self.set_closing(error.clone().ok());
self.set_state(ChannelState::Initial);
let error = error.map_err(|error| info!(channel=%self.id, ?method, code_to_error=%error, "Channel closed with a non-error code")).ok();
let channel = self.clone();
self.internal_rpc
.register_internal_future(async move { channel.channel_close_ok(error).await });
self.internal_rpc.register_internal_future(async move {
channel.channel_close_ok(error.clone()).await?;
if let Some(error) = error {
channel.frames.clear_expected_replies(channel.id, error);
}
channel.clone().channel_open(channel).await?;
Ok(())
});
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions src/generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1394,9 +1394,11 @@ impl Channel {
self.on_channel_close_received(method)
}
async fn channel_close_ok(&self, error: Option<Error>) -> Result<()> {
/*
if !self.status.closing() {
return Err(Error::InvalidChannelState(self.status.state()));
}
*/

let method = AMQPClass::Channel(protocol::channel::AMQPMethod::CloseOk(
protocol::channel::CloseOk {},
Expand Down

0 comments on commit ae21365

Please sign in to comment.