Skip to content

Commit

Permalink
experiments
Browse files Browse the repository at this point in the history
Signed-off-by: Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
  • Loading branch information
Keruspe committed Feb 14, 2025
1 parent 8144442 commit 2827009
Show file tree
Hide file tree
Showing 14 changed files with 493 additions and 34 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ waker-fn = "^1.1"

[dev-dependencies]
async-global-executor = "^2.0"
async-io = "^2.0"
futures-lite = "^2.0"
serde_json = "^1.0"
waker-fn = "^1.1"
Expand Down
58 changes: 58 additions & 0 deletions examples/c.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use futures_lite::StreamExt;
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties};
use tracing::info;

fn main() {
if std::env::var("RUST_LOG").is_err() {
unsafe { std::env::set_var("RUST_LOG", "info") };
}

tracing_subscriber::fmt::init();

let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());

async_global_executor::block_on(async {
let conn = Connection::connect(&addr, ConnectionProperties::default())
.await
.expect("connection error");

info!("CONNECTED");

//receive channel
let channel = conn.create_channel().await.expect("create_channel");
info!(state=?conn.status().state());

let queue = channel
.queue_declare(
"hello-recover",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.expect("queue_declare");
info!(state=?conn.status().state());
info!(?queue, "Declared queue");

info!("will consume");
let mut consumer = channel
.basic_consume(
"hello-recover",
"my_consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.expect("basic_consume");
info!(state=?conn.status().state());

while let Some(delivery) = consumer.next().await {
info!(message=?delivery, "received message");
if let Ok(delivery) = delivery {
delivery
.ack(BasicAckOptions::default())
.await
.expect("basic_ack");
}
}
})
}
96 changes: 96 additions & 0 deletions examples/p.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use lapin::{options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties};
use tracing::info;

fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}

tracing_subscriber::fmt::init();

let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
let recovery_config = lapin::experimental::RecoveryConfig::default().auto_recover_channels();

async_global_executor::block_on(async {
let conn = Connection::connect(
&addr,
ConnectionProperties::default().with_experimental_recovery_config(recovery_config),
)
.await
.expect("connection error");

info!("CONNECTED");

let channel1 = conn.create_channel().await.expect("create_channel");
channel1
.confirm_select(ConfirmSelectOptions::default())
.await
.expect("confirm_select");
channel1
.queue_declare(
"hello-recover",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.expect("queue_declare");

let ch = channel1.clone();
async_global_executor::spawn(async move {
loop {
async_io::Timer::after(std::time::Duration::from_secs(1)).await;
info!("Trigger failure");
assert!(ch
.queue_declare(
"fake queue",
QueueDeclareOptions {
passive: true,
..QueueDeclareOptions::default()
},
FieldTable::default(),
)
.await
.is_err());
}
})
.detach();

let mut published = 0;
let mut errors = 0;
info!("will publish");
loop {
let res = channel1
.basic_publish(
"",
"recover-test",
BasicPublishOptions::default(),
b"before",
BasicProperties::default(),
)
.await;
let res = if let Ok(res) = res {
res.await.map(|_| ())
} else {
res.map(|_| ())
};
match res {
Ok(()) => {
println!("GOT OK");
published += 1;
}
Err(err) => {
println!("GOT ERROR");
let (soft, notifier) = err.is_amqp_soft_error();

Check failure on line 83 in examples/p.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable)

mismatched types

Check failure on line 83 in examples/p.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, beta)

mismatched types

Check failure on line 83 in examples/p.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, nightly)

mismatched types

Check failure on line 83 in examples/p.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, beta)

mismatched types

Check failure on line 83 in examples/p.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable)

mismatched types

Check failure on line 83 in examples/p.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, 1.74.0)

mismatched types

Check failure on line 83 in examples/p.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, nightly)

mismatched types
if !soft {
panic!("{}", err);
}
errors += 1;
if let Some(notifier) = notifier {
notifier.await
}
}
}
println!("Published {} with {} errors", published, errors);
}
});
}
120 changes: 120 additions & 0 deletions examples/t.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use lapin::{
message::DeliveryResult, options::*, publisher_confirm::Confirmation, types::FieldTable,
BasicProperties, Connection, ConnectionProperties,
};
use tracing::info;

fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}

tracing_subscriber::fmt::init();

let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
let recovery_config = lapin::experimental::RecoveryConfig::default().auto_recover_channels();

async_global_executor::block_on(async {
let conn = Connection::connect(
&addr,
ConnectionProperties::default().with_experimental_recovery_config(recovery_config),
)
.await
.expect("connection error");

info!("CONNECTED");

{
let channel1 = conn.create_channel().await.expect("create_channel");
let channel2 = conn.create_channel().await.expect("create_channel");
channel1
.confirm_select(ConfirmSelectOptions::default())
.await
.expect("confirm_select");
channel1
.queue_declare(
"recover-test",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.expect("queue_declare");

info!("will consume");
let channel = channel2.clone();
channel2
.basic_consume(
"recover-test",
"my_consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.expect("basic_consume")
.set_delegate(move |delivery: DeliveryResult| {
let channel = channel.clone();
async move {
info!(message=?delivery, "received message");
if let Ok(Some(delivery)) = delivery {
delivery
.ack(BasicAckOptions::default())
.await
.expect("basic_ack");
if &delivery.data[..] == b"after" {
channel
.basic_cancel("my_consumer", BasicCancelOptions::default())
.await
.expect("basic_cancel");
}
}
}
});

info!("will publish");
let confirm = channel1
.basic_publish(
"",
"recover-test",
BasicPublishOptions::default(),
b"before",
BasicProperties::default(),
)
.await
.expect("basic_publish")
.await
.expect("publisher-confirms");
assert_eq!(confirm, Confirmation::Ack(None));

info!("before fail");
assert!(channel1
.queue_declare(
"fake queue",
QueueDeclareOptions {
passive: true,
..QueueDeclareOptions::default()
},
FieldTable::default(),
)
.await
.is_err());
info!("after fail");

info!("publish after");
let confirm = channel1
.basic_publish(
"",
"recover-test",
BasicPublishOptions::default(),
b"after",
BasicProperties::default(),
)
.await
.expect("basic_publish")
.await
.expect("publisher-confirms");
assert_eq!(confirm, Confirmation::Ack(None));
}

conn.run().expect("conn.run");
});
}
9 changes: 9 additions & 0 deletions src/acknowledgement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ impl Acknowledgements {
pub(crate) fn on_channel_error(&self, error: Error) {
self.0.lock().on_channel_error(error);
}

pub(crate) fn reset(&self, error: Error) {
self.0.lock().reset(error);
}
}

impl fmt::Debug for Acknowledgements {
Expand Down Expand Up @@ -174,4 +178,9 @@ impl Inner {
}
}
}

fn reset(&mut self, error: Error) {
self.delivery_tag = IdSequence::new(false);
self.on_channel_error(error);
}
}
Loading

0 comments on commit 2827009

Please sign in to comment.