Skip to content

Commit

Permalink
Merge pull request #6 from upupnoah/main
Browse files Browse the repository at this point in the history
feat(network): support network for simple redis
  • Loading branch information
upupnoah authored Jul 28, 2024
2 parents baefc78 + 8599d4a commit a994ab7
Show file tree
Hide file tree
Showing 8 changed files with 609 additions and 12 deletions.
447 changes: 446 additions & 1 deletion Cargo.lock

Large diffs are not rendered by default.

21 changes: 20 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,24 @@ anyhow = "^1.0"
bytes = "^1.6.1"
dashmap = "6.0.1"
enum_dispatch = "^0.3.13"
lazy_static = "^1.5.0"
futures = { version = "^0.3.30", default-features = false }
# lazy_static = "^1.5.0"
thiserror = "^1.0.62"
tokio = { version = "^1.37.0", features = [
"rt",
"rt-multi-thread",
"macros",
"net",
] }
tokio-stream = "^0.1.15"
tokio-util = { version = "^0.7.10", features = ["codec"] }
tracing = "^0.1.40"
tracing-subscriber = { version = "^0.3.18", features = ["env-filter"] }
winnow = { version = "^0.6.8", features = ["simd"] }

# [dev-dependencies]
# criterion = { version = "^0.5.1", features = ["html_reports"] }

# [[bench]]
# name = "resp"
# harness = false
4 changes: 2 additions & 2 deletions src/cmd/map.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{
cmd::{extract_args, validate_command, Get, Set},
CommandError, RespArray, RespFrame, RespNull,
RespArray, RespFrame, RespNull,
};

use super::{CommandExecutor, RESP_OK};
use super::{CommandError, CommandExecutor, RESP_OK};

impl CommandExecutor for Get {
fn execute(self, backend: &crate::Backend) -> RespFrame {
Expand Down
25 changes: 21 additions & 4 deletions src/cmd/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::LazyLock;

use enum_dispatch::enum_dispatch;
use lazy_static::lazy_static;
use thiserror::Error;

use crate::{Backend, RespArray, RespError, RespFrame, SimpleString};
Expand All @@ -12,9 +13,13 @@ mod map;
// 1. init in runtime
// 2. thread safe
// 3. improve performance
lazy_static! {
static ref RESP_OK: RespFrame = SimpleString::new("OK").into();
}
// lazy_static! {
// static ref RESP_OK: RespFrame = SimpleString::new("OK").into();
// }

// > Rust 1.80.0
// https://blog.rust-lang.org/2024/07/25/Rust-1.80.0.html
static RESP_OK: LazyLock<RespFrame> = LazyLock::new(|| SimpleString::new("OK").into());

// region: --- Traits
#[enum_dispatch]
Expand Down Expand Up @@ -108,6 +113,18 @@ impl TryFrom<RespArray> for Command {
}
}
}

impl TryFrom<RespFrame> for Command {
type Error = CommandError;
fn try_from(v: RespFrame) -> Result<Self, Self::Error> {
match v {
RespFrame::Array(array) => array.try_into(),
_ => Err(CommandError::InvalidCommand(
"Command must be an Array".to_string(),
)),
}
}
}
// endregion: --- impls

// region: --- functions
Expand Down
8 changes: 6 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
mod backend;
mod cmd;

mod resp;

pub mod cmd;
pub mod network;

pub use backend::*;
pub use cmd::*;
// pub use cmd::*;
// pub use network::*;
pub use resp::*;
28 changes: 26 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,27 @@
fn main() {
println!("Hello, world!");
use anyhow::Result;
use simple_redis::{network, Backend};
use tokio::net::TcpListener;
use tracing::info;

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();

let addr = "0.0.0.0:6389";
let listener = TcpListener::bind(addr).await?;
info!("Simple-Redis-Server is listening on {}", addr);

let backend = Backend::new();
loop {
let (stream, remote_addr) = listener.accept().await?;
info!("Accepted connection from {}", remote_addr);
let cloned_backend = backend.clone();
tokio::spawn(async move {
// handling of stream
match network::stream_handler(stream, cloned_backend).await {
Ok(_) => info!("Connection from {} closed", remote_addr),
Err(e) => info!("Connection from {} closed with error: {}", remote_addr, e),
}
});
}
}
81 changes: 81 additions & 0 deletions src/network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use anyhow::Result;
use futures::SinkExt;
use tokio::net::TcpStream;
use tokio_stream::StreamExt;
use tokio_util::codec::{Decoder, Encoder, Framed};
use tracing::info;

use crate::{
cmd::{Command, CommandExecutor},
Backend, RespDecode, RespEncode, RespError, RespFrame,
};

#[derive(Debug)]
struct RedisRequest {
frame: RespFrame,
backend: Backend,
}

#[derive(Debug)]
struct RedisResponse {
frame: RespFrame,
}

#[derive(Debug)]
struct RespFrameCodec;

pub async fn stream_handler(stream: TcpStream, backend: Backend) -> Result<()> {
// how to get a frame from the stream?
let mut framed = Framed::new(stream, RespFrameCodec);
loop {
match framed.next().await {
Some(Ok(frame)) => {
info!("Received frame: {:?}", frame);
let request = RedisRequest {
frame,
backend: backend.clone(),
};
let response = request_handler(request).await?;
info!("Sending response: {:?}", response.frame);
framed.send(response.frame).await?;
}
Some(Err(e)) => return Err(e),
None => return Ok(()),
}
}
}

// NOTE: need a backend to process the frame
// async fn request_handler(request: RespFrame) -> Result<RespFrame> {
// todo!()
// }
async fn request_handler(request: RedisRequest) -> Result<RedisResponse> {
let (frame, backend) = (request.frame, request.backend);
let cmd = Command::try_from(frame)?;
info!("Executing command: {:?}", cmd);
let frame = cmd.execute(&backend);
Ok(RedisResponse { frame })
}

impl Encoder<RespFrame> for RespFrameCodec {
type Error = anyhow::Error;

fn encode(&mut self, item: RespFrame, dst: &mut bytes::BytesMut) -> Result<()> {
let encoded = item.encode();
dst.extend_from_slice(&encoded);
Ok(())
}
}

impl Decoder for RespFrameCodec {
type Item = RespFrame;
type Error = anyhow::Error;

fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<RespFrame>> {
match RespFrame::decode(src) {
Ok(frame) => Ok(Some(frame)),
Err(RespError::NotComplete) => Ok(None),
Err(e) => Err(e.into()),
}
}
}
7 changes: 7 additions & 0 deletions src/resp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ pub enum RespError {
// fn expect_length(buf: &[u8]) -> Result<usize, RespError>;
// }

// 关于 enum 的知识点
// 枚举变体: 直接包含数据, 结构体类型, 无数据

// 元组变体: 当枚举变体直接包含一组命名未指定的值时-> SimpleString(String) 和 Integer(i64),
// 结构体变体: 枚举的变体被定义为包含具有名称的字段-> StructVariant { name: String, id: i32 }
// 单元变体: RespNull

// 之所以要定义一些新的结构体, 是因为要在实现 trait 的时候, 要区分开这些类型
#[enum_dispatch(RespEncode)]
#[derive(Debug, Clone, PartialEq)]
Expand Down

0 comments on commit a994ab7

Please sign in to comment.