Skip to content

Commit

Permalink
upload large payloads to storage (#360)
Browse files Browse the repository at this point in the history
* wip: upload large payloads to storage

* fixes + v0: download span directly
  • Loading branch information
dinmukhamedm authored Feb 1, 2025
1 parent 74bc7f8 commit 2ef6d99
Show file tree
Hide file tree
Showing 16 changed files with 3,101 additions and 20 deletions.
3 changes: 3 additions & 0 deletions app-server/src/api/v1/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ async fn push_to_queue(
output: request_item.output,
events: None,
labels: None,
// TODO: store the input and output in storage if they are too large
input_url: None,
output_url: None,
};

let span_usage = crate::traces::utils::get_llm_usage_for_span(
Expand Down
14 changes: 12 additions & 2 deletions app-server/src/db/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub struct Span {
pub end_time: DateTime<Utc>,
pub events: Option<Value>,
pub labels: Option<Value>,
pub input_url: Option<String>,
pub output_url: Option<String>,
}

pub async fn record_span(pool: &PgPool, span: &Span, project_id: &Uuid) -> Result<()> {
Expand Down Expand Up @@ -101,6 +103,8 @@ pub async fn record_span(pool: &PgPool, span: &Span, project_id: &Uuid) -> Resul
span_type,
input_preview,
output_preview,
input_url,
output_url,
project_id
)
VALUES(
Expand All @@ -116,7 +120,9 @@ pub async fn record_span(pool: &PgPool, span: &Span, project_id: &Uuid) -> Resul
$10,
$11,
$12,
$13)
$13,
$14,
$15)
ON CONFLICT (span_id, project_id) DO UPDATE SET
trace_id = EXCLUDED.trace_id,
parent_span_id = EXCLUDED.parent_span_id,
Expand All @@ -128,7 +134,9 @@ pub async fn record_span(pool: &PgPool, span: &Span, project_id: &Uuid) -> Resul
output = EXCLUDED.output,
span_type = EXCLUDED.span_type,
input_preview = EXCLUDED.input_preview,
output_preview = EXCLUDED.output_preview
output_preview = EXCLUDED.output_preview,
input_url = EXCLUDED.input_url,
output_url = EXCLUDED.output_url
",
)
.bind(&span.span_id)
Expand All @@ -143,6 +151,8 @@ pub async fn record_span(pool: &PgPool, span: &Span, project_id: &Uuid) -> Resul
.bind(&span.span_type as &SpanType)
.bind(&input_preview)
.bind(&output_preview)
.bind(&span.input_url as &Option<String>)
.bind(&span.output_url as &Option<String>)
.bind(&project_id)
.execute(pool)
.await?;
Expand Down
7 changes: 5 additions & 2 deletions app-server/src/routes/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@ Set the target version for the pipeline in the pipeline builder."),
let mut short_message = message.clone();
let value: String = short_message.value.clone().into();
if value.len() > 100 {
short_message.value =
format!("{}... [TRUNCATED FOR BREVITY]", &value[..100]).into();
short_message.value = format!(
"{}... [TRUNCATED FOR BREVITY]",
&value.chars().take(100).collect::<String>()
)
.into();
}
(node, short_message)
})
Expand Down
3 changes: 3 additions & 0 deletions app-server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ impl S3Storage {
#[async_trait::async_trait]
impl super::Storage for S3Storage {
async fn store(&self, data: Vec<u8>, key: &str) -> Result<String> {
// TODO: check the performance of this, and, if needed,
// try either multi-part upload or tokio::spawn the upload
// and just return the url
self.client
.put_object()
.bucket(&self.bucket)
Expand Down
2 changes: 1 addition & 1 deletion app-server/src/traces/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ async fn inner_process_queue_spans<T: Storage + ?Sized>(

if is_feature_enabled(Feature::Storage) {
if let Err(e) = span
.store_input_media(&rabbitmq_span_message.project_id, storage.clone())
.store_payloads(&rabbitmq_span_message.project_id, storage.clone())
.await
{
log::error!(
Expand Down
41 changes: 40 additions & 1 deletion app-server/src/traces/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ const OVERRIDE_PARENT_SPAN_ATTRIBUTE_NAME: &str = "lmnr.internal.override_parent
const TRACING_LEVEL_ATTRIBUTE_NAME: &str = "lmnr.internal.tracing_level";
const HAS_BROWSER_SESSION_ATTRIBUTE_NAME: &str = "lmnr.internal.has_browser_session";

// Minimal number of tokens in the input or output to store the payload
// in storage instead of database.
//
// We use 7/2 as an estimate of the number of characters per token.
// And 128K is a common input size for LLM calls.
const PAYLOAD_SIZE_THRESHOLD: usize = (7 / 2) * 128_000; // approx 448KB

#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "snake_case")]
enum TracingLevel {
Expand Down Expand Up @@ -503,6 +510,8 @@ impl Span {
span_type: SpanType::PIPELINE,
events: None,
labels: None,
input_url: None,
output_url: None,
}
}

Expand Down Expand Up @@ -563,13 +572,15 @@ impl Span {
},
events: None,
labels: None,
input_url: None,
output_url: None,
};
Some(span)
})
.collect()
}

pub async fn store_input_media<S: Storage + ?Sized>(
pub async fn store_payloads<S: Storage + ?Sized>(
&mut self,
project_id: &Uuid,
storage: Arc<S>,
Expand All @@ -589,6 +600,34 @@ impl Span {
new_messages.push(message);
}
self.input = Some(serde_json::to_value(new_messages).unwrap());
// We cannot parse the input as a Vec<ChatMessage>, but we check if
// it's still large. Obviously serializing to JSON affects the size,
// but we don't need to be exact here.
} else {
let input_str = serde_json::to_string(&self.input).unwrap_or_default();
if input_str.len() > PAYLOAD_SIZE_THRESHOLD {
let key = crate::storage::create_key(project_id, &None);
let mut data = Vec::new();
serde_json::to_writer(&mut data, &self.input)?;
let url = storage.store(data, &key).await?;
self.input_url = Some(url);
self.input = Some(serde_json::Value::String(
input_str.chars().take(100).collect(),
));
}
}
}
if let Some(output) = self.output.clone() {
let output_str = serde_json::to_string(&output).unwrap_or_default();
if output_str.len() > PAYLOAD_SIZE_THRESHOLD {
let key = crate::storage::create_key(project_id, &None);
let mut data = Vec::new();
serde_json::to_writer(&mut data, &output)?;
let url = storage.store(data, &key).await?;
self.output_url = Some(url);
self.output = Some(serde_json::Value::String(
output_str.chars().take(100).collect(),
));
}
}
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export async function GET(
// that the media-type is application/pdf
if (payloadType === 'image') {
return new Response(bytes);
} else if (payloadType === 'raw') {
return new Response(bytes);
} else if (payloadId.endsWith('.pdf')) {
headers.set('Content-Type', 'application/pdf');
} else {
Expand Down
4 changes: 2 additions & 2 deletions frontend/app/api/projects/[projectId]/spans/[spanId]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ export async function GET(
const projectId = params.projectId;
const spanId = params.spanId;

const rows = await db.query.spans.findFirst({
const span = await db.query.spans.findFirst({
where: and(eq(spans.spanId, spanId), eq(spans.projectId, projectId)),
});

return NextResponse.json(rows);
return NextResponse.json(span);
}
40 changes: 34 additions & 6 deletions frontend/components/traces/span-view-span.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ interface SpanViewSpanProps {
export function SpanViewSpan({ span }: SpanViewSpanProps) {
const scrollAreaRef = useRef<HTMLDivElement>(null);
const [contentWidth, setContentWidth] = useState<number>(0);
const [spanInput, setSpanInput] = useState(span.input);
const [spanOutput, setSpanOutput] = useState(span.output);

useEffect(() => {
if (!scrollAreaRef.current) return;
Expand All @@ -31,6 +33,28 @@ export function SpanViewSpan({ span }: SpanViewSpanProps) {
return () => resizeObserver.disconnect();
}, [scrollAreaRef.current]);

if (span.inputUrl) {
const url = span.inputUrl.startsWith('/')
? `${span.inputUrl}?payloadType=raw`
: span.inputUrl;
fetch(url).then(response => {
response.json().then(j => {
setSpanInput(j);
});
});
}

if (span.outputUrl) {
const url = span.outputUrl.startsWith('/')
? `${span.outputUrl}?payloadType=raw`
: span.outputUrl;
fetch(url).then(response => {
response.json().then(j => {
setSpanOutput(j);
});
});
}

return (
<ScrollArea ref={scrollAreaRef} className="w-full h-full mt-0" type="scroll">
<div className="max-h-0">
Expand All @@ -42,16 +66,20 @@ export function SpanViewSpan({ span }: SpanViewSpanProps) {
<SpanLabels span={span} />
<SpanDatasets spanId={span.spanId} />
<div className="pb-2 font-medium text-lg">Input</div>
{isChatMessageList(span.input) ? (
{isChatMessageList(spanInput) ? (
<ChatMessageListTab
messages={span.input}
messages={spanInput}
presetKey={`input-${span.attributes['lmnr.span.path'].join('.')}`}
/>
) : (
<Formatter
className="max-h-[400px]"
collapsible
value={JSON.stringify(span.input)}
value={
typeof spanInput === 'string'
? spanInput
: JSON.stringify(spanInput)
}
presetKey={`input-${span.attributes['lmnr.span.path'].join('.')}`}
/>
)}
Expand All @@ -61,9 +89,9 @@ export function SpanViewSpan({ span }: SpanViewSpanProps) {
<Formatter
className="max-h-[400px]"
value={
typeof span.output === 'string'
? span.output
: JSON.stringify(span.output)
typeof spanOutput === 'string'
? spanOutput
: JSON.stringify(spanOutput)
}
presetKey={`output-${span.attributes['lmnr.span.path'].join('.')}`}
collapsible
Expand Down
2 changes: 2 additions & 0 deletions frontend/lib/db/migrations/0017_groovy_senator_kelly.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE "spans" ADD COLUMN "input_url" text;--> statement-breakpoint
ALTER TABLE "spans" ADD COLUMN "output_url" text;
Loading

0 comments on commit 2ef6d99

Please sign in to comment.