Skip to content

Commit

Permalink
Expose HTTP options to Python (#172)
Browse files Browse the repository at this point in the history
  • Loading branch information
gi0baro authored Jan 17, 2024
1 parent 571d443 commit 38cf97a
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 59 deletions.
85 changes: 75 additions & 10 deletions granian/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from .__version__ import __version__
from .constants import HTTPModes, Interfaces, Loops, ThreadModes
from .http import HTTP1Settings, HTTP2Settings
from .log import LogLevels
from .server import Granian

Expand All @@ -21,18 +22,68 @@ def version_callback(value: bool):

@cli.command()
def main(
app: str = typer.Argument(..., help='Application target to serve.'),
host: str = typer.Option('127.0.0.1', help='Host address to bind to.'),
app: str = typer.Argument(..., help='Application target to serve'),
host: str = typer.Option('127.0.0.1', help='Host address to bind to'),
port: int = typer.Option(8000, help='Port to bind to.'),
interface: Interfaces = typer.Option(Interfaces.RSGI.value, help='Application interface type.'),
http: HTTPModes = typer.Option(HTTPModes.auto.value, help='HTTP version.'),
interface: Interfaces = typer.Option(Interfaces.RSGI.value, help='Application interface type'),
http: HTTPModes = typer.Option(HTTPModes.auto.value, help='HTTP version'),
websockets: bool = typer.Option(True, '--ws/--no-ws', help='Enable websockets handling', show_default='enabled'),
workers: int = typer.Option(1, min=1, help='Number of worker processes.'),
threads: int = typer.Option(1, min=1, help='Number of threads.'),
threading_mode: ThreadModes = typer.Option(ThreadModes.workers.value, help='Threading mode to use.'),
workers: int = typer.Option(1, min=1, help='Number of worker processes'),
threads: int = typer.Option(1, min=1, help='Number of threads'),
threading_mode: ThreadModes = typer.Option(ThreadModes.workers.value, help='Threading mode to use'),
loop: Loops = typer.Option(Loops.auto.value, help='Event loop implementation'),
loop_opt: bool = typer.Option(False, '--opt/--no-opt', help='Enable loop optimizations', show_default='disabled'),
backlog: int = typer.Option(1024, min=128, help='Maximum number of connections to hold in backlog.'),
backlog: int = typer.Option(1024, min=128, help='Maximum number of connections to hold in backlog'),
http1_buffer_size: int = typer.Option(
HTTP1Settings.max_buffer_size, min=8192, help='Set the maximum buffer size for HTTP/1 connections'
),
http1_keep_alive: bool = typer.Option(
HTTP1Settings.keep_alive,
'--http1-keep-alive/--no-http1-keep-alive',
show_default='enabled',
help='Enables or disables HTTP/1 keep-alive',
),
http1_pipeline_flush: bool = typer.Option(
HTTP1Settings.pipeline_flush,
'--http1-pipeline-flush/--no-http1-pipeline-flush',
show_default='disabled',
help='Aggregates HTTP/1 flushes to better support pipelined responses (experimental)',
),
http2_adaptive_window: bool = typer.Option(
HTTP2Settings.adaptive_window,
'--http2-adaptive-window/--no-http2-adaptive-window',
show_default='disabled',
help='Sets whether to use an adaptive flow control for HTTP2',
),
http2_initial_connection_window_size: int = typer.Option(
HTTP2Settings.initial_connection_window_size, help='Sets the max connection-level flow control for HTTP2'
),
http2_initial_stream_window_size: int = typer.Option(
HTTP2Settings.initial_stream_window_size,
help='Sets the `SETTINGS_INITIAL_WINDOW_SIZE` option for HTTP2 stream-level flow control',
),
http2_keep_alive_interval: Optional[int] = typer.Option(
HTTP2Settings.keep_alive_interval,
help='Sets an interval for HTTP2 Ping frames should be sent to keep a connection alive',
show_default='disabled',
),
http2_keep_alive_timeout: int = typer.Option(
HTTP2Settings.keep_alive_timeout,
help='Sets a timeout for receiving an acknowledgement of the HTTP2 keep-alive ping',
),
http2_max_concurrent_streams: int = typer.Option(
HTTP2Settings.max_concurrent_streams,
help='Sets the SETTINGS_MAX_CONCURRENT_STREAMS option for HTTP2 connections',
),
http2_max_frame_size: int = typer.Option(
HTTP2Settings.max_frame_size, help='Sets the maximum frame size to use for HTTP2'
),
http2_max_headers_size: int = typer.Option(
HTTP2Settings.max_headers_size, help='Sets the max size of received header frames'
),
http2_max_send_buffer_size: int = typer.Option(
HTTP2Settings.max_send_buffer_size, help='Set the maximum write buffer size for each HTTP/2 stream'
),
log_enabled: bool = typer.Option(True, '--log/--no-log', help='Enable logging', show_default='enabled'),
log_level: LogLevels = typer.Option(LogLevels.info.value, help='Log level', case_sensitive=False),
log_config: Optional[Path] = typer.Option(
Expand All @@ -46,14 +97,14 @@ def main(
),
url_path_prefix: Optional[str] = typer.Option(None, help='URL path prefix the app is mounted on'),
reload: bool = typer.Option(
False, '--reload/--no-reload', help="Enable auto reload on application's files changes"
False, '--reload/--no-reload', help="Enable auto reload on application's files changes", show_default='disabled'
),
_: Optional[bool] = typer.Option(
None,
'--version',
callback=version_callback,
is_eager=True,
help='Shows the version and exit.',
help='Shows the version and exit',
allow_from_autoenv=False,
),
):
Expand All @@ -80,6 +131,20 @@ def main(
http=http,
websockets=websockets,
backlog=backlog,
http1_settings=HTTP1Settings(
keep_alive=http1_keep_alive, max_buffer_size=http1_buffer_size, pipeline_flush=http1_pipeline_flush
),
http2_settings=HTTP2Settings(
adaptive_window=http2_adaptive_window,
initial_connection_window_size=http2_initial_connection_window_size,
initial_stream_window_size=http2_initial_stream_window_size,
keep_alive_interval=http2_keep_alive_interval,
keep_alive_timeout=http2_keep_alive_timeout,
max_concurrent_streams=http2_max_concurrent_streams,
max_frame_size=http2_max_frame_size,
max_headers_size=http2_max_headers_size,
max_send_buffer_size=http2_max_send_buffer_size,
),
log_enabled=log_enabled,
log_level=log_level,
log_dictconfig=log_dictconfig,
Expand Down
22 changes: 22 additions & 0 deletions granian/http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from dataclasses import dataclass
from typing import Optional


@dataclass
class HTTP1Settings:
keep_alive: bool = True
max_buffer_size: int = 8192 + 4096 * 100
pipeline_flush: bool = False


@dataclass
class HTTP2Settings:
adaptive_window: bool = False
initial_connection_window_size: int = 1024 * 1024
initial_stream_window_size: int = 1024 * 1024
keep_alive_interval: Optional[int] = None
keep_alive_timeout: int = 20
max_concurrent_streams: int = 200
max_frame_size: int = 1024 * 16
max_headers_size: int = 16 * 1024 * 1024
max_send_buffer_size: int = 1024 * 400
25 changes: 16 additions & 9 deletions granian/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ._internal import load_target
from .asgi import LifespanProtocol, _callback_wrapper as _asgi_call_wrap
from .constants import HTTPModes, Interfaces, Loops, ThreadModes
from .http import HTTP1Settings, HTTP2Settings
from .log import LogLevels, configure_logging, logger
from .net import SocketHolder
from .wsgi import _callback_wrapper as _wsgi_call_wrap
Expand Down Expand Up @@ -79,7 +80,8 @@ def __init__(
http: HTTPModes = HTTPModes.auto,
websockets: bool = True,
backlog: int = 1024,
http1_buffer_size: int = 65535,
http1_settings: Optional[HTTP1Settings] = None,
http2_settings: Optional[HTTP2Settings] = None,
log_enabled: bool = True,
log_level: LogLevels = LogLevels.info,
log_dictconfig: Optional[Dict[str, Any]] = None,
Expand All @@ -101,7 +103,8 @@ def __init__(
self.http = http
self.websockets = websockets
self.backlog = max(128, backlog)
self.http1_buffer_size = http1_buffer_size
self.http1_settings = http1_settings
self.http2_settings = http2_settings
self.log_enabled = log_enabled
self.log_level = log_level
self.log_config = log_dictconfig
Expand Down Expand Up @@ -140,7 +143,8 @@ def _spawn_asgi_worker(
pthreads,
threading_mode,
http_mode,
http1_buffer_size,
http1_settings,
http2_settings,
websockets,
loop_opt,
log_enabled,
Expand Down Expand Up @@ -170,7 +174,7 @@ def _spawn_asgi_worker(
wcallback = future_watcher_wrapper(wcallback)

worker = ASGIWorker(
worker_id, sfd, threads, pthreads, http_mode, http1_buffer_size, websockets, loop_opt, *ssl_ctx
worker_id, sfd, threads, pthreads, http_mode, http1_settings, http2_settings, websockets, loop_opt, *ssl_ctx
)
serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode])
serve(wcallback, loop, contextvars.copy_context(), shutdown_event)
Expand All @@ -186,7 +190,8 @@ def _spawn_rsgi_worker(
pthreads,
threading_mode,
http_mode,
http1_buffer_size,
http1_settings,
http2_settings,
websockets,
loop_opt,
log_enabled,
Expand All @@ -211,7 +216,7 @@ def _spawn_rsgi_worker(
callback_init(loop)

worker = RSGIWorker(
worker_id, sfd, threads, pthreads, http_mode, http1_buffer_size, websockets, loop_opt, *ssl_ctx
worker_id, sfd, threads, pthreads, http_mode, http1_settings, http2_settings, websockets, loop_opt, *ssl_ctx
)
serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode])
serve(
Expand All @@ -231,7 +236,8 @@ def _spawn_wsgi_worker(
pthreads,
threading_mode,
http_mode,
http1_buffer_size,
http1_settings,
http2_settings,
websockets,
loop_opt,
log_enabled,
Expand All @@ -250,7 +256,7 @@ def _spawn_wsgi_worker(

shutdown_event = set_loop_signals(loop, [signal.SIGTERM, signal.SIGINT])

worker = WSGIWorker(worker_id, sfd, threads, pthreads, http_mode, http1_buffer_size, *ssl_ctx)
worker = WSGIWorker(worker_id, sfd, threads, pthreads, http_mode, http1_settings, http2_settings, *ssl_ctx)
serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode])
serve(_wsgi_call_wrap(callback, scope_opts), loop, contextvars.copy_context(), shutdown_event)

Expand All @@ -276,7 +282,8 @@ def _spawn_proc(self, idx, target, callback_loader, socket_loader) -> Worker:
self.pthreads,
self.threading_mode,
self.http,
self.http1_buffer_size,
self.http1_settings,
self.http2_settings,
self.websockets,
self.loop_opt,
self.log_enabled,
Expand Down
12 changes: 9 additions & 3 deletions src/asgi/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use super::http::{
handle_rtb, handle_rtb_pyw, handle_rtb_ws, handle_rtb_ws_pyw, handle_rtt, handle_rtt_pyw, handle_rtt_ws,
handle_rtt_ws_pyw,
};

use crate::conversion::{worker_http1_config_from_py, worker_http2_config_from_py};
use crate::workers::{serve_rth, serve_rth_ssl, serve_wth, serve_wth_ssl, WorkerConfig, WorkerSignal};

#[pyclass(module = "granian._granian")]
Expand Down Expand Up @@ -40,7 +42,8 @@ impl ASGIWorker {
threads=1,
pthreads=1,
http_mode="1",
http1_buffer_max=65535,
http1_opts=None,
http2_opts=None,
websockets_enabled=false,
opt_enabled=true,
ssl_enabled=false,
Expand All @@ -49,12 +52,14 @@ impl ASGIWorker {
)
)]
fn new(
py: Python,
worker_id: i32,
socket_fd: i32,
threads: usize,
pthreads: usize,
http_mode: &str,
http1_buffer_max: usize,
http1_opts: Option<PyObject>,
http2_opts: Option<PyObject>,
websockets_enabled: bool,
opt_enabled: bool,
ssl_enabled: bool,
Expand All @@ -68,7 +73,8 @@ impl ASGIWorker {
threads,
pthreads,
http_mode,
http1_buffer_max,
worker_http1_config_from_py(py, http1_opts)?,
worker_http2_config_from_py(py, http2_opts)?,
websockets_enabled,
opt_enabled,
ssl_enabled,
Expand Down
49 changes: 49 additions & 0 deletions src/conversion.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use pyo3::prelude::*;
use std::ops::{Deref, DerefMut};

use crate::workers::{HTTP1Config, HTTP2Config};

pub(crate) struct BytesToPy(pub hyper::body::Bytes);

impl Deref for BytesToPy {
Expand Down Expand Up @@ -30,3 +32,50 @@ impl ToPyObject for BytesToPy {
(&self[..]).into_py(py)
}
}

pub(crate) fn worker_http1_config_from_py(py: Python, cfg: Option<PyObject>) -> PyResult<HTTP1Config> {
let ret = match cfg {
Some(cfg) => HTTP1Config {
keep_alive: cfg.getattr(py, "keep_alive")?.extract(py)?,
max_buffer_size: cfg.getattr(py, "max_buffer_size")?.extract(py)?,
pipeline_flush: cfg.getattr(py, "pipeline_flush")?.extract(py)?,
},
None => HTTP1Config {
keep_alive: true,
max_buffer_size: 8192 + 4096 * 100,
pipeline_flush: false,
},
};
Ok(ret)
}

pub(crate) fn worker_http2_config_from_py(py: Python, cfg: Option<PyObject>) -> PyResult<HTTP2Config> {
let ret = match cfg {
Some(cfg) => HTTP2Config {
adaptive_window: cfg.getattr(py, "adaptive_window")?.extract(py)?,
initial_connection_window_size: cfg.getattr(py, "initial_connection_window_size")?.extract(py)?,
initial_stream_window_size: cfg.getattr(py, "initial_stream_window_size")?.extract(py)?,
keep_alive_interval: match cfg.getattr(py, "keep_alive_interval")?.extract(py) {
Ok(v) => Some(core::time::Duration::from_secs(v)),
_ => None,
},
keep_alive_timeout: core::time::Duration::from_secs(cfg.getattr(py, "keep_alive_timeout")?.extract(py)?),
max_concurrent_streams: cfg.getattr(py, "max_concurrent_streams")?.extract(py)?,
max_frame_size: cfg.getattr(py, "max_frame_size")?.extract(py)?,
max_headers_size: cfg.getattr(py, "max_headers_size")?.extract(py)?,
max_send_buffer_size: cfg.getattr(py, "max_send_buffer_size")?.extract(py)?,
},
None => HTTP2Config {
adaptive_window: false,
initial_connection_window_size: 1024 * 1024,
initial_stream_window_size: 1024 * 1024,
keep_alive_interval: None,
keep_alive_timeout: core::time::Duration::from_secs(20),
max_concurrent_streams: 200,
max_frame_size: 1024 * 16,
max_headers_size: 16 * 1024 * 1024,
max_send_buffer_size: 1024 * 400,
},
};
Ok(ret)
}
12 changes: 9 additions & 3 deletions src/rsgi/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use super::http::{
handle_rtb, handle_rtb_pyw, handle_rtb_ws, handle_rtb_ws_pyw, handle_rtt, handle_rtt_pyw, handle_rtt_ws,
handle_rtt_ws_pyw,
};

use crate::conversion::{worker_http1_config_from_py, worker_http2_config_from_py};
use crate::workers::{serve_rth, serve_rth_ssl, serve_wth, serve_wth_ssl, WorkerConfig, WorkerSignal};

#[pyclass(module = "granian._granian")]
Expand Down Expand Up @@ -40,7 +42,8 @@ impl RSGIWorker {
threads=1,
pthreads=1,
http_mode="1",
http1_buffer_max=65535,
http1_opts=None,
http2_opts=None,
websockets_enabled=false,
opt_enabled=true,
ssl_enabled=false,
Expand All @@ -49,12 +52,14 @@ impl RSGIWorker {
)
)]
fn new(
py: Python,
worker_id: i32,
socket_fd: i32,
threads: usize,
pthreads: usize,
http_mode: &str,
http1_buffer_max: usize,
http1_opts: Option<PyObject>,
http2_opts: Option<PyObject>,
websockets_enabled: bool,
opt_enabled: bool,
ssl_enabled: bool,
Expand All @@ -68,7 +73,8 @@ impl RSGIWorker {
threads,
pthreads,
http_mode,
http1_buffer_max,
worker_http1_config_from_py(py, http1_opts)?,
worker_http2_config_from_py(py, http2_opts)?,
websockets_enabled,
opt_enabled,
ssl_enabled,
Expand Down
Loading

0 comments on commit 38cf97a

Please sign in to comment.