Skip to content

Commit

Permalink
remove: p-meta and p-tags (parseablehq#1067)
Browse files Browse the repository at this point in the history
remove p-meta, p-tags from ingestion flow
parseable will not add these fields to the event

remove from static schema creation
remove from query flow
  • Loading branch information
nikhilsinhaparseable authored and parmesant committed Jan 13, 2025
1 parent 67396be commit 59cdecc
Show file tree
Hide file tree
Showing 10 changed files with 29 additions and 221 deletions.
8 changes: 3 additions & 5 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,14 @@ use serde_json::Value;
use std::{collections::HashMap, sync::Arc};
use tracing::error;

use super::{EventFormat, LogSource, Metadata, Tags};
use super::{EventFormat, LogSource};
use crate::{
metadata::SchemaVersion,
utils::{arrow::get_field, json::flatten_json_body},
};

pub struct Event {
pub data: Value,
pub tags: Tags,
pub metadata: Metadata,
}

impl EventFormat for Event {
Expand All @@ -53,7 +51,7 @@ impl EventFormat for Event {
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &LogSource,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
) -> Result<(Self::Data, Vec<Arc<Field>>, bool), anyhow::Error> {
let data = flatten_json_body(
self.data,
None,
Expand Down Expand Up @@ -119,7 +117,7 @@ impl EventFormat for Event {
));
}

Ok((value_arr, schema, is_first, self.tags, self.metadata))
Ok((value_arr, schema, is_first))
}

// Convert the Data type (defined above) to arrow record batch
Expand Down
48 changes: 5 additions & 43 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,18 @@ use std::{
};

use anyhow::{anyhow, Error as AnyError};
use arrow_array::{RecordBatch, StringArray};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use chrono::DateTime;
use serde_json::Value;

use crate::{
metadata::SchemaVersion,
utils::{self, arrow::get_field},
};
use crate::{metadata::SchemaVersion, utils::arrow::get_field};

use super::{DEFAULT_METADATA_KEY, DEFAULT_TAGS_KEY, DEFAULT_TIMESTAMP_KEY};
use super::DEFAULT_TIMESTAMP_KEY;

pub mod json;

static TIME_FIELD_NAME_PARTS: [&str; 2] = ["time", "date"];

type Tags = String;
type Metadata = String;
type EventSchema = Vec<Arc<Field>>;

/// Source of the logs, used to perform special processing for certain sources
Expand Down Expand Up @@ -87,7 +81,7 @@ pub trait EventFormat: Sized {
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &LogSource,
) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>;
) -> Result<(Self::Data, EventSchema, bool), AnyError>;

fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;

Expand All @@ -99,26 +93,14 @@ pub trait EventFormat: Sized {
schema_version: SchemaVersion,
log_source: &LogSource,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first, tags, metadata) = self.to_data(
let (data, mut schema, is_first) = self.to_data(
storage_schema,
static_schema_flag,
time_partition,
schema_version,
log_source,
)?;

// DEFAULT_TAGS_KEY, DEFAULT_METADATA_KEY and DEFAULT_TIMESTAMP_KEY are reserved field names
if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY));
};

if get_field(&schema, DEFAULT_METADATA_KEY).is_some() {
return Err(anyhow!(
"field {} is a reserved field",
DEFAULT_METADATA_KEY
));
};

if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
return Err(anyhow!(
"field {} is a reserved field",
Expand All @@ -136,16 +118,6 @@ pub trait EventFormat: Sized {
)),
);

// p_tags and p_metadata are added to the end of the schema
let tags_index = schema.len();
let metadata_index = tags_index + 1;
schema.push(Arc::new(Field::new(DEFAULT_TAGS_KEY, DataType::Utf8, true)));
schema.push(Arc::new(Field::new(
DEFAULT_METADATA_KEY,
DataType::Utf8,
true,
)));

// prepare the record batch and new fields to be added
let mut new_schema = Arc::new(Schema::new(schema));
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
Expand All @@ -154,16 +126,6 @@ pub trait EventFormat: Sized {
new_schema =
update_field_type_in_schema(new_schema, None, time_partition, None, schema_version);
let rb = Self::decode(data, new_schema.clone())?;
let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows()));
let metadata_arr =
StringArray::from_iter_values(std::iter::repeat(&metadata).take(rb.num_rows()));
// modify the record batch to add fields to respective indexes
let rb = utils::arrow::replace_columns(
Arc::clone(&new_schema),
&rb,
&[tags_index, metadata_index],
&[Arc::new(tags_arr), Arc::new(metadata_arr)],
);

Ok((rb, is_first))
}
Expand Down
2 changes: 0 additions & 2 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ use chrono::NaiveDateTime;
use std::collections::HashMap;

pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
pub const DEFAULT_TAGS_KEY: &str = "p_tags";
pub const DEFAULT_METADATA_KEY: &str = "p_metadata";

#[derive(Clone)]
pub struct Event {
Expand Down
Loading

0 comments on commit 59cdecc

Please sign in to comment.