From 62b4d137dd108e348d1d40b8dd8d05fa7ad7e76c Mon Sep 17 00:00:00 2001 From: Winter Zhang Date: Thu, 19 Sep 2024 13:59:06 +0800 Subject: [PATCH 1/6] chore(executor): support for data exchange between different processors within the pipeline (#16467) chore(executor): add exchange for pipeline --- src/query/pipeline/core/src/lib.rs | 1 + src/query/pipeline/core/src/pipeline.rs | 52 +++++ src/query/pipeline/core/src/processors/mod.rs | 3 + .../pipeline/core/src/processors/processor.rs | 2 +- .../core/src/processors/shuffle_processor.rs | 215 ++++++++++++++++++ 5 files changed, 272 insertions(+), 1 deletion(-) diff --git a/src/query/pipeline/core/src/lib.rs b/src/query/pipeline/core/src/lib.rs index be296444ca25..8e0a3ab7dd59 100644 --- a/src/query/pipeline/core/src/lib.rs +++ b/src/query/pipeline/core/src/lib.rs @@ -14,6 +14,7 @@ #![feature(once_cell_try)] #![feature(variant_count)] +#![feature(associated_type_defaults)] #![allow(clippy::arc_with_non_send_sync)] #![allow(clippy::useless_asref)] diff --git a/src/query/pipeline/core/src/pipeline.rs b/src/query/pipeline/core/src/pipeline.rs index 7fe725ddd9de..9d74f386490e 100644 --- a/src/query/pipeline/core/src/pipeline.rs +++ b/src/query/pipeline/core/src/pipeline.rs @@ -33,8 +33,11 @@ use crate::finished_chain::FinishedCallbackChain; use crate::pipe::Pipe; use crate::pipe::PipeItem; use crate::processors::DuplicateProcessor; +use crate::processors::Exchange; use crate::processors::InputPort; +use crate::processors::MergePartitionProcessor; use crate::processors::OutputPort; +use crate::processors::PartitionProcessor; use crate::processors::PlanScope; use crate::processors::PlanScopeGuard; use crate::processors::ProcessorPtr; @@ -444,6 +447,55 @@ impl Pipeline { } } + pub fn exchange(&mut self, n: usize, exchange: Arc) { + if let Some(pipe) = self.pipes.last() { + if pipe.output_length < 1 { + return; + } + + let input_len = pipe.output_length; + let mut items = Vec::with_capacity(input_len); + + for _index in 0..input_len { + let input = InputPort::create(); + let outputs: Vec<_> = (0..n).map(|_| OutputPort::create()).collect(); + items.push(PipeItem::create( + PartitionProcessor::create(input.clone(), outputs.clone(), exchange.clone()), + vec![input], + outputs, + )); + } + + // partition data block + self.add_pipe(Pipe::create(input_len, input_len * n, items)); + + let mut reorder_edges = Vec::with_capacity(input_len * n); + for index in 0..input_len * n { + reorder_edges.push((index % n) * input_len + (index / n)); + } + + self.reorder_inputs(reorder_edges); + + let mut items = Vec::with_capacity(input_len); + for _index in 0..n { + let output = OutputPort::create(); + let inputs: Vec<_> = (0..input_len).map(|_| InputPort::create()).collect(); + items.push(PipeItem::create( + MergePartitionProcessor::create( + inputs.clone(), + output.clone(), + exchange.clone(), + ), + inputs, + vec![output], + )); + } + + // merge partition + self.add_pipe(Pipe::create(input_len * n, n, items)) + } + } + #[track_caller] pub fn set_on_init Result<()> + Send + Sync + 'static>(&mut self, f: F) { let location = std::panic::Location::caller(); diff --git a/src/query/pipeline/core/src/processors/mod.rs b/src/query/pipeline/core/src/processors/mod.rs index 61e6f7005a60..c3b0e1772a34 100644 --- a/src/query/pipeline/core/src/processors/mod.rs +++ b/src/query/pipeline/core/src/processors/mod.rs @@ -37,4 +37,7 @@ pub use profile::PlanScope; pub use profile::PlanScopeGuard; pub use resize_processor::create_resize_item; pub use resize_processor::ResizeProcessor; +pub use shuffle_processor::Exchange; +pub use shuffle_processor::MergePartitionProcessor; +pub use shuffle_processor::PartitionProcessor; pub use shuffle_processor::ShuffleProcessor; diff --git a/src/query/pipeline/core/src/processors/processor.rs b/src/query/pipeline/core/src/processors/processor.rs index 3112c02b513b..ce70053b80de 100644 --- a/src/query/pipeline/core/src/processors/processor.rs +++ b/src/query/pipeline/core/src/processors/processor.rs @@ -34,7 +34,7 @@ pub enum Event { Finished, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum EventCause { Other, // Which input of the processor triggers the event diff --git a/src/query/pipeline/core/src/processors/shuffle_processor.rs b/src/query/pipeline/core/src/processors/shuffle_processor.rs index ebfeefb6479e..d998c22a2481 100644 --- a/src/query/pipeline/core/src/processors/shuffle_processor.rs +++ b/src/query/pipeline/core/src/processors/shuffle_processor.rs @@ -16,12 +16,27 @@ use std::any::Any; use std::sync::Arc; use databend_common_exception::Result; +use databend_common_expression::DataBlock; use crate::processors::Event; use crate::processors::EventCause; use crate::processors::InputPort; use crate::processors::OutputPort; use crate::processors::Processor; +use crate::processors::ProcessorPtr; + +pub enum MultiwayStrategy { + Random, + Custom, +} + +pub trait Exchange: Send + Sync + 'static { + const STRATEGY: MultiwayStrategy = MultiwayStrategy::Random; + + fn partition(&self, state: DataBlock, n: usize) -> Result>; + + fn multiway_pick(&self, partitions: &[Option]) -> Result; +} pub struct ShuffleProcessor { input2output: Vec, @@ -139,3 +154,203 @@ impl Processor for ShuffleProcessor { Ok(Event::NeedData) } } + +pub struct PartitionProcessor { + input: Arc, + outputs: Vec>, + + exchange: Arc, + input_data: Option, + partitioned_data: Vec>, +} + +impl PartitionProcessor { + pub fn create( + input: Arc, + outputs: Vec>, + exchange: Arc, + ) -> ProcessorPtr { + let partitioned_data = vec![None; outputs.len()]; + ProcessorPtr::create(Box::new(PartitionProcessor { + input, + outputs, + exchange, + partitioned_data, + input_data: None, + })) + } +} + +impl Processor for PartitionProcessor { + fn name(&self) -> String { + String::from("ShufflePartition") + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + let mut all_output_finished = true; + let mut all_data_pushed_output = true; + + for (index, output) in self.outputs.iter().enumerate() { + if output.is_finished() { + self.partitioned_data[index].take(); + continue; + } + + all_output_finished = false; + + if output.can_push() { + if let Some(block) = self.partitioned_data[index].take() { + output.push_data(Ok(block)); + + continue; + } + } + + if self.partitioned_data[index].is_some() { + all_data_pushed_output = false; + } + } + + if all_output_finished { + self.input.finish(); + return Ok(Event::Finished); + } + + if !all_data_pushed_output { + self.input.set_not_need_data(); + return Ok(Event::NeedConsume); + } + + if self.input.has_data() { + self.input_data = Some(self.input.pull_data().unwrap()?); + return Ok(Event::Sync); + } + + if self.input.is_finished() { + for output in &self.outputs { + output.finish(); + } + + return Ok(Event::Finished); + } + + self.input.set_need_data(); + Ok(Event::NeedData) + } + + fn process(&mut self) -> Result<()> { + if let Some(block) = self.input_data.take() { + let partitioned = self.exchange.partition(block, self.outputs.len())?; + + for (index, block) in partitioned.into_iter().enumerate() { + if block.is_empty() && block.get_meta().is_none() { + continue; + } + + self.partitioned_data[index] = Some(block); + } + } + + Ok(()) + } +} + +pub struct MergePartitionProcessor { + exchange: Arc, + + output: Arc, + inputs: Vec>, + inputs_data: Vec>, +} + +impl MergePartitionProcessor { + pub fn create( + inputs: Vec>, + output: Arc, + exchange: Arc, + ) -> ProcessorPtr { + let inputs_data = vec![None; inputs.len()]; + ProcessorPtr::create(Box::new(MergePartitionProcessor { + output, + inputs, + exchange, + inputs_data, + })) + } +} + +impl Processor for MergePartitionProcessor { + fn name(&self) -> String { + String::from("ShuffleMergePartition") + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.output.is_finished() { + for input in &self.inputs { + input.finish(); + } + + return Ok(Event::Finished); + } + + if !self.output.can_push() { + return Ok(Event::NeedConsume); + } + + let mut all_inputs_finished = true; + let mut need_pick_block_to_push = matches!(T::STRATEGY, MultiwayStrategy::Custom); + + for (index, input) in self.inputs.iter().enumerate() { + if input.is_finished() { + continue; + } + + all_inputs_finished = false; + + if input.has_data() { + match T::STRATEGY { + MultiwayStrategy::Random => { + if self.output.can_push() { + self.output.push_data(Ok(input.pull_data().unwrap()?)); + } + } + MultiwayStrategy::Custom => { + if self.inputs_data[index].is_none() { + self.inputs_data[index] = Some(input.pull_data().unwrap()?); + } + } + } + } + + if self.inputs_data[index].is_none() { + need_pick_block_to_push = false; + } + + input.set_need_data(); + } + + if all_inputs_finished { + self.output.finish(); + return Ok(Event::Finished); + } + + if need_pick_block_to_push { + let pick_index = self.exchange.multiway_pick(&self.inputs_data)?; + + if let Some(block) = self.inputs_data[pick_index].take() { + self.output.push_data(Ok(block)); + return Ok(Event::NeedConsume); + } + } + + Ok(Event::NeedData) + } +} From 31c238357125e37697b28b7c2e0891c2dd9fe30e Mon Sep 17 00:00:00 2001 From: Winter Zhang Date: Thu, 19 Sep 2024 14:25:02 +0800 Subject: [PATCH 2/6] chore(executor): more edge detail for executing graph (#16468) * chore(executor): more edge detail for executing graph * chore(executor): more edge detail for executing graph --- .../src/pipelines/executor/executor_graph.rs | 11 ++++++- .../it/pipelines/executor/executor_graph.rs | 30 +++++++++---------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index 3d66db20c40d..7a56e250ee5a 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -998,7 +998,16 @@ impl Debug for ExecutingGraph { write!( f, "{:?}", - Dot::with_config(&self.graph, &[Config::EdgeNoLabel]) + Dot::with_attr_getters( + &self.graph, + &[Config::EdgeNoLabel], + &|_, edge| format!( + "{} -> {}", + edge.weight().output_index, + edge.weight().input_index + ), + &|_, (_, _)| String::new(), + ) ) } } diff --git a/src/query/service/tests/it/pipelines/executor/executor_graph.rs b/src/query/service/tests/it/pipelines/executor/executor_graph.rs index e46e883d2a66..2efe0cfbcf08 100644 --- a/src/query/service/tests/it/pipelines/executor/executor_graph.rs +++ b/src/query/service/tests/it/pipelines/executor/executor_graph.rs @@ -51,8 +51,8 @@ async fn test_create_simple_pipeline() -> Result<()> { \n 0 [ label = \"BlocksSource\" ]\ \n 1 [ label = \"DummyTransform\" ]\ \n 2 [ label = \"SyncSenderSink\" ]\ - \n 0 -> 1 [ ]\ - \n 1 -> 2 [ ]\ + \n 0 -> 1 [ 0 -> 0]\ + \n 1 -> 2 [ 0 -> 0]\ \n}\n" ); @@ -73,10 +73,10 @@ async fn test_create_parallel_simple_pipeline() -> Result<()> { \n 3 [ label = \"DummyTransform\" ]\ \n 4 [ label = \"SyncSenderSink\" ]\ \n 5 [ label = \"SyncSenderSink\" ]\ - \n 0 -> 2 [ ]\ - \n 1 -> 3 [ ]\ - \n 2 -> 4 [ ]\ - \n 3 -> 5 [ ]\ + \n 0 -> 2 [ 0 -> 0]\ + \n 1 -> 3 [ 0 -> 0]\ + \n 2 -> 4 [ 0 -> 0]\ + \n 3 -> 5 [ 0 -> 0]\ \n}\n" ); @@ -100,15 +100,15 @@ async fn test_create_resize_pipeline() -> Result<()> { \n 6 [ label = \"Resize\" ]\ \n 7 [ label = \"SyncSenderSink\" ]\ \n 8 [ label = \"SyncSenderSink\" ]\ - \n 0 -> 1 [ ]\ - \n 1 -> 2 [ ]\ - \n 1 -> 3 [ ]\ - \n 2 -> 4 [ ]\ - \n 3 -> 4 [ ]\ - \n 4 -> 5 [ ]\ - \n 5 -> 6 [ ]\ - \n 6 -> 7 [ ]\ - \n 6 -> 8 [ ]\ + \n 0 -> 1 [ 0 -> 0]\ + \n 1 -> 2 [ 0 -> 0]\ + \n 1 -> 3 [ 1 -> 0]\ + \n 2 -> 4 [ 0 -> 0]\ + \n 3 -> 4 [ 0 -> 1]\ + \n 4 -> 5 [ 0 -> 0]\ + \n 5 -> 6 [ 0 -> 0]\ + \n 6 -> 7 [ 0 -> 0]\ + \n 6 -> 8 [ 1 -> 0]\ \n}\n" ); From d9131cc58f429209d49cd8377063f02d60b65b85 Mon Sep 17 00:00:00 2001 From: everpcpc Date: Thu, 19 Sep 2024 16:18:21 +0800 Subject: [PATCH 3/6] fix: log query detail and profile with identical format (#16469) --- src/common/tracing/src/init.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/common/tracing/src/init.rs b/src/common/tracing/src/init.rs index 6931df1d1ebf..7d8a28250aec 100644 --- a/src/common/tracing/src/init.rs +++ b/src/common/tracing/src/init.rs @@ -290,7 +290,6 @@ pub fn init_logging( "databend::log::query", LevelFilter::Off, )) - .layout(get_layout(&cfg.file.format)) .append(query_log_file); logger = logger.dispatch(dispatch); } @@ -329,7 +328,6 @@ pub fn init_logging( "databend::log::profile", LevelFilter::Off, )) - .layout(get_layout(&cfg.file.format)) .append(profile_log_file); logger = logger.dispatch(dispatch); } From 2c41faf63b5136ab22a2ab3ac972c9752dc4a7bf Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Thu, 19 Sep 2024 16:41:04 +0800 Subject: [PATCH 4/6] fix: revert log format breakage from #16249 (#16470) * fix: revert log format breakage from #16249 * fix --------- Co-authored-by: Bohu --- Cargo.lock | 7 ++- src/common/tracing/Cargo.toml | 4 +- src/common/tracing/src/init.rs | 52 ++++++++++------- src/common/tracing/src/loggers.rs | 93 +++++++++++++++++++++++++++++-- 4 files changed, 129 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7a6a243e7bf1..2388d322e6ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4748,6 +4748,7 @@ dependencies = [ name = "databend-common-tracing" version = "0.1.0" dependencies = [ + "anyhow", "backtrace", "chrono", "color-backtrace", @@ -4764,6 +4765,7 @@ dependencies = [ "opentelemetry-otlp", "opentelemetry_sdk", "serde", + "serde_json", "strip-ansi-escapes", "tonic 0.11.0", ] @@ -10023,8 +10025,9 @@ dependencies = [ [[package]] name = "logforth" -version = "0.11.0" -source = "git+http://github.com/andylokandy/logforth?rev=0ca61ca#0ca61ca0fa3c87b5af5a08aa0354d96604e685c0" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "633080680671612565f637d1e33c5bcb7d58fb12c7d658baa166a03487265e80" dependencies = [ "anyhow", "colored", diff --git a/src/common/tracing/Cargo.toml b/src/common/tracing/Cargo.toml index bf3b1cd70ce8..58ebe2903dbf 100644 --- a/src/common/tracing/Cargo.toml +++ b/src/common/tracing/Cargo.toml @@ -11,6 +11,7 @@ doctest = false test = true [dependencies] +anyhow = { workspace = true } backtrace = { workspace = true } chrono = { workspace = true } color-backtrace = { version = "0.6" } @@ -22,7 +23,7 @@ fastrace-opentelemetry = { workspace = true } itertools = { workspace = true } libc = "0.2.153" log = { workspace = true } -logforth = { version = "0.11", git = "http://github.com/andylokandy/logforth", rev = "0ca61ca", features = [ +logforth = { version = "0.12", features = [ 'json', 'rolling_file', 'opentelemetry', @@ -32,6 +33,7 @@ opentelemetry = { workspace = true } opentelemetry-otlp = { workspace = true } opentelemetry_sdk = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } strip-ansi-escapes = "0.2" tonic = { workspace = true } diff --git a/src/common/tracing/src/init.rs b/src/common/tracing/src/init.rs index 7d8a28250aec..52097dca157c 100644 --- a/src/common/tracing/src/init.rs +++ b/src/common/tracing/src/init.rs @@ -224,15 +224,19 @@ pub fn init_logging( let labels = labels .iter() .chain(&cfg.otlp.endpoint.labels) - .map(|(k, v)| (k.clone().into(), v.clone().into())) - .chain([("category".into(), "system".into())]); - let otel = logforth::append::OpentelemetryLog::new( + .map(|(k, v)| (Cow::from(k.clone()), Cow::from(v.clone()))) + .chain([(Cow::from("category"), Cow::from("system"))]); + let mut otel_builder = logforth::append::opentelemetry::OpentelemetryLogBuilder::new( log_name, &cfg.otlp.endpoint.endpoint, - cfg.otlp.endpoint.protocol.into(), - labels, ) - .expect("initialize opentelemetry logger"); + .with_protocol(cfg.otlp.endpoint.protocol.into()); + for (k, v) in labels { + otel_builder = otel_builder.add_label(k, v); + } + let otel = otel_builder + .build() + .expect("initialize opentelemetry logger"); let dispatch = Dispatch::new() .filter(TargetFilter::level_for( "databend::log::query", @@ -297,15 +301,19 @@ pub fn init_logging( let labels = labels .iter() .chain(&endpoint.labels) - .map(|(k, v)| (k.clone().into(), v.clone().into())) - .chain([("category".into(), "query".into())]); - let otel = logforth::append::OpentelemetryLog::new( + .map(|(k, v)| (Cow::from(k.clone()), Cow::from(v.clone()))) + .chain([(Cow::from("category"), Cow::from("query"))]); + let mut otel_builder = logforth::append::opentelemetry::OpentelemetryLogBuilder::new( log_name, - &endpoint.endpoint, - endpoint.protocol.into(), - labels, + &cfg.otlp.endpoint.endpoint, ) - .expect("initialize opentelemetry logger"); + .with_protocol(cfg.otlp.endpoint.protocol.into()); + for (k, v) in labels { + otel_builder = otel_builder.add_label(k, v); + } + let otel = otel_builder + .build() + .expect("initialize opentelemetry logger"); let dispatch = Dispatch::new() .filter(TargetFilter::level_for_not( "databend::log::query", @@ -335,15 +343,19 @@ pub fn init_logging( let labels = labels .iter() .chain(&endpoint.labels) - .map(|(k, v)| (k.clone().into(), v.clone().into())) - .chain([("category".into(), "profile".into())]); - let otel = logforth::append::OpentelemetryLog::new( + .map(|(k, v)| (Cow::from(k.clone()), Cow::from(v.clone()))) + .chain([(Cow::from("category"), Cow::from("profile"))]); + let mut otel_builder = logforth::append::opentelemetry::OpentelemetryLogBuilder::new( log_name, - &endpoint.endpoint, - endpoint.protocol.into(), - labels, + &cfg.otlp.endpoint.endpoint, ) - .expect("initialize opentelemetry logger"); + .with_protocol(cfg.otlp.endpoint.protocol.into()); + for (k, v) in labels { + otel_builder = otel_builder.add_label(k, v); + } + let otel = otel_builder + .build() + .expect("initialize opentelemetry logger"); let dispatch = Dispatch::new() .filter(TargetFilter::level_for_not( "databend::log::profile", diff --git a/src/common/tracing/src/loggers.rs b/src/common/tracing/src/loggers.rs index 08ff7ed96aad..6285a438fb57 100644 --- a/src/common/tracing/src/loggers.rs +++ b/src/common/tracing/src/loggers.rs @@ -12,13 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Arguments; +use std::path::Path; + +use databend_common_base::runtime::ThreadTracker; +use log::Record; use logforth::append::rolling_file::NonBlockingBuilder; use logforth::append::rolling_file::RollingFileWriter; use logforth::append::rolling_file::Rotation; use logforth::append::RollingFile; -use logforth::layout::JsonLayout; -use logforth::layout::TextLayout; +use logforth::layout::collect_kvs; +use logforth::layout::CustomLayout; +use logforth::layout::KvDisplay; use logforth::Layout; +use serde_json::Map; /// Create a `BufWriter` for a rolling file logger. pub(crate) fn new_rolling_file_appender( @@ -41,8 +48,86 @@ pub(crate) fn new_rolling_file_appender( pub fn get_layout(format: &str) -> Layout { match format { - "text" => TextLayout::default().into(), - "json" => JsonLayout::default().into(), + "text" => text_layout(), + "json" => json_layout(), _ => unimplemented!("file logging format {format} is not supported"), } } + +fn text_layout() -> Layout { + CustomLayout::new( + |record: &Record, f: &dyn Fn(Arguments) -> anyhow::Result<()>| { + match ThreadTracker::query_id() { + None => { + f(format_args!( + "{} {:>5} {}: {}:{} {}{}", + chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true), + record.level(), + record.module_path().unwrap_or(""), + Path::new(record.file().unwrap_or_default()) + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or_default(), + record.line().unwrap_or(0), + record.args(), + KvDisplay::new(record.key_values()), + ))?; + } + Some(query_id) => { + f(format_args!( + "{} {} {:>5} {}: {}:{} {}{}", + query_id, + chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true), + record.level(), + record.module_path().unwrap_or(""), + Path::new(record.file().unwrap_or_default()) + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or_default(), + record.line().unwrap_or(0), + record.args(), + KvDisplay::new(record.key_values()), + ))?; + } + } + + Ok(()) + }, + ) + .into() +} + +fn json_layout() -> Layout { + CustomLayout::new( + |record: &Record, f: &dyn Fn(Arguments) -> anyhow::Result<()>| { + let mut fields = Map::new(); + fields.insert("message".to_string(), format!("{}", record.args()).into()); + for (k, v) in collect_kvs(record.key_values()) { + fields.insert(k, v.into()); + } + + match ThreadTracker::query_id() { + None => { + f(format_args!( + r#"{{"timestamp":"{}","level":"{}","fields":{}}}"#, + chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true), + record.level(), + serde_json::to_string(&fields).unwrap_or_default(), + ))?; + } + Some(query_id) => { + f(format_args!( + r#"{{"timestamp":"{}","level":"{}","query_id":"{}","fields":{}}}"#, + chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true), + record.level(), + query_id, + serde_json::to_string(&fields).unwrap_or_default(), + ))?; + } + } + + Ok(()) + }, + ) + .into() +} From e2c04e9013a277e8cc6c9011850f134a253c92cf Mon Sep 17 00:00:00 2001 From: everpcpc Date: Thu, 19 Sep 2024 19:14:40 +0800 Subject: [PATCH 5/6] fix: typo for query & profile logging config (#16476) --- .../deploy/config/databend-query-node-otlp-logs.toml | 4 ++-- src/common/tracing/src/init.rs | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/scripts/ci/deploy/config/databend-query-node-otlp-logs.toml b/scripts/ci/deploy/config/databend-query-node-otlp-logs.toml index 3b9df9113073..ad9abf5cdd29 100644 --- a/scripts/ci/deploy/config/databend-query-node-otlp-logs.toml +++ b/scripts/ci/deploy/config/databend-query-node-otlp-logs.toml @@ -52,14 +52,14 @@ dir = "./.databend/logs_1" [log.query] on = true -otlp_endpoint = "http://127.0.0.1:4317/v1/logs" +otlp_endpoint = "http://127.0.0.1:4317" [log.query.otlp_labels] qkey1 = "qvalue1" qkey2 = "qvalue2" [log.profile] on = true -otlp_endpoint = "http://127.0.0.1:4318/v1/logs" +otlp_endpoint = "http://127.0.0.1:4318" otlp_protocol = "http" [log.profile.otlp_labels] pkey1 = "pvalue1" diff --git a/src/common/tracing/src/init.rs b/src/common/tracing/src/init.rs index 52097dca157c..93b5793dfdea 100644 --- a/src/common/tracing/src/init.rs +++ b/src/common/tracing/src/init.rs @@ -228,7 +228,7 @@ pub fn init_logging( .chain([(Cow::from("category"), Cow::from("system"))]); let mut otel_builder = logforth::append::opentelemetry::OpentelemetryLogBuilder::new( log_name, - &cfg.otlp.endpoint.endpoint, + format!("{}/v1/logs", &cfg.otlp.endpoint.endpoint), ) .with_protocol(cfg.otlp.endpoint.protocol.into()); for (k, v) in labels { @@ -305,9 +305,9 @@ pub fn init_logging( .chain([(Cow::from("category"), Cow::from("query"))]); let mut otel_builder = logforth::append::opentelemetry::OpentelemetryLogBuilder::new( log_name, - &cfg.otlp.endpoint.endpoint, + format!("{}/v1/logs", &endpoint.endpoint), ) - .with_protocol(cfg.otlp.endpoint.protocol.into()); + .with_protocol(endpoint.protocol.into()); for (k, v) in labels { otel_builder = otel_builder.add_label(k, v); } @@ -347,9 +347,9 @@ pub fn init_logging( .chain([(Cow::from("category"), Cow::from("profile"))]); let mut otel_builder = logforth::append::opentelemetry::OpentelemetryLogBuilder::new( log_name, - &cfg.otlp.endpoint.endpoint, + format!("{}/v1/logs", &endpoint.endpoint), ) - .with_protocol(cfg.otlp.endpoint.protocol.into()); + .with_protocol(endpoint.protocol.into()); for (k, v) in labels { otel_builder = otel_builder.add_label(k, v); } From cefeddc88664a0a368fe5377baafda761aca7e50 Mon Sep 17 00:00:00 2001 From: everpcpc Date: Thu, 19 Sep 2024 21:34:10 +0800 Subject: [PATCH 6/6] chore(ci): remove hdfs release (#16477) --- .github/workflows/release.yml | 32 +------------------------------- 1 file changed, 1 insertion(+), 31 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index e6f7fb7dbc08..7f892ac3dd28 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -180,35 +180,9 @@ jobs: features: python-udf category: udf - build_hdfs: - runs-on: [self-hosted, "${{ matrix.runner }}", Linux, 16c32g, aws] - needs: create_release - strategy: - fail-fast: false - matrix: - include: - - { target: x86_64-unknown-linux-gnu, runner: X64 } - - { target: aarch64-unknown-linux-gnu, runner: ARM64 } - steps: - - name: Checkout - uses: actions/checkout@v4 - with: - ref: ${{ needs.create_release.outputs.sha }} - fetch-depth: 0 - - name: Build Release - uses: ./.github/actions/build_linux - env: - DATABEND_RELEASE_VERSION: ${{ needs.create_release.outputs.version }} - with: - sha: ${{ github.sha }} - target: ${{ matrix.target }} - artifacts: sqllogictests,sqlsmith,metactl,meta,query - features: storage-hdfs - category: hdfs - publish: runs-on: [self-hosted, X64, Linux, 4c8g, aws] - needs: [create_release, build_default, build_musl, build_hdfs] + needs: [create_release, build_default, build_musl] strategy: fail-fast: false matrix: @@ -221,10 +195,6 @@ jobs: target: x86_64-unknown-linux-musl - category: default target: aarch64-unknown-linux-musl - - category: hdfs - target: x86_64-unknown-linux-gnu - - category: hdfs - target: aarch64-unknown-linux-gnu steps: - name: Checkout uses: actions/checkout@v4