Skip to content

Commit

Permalink
Merge pull request #26 from DaanDD/tokio-streams
Browse files Browse the repository at this point in the history
Use tokio streams instead of crossbeam to fix deadlock issues
  • Loading branch information
johnmanjiro13 authored Jul 18, 2023
2 parents 2e22a30 + 0075918 commit e5c21ee
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 32 deletions.
4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ include = [
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1.61"
base64 = "0.21.0"
bytes = { version = "1.3.0", features = ["serde"] }
chrono = "0.4.23"
crossbeam = "0.8.2"
log = "0.4.17"
rmp-serde = "1.1.1"
serde = { version = "1.0.152", features = ["derive"] }
tokio = { version = "1.24.2", features = ["net", "time", "io-util", "rt"] }
tokio = { version = "1.24.2", features = ["net", "time", "io-util", "rt", "sync"] }
uuid = { version = "1.2.2", features = ["v4"] }

[dev-dependencies]
Expand Down
49 changes: 26 additions & 23 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
use std::net::SocketAddr;
use std::time::Duration;

use async_trait::async_trait;
use base64::{engine::general_purpose, Engine};
use crossbeam::channel::{self, Sender};
use tokio::{net::TcpStream, time::timeout};
use tokio::{
net::TcpStream,
sync::broadcast::{channel, Sender},
time::timeout,
};
use uuid::Uuid;

use crate::record::Map;
Expand Down Expand Up @@ -79,10 +81,9 @@ impl Default for Config {
}
}

#[async_trait]
pub trait FluentClient: Send + Sync {
fn send(&self, tag: &str, record: Map) -> Result<(), SendError>;
async fn stop(self) -> Result<(), SendError>;
fn stop(self) -> Result<(), SendError>;
}

#[derive(Debug, Clone)]
Expand All @@ -95,7 +96,7 @@ impl Client {
/// Connect to the fluentd server and create a worker with tokio::spawn.
pub async fn new(config: &Config) -> tokio::io::Result<Client> {
let stream = timeout(config.timeout, TcpStream::connect(config.addr)).await??;
let (sender, receiver) = channel::unbounded();
let (sender, receiver) = channel(1024);

let config = config.clone();
let _ = tokio::spawn(async move {
Expand Down Expand Up @@ -127,11 +128,11 @@ impl Client {
.send(Message::Record(record))
.map_err(|e| SendError {
source: e.to_string(),
})
})?;
Ok(())
}
}

#[async_trait]
impl FluentClient for Client {
/// Send a fluent record to the fluentd server.
///
Expand All @@ -144,10 +145,13 @@ impl FluentClient for Client {
}

/// Stop the worker.
async fn stop(self) -> Result<(), SendError> {
self.sender.send(Message::Terminate).map_err(|e| SendError {
source: e.to_string(),
})
fn stop(self) -> Result<(), SendError> {
self.sender
.send(Message::Terminate)
.map_err(|e| SendError {
source: e.to_string(),
})?;
Ok(())
}
}

Expand All @@ -162,13 +166,12 @@ impl Drop for Client {
/// NopClient does nothing.
pub struct NopClient;

#[async_trait]
impl FluentClient for NopClient {
fn send(&self, _tag: &str, _record: Map) -> Result<(), SendError> {
Ok(())
}

async fn stop(self) -> Result<(), SendError> {
fn stop(self) -> Result<(), SendError> {
Ok(())
}
}
Expand All @@ -186,7 +189,7 @@ mod tests {
use crate::record::Value;
use crate::record_map;

let (sender, receiver) = channel::unbounded();
let (sender, mut receiver) = channel(1024);
let client = Client { sender };

let timestamp = chrono::Utc.timestamp_opt(1234567, 0).unwrap().timestamp();
Expand All @@ -196,7 +199,7 @@ mod tests {
"failed to send with time"
);

let got = receiver.recv().expect("failed to receive");
let got = receiver.try_recv().expect("failed to receive");
match got {
Message::Record(r) => {
assert_eq!(r.tag, "test");
Expand All @@ -207,13 +210,13 @@ mod tests {
}
}

#[tokio::test]
async fn test_stop() {
let (sender, receiver) = channel::unbounded();
#[test]
fn test_stop() {
let (sender, mut receiver) = channel(1024);
let client = Client { sender };
assert!(client.stop().await.is_ok(), "faled to stop");
assert!(client.stop().is_ok(), "faled to stop");

let got = receiver.recv().expect("failed to receive");
let got = receiver.try_recv().expect("failed to receive");
match got {
Message::Record(_) => unreachable!("got record message"),
Message::Terminate => {}
Expand All @@ -222,11 +225,11 @@ mod tests {

#[test]
fn test_client_drop_sends_terminate() {
let (sender, receiver) = channel::unbounded();
let (sender, mut receiver) = channel(1024);
{
Client { sender };
}
let got = receiver.recv().expect("failed to receive");
let got = receiver.try_recv().expect("failed to receive");
match got {
Message::Record(_) => unreachable!("got record message"),
Message::Terminate => {}
Expand Down
15 changes: 9 additions & 6 deletions src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use bytes::{Buf, BufMut};
use crossbeam::channel::{self, Receiver};
use log::warn;
use rmp_serde::Serializer;
use serde::{ser::SerializeMap, Deserialize, Serialize};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
sync::broadcast::{error::RecvError, Receiver},
time::Duration,
};

Expand Down Expand Up @@ -37,15 +37,15 @@ impl std::fmt::Display for Error {
}
}

#[derive(Debug, Serialize)]
#[derive(Clone, Debug, Serialize)]
pub struct Record {
pub tag: String,
pub timestamp: i64,
pub record: Map,
pub options: Options,
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct Options {
pub chunk: String,
}
Expand All @@ -61,6 +61,7 @@ impl Serialize for Options {
}
}

#[derive(Clone)]
pub enum Message {
Record(Record),
Terminate,
Expand Down Expand Up @@ -100,7 +101,7 @@ impl Worker {

pub async fn run(&mut self) {
loop {
match self.receiver.try_recv() {
match self.receiver.recv().await {
Ok(Message::Record(record)) => {
let record = match self.encode(record) {
Ok(record) => record,
Expand All @@ -115,8 +116,10 @@ impl Worker {
Err(_) => continue,
};
}
Err(channel::TryRecvError::Empty) => continue,
Ok(Message::Terminate) | Err(channel::TryRecvError::Disconnected) => break,
Err(RecvError::Closed) | Ok(Message::Terminate) => {
break;
}
Err(RecvError::Lagged(_)) => continue,
}
}
}
Expand Down

0 comments on commit e5c21ee

Please sign in to comment.