Skip to content

Commit

Permalink
Merge branch 'main' into merge-path
Browse files Browse the repository at this point in the history
  • Loading branch information
forsaken628 committed Sep 19, 2024
2 parents d2ff843 + cefeddc commit 8918f8d
Show file tree
Hide file tree
Showing 13 changed files with 430 additions and 80 deletions.
32 changes: 1 addition & 31 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -180,35 +180,9 @@ jobs:
features: python-udf
category: udf

build_hdfs:
runs-on: [self-hosted, "${{ matrix.runner }}", Linux, 16c32g, aws]
needs: create_release
strategy:
fail-fast: false
matrix:
include:
- { target: x86_64-unknown-linux-gnu, runner: X64 }
- { target: aarch64-unknown-linux-gnu, runner: ARM64 }
steps:
- name: Checkout
uses: actions/checkout@v4
with:
ref: ${{ needs.create_release.outputs.sha }}
fetch-depth: 0
- name: Build Release
uses: ./.github/actions/build_linux
env:
DATABEND_RELEASE_VERSION: ${{ needs.create_release.outputs.version }}
with:
sha: ${{ github.sha }}
target: ${{ matrix.target }}
artifacts: sqllogictests,sqlsmith,metactl,meta,query
features: storage-hdfs
category: hdfs

publish:
runs-on: [self-hosted, X64, Linux, 4c8g, aws]
needs: [create_release, build_default, build_musl, build_hdfs]
needs: [create_release, build_default, build_musl]
strategy:
fail-fast: false
matrix:
Expand All @@ -221,10 +195,6 @@ jobs:
target: x86_64-unknown-linux-musl
- category: default
target: aarch64-unknown-linux-musl
- category: hdfs
target: x86_64-unknown-linux-gnu
- category: hdfs
target: aarch64-unknown-linux-gnu
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions scripts/ci/deploy/config/databend-query-node-otlp-logs.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ dir = "./.databend/logs_1"

[log.query]
on = true
otlp_endpoint = "http://127.0.0.1:4317/v1/logs"
otlp_endpoint = "http://127.0.0.1:4317"
[log.query.otlp_labels]
qkey1 = "qvalue1"
qkey2 = "qvalue2"

[log.profile]
on = true
otlp_endpoint = "http://127.0.0.1:4318/v1/logs"
otlp_endpoint = "http://127.0.0.1:4318"
otlp_protocol = "http"
[log.profile.otlp_labels]
pkey1 = "pvalue1"
Expand Down
4 changes: 3 additions & 1 deletion src/common/tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ doctest = false
test = true

