Skip to content

Commit

Permalink
feat: made a prototype with actix
Browse files Browse the repository at this point in the history
  • Loading branch information
frolvanya committed Aug 19, 2024
1 parent b9f6186 commit 6cec5cf
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 180 deletions.
17 changes: 5 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion rpc-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ anyhow = "1.0.70"
assert-json-diff = { version = "2.0.2", optional = true }
borsh = "1.3.1"
chrono = "0.4.19"
easy-ext = "1"
erased-serde = "0.4.2"
futures = "0.3.24"
futures-locks = "0.7.1"
Expand Down
220 changes: 85 additions & 135 deletions rpc-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
use jsonrpc_v2::{Data, Params, Router, Server};
use actix_web::{
web::{self},
App, HttpResponse, HttpServer,
};
use errors::RPCError;
//use jsonrpc_v2::{Data, Params, Router, Server};
use mimalloc::MiMalloc;
use near_jsonrpc::{primitives::errors::RpcErrorKind, RpcRequest};
use serde_json::Value;

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
Expand All @@ -19,52 +26,80 @@ mod utils;
// Categories for logging
pub(crate) const RPC_SERVER: &str = "read_rpc_server";

#[easy_ext::ext(WithMethodAndMetrics)]
impl<R> jsonrpc_v2::ServerBuilder<R>
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct Request {
jsonrpc: Value,
pub method: String,
#[serde(default, skip_serializing_if = "Value::is_null")]
pub params: Value,
pub id: Value,
}

/// Serialises response of a query into JSON to be sent to the client.
///
/// Returns an internal server error if the value fails to serialise.
fn serialize_response(value: impl serde::ser::Serialize) -> Result<Value, RPCError> {
serde_json::to_value(value).map_err(|err| RPCError::internal_error(&err.to_string()))
}

