Skip to content

Commit

Permalink
refactor: replace LogHandler with PipelineHandler (GreptimeTeam#5096)
Browse files Browse the repository at this point in the history
* refactor: replace LogHandler with PipelineHandler

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change method name

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename transform to insert

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
waynexia authored Dec 4, 2024
1 parent 5092f5f commit 7d8b256
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 26 deletions.
6 changes: 3 additions & 3 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ use servers::prometheus_handler::PrometheusHandler;
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::{
InfluxdbLineProtocolHandler, LogHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler,
PromStoreProtocolHandler, ScriptHandler,
InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler,
PipelineHandler, PromStoreProtocolHandler, ScriptHandler,
};
use servers::server::ServerHandlers;
use session::context::QueryContextRef;
Expand Down Expand Up @@ -98,7 +98,7 @@ pub trait FrontendInstance:
+ OpenTelemetryProtocolHandler
+ ScriptHandler
+ PrometheusHandler
+ LogHandler
+ PipelineHandler
+ Send
+ Sync
+ 'static
Expand Down
10 changes: 3 additions & 7 deletions src/frontend/src/instance/log_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,15 @@ use servers::error::{
AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult,
};
use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
use servers::query_handler::LogHandler;
use servers::query_handler::PipelineHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;

use crate::instance::Instance;

#[async_trait]
impl LogHandler for Instance {
async fn insert_logs(
&self,
log: RowInsertRequests,
ctx: QueryContextRef,
) -> ServerResult<Output> {
impl PipelineHandler for Instance {
async fn insert(&self, log: RowInsertRequests, ctx: QueryContextRef) -> ServerResult<Output> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
Expand Down
6 changes: 3 additions & 3 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ use crate::metrics_handler::MetricsHandler;
use crate::prometheus_handler::PrometheusHandlerRef;
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
use crate::query_handler::{
InfluxdbLineProtocolHandlerRef, LogHandlerRef, OpenTelemetryProtocolHandlerRef,
OpentsdbProtocolHandlerRef, PromStoreProtocolHandlerRef, ScriptHandlerRef,
InfluxdbLineProtocolHandlerRef, OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef,
PipelineHandlerRef, PromStoreProtocolHandlerRef, ScriptHandlerRef,
};
use crate::server::Server;

Expand Down Expand Up @@ -576,7 +576,7 @@ impl HttpServerBuilder {

pub fn with_log_ingest_handler(
self,
handler: LogHandlerRef,
handler: PipelineHandlerRef,
validator: Option<LogValidatorRef>,
ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
) -> Self {
Expand Down
10 changes: 5 additions & 5 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use crate::metrics::{
METRIC_LOKI_LOGS_INGESTION_ELAPSED, METRIC_SUCCESS_VALUE,
};
use crate::prom_store;
use crate::query_handler::LogHandlerRef;
use crate::query_handler::PipelineHandlerRef;

const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_";
const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity";
Expand Down Expand Up @@ -502,7 +502,7 @@ pub async fn loki_ingest(
};

let handler = log_state.log_handler;
let output = handler.insert_logs(ins_reqs, ctx).await;
let output = handler.insert(ins_reqs, ctx).await;

if let Ok(Output {
data: OutputData::AffectedRows(rows),
Expand Down Expand Up @@ -599,7 +599,7 @@ fn extract_pipeline_value_by_content_type(
}

async fn ingest_logs_inner(
state: LogHandlerRef,
state: PipelineHandlerRef,
pipeline_name: String,
version: PipelineVersion,
table_name: String,
Expand Down Expand Up @@ -664,7 +664,7 @@ async fn ingest_logs_inner(
let insert_requests = RowInsertRequests {
inserts: vec![insert_request],
};
let output = state.insert_logs(insert_requests, query_ctx).await;
let output = state.insert(insert_requests, query_ctx).await;

if let Ok(Output {
data: OutputData::AffectedRows(rows),
Expand Down Expand Up @@ -701,7 +701,7 @@ pub type LogValidatorRef = Arc<dyn LogValidator + 'static>;
/// axum state struct to hold log handler and validator
#[derive(Clone)]
pub struct LogState {
pub log_handler: LogHandlerRef,
pub log_handler: PipelineHandlerRef,
pub log_validator: Option<LogValidatorRef>,
pub ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
}
Expand Down
18 changes: 10 additions & 8 deletions src/servers/src/query_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub type InfluxdbLineProtocolHandlerRef = Arc<dyn InfluxdbLineProtocolHandler +
pub type PromStoreProtocolHandlerRef = Arc<dyn PromStoreProtocolHandler + Send + Sync>;
pub type OpenTelemetryProtocolHandlerRef = Arc<dyn OpenTelemetryProtocolHandler + Send + Sync>;
pub type ScriptHandlerRef = Arc<dyn ScriptHandler + Send + Sync>;
pub type LogHandlerRef = Arc<dyn LogHandler + Send + Sync>;
pub type PipelineHandlerRef = Arc<dyn PipelineHandler + Send + Sync>;

#[async_trait]
pub trait ScriptHandler {
Expand Down Expand Up @@ -107,7 +107,7 @@ pub trait PromStoreProtocolHandler {
}

#[async_trait]
pub trait OpenTelemetryProtocolHandler: LogHandler {
pub trait OpenTelemetryProtocolHandler: PipelineHandler {
/// Handling opentelemetry metrics request
async fn metrics(
&self,
Expand All @@ -132,14 +132,16 @@ pub trait OpenTelemetryProtocolHandler: LogHandler {
) -> Result<Output>;
}

/// LogHandler is responsible for handling log related requests.
/// PipelineHandler is responsible for handling pipeline related requests.
///
/// It should be able to insert logs and manage pipelines.
/// The pipeline is a series of transformations that can be applied to logs.
/// The pipeline is stored in the database and can be retrieved by name.
/// The "Pipeline" is a series of transformations that can be applied to unstructured
/// data like logs. This handler is responsible to manage pipelines and accept data for
/// processing.
///
/// The pipeline is stored in the database and can be retrieved by its name.
#[async_trait]
pub trait LogHandler {
async fn insert_logs(&self, log: RowInsertRequests, ctx: QueryContextRef) -> Result<Output>;
pub trait PipelineHandler {
async fn insert(&self, input: RowInsertRequests, ctx: QueryContextRef) -> Result<Output>;

async fn get_pipeline(
&self,
Expand Down

0 comments on commit 7d8b256

Please sign in to comment.