[dependencies]
anyhow = { workspace = true }
backtrace = { workspace = true }
chrono = { workspace = true }
color-backtrace = { version = "0.6" }
Expand All @@ -22,7 +23,7 @@ fastrace-opentelemetry = { workspace = true }
itertools = { workspace = true }
libc = "0.2.153"
log = { workspace = true }
logforth = { version = "0.11", git = "http://github.com/andylokandy/logforth", rev = "0ca61ca", features = [
logforth = { version = "0.12", features = [
'json',
'rolling_file',
'opentelemetry',
Expand All @@ -32,6 +33,7 @@ opentelemetry = { workspace = true }
opentelemetry-otlp = { workspace = true }
opentelemetry_sdk = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
strip-ansi-escapes = "0.2"
tonic = { workspace = true }

Expand Down
56 changes: 33 additions & 23 deletions src/common/tracing/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,19 @@ pub fn init_logging(
let labels = labels
.iter()
.chain(&cfg.otlp.endpoint.labels)
.map(|(k, v)| (k.clone().into(), v.clone().into()))
.chain([("category".into(), "system".into())]);
let otel = logforth::append::OpentelemetryLog::new(
.map(|(k, v)| (Cow::from(k.clone()), Cow::from(v.clone())))
.chain([(Cow::from("category"), Cow::from("system"))]);
let mut otel_builder = logforth::append::opentelemetry::OpentelemetryLogBuilder::new(
log_name,
&cfg.otlp.endpoint.endpoint,
cfg.otlp.endpoint.protocol.into(),
labels,
format!("{}/v1/logs", &cfg.otlp.endpoint.endpoint),
)
.expect("initialize opentelemetry logger");
.with_protocol(cfg.otlp.endpoint.protocol.into());
for (k, v) in labels {
otel_builder = otel_builder.add_label(k, v);
}
let otel = otel_builder
.build()
.expect("initialize opentelemetry logger");
let dispatch = Dispatch::new()
.filter(TargetFilter::level_for(
"databend::log::query",
Expand Down Expand Up @@ -290,23 +294,26 @@ pub fn init_logging(
"databend::log::query",
LevelFilter::Off,
))
.layout(get_layout(&cfg.file.format))
.append(query_log_file);
logger = logger.dispatch(dispatch);
}
if let Some(endpoint) = &cfg.query.otlp {
let labels = labels
.iter()
.chain(&endpoint.labels)
.map(|(k, v)| (k.clone().into(), v.clone().into()))
.chain([("category".into(), "query".into())]);
let otel = logforth::append::OpentelemetryLog::new(
.map(|(k, v)| (Cow::from(k.clone()), Cow::from(v.clone())))
.chain([(Cow::from("category"), Cow::from("query"))]);
let mut otel_builder = logforth::append::opentelemetry::OpentelemetryLogBuilder::new(
log_name,
&endpoint.endpoint,
endpoint.protocol.into(),
labels,
format!("{}/v1/logs", &endpoint.endpoint),
)
.expect("initialize opentelemetry logger");
.with_protocol(endpoint.protocol.into());
for (k, v) in labels {
otel_builder = otel_builder.add_label(k, v);
}
let otel = otel_builder
.build()
.expect("initialize opentelemetry logger");
let dispatch = Dispatch::new()
.filter(TargetFilter::level_for_not(
"databend::log::query",
Expand All @@ -329,23 +336,26 @@ pub fn init_logging(
"databend::log::profile",
LevelFilter::Off,
))
.layout(get_layout(&cfg.file.format))
.append(profile_log_file);
logger = logger.dispatch(dispatch);
}
if let Some(endpoint) = &cfg.profile.otlp {
let labels = labels
.iter()
.chain(&endpoint.labels)
.map(|(k, v)| (k.clone().into(), v.clone().into()))
.chain([("category".into(), "profile".into())]);
let otel = logforth::append::OpentelemetryLog::new(
.map(|(k, v)| (Cow::from(k.clone()), Cow::from(v.clone())))
.chain([(Cow::from("category"), Cow::from("profile"))]);
let mut otel_builder = logforth::append::opentelemetry::OpentelemetryLogBuilder::new(
log_name,
&endpoint.endpoint,
endpoint.protocol.into(),
labels,
format!("{}/v1/logs", &endpoint.endpoint),
)
.expect("initialize opentelemetry logger");
.with_protocol(endpoint.protocol.into());
for (k, v) in labels {
otel_builder = otel_builder.add_label(k, v);
}
let otel = otel_builder
.build()
.expect("initialize opentelemetry logger");
let dispatch = Dispatch::new()
.filter(TargetFilter::level_for_not(
"databend::log::profile",
Expand Down
93 changes: 89 additions & 4 deletions src/common/tracing/src/loggers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Arguments;
use std::path::Path;

use databend_common_base::runtime::ThreadTracker;
use log::Record;
use logforth::append::rolling_file::NonBlockingBuilder;
use logforth::append::rolling_file::RollingFileWriter;
use logforth::append::rolling_file::Rotation;
use logforth::append::RollingFile;
use logforth::layout::JsonLayout;
use logforth::layout::TextLayout;
use logforth::layout::collect_kvs;
use logforth::layout::CustomLayout;
use logforth::layout::KvDisplay;
use logforth::Layout;
use serde_json::Map;

/// Create a `BufWriter<NonBlocking>` for a rolling file logger.
pub(crate) fn new_rolling_file_appender(
Expand All @@ -41,8 +48,86 @@ pub(crate) fn new_rolling_file_appender(

pub fn get_layout(format: &str) -> Layout {
match format {
"text" => TextLayout::default().into(),
"json" => JsonLayout::default().into(),
"text" => text_layout(),
"json" => json_layout(),
_ => unimplemented!("file logging format {format} is not supported"),
}
}

fn text_layout() -> Layout {
CustomLayout::new(
|record: &Record, f: &dyn Fn(Arguments) -> anyhow::Result<()>| {
match ThreadTracker::query_id() {
None => {
f(format_args!(
"{} {:>5} {}: {}:{} {}{}",
chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
record.level(),
record.module_path().unwrap_or(""),
Path::new(record.file().unwrap_or_default())
.file_name()
.and_then(|name| name.to_str())
.unwrap_or_default(),
record.line().unwrap_or(0),
record.args(),
KvDisplay::new(record.key_values()),
))?;
}
Some(query_id) => {
f(format_args!(
"{} {} {:>5} {}: {}:{} {}{}",
query_id,
chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
record.level(),
record.module_path().unwrap_or(""),
Path::new(record.file().unwrap_or_default())
.file_name()
.and_then(|name| name.to_str())
.unwrap_or_default(),
record.line().unwrap_or(0),
record.args(),
KvDisplay::new(record.key_values()),
))?;
}
}

Ok(())
},
)
.into()
}

fn json_layout() -> Layout {
CustomLayout::new(
|record: &Record, f: &dyn Fn(Arguments) -> anyhow::Result<()>| {
let mut fields = Map::new();
fields.insert("message".to_string(), format!("{}", record.args()).into());
for (k, v) in collect_kvs(record.key_values()) {
fields.insert(k, v.into());
}

match ThreadTracker::query_id() {
None => {
f(format_args!(
r#"{{"timestamp":"{}","level":"{}","fields":{}}}"#,
chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
record.level(),
serde_json::to_string(&fields).unwrap_or_default(),
))?;
}
Some(query_id) => {
f(format_args!(
r#"{{"timestamp":"{}","level":"{}","query_id":"{}","fields":{}}}"#,
chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
record.level(),
query_id,
serde_json::to_string(&fields).unwrap_or_default(),
))?;
}
}

Ok(())
},
)
.into()
}
1 change: 1 addition & 0 deletions src/query/pipeline/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#![feature(once_cell_try)]
#![feature(variant_count)]
#![feature(associated_type_defaults)]
#![allow(clippy::arc_with_non_send_sync)]
#![allow(clippy::useless_asref)]

Expand Down
Loading

0 comments on commit 8918f8d

Please sign in to comment.