diff --git a/packages/dd-trace/src/llmobs/constants.js b/packages/dd-trace/src/llmobs/constants.js index 4cc63a8d01d..68d426e3745 100644 --- a/packages/dd-trace/src/llmobs/constants.js +++ b/packages/dd-trace/src/llmobs/constants.js @@ -36,5 +36,6 @@ module.exports = { EVP_EVENT_SIZE_LIMIT: (1 << 20) - 1024, // 999KB (actual limit is 1MB) DROPPED_IO_COLLECTION_ERROR: 'dropped_io', - DROPPED_VALUE_TEXT: "[This value has been dropped because this span's size exceeds the 1MB size limit.]" + DROPPED_VALUE_TEXT: "[This value has been dropped because this span's size exceeds the 1MB size limit.]", + UNSERIALIZABLE_VALUE_TEXT: 'Unserializable value' } diff --git a/packages/dd-trace/src/llmobs/span_processor.js b/packages/dd-trace/src/llmobs/span_processor.js new file mode 100644 index 00000000000..c87e798eb00 --- /dev/null +++ b/packages/dd-trace/src/llmobs/span_processor.js @@ -0,0 +1,199 @@ +'use strict' + +const { + SPAN_KIND, + MODEL_NAME, + MODEL_PROVIDER, + METADATA, + INPUT_MESSAGES, + INPUT_VALUE, + OUTPUT_MESSAGES, + INPUT_DOCUMENTS, + OUTPUT_DOCUMENTS, + OUTPUT_VALUE, + METRICS, + ML_APP, + TAGS, + PARENT_ID_KEY, + SESSION_ID, + NAME, + UNSERIALIZABLE_VALUE_TEXT +} = require('./constants') + +const { + ERROR_MESSAGE, + ERROR_TYPE, + ERROR_STACK +} = require('../constants') + +const LLMObsTagger = require('./tagger') +const AgentlessWriter = require('./writers/spans/agentless') +const AgentProxyWriter = require('./writers/spans/agentProxy') + +const tracerVersion = require('../../../../package.json').version +const logger = require('../log') + +class LLMObsSpanProcessor { + constructor (config) { + this._config = config + const { llmobs } = config + + if (llmobs.enabled) { + const LLMObsSpanWriter = llmobs.agentlessEnabled ? AgentlessWriter : AgentProxyWriter + this._writer = new LLMObsSpanWriter(config) + } + } + + // TODO: instead of relying on the tagger's weakmap registry, can we use some namespaced storage correlation? + process ({ span }) { + if (!this._config.llmobs.enabled) return + // if the span is not in our private tagger map, it is not an llmobs span + if (!LLMObsTagger.tagMap.has(span)) return + const formattedEvent = this.format(span) + + try { + this._writer.append(formattedEvent) + } catch (e) { + // this should be a rare case + // we protect against unserializable properties in the format function, and in + // safeguards in the tagger + logger.warn(` + Failed to append span to LLM Observability writer, likely due to an unserializable property. + Span won't be sent to LLM Observability: ${e.message} + `) + } + } + + format (span) { + const spanTags = span.context()._tags + const mlObsTags = LLMObsTagger.tagMap.get(span) + + const spanKind = mlObsTags[SPAN_KIND] + + const meta = { 'span.kind': spanKind, input: {}, output: {} } + const input = {} + const output = {} + + if (['llm', 'embedding'].includes(spanKind) && mlObsTags[MODEL_NAME]) { + meta.model_name = mlObsTags[MODEL_NAME] + meta.model_provider = (mlObsTags[MODEL_PROVIDER] || 'custom').toLowerCase() + } + if (mlObsTags[METADATA]) { + this._addObject(mlObsTags[METADATA], meta.metadata = {}) + } + if (spanKind === 'llm' && mlObsTags[INPUT_MESSAGES]) { + input.messages = mlObsTags[INPUT_MESSAGES] + } + if (mlObsTags[INPUT_VALUE]) { + input.value = mlObsTags[INPUT_VALUE] + } + if (spanKind === 'llm' && mlObsTags[OUTPUT_MESSAGES]) { + output.messages = mlObsTags[OUTPUT_MESSAGES] + } + if (spanKind === 'embedding' && mlObsTags[INPUT_DOCUMENTS]) { + input.documents = mlObsTags[INPUT_DOCUMENTS] + } + if (mlObsTags[OUTPUT_VALUE]) { + output.value = mlObsTags[OUTPUT_VALUE] + } + if (spanKind === 'retrieval' && mlObsTags[OUTPUT_DOCUMENTS]) { + output.documents = mlObsTags[OUTPUT_DOCUMENTS] + } + + const error = spanTags.error || spanTags[ERROR_TYPE] + if (error) { + meta[ERROR_MESSAGE] = spanTags[ERROR_MESSAGE] || error.message || error.code + meta[ERROR_TYPE] = spanTags[ERROR_TYPE] || error.name + meta[ERROR_STACK] = spanTags[ERROR_STACK] || error.stack + } + + if (input) meta.input = input + if (output) meta.output = output + + const metrics = mlObsTags[METRICS] || {} + + const mlApp = mlObsTags[ML_APP] + const sessionId = mlObsTags[SESSION_ID] + const parentId = mlObsTags[PARENT_ID_KEY] + + const name = mlObsTags[NAME] || span._name + + const llmObsSpanEvent = { + trace_id: span.context().toTraceId(true), + span_id: span.context().toSpanId(), + parent_id: parentId, + name, + tags: this._processTags(span, mlApp, sessionId, error), + start_ns: Math.round(span._startTime * 1e6), + duration: Math.round(span._duration * 1e6), + status: error ? 'error' : 'ok', + meta, + metrics, + _dd: { + span_id: span.context().toSpanId(), + trace_id: span.context().toTraceId(true) + } + } + + if (sessionId) llmObsSpanEvent.session_id = sessionId + + return llmObsSpanEvent + } + + // For now, this only applies to metadata, as we let users annotate this field with any object + // However, we want to protect against circular references or BigInts (unserializable) + // This function can be reused for other fields if needed + // Messages, Documents, and Metrics are safeguarded in `llmobs/tagger.js` + _addObject (obj, carrier) { + const seenObjects = new WeakSet() + seenObjects.add(obj) // capture root object + + const isCircular = value => { + if (typeof value !== 'object') return false + if (seenObjects.has(value)) return true + seenObjects.add(value) + return false + } + + const add = (obj, carrier) => { + for (const key in obj) { + const value = obj[key] + if (!Object.prototype.hasOwnProperty.call(obj, key)) continue + if (typeof value === 'bigint' || isCircular(value)) { + // mark as unserializable instead of dropping + logger.warn(`Unserializable property found in metadata: ${key}`) + carrier[key] = UNSERIALIZABLE_VALUE_TEXT + continue + } + if (typeof value === 'object') { + add(value, carrier[key] = {}) + } else { + carrier[key] = value + } + } + } + + add(obj, carrier) + } + + _processTags (span, mlApp, sessionId, error) { + let tags = { + version: this._config.version, + env: this._config.env, + service: this._config.service, + source: 'integration', + ml_app: mlApp, + 'dd-trace.version': tracerVersion, + error: Number(!!error) || 0, + language: 'javascript' + } + const errType = span.context()._tags[ERROR_TYPE] || error?.name + if (errType) tags.error_type = errType + if (sessionId) tags.session_id = sessionId + const existingTags = LLMObsTagger.tagMap.get(span)?.[TAGS] || {} + if (existingTags) tags = { ...tags, ...existingTags } + return Object.entries(tags).map(([key, value]) => `${key}:${value ?? ''}`) + } +} + +module.exports = LLMObsSpanProcessor diff --git a/packages/dd-trace/src/llmobs/tagger.js b/packages/dd-trace/src/llmobs/tagger.js index 7c59ef9241e..0b5a8aa778b 100644 --- a/packages/dd-trace/src/llmobs/tagger.js +++ b/packages/dd-trace/src/llmobs/tagger.js @@ -1,9 +1,6 @@ 'use strict' const logger = require('../log') -const { - SPAN_TYPE -} = require('../../../../ext/tags') const { MODEL_NAME, MODEL_PROVIDER, @@ -25,11 +22,25 @@ const { ROOT_PARENT_ID } = require('./constants') +// maps spans to tag annotations +const tagMap = new WeakMap() + +function setTag (span, key, value) { + const tagsCarrier = tagMap.get(span) || {} + Object.assign(tagsCarrier, { [key]: value }) + if (!tagMap.has(span)) tagMap.set(span, tagsCarrier) +} + class LLMObsTagger { constructor (config) { this._config = config } + static get tagMap () { + return tagMap + } + + // TODO: we're using a weakmap registry of LLMObs spans for now, how can this be used in the core API? setLLMObsSpanTags ( span, kind, @@ -37,26 +48,28 @@ class LLMObsTagger { name ) { if (!this._config.llmobs.enabled) return - if (kind) span.setTag(SPAN_TYPE, 'llm') // only mark it as an llm span if it was a valid kind - if (name) span.setTag(NAME, name) + if (!kind) return // do not register it in the map if it doesn't have an llmobs span kind + if (name) setTag(span, NAME, name) - span.setTag(SPAN_KIND, kind) - if (modelName) span.setTag(MODEL_NAME, modelName) - if (modelProvider) span.setTag(MODEL_PROVIDER, modelProvider) + setTag(span, SPAN_KIND, kind) + if (modelName) setTag(span, MODEL_NAME, modelName) + if (modelProvider) setTag(span, MODEL_PROVIDER, modelProvider) sessionId = sessionId || parentLLMObsSpan?.context()._tags[SESSION_ID] - if (sessionId) span.setTag(SESSION_ID, sessionId) + if (sessionId) setTag(span, SESSION_ID, sessionId) if (!mlApp) mlApp = parentLLMObsSpan?.context()._tags[ML_APP] || this._config.llmobs.mlApp - span.setTag(ML_APP, mlApp) + setTag(span, ML_APP, mlApp) const parentId = parentLLMObsSpan?.context().toSpanId() || span.context()._trace.tags[PROPAGATED_PARENT_ID_KEY] || ROOT_PARENT_ID - span.setTag(PARENT_ID_KEY, parentId) + setTag(span, PARENT_ID_KEY, parentId) } + // TODO: similarly for the following `tag` methods, + // how can we transition from a span weakmap to core API functionality tagLLMIO (span, inputData, outputData) { this._tagMessages(span, inputData, INPUT_MESSAGES) this._tagMessages(span, outputData, OUTPUT_MESSAGES) @@ -78,41 +91,38 @@ class LLMObsTagger { } tagMetadata (span, metadata) { - try { - span.setTag(METADATA, JSON.stringify(metadata)) - } catch { - logger.warn('Failed to parse span metadata. Metadata key-value pairs must be JSON serializable.') - } + setTag(span, METADATA, metadata) } tagMetrics (span, metrics) { - try { - span.setTag(METRICS, JSON.stringify(metrics)) - } catch { - logger.warn('Failed to parse span metrics. Metrics key-value pairs must be JSON serializable.') + const filterdMetrics = {} + for (const [key, value] of Object.entries(metrics)) { + if (typeof value === 'number') { + filterdMetrics[key] = value + } else { + logger.warn(`Value for metric '${key}' must be a number, instead got ${value}`) + } } + + setTag(span, METRICS, filterdMetrics) } tagSpanTags (span, tags) { // new tags will be merged with existing tags - try { - const currentTags = span.context()._tags[TAGS] - if (currentTags) { - Object.assign(tags, JSON.parse(currentTags)) - } - span.setTag(TAGS, JSON.stringify(tags)) - } catch { - logger.warn('Failed to parse span tags. Tag key-value pairs must be JSON serializable.') + const currentTags = tagMap.get(span)?.[TAGS] + if (currentTags) { + Object.assign(tags, currentTags) } + setTag(span, TAGS, tags) } _tagText (span, data, key) { if (data) { if (typeof data === 'string') { - span.setTag(key, data) + setTag(span, key, data) } else { try { - span.setTag(key, JSON.stringify(data)) + setTag(span, key, JSON.stringify(data)) } catch { const type = key === INPUT_VALUE ? 'input' : 'output' logger.warn(`Failed to parse ${type} value, must be JSON serializable.`) @@ -127,57 +137,36 @@ class LLMObsTagger { data = [data] } - try { - const documents = data.map(document => { - if (typeof document === 'string') { - return { text: document } - } - - if (document == null || typeof document !== 'object') { - logger.warn('Documents must be a string, object, or list of objects.') - return undefined - } + const documents = data.map(document => { + if (typeof document === 'string') { + return { text: document } + } - const { text, name, id, score } = document + let validDocument = true - if (typeof text !== 'string') { - logger.warn('Document text must be a string.') - return undefined - } + if (document == null || typeof document !== 'object') { + logger.warn('Documents must be a string, object, or list of objects.') + return undefined // returning here as we need document to be an object + } - const documentObj = { text } + const { text, name, id, score } = document - if (name) { - if (typeof name !== 'string') { - logger.warn('Document name must be a string.') - return undefined - } - documentObj.name = name - } + if (typeof text !== 'string') { + logger.warn('Document text must be a string.') + validDocument = false + } - if (id) { - if (typeof id !== 'string') { - logger.warn('Document ID must be a string.') - return undefined - } - documentObj.id = id - } + const documentObj = { text } - if (score) { - if (typeof score !== 'number') { - logger.warn('Document score must be a number.') - return undefined - } - documentObj.score = score - } + validDocument = this._tagConditionalString(name, 'Document name', documentObj, 'name') && validDocument + validDocument = this._tagConditionalString(id, 'Document ID', documentObj, 'id') && validDocument + validDocument = this._tagConditionalNumber(score, 'Document score', documentObj, 'score') && validDocument - return documentObj - }).filter(doc => !!doc) + return validDocument ? documentObj : undefined + }).filter(doc => !!doc) - span.setTag(key, JSON.stringify(documents)) - } catch { - const type = key === INPUT_DOCUMENTS ? 'input' : 'output' - logger.warn(`Failed to parse ${type} documents.`) + if (documents.length) { + setTag(span, key, documents) } } } @@ -188,100 +177,98 @@ class LLMObsTagger { data = [data] } - try { - const messages = data.map(message => { - if (typeof message === 'string') { - return { content: message } - } + const messages = data.map(message => { + if (typeof message === 'string') { + return { content: message } + } - if (message == null || typeof message !== 'object') { - logger.warn('Messages must be a string, object, or list of objects') - return undefined - } + if (message == null || typeof message !== 'object') { + logger.warn('Messages must be a string, object, or list of objects') + return undefined // returning here as we need message to be an object + } - const { content = '', role } = message - let toolCalls = message.toolCalls - const messageObj = { content } + let validMessage = true - if (typeof content !== 'string') { - logger.warn('Message content must be a string.') - return undefined - } + const { content = '', role } = message + let toolCalls = message.toolCalls + const messageObj = { content } - if (role) { - if (typeof role !== 'string') { - logger.warn('Message role must be a string.') - return undefined - } - messageObj.role = role + if (typeof content !== 'string') { + logger.warn('Message content must be a string.') + validMessage = false + } + + validMessage = this._tagConditionalString(role, 'Message role', messageObj, 'role') && validMessage + + if (toolCalls) { + if (!Array.isArray(toolCalls)) { + toolCalls = [toolCalls] } - if (toolCalls) { - if (!Array.isArray(toolCalls)) { - toolCalls = [toolCalls] + const filteredToolCalls = toolCalls.map(toolCall => { + if (typeof toolCall !== 'object') { + logger.warn('Tool call must be an object.') + return undefined // returning here as we need tool call to be an object } - const filteredToolCalls = toolCalls.map(toolCall => { - if (typeof toolCall !== 'object') { - logger.warn('Tool call must be an object.') - return undefined - } - - const { name, arguments: args, toolId, type } = toolCall - const toolCallObj = {} - - if (name) { - if (typeof name !== 'string') { - logger.warn('Tool name must be a string.') - return undefined - } - toolCallObj.name = name - } - - if (args) { - if (typeof args !== 'object') { - logger.warn('Tool arguments must be an object.') - return undefined - } - toolCallObj.arguments = args - } - - if (toolId) { - if (typeof toolId !== 'string') { - logger.warn('Tool ID must be a string.') - return undefined - } - toolCallObj.toolId = toolId - } - - if (type) { - if (typeof type !== 'string') { - logger.warn('Tool type must be a string.') - return undefined - } - toolCallObj.type = type - } - - return toolCallObj - }).filter(toolCall => !!toolCall) - - if (filteredToolCalls.length) { - messageObj.tool_calls = filteredToolCalls - } - } + let validTool = true + + const { name, arguments: args, toolId, type } = toolCall + const toolCallObj = {} + + validTool = this._tagConditionalString(name, 'Tool name', toolCallObj, 'name') && validTool + validTool = this._tagConditionalObject(args, 'Tool arguments', toolCallObj, 'arguments') && validTool + validTool = this._tagConditionalString(toolId, 'Tool ID', toolCallObj, 'tool_id') && validTool + validTool = this._tagConditionalString(type, 'Tool type', toolCallObj, 'type') && validTool - return messageObj - }).filter(msg => !!msg) + return validTool ? toolCallObj : undefined + }).filter(toolCall => !!toolCall) - if (messages.length) { - span.setTag(key, JSON.stringify(messages)) + if (filteredToolCalls.length) { + messageObj.tool_calls = filteredToolCalls + } } - } catch { - const type = key === INPUT_MESSAGES ? 'input' : 'output' - logger.warn(`Failed to parse ${type} messages.`) + + return validMessage ? messageObj : undefined + }).filter(msg => !!msg) + + if (messages.length) { + setTag(span, key, messages) } } } + + _tagConditionalString (data, type, carrier, key) { + // returning true here means we won't drop the whole object (message/document) + // if the field isn't there. we check for mandatory fields separately + if (!data) return true + if (typeof data !== 'string') { + logger.warn(`${type} must be a string.`) + return false + } + carrier[key] = data + return true + } + + _tagConditionalNumber (data, type, carrier, key) { + if (!data) return true + if (typeof data !== 'number') { + logger.warn(`${type} must be a number.`) + return false + } + carrier[key] = data + return true + } + + _tagConditionalObject (data, type, carrier, key) { + if (!data) return true + if (typeof data !== 'object') { + logger.warn(`${type} must be an object.`) + return false + } + carrier[key] = data + return true + } } module.exports = LLMObsTagger diff --git a/packages/dd-trace/src/span_processor.js b/packages/dd-trace/src/span_processor.js index 6dc19407d56..deb92c02f34 100644 --- a/packages/dd-trace/src/span_processor.js +++ b/packages/dd-trace/src/span_processor.js @@ -10,6 +10,9 @@ const { SpanStatsProcessor } = require('./span_stats') const startedSpans = new WeakSet() const finishedSpans = new WeakSet() +const { channel } = require('dc-polyfill') +const spanProcessCh = channel('dd-trace:span:process') + class SpanProcessor { constructor (exporter, prioritySampler, config) { this._exporter = exporter @@ -45,6 +48,8 @@ class SpanProcessor { const formattedSpan = format(span) this._stats.onSpanFinished(formattedSpan) formatted.push(formattedSpan) + + spanProcessCh.publish({ span }) } else { active.push(span) } diff --git a/packages/dd-trace/test/llmobs/span_processor.spec.js b/packages/dd-trace/test/llmobs/span_processor.spec.js new file mode 100644 index 00000000000..8265d44cdb0 --- /dev/null +++ b/packages/dd-trace/test/llmobs/span_processor.spec.js @@ -0,0 +1,402 @@ +'use strict' + +const { expect } = require('chai') +const proxyquire = require('proxyquire') + +// we will use this to populate the span-tags map +const LLMObsTagger = require('../../src/llmobs/tagger') + +describe('span processor', () => { + let LLMObsSpanProcessor + let processor + let AgentlessWriter + let AgentProxyWriter + let writer + let log + + beforeEach(() => { + writer = { + append: sinon.stub() + } + AgentlessWriter = sinon.stub().returns(writer) + AgentProxyWriter = sinon.stub().returns(writer) + log = { + warn: sinon.stub() + } + + LLMObsSpanProcessor = proxyquire('../../src/llmobs/span_processor', { + './writers/spans/agentless': AgentlessWriter, + './writers/spans/agentProxy': AgentProxyWriter, + '../../../../package.json': { version: 'x.y.z' }, + '../log': log + }) + }) + + describe('initialization', () => { + it('should not create a writer if llmobs is not enabled', () => { + processor = new LLMObsSpanProcessor({ llmobs: { enabled: false } }) + + expect(processor._writer).to.be.undefined + }) + + it('should create an agentless writer if agentless is enabled', () => { + processor = new LLMObsSpanProcessor({ llmobs: { enabled: true, agentlessEnabled: true } }) + + expect(AgentlessWriter).to.have.been.calledOnce + }) + + it('should create an agent proxy writer if agentless is not enabled', () => { + processor = new LLMObsSpanProcessor({ llmobs: { enabled: true, agentlessEnabled: false } }) + + expect(AgentProxyWriter).to.have.been.calledOnce + }) + }) + + describe('process', () => { + let span + + it('should do nothing if llmobs is not enabled', () => { + processor = new LLMObsSpanProcessor({ llmobs: { enabled: false } }) + + expect(() => processor.process({ span })).not.to.throw() + }) + + it('should do nothing if the span is not an llm obs span', () => { + processor = new LLMObsSpanProcessor({ llmobs: { enabled: true } }) + span = { context: () => ({ _tags: {} }) } + + expect(processor._writer.append).to.not.have.been.called + }) + + it('should format the span event for the writer', () => { + span = { + _name: 'test', + _startTime: 0, // this is in ms, will be converted to ns + _duration: 1, // this is in ms, will be converted to ns + context () { + return { + _tags: {}, + toTraceId () { return '123' }, // should not use this + toSpanId () { return '456' } + } + } + } + LLMObsTagger.tagMap.set(span, { + '_ml_obs.meta.span.kind': 'llm', + '_ml_obs.meta.model_name': 'myModel', + '_ml_obs.meta.model_provider': 'myProvider', + '_ml_obs.meta.metadata': { foo: 'bar' }, + '_ml_obs.meta.ml_app': 'myApp', + '_ml_obs.meta.input.value': 'input-value', + '_ml_obs.meta.output.value': 'output-value', + '_ml_obs.meta.input.messages': [{ role: 'user', content: 'hello' }], + '_ml_obs.meta.output.messages': [{ role: 'assistant', content: 'world' }], + '_ml_obs.llmobs_parent_id': '1234' + }) + + processor = new LLMObsSpanProcessor({ llmobs: { enabled: true } }) + + processor.process({ span }) + const payload = writer.append.getCall(0).firstArg + + expect(payload).to.deep.equal({ + trace_id: '123', + span_id: '456', + parent_id: '1234', + name: 'test', + tags: [ + 'version:', + 'env:', + 'service:', + 'source:integration', + 'ml_app:myApp', + 'dd-trace.version:x.y.z', + 'error:0', + 'language:javascript' + ], + start_ns: 0, + duration: 1000000, + status: 'ok', + meta: { + 'span.kind': 'llm', + model_name: 'myModel', + model_provider: 'myprovider', // should be lowercase + input: { + value: 'input-value', + messages: [{ role: 'user', content: 'hello' }] + }, + output: { + value: 'output-value', + messages: [{ role: 'assistant', content: 'world' }] + }, + metadata: { foo: 'bar' } + }, + metrics: {}, + _dd: { + trace_id: '123', + span_id: '456' + } + }) + + expect(writer.append).to.have.been.calledOnce + }) + + it('removes problematic fields from the metadata', () => { + // problematic fields are circular references or bigints + const metadata = { + bigint: BigInt(1), + deep: { + foo: 'bar' + }, + bar: 'baz' + } + metadata.circular = metadata + metadata.deep.circular = metadata.deep + span = { + context () { + return { + _tags: {}, + toTraceId () { return '123' }, + toSpanId () { return '456' } + } + } + } + + LLMObsTagger.tagMap.set(span, { + '_ml_obs.meta.span.kind': 'llm', + '_ml_obs.meta.metadata': metadata + }) + + processor = new LLMObsSpanProcessor({ llmobs: { enabled: true } }) + processor.process({ span }) + const payload = writer.append.getCall(0).firstArg + + expect(payload.meta.metadata).to.deep.equal({ + bar: 'baz', + bigint: 'Unserializable value', + circular: 'Unserializable value', + deep: { foo: 'bar', circular: 'Unserializable value' } + }) + }) + + it('tags output documents for a retrieval span', () => { + span = { + context () { + return { + _tags: {}, + toTraceId () { return '123' }, + toSpanId () { return '456' } + } + } + } + + LLMObsTagger.tagMap.set(span, { + '_ml_obs.meta.span.kind': 'retrieval', + '_ml_obs.meta.output.documents': [{ text: 'hello', name: 'myDoc', id: '1', score: 0.6 }] + }) + + processor = new LLMObsSpanProcessor({ llmobs: { enabled: true } }) + + processor.process({ span }) + const payload = writer.append.getCall(0).firstArg + + expect(payload.meta.output.documents).to.deep.equal([{ + text: 'hello', + name: 'myDoc', + id: '1', + score: 0.6 + }]) + }) + + it('tags input documents for an embedding span', () => { + span = { + context () { + return { + _tags: {}, + toTraceId () { return '123' }, + toSpanId () { return '456' } + } + } + } + + LLMObsTagger.tagMap.set(span, { + '_ml_obs.meta.span.kind': 'embedding', + '_ml_obs.meta.input.documents': [{ text: 'hello', name: 'myDoc', id: '1', score: 0.6 }] + }) + + processor = new LLMObsSpanProcessor({ llmobs: { enabled: true } }) + + processor.process({ span }) + const payload = writer.append.getCall(0).firstArg + + expect(payload.meta.input.documents).to.deep.equal([{ + text: 'hello', + name: 'myDoc', + id: '1', + score: 0.6 + }]) + }) + + it('defaults model provider to custom', () => { + span = { + context () { + return { + _tags: {}, + toTraceId () { return '123' }, + toSpanId () { return '456' } + } + } + } + + LLMObsTagger.tagMap.set(span, { + '_ml_obs.meta.span.kind': 'llm', + '_ml_obs.meta.model_name': 'myModel' + }) + + processor = new LLMObsSpanProcessor({ llmobs: { enabled: true } }) + + processor.process({ span }) + const payload = writer.append.getCall(0).firstArg + + expect(payload.meta.model_provider).to.equal('custom') + }) + + it('sets an error appropriately', () => { + span = { + context () { + return { + _tags: { + 'error.message': 'error message', + 'error.type': 'error type', + 'error.stack': 'error stack' + }, + toTraceId () { return '123' }, + toSpanId () { return '456' } + } + } + } + + LLMObsTagger.tagMap.set(span, { + '_ml_obs.meta.span.kind': 'llm' + }) + + processor = new LLMObsSpanProcessor({ llmobs: { enabled: true } }) + + processor.process({ span }) + const payload = writer.append.getCall(0).firstArg + + expect(payload.meta['error.message']).to.equal('error message') + expect(payload.meta['error.type']).to.equal('error type') + expect(payload.meta['error.stack']).to.equal('error stack') + expect(payload.status).to.equal('error') + + expect(payload.tags).to.include('error_type:error type') + }) + + it('uses the error itself if the span does not have specific error fields', () => { + span = { + context () { + return { + _tags: { + error: new Error('error message') + }, + toTraceId () { return '123' }, + toSpanId () { return '456' } + } + } + } + + LLMObsTagger.tagMap.set(span, { + '_ml_obs.meta.span.kind': 'llm' + }) + + processor = new LLMObsSpanProcessor({ llmobs: { enabled: true } }) + + processor.process({ span }) + const payload = writer.append.getCall(0).firstArg + + expect(payload.meta['error.message']).to.equal('error message') + expect(payload.meta['error.type']).to.equal('Error') + expect(payload.meta['error.stack']).to.exist + expect(payload.status).to.equal('error') + + expect(payload.tags).to.include('error_type:Error') + }) + + it('uses the span name from the tag if provided', () => { + span = { + _name: 'test', + context () { + return { + _tags: {}, + toTraceId () { return '123' }, + toSpanId () { return '456' } + } + } + } + + LLMObsTagger.tagMap.set(span, { + '_ml_obs.meta.span.kind': 'llm', + '_ml_obs.name': 'mySpan' + }) + + processor = new LLMObsSpanProcessor({ llmobs: { enabled: true } }) + + processor.process({ span }) + const payload = writer.append.getCall(0).firstArg + + expect(payload.name).to.equal('mySpan') + }) + + it('attaches session id if provided', () => { + span = { + context () { + return { + _tags: {}, + toTraceId () { return '123' }, + toSpanId () { return '456' } + } + } + } + + LLMObsTagger.tagMap.set(span, { + '_ml_obs.meta.span.kind': 'llm', + '_ml_obs.session_id': '1234' + }) + + processor = new LLMObsSpanProcessor({ llmobs: { enabled: true } }) + + processor.process({ span }) + const payload = writer.append.getCall(0).firstArg + + expect(payload.session_id).to.equal('1234') + expect(payload.tags).to.include('session_id:1234') + }) + + it('sets span tags appropriately', () => { + span = { + context () { + return { + _tags: {}, + toTraceId () { return '123' }, + toSpanId () { return '456' } + } + } + } + + LLMObsTagger.tagMap.set(span, { + '_ml_obs.meta.span.kind': 'llm', + '_ml_obs.tags': { hostname: 'localhost', foo: 'bar', source: 'mySource' } + }) + + processor = new LLMObsSpanProcessor({ llmobs: { enabled: true } }) + + processor.process({ span }) + const payload = writer.append.getCall(0).firstArg + + expect(payload.tags).to.include('foo:bar') + expect(payload.tags).to.include('source:mySource') + expect(payload.tags).to.include('hostname:localhost') + }) + }) +}) diff --git a/packages/dd-trace/test/llmobs/tagger.spec.js b/packages/dd-trace/test/llmobs/tagger.spec.js index 935389cd985..dd21a53f9ee 100644 --- a/packages/dd-trace/test/llmobs/tagger.spec.js +++ b/packages/dd-trace/test/llmobs/tagger.spec.js @@ -51,14 +51,13 @@ describe('tagger', () => { tagger = new Tagger({ llmobs: { enabled: false } }) tagger.setLLMObsSpanTags(span, 'llm') - expect(span.context()._tags).to.deep.equal({}) + expect(Tagger.tagMap.get(span)).to.deep.equal(undefined) }) it('tags an llm obs span with basic and default properties', () => { tagger.setLLMObsSpanTags(span, 'workflow') - expect(span.context()._tags).to.deep.equal({ - 'span.type': 'llm', + expect(Tagger.tagMap.get(span)).to.deep.equal({ '_ml_obs.meta.span.kind': 'workflow', '_ml_obs.meta.ml_app': 'my-default-ml-app', '_ml_obs.llmobs_parent_id': 'undefined' // no parent id provided @@ -73,8 +72,7 @@ describe('tagger', () => { mlApp: 'my-app' }) - expect(span.context()._tags).to.deep.equal({ - 'span.type': 'llm', + expect(Tagger.tagMap.get(span)).to.deep.equal({ '_ml_obs.meta.span.kind': 'llm', '_ml_obs.meta.model_name': 'my-model', '_ml_obs.meta.model_provider': 'my-provider', @@ -87,8 +85,7 @@ describe('tagger', () => { it('uses the name if provided', () => { tagger.setLLMObsSpanTags(span, 'llm', {}, 'my-span-name') - expect(span.context()._tags).to.deep.equal({ - 'span.type': 'llm', + expect(Tagger.tagMap.get(span)).to.deep.equal({ '_ml_obs.meta.span.kind': 'llm', '_ml_obs.meta.ml_app': 'my-default-ml-app', '_ml_obs.llmobs_parent_id': 'undefined', @@ -99,8 +96,7 @@ describe('tagger', () => { it('defaults parent id to undefined', () => { tagger.setLLMObsSpanTags(span, 'llm') - expect(span.context()._tags).to.deep.equal({ - 'span.type': 'llm', + expect(Tagger.tagMap.get(span)).to.deep.equal({ '_ml_obs.meta.span.kind': 'llm', '_ml_obs.meta.ml_app': 'my-default-ml-app', '_ml_obs.llmobs_parent_id': 'undefined' @@ -121,8 +117,7 @@ describe('tagger', () => { } tagger.setLLMObsSpanTags(span, 'llm', { parentLLMObsSpan: parentSpan }) - expect(span.context()._tags).to.deep.equal({ - 'span.type': 'llm', + expect(Tagger.tagMap.get(span)).to.deep.equal({ '_ml_obs.meta.span.kind': 'llm', '_ml_obs.meta.ml_app': 'my-ml-app', '_ml_obs.session_id': 'my-session', @@ -133,8 +128,7 @@ describe('tagger', () => { it('uses the propagated trace id if provided', () => { tagger.setLLMObsSpanTags(span, 'llm') - expect(span.context()._tags).to.deep.equal({ - 'span.type': 'llm', + expect(Tagger.tagMap.get(span)).to.deep.equal({ '_ml_obs.meta.span.kind': 'llm', '_ml_obs.meta.ml_app': 'my-default-ml-app', '_ml_obs.llmobs_parent_id': 'undefined' @@ -146,8 +140,7 @@ describe('tagger', () => { tagger.setLLMObsSpanTags(span, 'llm') - expect(span.context()._tags).to.deep.equal({ - 'span.type': 'llm', + expect(Tagger.tagMap.get(span)).to.deep.equal({ '_ml_obs.meta.span.kind': 'llm', '_ml_obs.meta.ml_app': 'my-default-ml-app', '_ml_obs.llmobs_parent_id': '-567' @@ -157,37 +150,40 @@ describe('tagger', () => { it('does not set span type if the LLMObs span kind is falsy', () => { tagger.setLLMObsSpanTags(span, false) - expect(span.context()._tags['span.type']).to.be.undefined + expect(Tagger.tagMap.get(span)).to.be.undefined }) }) describe('tagMetadata', () => { it('tags a span with metadata', () => { tagger.tagMetadata(span, { a: 'foo', b: 'bar' }) - expect(span.context()._tags).to.deep.equal({ - '_ml_obs.meta.metadata': '{"a":"foo","b":"bar"}' + expect(Tagger.tagMap.get(span)).to.deep.equal({ + '_ml_obs.meta.metadata': { a: 'foo', b: 'bar' } }) }) - - it('logs when metadata is not JSON serializable', () => { - const metadata = unserializbleObject() - tagger.tagMetadata(span, metadata) - expect(logger.warn).to.have.been.calledOnce - }) }) describe('tagMetrics', () => { it('tags a span with metrics', () => { tagger.tagMetadata(span, { a: 1, b: 2 }) - expect(span.context()._tags).to.deep.equal({ - '_ml_obs.meta.metadata': '{"a":1,"b":2}' + expect(Tagger.tagMap.get(span)).to.deep.equal({ + '_ml_obs.meta.metadata': { a: 1, b: 2 } }) }) - it('logs when metrics is not JSON serializable', () => { - const metadata = unserializbleObject() - tagger.tagMetadata(span, metadata) - expect(logger.warn).to.have.been.calledOnce + it('removes non-number entries', () => { + const metrics = { + a: 1, + b: 'foo', + c: { depth: 1 }, + d: undefined + } + tagger.tagMetrics(span, metrics) + expect(Tagger.tagMap.get(span)).to.deep.equal({ + '_ml_obs.metrics': { a: 1 } + }) + + expect(logger.warn).to.have.been.calledThrice }) }) @@ -195,17 +191,17 @@ describe('tagger', () => { it('sets tags on a span', () => { const tags = { foo: 'bar' } tagger.tagSpanTags(span, tags) - expect(span.context()._tags).to.deep.equal({ - '_ml_obs.tags': '{"foo":"bar"}' + expect(Tagger.tagMap.get(span)).to.deep.equal({ + '_ml_obs.tags': { foo: 'bar' } }) }) it('merges tags so they do not overwrite', () => { - span.context()._tags['_ml_obs.tags'] = '{"a":1}' + Tagger.tagMap.set(span, { '_ml_obs.tags': { a: 1 } }) const tags = { a: 2, b: 1 } tagger.tagSpanTags(span, tags) - expect(span.context()._tags).to.deep.equal({ - '_ml_obs.tags': '{"a":1,"b":1}' + expect(Tagger.tagMap.get(span)).to.deep.equal({ + '_ml_obs.tags': { a: 1, b: 1 } }) }) }) @@ -223,11 +219,15 @@ describe('tagger', () => { const outputData = 'Nice to meet you, human!' tagger.tagLLMIO(span, inputData, outputData) - expect(span.context()._tags).to.deep.equal({ - '_ml_obs.meta.input.messages': '[{"content":"you are an amazing assistant"},' + - '{"content":"hello! my name is foobar"},{"content":"I am a robot","role":"assistant"},' + - '{"content":"I am a human","role":"user"},{"content":""}]', - '_ml_obs.meta.output.messages': '[{"content":"Nice to meet you, human!"}]' + expect(Tagger.tagMap.get(span)).to.deep.equal({ + '_ml_obs.meta.input.messages': [ + { content: 'you are an amazing assistant' }, + { content: 'hello! my name is foobar' }, + { content: 'I am a robot', role: 'assistant' }, + { content: 'I am a human', role: 'user' }, + { content: '' } + ], + '_ml_obs.meta.output.messages': [{ content: 'Nice to meet you, human!' }] }) }) @@ -245,8 +245,8 @@ describe('tagger', () => { { content: 'goodbye', role: 5 } ] tagger.tagLLMIO(span, inputData, outputData) - expect(span.context()._tags).to.deep.equal({ - '_ml_obs.meta.input.messages': '[{"content":"hi"}]' + expect(Tagger.tagMap.get(span)).to.deep.equal({ + '_ml_obs.meta.input.messages': [{ content: 'hi' }] }) expect(logger.warn.getCall(0).firstArg).to.equal('Messages must be a string, object, or list of objects') @@ -269,11 +269,16 @@ describe('tagger', () => { ] tagger.tagLLMIO(span, inputData, outputData) - expect(span.context()._tags).to.deep.equal({ - '_ml_obs.meta.input.messages': '[{"content":"hello","tool_calls":[{"name":"tool1"},' + - '{"name":"tool2","arguments":{"a":1,"b":2}}]},' + - '{"content":"goodbye","tool_calls":[{"name":"tool3"}]}]', - '_ml_obs.meta.output.messages': '[{"content":"hi","tool_calls":[{"name":"tool4"}]}]' + expect(Tagger.tagMap.get(span)).to.deep.equal({ + '_ml_obs.meta.input.messages': [ + { + content: 'hello', + tool_calls: [{ name: 'tool1' }, { name: 'tool2', arguments: { a: 1, b: 2 } }] + }, { + content: 'goodbye', + tool_calls: [{ name: 'tool3' }] + }], + '_ml_obs.meta.output.messages': [{ content: 'hi', tool_calls: [{ name: 'tool4' }] }] }) }) @@ -294,9 +299,15 @@ describe('tagger', () => { ] tagger.tagLLMIO(span, inputData, undefined) - expect(span.context()._tags).to.deep.equal({ - '_ml_obs.meta.input.messages': '[{"content":"a"},{"content":"b"},{"content":"c"},' + - '{"content":"d"},{"content":"e"},{"content":"f"},{"content":"g","tool_calls":[{"name":"tool2"}]}]' + expect(Tagger.tagMap.get(span)).to.deep.equal({ + '_ml_obs.meta.input.messages': [ + { content: 'a' }, + { content: 'b' }, + { content: 'c' }, + { content: 'd' }, + { content: 'e' }, + { content: 'f' }, + { content: 'g', tool_calls: [{ name: 'tool2' }] }] }) expect(logger.warn.getCall(0).firstArg).to.equal('Tool call must be an object.') @@ -307,6 +318,20 @@ describe('tagger', () => { expect(logger.warn.getCall(5).firstArg).to.equal('Tool type must be a string.') expect(logger.warn.getCall(6).firstArg).to.equal('Tool arguments must be an object.') }) + + it('logs multiple errors if there are multiple errors for a message and filters it out', () => { + const messages = [ + { content: 'a', toolCalls: [5, { name: 5, type: 7 }], role: 7 } + ] + + tagger.tagLLMIO(span, messages, undefined) + expect(Tagger.tagMap.get(span)).to.deep.equal(undefined) + + expect(logger.warn.getCall(0).firstArg).to.equal('Message role must be a string.') + expect(logger.warn.getCall(1).firstArg).to.equal('Tool call must be an object.') + expect(logger.warn.getCall(2).firstArg).to.equal('Tool name must be a string.') + expect(logger.warn.getCall(3).firstArg).to.equal('Tool type must be a string.') + }) }) }) @@ -322,10 +347,14 @@ describe('tagger', () => { ] const outputData = 'embedded documents' tagger.tagEmbeddingIO(span, inputData, outputData) - expect(span.context()._tags).to.deep.equal({ - '_ml_obs.meta.input.documents': '[{"text":"my string document"},{"text":"my object document"},' + - '{"text":"foo","name":"bar"},{"text":"baz","id":"qux"},{"text":"quux","score":5},' + - '{"text":"foo","name":"bar","id":"qux","score":5}]', + expect(Tagger.tagMap.get(span)).to.deep.equal({ + '_ml_obs.meta.input.documents': [ + { text: 'my string document' }, + { text: 'my object document' }, + { text: 'foo', name: 'bar' }, + { text: 'baz', id: 'qux' }, + { text: 'quux', score: 5 }, + { text: 'foo', name: 'bar', id: 'qux', score: 5 }], '_ml_obs.meta.output.value': 'embedded documents' }) }) @@ -341,8 +370,8 @@ describe('tagger', () => { ] const outputData = 'output' tagger.tagEmbeddingIO(span, inputData, outputData) - expect(span.context()._tags).to.deep.equal({ - '_ml_obs.meta.input.documents': '[{"text":"hi"}]', + expect(Tagger.tagMap.get(span)).to.deep.equal({ + '_ml_obs.meta.input.documents': [{ text: 'hi' }], '_ml_obs.meta.output.value': 'output' }) @@ -352,6 +381,20 @@ describe('tagger', () => { expect(logger.warn.getCall(3).firstArg).to.equal('Documents must be a string, object, or list of objects.') expect(logger.warn.getCall(4).firstArg).to.equal('Documents must be a string, object, or list of objects.') }) + + it('logs multiple errors if there are multiple errors for a document and filters it out', () => { + const documents = [ + { text: 'a', name: 5, id: 7, score: 9 } + ] + + tagger.tagEmbeddingIO(span, documents, 'output') + expect(Tagger.tagMap.get(span)).to.deep.equal({ + '_ml_obs.meta.output.value': 'output' + }) + + expect(logger.warn.getCall(0).firstArg).to.equal('Document name must be a string.') + expect(logger.warn.getCall(1).firstArg).to.equal('Document ID must be a string.') + }) }) describe('tagRetrievalIO', () => { @@ -367,11 +410,15 @@ describe('tagger', () => { ] tagger.tagRetrievalIO(span, inputData, outputData) - expect(span.context()._tags).to.deep.equal({ + expect(Tagger.tagMap.get(span)).to.deep.equal({ '_ml_obs.meta.input.value': 'some query', - '_ml_obs.meta.output.documents': '[{"text":"result 1"},{"text":"result 2"},' + - '{"text":"foo","name":"bar"},{"text":"baz","id":"qux"},{"text":"quux","score":5},' + - '{"text":"foo","name":"bar","id":"qux","score":5}]' + '_ml_obs.meta.output.documents': [ + { text: 'result 1' }, + { text: 'result 2' }, + { text: 'foo', name: 'bar' }, + { text: 'baz', id: 'qux' }, + { text: 'quux', score: 5 }, + { text: 'foo', name: 'bar', id: 'qux', score: 5 }] }) }) @@ -386,9 +433,9 @@ describe('tagger', () => { undefined ] tagger.tagRetrievalIO(span, inputData, outputData) - expect(span.context()._tags).to.deep.equal({ + expect(Tagger.tagMap.get(span)).to.deep.equal({ '_ml_obs.meta.input.value': 'some query', - '_ml_obs.meta.output.documents': '[{"text":"hi"}]' + '_ml_obs.meta.output.documents': [{ text: 'hi' }] }) expect(logger.warn.getCall(0).firstArg).to.equal('Documents must be a string, object, or list of objects.') @@ -404,7 +451,7 @@ describe('tagger', () => { const inputData = { some: 'object' } const outputData = 'some text' tagger.tagTextIO(span, inputData, outputData) - expect(span.context()._tags).to.deep.equal({ + expect(Tagger.tagMap.get(span)).to.deep.equal({ '_ml_obs.meta.input.value': '{"some":"object"}', '_ml_obs.meta.output.value': 'some text' }) @@ -414,7 +461,7 @@ describe('tagger', () => { const data = unserializbleObject() tagger.tagTextIO(span, data, 'output') expect(logger.warn).to.have.been.calledOnceWith('Failed to parse input value, must be JSON serializable.') - expect(span.context()._tags).to.deep.equal({ + expect(Tagger.tagMap.get(span)).to.deep.equal({ '_ml_obs.meta.output.value': 'output' }) })