Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upload large payloads to storage #360

Merged
merged 2 commits into from
Feb 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions app-server/src/api/v1/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ async fn push_to_queue(
output: request_item.output,
events: None,
labels: None,
// TODO: store the input and output in storage if they are too large
input_url: None,
output_url: None,
};

let span_usage = crate::traces::utils::get_llm_usage_for_span(
Expand Down
14 changes: 12 additions & 2 deletions app-server/src/db/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub struct Span {
pub end_time: DateTime<Utc>,
pub events: Option<Value>,
pub labels: Option<Value>,
pub input_url: Option<String>,
pub output_url: Option<String>,
}

pub async fn record_span(pool: &PgPool, span: &Span, project_id: &Uuid) -> Result<()> {
Expand Down Expand Up @@ -101,6 +103,8 @@ pub async fn record_span(pool: &PgPool, span: &Span, project_id: &Uuid) -> Resul
span_type,
input_preview,
output_preview,
input_url,
output_url,
project_id
)
VALUES(
Expand All @@ -116,7 +120,9 @@ pub async fn record_span(pool: &PgPool, span: &Span, project_id: &Uuid) -> Resul
$10,
$11,
$12,
$13)
$13,
$14,
$15)
ON CONFLICT (span_id, project_id) DO UPDATE SET
trace_id = EXCLUDED.trace_id,
parent_span_id = EXCLUDED.parent_span_id,
Expand All @@ -128,7 +134,9 @@ pub async fn record_span(pool: &PgPool, span: &Span, project_id: &Uuid) -> Resul
output = EXCLUDED.output,
span_type = EXCLUDED.span_type,
input_preview = EXCLUDED.input_preview,
output_preview = EXCLUDED.output_preview
output_preview = EXCLUDED.output_preview,
input_url = EXCLUDED.input_url,
output_url = EXCLUDED.output_url
",
)
.bind(&span.span_id)
Expand All @@ -143,6 +151,8 @@ pub async fn record_span(pool: &PgPool, span: &Span, project_id: &Uuid) -> Resul
.bind(&span.span_type as &SpanType)
.bind(&input_preview)
.bind(&output_preview)
.bind(&span.input_url as &Option<String>)
.bind(&span.output_url as &Option<String>)
.bind(&project_id)
.execute(pool)
.await?;
Expand Down
7 changes: 5 additions & 2 deletions app-server/src/routes/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@ Set the target version for the pipeline in the pipeline builder."),
let mut short_message = message.clone();
let value: String = short_message.value.clone().into();
if value.len() > 100 {
short_message.value =
format!("{}... [TRUNCATED FOR BREVITY]", &value[..100]).into();
short_message.value = format!(
"{}... [TRUNCATED FOR BREVITY]",
&value.chars().take(100).collect::<String>()
)
.into();
}
(node, short_message)
})
Expand Down
3 changes: 3 additions & 0 deletions app-server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ impl S3Storage {
#[async_trait::async_trait]
impl super::Storage for S3Storage {
async fn store(&self, data: Vec<u8>, key: &str) -> Result<String> {
// TODO: check the performance of this, and, if needed,
// try either multi-part upload or tokio::spawn the upload
// and just return the url
self.client
.put_object()
.bucket(&self.bucket)
Expand Down
2 changes: 1 addition & 1 deletion app-server/src/traces/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ async fn inner_process_queue_spans<T: Storage + ?Sized>(

if is_feature_enabled(Feature::Storage) {
if let Err(e) = span
.store_input_media(&rabbitmq_span_message.project_id, storage.clone())
.store_payloads(&rabbitmq_span_message.project_id, storage.clone())
.await
{
log::error!(
Expand Down
41 changes: 40 additions & 1 deletion app-server/src/traces/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ const OVERRIDE_PARENT_SPAN_ATTRIBUTE_NAME: &str = "lmnr.internal.override_parent
const TRACING_LEVEL_ATTRIBUTE_NAME: &str = "lmnr.internal.tracing_level";
const HAS_BROWSER_SESSION_ATTRIBUTE_NAME: &str = "lmnr.internal.has_browser_session";

// Minimal number of tokens in the input or output to store the payload
// in storage instead of database.
//
// We use 7/2 as an estimate of the number of characters per token.
// And 128K is a common input size for LLM calls.
const PAYLOAD_SIZE_THRESHOLD: usize = (7 / 2) * 128_000; // approx 448KB

#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "snake_case")]
enum TracingLevel {
Expand Down Expand Up @@ -503,6 +510,8 @@ impl Span {
span_type: SpanType::PIPELINE,
events: None,
labels: None,
input_url: None,
output_url: None,
}
}

Expand Down Expand Up @@ -563,13 +572,15 @@ impl Span {
},
events: None,
labels: None,
input_url: None,
output_url: None,
};
Some(span)
})
.collect()
}

pub async fn store_input_media<S: Storage + ?Sized>(
pub async fn store_payloads<S: Storage + ?Sized>(
&mut self,
project_id: &Uuid,
storage: Arc<S>,
Expand All @@ -589,6 +600,34 @@ impl Span {
new_messages.push(message);
}
self.input = Some(serde_json::to_value(new_messages).unwrap());
// We cannot parse the input as a Vec<ChatMessage>, but we check if
// it's still large. Obviously serializing to JSON affects the size,
// but we don't need to be exact here.
} else {
let input_str = serde_json::to_string(&self.input).unwrap_or_default();
if input_str.len() > PAYLOAD_SIZE_THRESHOLD {
let key = crate::storage::create_key(project_id, &None);
let mut data = Vec::new();
serde_json::to_writer(&mut data, &self.input)?;
let url = storage.store(data, &key).await?;
self.input_url = Some(url);
self.input = Some(serde_json::Value::String(
input_str.chars().take(100).collect(),
));
}
}
}
if let Some(output) = self.output.clone() {
let output_str = serde_json::to_string(&output).unwrap_or_default();
if output_str.len() > PAYLOAD_SIZE_THRESHOLD {
let key = crate::storage::create_key(project_id, &None);
let mut data = Vec::new();
serde_json::to_writer(&mut data, &output)?;
let url = storage.store(data, &key).await?;
self.output_url = Some(url);
self.output = Some(serde_json::Value::String(
output_str.chars().take(100).collect(),
));
}
}
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export async function GET(
// that the media-type is application/pdf
if (payloadType === 'image') {
return new Response(bytes);
} else if (payloadType === 'raw') {
return new Response(bytes);
} else if (payloadId.endsWith('.pdf')) {
headers.set('Content-Type', 'application/pdf');
} else {
Expand Down
4 changes: 2 additions & 2 deletions frontend/app/api/projects/[projectId]/spans/[spanId]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ export async function GET(
const projectId = params.projectId;
const spanId = params.spanId;

const rows = await db.query.spans.findFirst({
const span = await db.query.spans.findFirst({
where: and(eq(spans.spanId, spanId), eq(spans.projectId, projectId)),
});

return NextResponse.json(rows);
return NextResponse.json(span);
}
40 changes: 34 additions & 6 deletions frontend/components/traces/span-view-span.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ interface SpanViewSpanProps {
export function SpanViewSpan({ span }: SpanViewSpanProps) {
const scrollAreaRef = useRef<HTMLDivElement>(null);
const [contentWidth, setContentWidth] = useState<number>(0);
const [spanInput, setSpanInput] = useState(span.input);
const [spanOutput, setSpanOutput] = useState(span.output);

useEffect(() => {
if (!scrollAreaRef.current) return;
Expand All @@ -31,6 +33,28 @@ export function SpanViewSpan({ span }: SpanViewSpanProps) {
return () => resizeObserver.disconnect();
}, [scrollAreaRef.current]);

if (span.inputUrl) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for the fetch calls to handle potential network or parsing errors. This applies to both inputUrl and outputUrl fetch calls.

const url = span.inputUrl.startsWith('/')
? `${span.inputUrl}?payloadType=raw`
: span.inputUrl;
fetch(url).then(response => {
response.json().then(j => {
setSpanInput(j);
});
});
}

if (span.outputUrl) {
const url = span.outputUrl.startsWith('/')
? `${span.outputUrl}?payloadType=raw`
: span.outputUrl;
fetch(url).then(response => {
response.json().then(j => {
setSpanOutput(j);
});
});
}

return (
<ScrollArea ref={scrollAreaRef} className="w-full h-full mt-0" type="scroll">
<div className="max-h-0">
Expand All @@ -42,16 +66,20 @@ export function SpanViewSpan({ span }: SpanViewSpanProps) {
<SpanLabels span={span} />
<SpanDatasets spanId={span.spanId} />
<div className="pb-2 font-medium text-lg">Input</div>
{isChatMessageList(span.input) ? (
{isChatMessageList(spanInput) ? (
<ChatMessageListTab
messages={span.input}
messages={spanInput}
presetKey={`input-${span.attributes['lmnr.span.path'].join('.')}`}
/>
) : (
<Formatter
className="max-h-[400px]"
collapsible
value={JSON.stringify(span.input)}
value={
typeof spanInput === 'string'
? spanInput
: JSON.stringify(spanInput)
}
presetKey={`input-${span.attributes['lmnr.span.path'].join('.')}`}
/>
)}
Expand All @@ -61,9 +89,9 @@ export function SpanViewSpan({ span }: SpanViewSpanProps) {
<Formatter
className="max-h-[400px]"
value={
typeof span.output === 'string'
? span.output
: JSON.stringify(span.output)
typeof spanOutput === 'string'
? spanOutput
: JSON.stringify(spanOutput)
}
presetKey={`output-${span.attributes['lmnr.span.path'].join('.')}`}
collapsible
Expand Down
2 changes: 2 additions & 0 deletions frontend/lib/db/migrations/0017_groovy_senator_kelly.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE "spans" ADD COLUMN "input_url" text;--> statement-breakpoint
ALTER TABLE "spans" ADD COLUMN "output_url" text;
Loading