Skip to content

Commit

Permalink
disable chunking spans (#340)
Browse files Browse the repository at this point in the history
  • Loading branch information
dinmukhamedm authored Jan 20, 2025
1 parent cda9c7a commit 472e023
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 46 deletions.
1 change: 0 additions & 1 deletion app-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,6 @@ fn main() -> anyhow::Result<()> {
semantic_search.clone(),
rabbitmq_connection.clone(),
clickhouse.clone(),
chunker_runner.clone(),
storage.clone(),
));
}
Expand Down
5 changes: 0 additions & 5 deletions app-server/src/traces/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::{
api::v1::traces::RabbitMqSpanMessage,
cache::Cache,
ch::{self, spans::CHSpan},
chunk,
db::{labels::get_registered_label_classes_for_path, spans::Span, stats, DB},
features::{is_feature_enabled, Feature},
pipeline::runner::PipelineRunner,
Expand All @@ -32,7 +31,6 @@ pub async fn process_queue_spans<T: Storage + ?Sized>(
semantic_search: Arc<dyn SemanticSearch>,
rabbitmq_connection: Option<Arc<Connection>>,
clickhouse: clickhouse::Client,
chunker_runner: Arc<chunk::runner::ChunkerRunner>,
storage: Arc<T>,
) {
loop {
Expand All @@ -43,7 +41,6 @@ pub async fn process_queue_spans<T: Storage + ?Sized>(
semantic_search.clone(),
rabbitmq_connection.clone(),
clickhouse.clone(),
chunker_runner.clone(),
storage.clone(),
)
.await;
Expand All @@ -58,7 +55,6 @@ async fn inner_process_queue_spans<T: Storage + ?Sized>(
semantic_search: Arc<dyn SemanticSearch>,
rabbitmq_connection: Option<Arc<Connection>>,
clickhouse: clickhouse::Client,
chunker_runner: Arc<chunk::runner::ChunkerRunner>,
storage: Arc<T>,
) {
if !is_feature_enabled(Feature::FullBuild) {
Expand Down Expand Up @@ -200,7 +196,6 @@ async fn inner_process_queue_spans<T: Storage + ?Sized>(
&span,
semantic_search.clone(),
&format!("spans-{}", rabbitmq_span_message.project_id),
chunker_runner.clone(),
)
.await
{
Expand Down
43 changes: 7 additions & 36 deletions app-server/src/traces/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,12 @@ use std::{collections::HashMap, sync::Arc};
use uuid::Uuid;

use crate::{
chunk::{
character_split::CharacterSplitParams,
runner::{ChunkParams, ChunkerRunner, ChunkerType},
},
db::spans::Span,
semantic_search::{semantic_search_grpc::index_request::Datapoint, SemanticSearch},
};

use super::span_attributes::SPAN_PATH;

const CHARACTER_SPLITTER_CHUNK_SIZE: u32 = 512;
// Since this indexing is for sparse vectors, we can use a smaller stride
const CHARACTER_SPLITTER_STRIDE: u32 = 32;
const DATASOURCE_ID: &str = "spans";

/// internal enum to choose which span field to index
Expand All @@ -30,27 +23,22 @@ pub async fn index_span(
span: &Span,
semantic_search: Arc<dyn SemanticSearch>,
collection_name: &String,
chunker_runner: Arc<ChunkerRunner>,
) -> Result<()> {
let input_content = get_indexable_content(span, IndexField::Input);
let output_content = get_indexable_content(span, IndexField::Output);
if input_content.is_none() && output_content.is_none() {
return Ok(());
}
let input_chunks = chunk(chunker_runner.clone(), input_content)?;
let output_chunks = chunk(chunker_runner.clone(), output_content)?;

let mut points = input_chunks
.iter()
.map(|chunk| create_datapoint(span, chunk.to_string(), "input"))
.collect::<Vec<_>>();
let mut points = Vec::new();

let output_points = output_chunks
.iter()
.map(|chunk| create_datapoint(span, chunk.to_string(), "output"))
.collect::<Vec<_>>();
if let Some(input_content) = input_content {
points.push(create_datapoint(span, input_content, "input"));
}

points.extend(output_points);
if let Some(output_content) = output_content {
points.push(create_datapoint(span, output_content, "output"));
}

semantic_search
.index(points, collection_name.to_owned(), true)
Expand All @@ -59,23 +47,6 @@ pub async fn index_span(
Ok(())
}

fn chunk(chunker_runner: Arc<ChunkerRunner>, content: Option<String>) -> Result<Vec<String>> {
let chunks = content
.map(|content| {
chunker_runner.chunk(
&ChunkerType::CharacterSplit,
&content,
&ChunkParams::CharacterSplit(CharacterSplitParams {
chunk_size: CHARACTER_SPLITTER_CHUNK_SIZE,
stride: CHARACTER_SPLITTER_STRIDE,
}),
)
})
.transpose()?
.unwrap_or_default();
Ok(chunks)
}

fn create_datapoint(span: &Span, content: String, field_type: &str) -> Datapoint {
let mut data = HashMap::new();
data.insert("trace_id".to_string(), span.trace_id.to_string());
Expand Down
5 changes: 1 addition & 4 deletions semantic-search-service/src/embeddings/bm25.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use indexmap::IndexMap;

use super::{Embed, Embedding};

// This has to be tuned together with CHARACTER_SPLITTER_CHUNK_SIZE
// or any other chunking method. At the time of writing this,
// chunk size is 512 characters, which is approximately 150 words.
const DOCUMENT_LENGTH_TOKENS: f32 = 150.0;
const DOCUMENT_LENGTH_TOKENS: f32 = 1024.0;

pub struct Bm25 {
embedder: Embedder,
Expand Down

0 comments on commit 472e023

Please sign in to comment.