Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Static Worker Pool #9

Merged
merged 1 commit into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
393 changes: 214 additions & 179 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
hyper = { git = "https://github.com/hyperium/hyper" }
tokio = { version = "0.3.0", features = ["full"] }
hyper = { version = "0.14", features = ["full"] }
tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1.5", features = ["net"] }
anyhow = "1.0.33"
structopt = "0.3.20"
log = "0.4.11"
env_logger = "0.8.1"
num-traits = "0.2"
num-derive = "0.3"
num-derive = "0.3"
async-trait = "0.1.48"
futures = "0.3.13"
34 changes: 11 additions & 23 deletions php/Relay.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class Relay

private const TYPE_LENGTH = 1;
private const SIZE_LENGTH = 8;
private const HEADER_LENGTH = Relay::TYPE_LENGTH + Relay::SIZE_LENGTH;
private $fp;

public function __construct(string $sock, int $connectTimeout = 10)
Expand All @@ -30,11 +31,10 @@ public function __construct(string $sock, int $connectTimeout = 10)
public function next(): ?string
{
try {
$type = $this->readType();
[$type, $size] = $this->readHeader();
if ($type !== self::MESSAGE_TYPE_REQUEST) {
throw new \Exception("expected Request message, got: %d", $type);
}
$size = $this->readSize();
$body = $this->read($size);
return $body;
} catch (ReadException $e) {
Expand All @@ -53,29 +53,19 @@ public function __destruct()
fclose($this->fp);
}

private function readType(): int
private function readHeader(): array
{
$data = $this->read(self::TYPE_LENGTH);
$type = unpack("Ctype", $data);
if (false === $type) {
throw new \Exception(sprintf("could not unpack type: %s", $data));
$data = $this->read(self::HEADER_LENGTH);
$header = unpack("Ctype/Jsize", $data);
if (false === $header) {
throw new \Exception(sprintf("could not unpack header: %s", $data));
}
return $type["type"];
}

private function readSize(): int
{
$data = $this->read(self::SIZE_LENGTH);
$size = unpack("Jsize", $data);
if (false === $size) {
throw new \Exception(sprintf("could not unpack size: %s", $data));
}
return $size["size"];
return [$header["type"], $header["size"]];
}

private function sendIdentity()
{
$this->write(self::MESSAGE_TYPE_IDENTITY, pack("N", getmypid()));
$this->write(self::MESSAGE_TYPE_IDENTITY, (string)getmypid());
}

private function read(int $length): string
Expand All @@ -99,13 +89,11 @@ private function write(int $type, string $payload): void
{
switch ($type) {
case self::MESSAGE_TYPE_IDENTITY:
fwrite($this->fp, pack("C", $type));
fwrite($this->fp, $payload);
fwrite($this->fp, pack("CJ", $type, $payload));
break;

case self::MESSAGE_TYPE_RESPONSE:
fwrite($this->fp, pack("C", $type));
fwrite($this->fp, pack("J", mb_strlen($payload, "8bit")));
fwrite($this->fp, pack("CJ", $type, mb_strlen($payload, "8bit")));
fwrite($this->fp, $payload);
break;

Expand Down
58 changes: 32 additions & 26 deletions src/ipc/message.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::convert::TryInto;
use std::mem;

use anyhow::{
Expand All @@ -12,7 +13,7 @@ use tokio::io::{
AsyncWriteExt,
};

pub type Pid = u32;
pub type Pid = usize;

#[repr(u8)]
#[derive(Debug, FromPrimitive)]
Expand All @@ -36,6 +37,9 @@ pub enum Message {
}

impl Message {
const HEADER_SIZE: usize =
mem::size_of::<MessageType>() + mem::size_of::<usize>();

pub async fn write_to(
self,
dst: impl AsyncWrite,
Expand All @@ -44,9 +48,7 @@ impl Message {

match self {
Message::Identity(id) => {
let mut buf = Vec::with_capacity(
mem::size_of::<MessageType>() + mem::size_of::<Pid>(),
);
let mut buf = Vec::with_capacity(Message::HEADER_SIZE);
buf.push(MessageType::Identity as u8);
buf.extend(&id.to_be_bytes());
dst.write_all(&buf).await?;
Expand All @@ -71,9 +73,7 @@ impl Message {
) -> Result<()> {
tokio::pin!(dst);

let mut header = Vec::with_capacity(
mem::size_of::<MessageType>() + mem::size_of::<usize>(),
);
let mut header = Vec::with_capacity(Message::HEADER_SIZE);
header.push(ty as u8);
header.extend(&buf.len().to_be_bytes());
dst.write_all(&header).await?;
Expand All @@ -87,31 +87,37 @@ impl Message {
pub async fn read_from(src: impl AsyncRead) -> Result<Message> {
tokio::pin!(src);

// TODO: read full header, that contains type+length for req&res.
let ty = MessageType::from_u8(src.read_u8().await?)
.ok_or_else(|| anyhow!("unexpected message type"))?;
let mut header = vec![0u8; Message::HEADER_SIZE];
src.read_exact(&mut header).await?;

let ty = MessageType::from_u8(
*header
.first()
.ok_or_else(|| anyhow!("missing message type"))?,
)
.ok_or_else(|| anyhow!("unexpected message type"))?;
let size = usize::from_be_bytes(
header[mem::size_of::<MessageType>()..].try_into()?,
);

return match ty {
MessageType::Identity => {
let mut buf = [0; mem::size_of::<Pid>()];
src.read_exact(&mut buf).await?;
Ok(Message::Identity(Pid::from_be_bytes(buf)))
}
MessageType::Request => {
read_u8_vec(src).await.map(Request).map(Message::Request)
}
MessageType::Response => {
read_u8_vec(src).await.map(Response).map(Message::Response)
}
MessageType::Identity => Ok(Message::Identity(size as Pid)),
MessageType::Request => read_u8_vec(size, src)
.await
.map(Request)
.map(Message::Request),
MessageType::Response => read_u8_vec(size, src)
.await
.map(Response)
.map(Message::Response),
};

async fn read_u8_vec(src: impl AsyncRead) -> Result<Vec<u8>> {
async fn read_u8_vec(
size: usize,
src: impl AsyncRead,
) -> Result<Vec<u8>> {
tokio::pin!(src);

let mut size_buf = [0; mem::size_of::<usize>()];
src.read_exact(&mut size_buf).await?;
let size = usize::from_be_bytes(size_buf);

let mut buf = vec![0; size];
src.read_exact(&mut buf).await?;

Expand Down
14 changes: 9 additions & 5 deletions src/ipc/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ use tokio::net::{
UnixListener,
UnixStream,
};
use tokio::stream::{
use tokio::sync::mpsc;
use tokio::time::timeout;
use tokio_stream::{
wrappers::{
UnboundedReceiverStream,
UnixListenerStream,
},
Stream,
StreamExt,
};
use tokio::sync::mpsc;
use tokio::time::timeout;

use super::message::{
Message,
Expand Down Expand Up @@ -69,7 +73,7 @@ impl Connection {

pub fn listen(path: &str) -> Result<impl Stream<Item = Connection> + Unpin> {
let _ = std::fs::remove_file(path);
let mut listener = UnixListener::bind(path)?;
let mut listener = UnixListenerStream::new(UnixListener::bind(path)?);
let (tx, rx) = mpsc::unbounded_channel();

tokio::spawn(async move {
Expand Down Expand Up @@ -104,7 +108,7 @@ pub fn listen(path: &str) -> Result<impl Stream<Item = Connection> + Unpin> {
}
});

Ok(rx)
Ok(UnboundedReceiverStream::new(rx))
}

#[cfg(test)]
Expand Down
39 changes: 19 additions & 20 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,21 @@ use std::{

use anyhow::Result;
use env_logger::Env;
use hyper::service::{
make_service_fn,
service_fn,
use hyper::{
service::{
make_service_fn,
service_fn,
},
StatusCode,
};
use hyper::{
Body,
Method,
Request,
Response,
Server,
StatusCode,
};
use tokio::sync::Mutex;

#[macro_use]
extern crate num_derive;
extern crate test;
Expand All @@ -29,20 +31,16 @@ mod opt;
mod worker;

async fn handle(
worker: Arc<Mutex<worker::Worker>>,
req: Request<Body>,
pool: Arc<impl worker::pool::Pool>,
) -> Result<Response<Body>> {
match (req.method(), req.uri().path()) {
(&Method::GET, path) if path.starts_with("/hello/") => {
let name = path.trim_start_matches("/hello/");

let mut worker = worker.lock().await;
let response =
worker.exec(&format!(r#"{{"name":"{}"}}"#, name)).await?;

pool.exec(format!(r#"{{"name":"{}"}}"#, name)).await?;
Ok(Response::new(Body::from(response)))
}

_ => {
let mut not_found = Response::default();
*not_found.status_mut() = StatusCode::NOT_FOUND;
Expand All @@ -57,20 +55,21 @@ async fn main() -> Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("info"))
.init();

let connections = ipc::listen(&opts.unix_socket)?;
let linker = worker::Linker::new(Box::pin(connections));
let worker =
worker::Worker::new(&opts.worker_script, &opts.unix_socket, linker)
.await?;
let worker = Arc::new(Mutex::new(worker));

let pool = Arc::new(
worker::pool::Static::new(
&opts.unix_socket,
&opts.worker_script,
opts.worker_count,
)
.await?,
);
let addr = opts.http_listen.parse()?;

let make_svc = make_service_fn(move |_| {
let worker = worker.clone();
let pool = pool.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req| {
handle(worker.clone(), req)
handle(req, pool.clone())
}))
}
});
Expand Down
4 changes: 4 additions & 0 deletions src/opt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ pub struct Opt {
/// PHP Worker script to use.
#[structopt(long, default_value = "worker.php")]
pub worker_script: String,

/// PHP Worker count.
#[structopt(long, default_value = "60")]
pub worker_count: usize,
}

impl Opt {
Expand Down
8 changes: 4 additions & 4 deletions src/worker/linker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use anyhow::{
anyhow,
Result,
};
use tokio::stream::{
Stream,
StreamExt,
};
use tokio::sync::{
oneshot,
Mutex,
};
use tokio_stream::{
Stream,
StreamExt,
};

use crate::ipc::{
Connection,
Expand Down
1 change: 1 addition & 0 deletions src/worker/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod linker;
pub mod pool;
#[allow(clippy::module_inception)]
mod worker;

Expand Down
14 changes: 14 additions & 0 deletions src/worker/pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use anyhow::Result;
use async_trait::async_trait;

mod static_;

pub use static_::Static;

#[async_trait]
pub trait Pool {
async fn exec(
&self,
req: String,
) -> Result<String>;
}
Loading