/// Processes a specific method call.
///
/// The arguments for the method (which is implemented by the `callback`) will
/// be parsed (using [`RpcRequest::parse`]) from the `request.params`. Ok
/// results of the `callback` will be converted into a [`Value`] via serde
/// serialisation.
async fn process_method_call<R, V, E, F>(
request: Request,
callback: impl FnOnce(R) -> F,
) -> Result<Value, RPCError>
where
R: Router,
R: RpcRequest,
V: serde::ser::Serialize,
RPCError: std::convert::From<E>,
F: std::future::Future<Output = Result<V, E>>,
{
fn with_method_and_metrics<F, T, P>(self, name: &'static str, handler: &'static F) -> Self
where
F: jsonrpc_v2::Factory<T, errors::RPCError, (Data<config::ServerContext>, Params<P>)>
+ Send
+ Sync
+ 'static,
T: serde::Serialize + Send + 'static,
P: serde::de::DeserializeOwned + Send + 'static,
{
self.with_method(name, |data, params| async {
let result = handler.call((data, params)).await;
serialize_response(callback(R::parse(request.params)?).await?)
}

if let Err(err) = &result {
match &err.error_struct {
Some(near_jsonrpc::primitives::errors::RpcErrorKind::RequestValidationError(
near_jsonrpc::primitives::errors::RpcRequestValidationErrorKind::ParseError { .. },
)) => {
metrics::METHOD_ERRORS_TOTAL
.with_label_values(&[name, "PARSE_ERROR"])
.inc();
}
Some(near_jsonrpc::primitives::errors::RpcErrorKind::HandlerError(error_struct)) => {
if let Some(stringified_error_name) = error_struct.get("name").and_then(serde_json::Value::as_str) {
metrics::METHOD_ERRORS_TOTAL
.with_label_values(&[name, stringified_error_name])
.inc();
}
}
Some(near_jsonrpc::primitives::errors::RpcErrorKind::InternalError(_)) => {
metrics::METHOD_ERRORS_TOTAL
.with_label_values(&[name, "INTERNAL_ERROR"])
.inc();
}
Some(near_jsonrpc::primitives::errors::RpcErrorKind::RequestValidationError(_))
| None => {}
async fn rpc_handler(
data: web::Data<config::ServerContext>,
payload: web::Json<Request>,
) -> HttpResponse {
let result = match payload.method.as_str() {
"block" => {
process_method_call(payload.0, |params| {
modules::blocks::methods::block(data, params)
})
.await
}
"view_receipt_record" => {
process_method_call(payload.0, |params| {
modules::receipts::methods::view_receipt_record(data, params)
})
.await
}
_ => return HttpResponse::NotFound().finish(),
};

// TODO: add metrics here
let mut response = match &result {
Ok(_) => HttpResponse::Ok(),
Err(err) => match &err.error_struct {
Some(RpcErrorKind::RequestValidationError(_)) => HttpResponse::BadRequest(),
Some(RpcErrorKind::HandlerError(error_struct)) => {
if error_struct["name"] == "TIMEOUT_ERROR" {
HttpResponse::RequestTimeout()
} else {
HttpResponse::Ok()
}
}
Some(RpcErrorKind::InternalError(_)) => HttpResponse::InternalServerError(),
None => HttpResponse::Ok(),
},
};

result
})
}
response.json(result)
}

#[actix_web::main]
Expand All @@ -91,8 +126,9 @@ async fn main() -> anyhow::Result<()> {

let server_port = rpc_server_config.general.server_port;

let server_context =
config::ServerContext::init(rpc_server_config.clone(), near_rpc_client.clone()).await?;
let server_context = web::Data::new(
config::ServerContext::init(rpc_server_config.clone(), near_rpc_client.clone()).await?,
);

let blocks_cache_clone = std::sync::Arc::clone(&server_context.blocks_cache);
let blocks_info_by_finality_clone =
Expand Down Expand Up @@ -152,105 +188,19 @@ async fn main() -> anyhow::Result<()> {
});
}

let rpc = Server::new()
.with_data(Data::new(server_context.clone()))
// custom requests methods
.with_method_and_metrics(
"view_state_paginated",
&modules::state::methods::view_state_paginated,
)
.with_method_and_metrics(
"view_receipt_record",
&modules::receipts::methods::view_receipt_record,
)
// requests methods
.with_method_and_metrics("query", &modules::queries::methods::query)
// basic requests methods
.with_method_and_metrics("block", &modules::blocks::methods::block)
.with_method_and_metrics(
"broadcast_tx_async",
&modules::transactions::methods::broadcast_tx_async,
)
.with_method_and_metrics(
"broadcast_tx_commit",
&modules::transactions::methods::broadcast_tx_commit,
)
.with_method_and_metrics("chunk", &modules::blocks::methods::chunk)
.with_method_and_metrics("gas_price", &modules::gas::methods::gas_price)
.with_method_and_metrics("health", &modules::network::methods::health)
.with_method_and_metrics(
"light_client_proof",
&modules::clients::methods::light_client_proof,
)
.with_method_and_metrics(
"next_light_client_block",
&modules::clients::methods::next_light_client_block,
)
.with_method_and_metrics("network_info", &modules::network::methods::network_info)
.with_method_and_metrics("send_tx", &modules::transactions::methods::send_tx)
.with_method_and_metrics("status", &modules::network::methods::status)
.with_method_and_metrics("tx", &modules::transactions::methods::tx)
.with_method_and_metrics("validators", &modules::network::methods::validators)
.with_method_and_metrics("client_config", &modules::network::methods::client_config)
.with_method_and_metrics(
"EXPERIMENTAL_changes",
&modules::blocks::methods::changes_in_block_by_type,
)
.with_method_and_metrics(
"EXPERIMENTAL_changes_in_block",
&modules::blocks::methods::changes_in_block,
)
.with_method_and_metrics(
"EXPERIMENTAL_genesis_config",
&modules::network::methods::genesis_config,
)
.with_method_and_metrics(
"EXPERIMENTAL_light_client_proof",
&modules::clients::methods::light_client_proof,
)
.with_method_and_metrics(
"EXPERIMENTAL_protocol_config",
&modules::network::methods::protocol_config,
)
.with_method_and_metrics("EXPERIMENTAL_receipt", &modules::receipts::methods::receipt)
.with_method_and_metrics(
"EXPERIMENTAL_tx_status",
&modules::transactions::methods::tx_status,
)
.with_method_and_metrics(
"EXPERIMENTAL_validators_ordered",
&modules::network::methods::validators_ordered,
)
.with_method_and_metrics(
"EXPERIMENTAL_maintenance_windows",
&modules::network::methods::maintenance_windows,
)
.with_method_and_metrics(
"EXPERIMENTAL_split_storage_info",
&modules::network::methods::split_storage_info,
)
.finish();

// TODO: decide what to do with metrics here
// Insert all rpc methods to the hashmap after init the server
metrics::RPC_METHODS.insert(rpc.router.routers()).await;

actix_web::HttpServer::new(move || {
let rpc = rpc.clone();
// metrics::RPC_METHODS.insert(rpc.router.routers()).await;

// Configure CORS
HttpServer::new(move || {
let cors = actix_cors::Cors::permissive();

actix_web::App::new()
App::new()
.wrap(cors)
.wrap(tracing_actix_web::TracingLogger::default())
// wrapper to count rpc total requests
.wrap(middlewares::RequestsCounters)
.app_data(actix_web::web::Data::new(server_context.clone()))
.service(
actix_web::web::service("/")
.guard(actix_web::guard::Post())
.finish(rpc.into_web_service()),
)
.app_data(server_context.clone())
.service(web::scope("/").route("", web::post().to(rpc_handler)))
.service(metrics::get_metrics)
.service(health::get_health_status)
})
Expand Down
2 changes: 1 addition & 1 deletion rpc-server/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ lazy_static! {
/// It should help to analyze the most popular requests
/// And build s better caching strategy
pub async fn increase_request_category_metrics(
data: &jsonrpc_v2::Data<crate::config::ServerContext>,
data: &actix_web::web::Data<crate::config::ServerContext>,
block_reference: &near_primitives::types::BlockReference,
method_name: &str,
block_height: Option<u64>,
Expand Down
Loading

0 comments on commit 6cec5cf

Please sign in to comment.