From 76077fa169d6f6dcb8dfe8ca79984156aae3213f Mon Sep 17 00:00:00 2001 From: Sam Brenner Date: Wed, 2 Oct 2024 11:25:43 -0400 Subject: [PATCH 1/3] add streamed calls for langchain llmobs --- ddtrace/contrib/internal/langchain/patch.py | 29 +++- ddtrace/contrib/internal/langchain/utils.py | 3 + ddtrace/llmobs/_integrations/langchain.py | 78 ++++++++-- .../langchain/test_langchain_llmobs.py | 140 +++++++++++++++--- ...ngchain_community.test_streamed_chain.json | 15 +- ...angchain_community.test_streamed_chat.json | 11 +- ...nity.test_streamed_json_output_parser.json | 15 +- ...langchain_community.test_streamed_llm.json | 11 +- 8 files changed, 237 insertions(+), 65 deletions(-) diff --git a/ddtrace/contrib/internal/langchain/patch.py b/ddtrace/contrib/internal/langchain/patch.py index 8db95a9dae1..ce72e1affff 100644 --- a/ddtrace/contrib/internal/langchain/patch.py +++ b/ddtrace/contrib/internal/langchain/patch.py @@ -954,8 +954,6 @@ def _on_span_started(span: Span): span.set_tag_str("langchain.request.inputs.%d.%s" % (idx, k), integration.trunc(str(v))) def _on_span_finished(span: Span, streamed_chunks): - if span.error or not integration.is_pc_sampled_span(span): - return if ( streamed_chunks and langchain_core @@ -970,6 +968,9 @@ def _on_span_finished(span: Span, streamed_chunks): else: # best effort to join chunks together content = "".join([str(chunk) for chunk in streamed_chunks]) + integration.llmobs_set_tags(span, args=args, kwargs=kwargs, response=content, operation="chain") + if span.error or not integration.is_pc_sampled_span(span): + return span.set_tag_str("langchain.response.outputs", integration.trunc(content)) return shared_stream( @@ -989,6 +990,7 @@ def _on_span_finished(span: Span, streamed_chunks): def traced_chat_stream(langchain, pin, func, instance, args, kwargs): integration: LangChainIntegration = langchain._datadog_integration llm_provider = instance._llm_type + model = _extract_model_name(instance) def _on_span_started(span: Span): if not integration.is_pc_sampled_span(span): @@ -1004,12 +1006,19 @@ def _on_span_started(span: Span): span.set_tag_str("langchain.request.%s.parameters.%s.%s" % (llm_provider, param, k), str(v)) def _on_span_finished(span: Span, streamed_chunks): - if span.error or not integration.is_pc_sampled_span(span): + joined_chunks = streamed_chunks[0] + for chunk in streamed_chunks[1:]: + joined_chunks += chunk # base message types support __add__ for concatenation + integration.llmobs_set_tags(span, args=args, kwargs=kwargs, response=joined_chunks, operation="chat") + if ( + span.error + or not integration.is_pc_sampled_span(span) + or streamed_chunks is None + or len(streamed_chunks) == 0 + ): return - content = "".join([str(getattr(chunk, "content", chunk)) for chunk in streamed_chunks]) - role = ( - streamed_chunks[0].__class__.__name__.replace("Chunk", "") if streamed_chunks else None - ) # AIMessageChunk --> AIeMessage + content = str(getattr(joined_chunks, "content", joined_chunks)) + role = joined_chunks.__class__.__name__.replace("Chunk", "") # AIMessageChunk --> AIMessage span.set_tag_str("langchain.response.content", integration.trunc(content)) if role: span.set_tag_str("langchain.response.message_type", role) @@ -1032,6 +1041,7 @@ def _on_span_finished(span: Span, streamed_chunks): on_span_finished=_on_span_finished, api_key=_extract_api_key(instance), provider=llm_provider, + model=model, ) @@ -1039,6 +1049,7 @@ def _on_span_finished(span: Span, streamed_chunks): def traced_llm_stream(langchain, pin, func, instance, args, kwargs): integration: LangChainIntegration = langchain._datadog_integration llm_provider = instance._llm_type + model = _extract_model_name(instance) def _on_span_start(span: Span): if not integration.is_pc_sampled_span(span): @@ -1053,9 +1064,10 @@ def _on_span_start(span: Span): span.set_tag_str("langchain.request.%s.parameters.%s.%s" % (llm_provider, param, k), str(v)) def _on_span_finished(span: Span, streamed_chunks): + content = "".join([str(chunk) for chunk in streamed_chunks]) + integration.llmobs_set_tags(span, args=args, kwargs=kwargs, response=content, operation="llm") if span.error or not integration.is_pc_sampled_span(span): return - content = "".join([str(chunk) for chunk in streamed_chunks]) span.set_tag_str("langchain.response.content", integration.trunc(content)) return shared_stream( @@ -1070,6 +1082,7 @@ def _on_span_finished(span: Span, streamed_chunks): on_span_finished=_on_span_finished, api_key=_extract_api_key(instance), provider=llm_provider, + model=model, ) diff --git a/ddtrace/contrib/internal/langchain/utils.py b/ddtrace/contrib/internal/langchain/utils.py index 8a2d67f31a5..df14d0dfd6c 100644 --- a/ddtrace/contrib/internal/langchain/utils.py +++ b/ddtrace/contrib/internal/langchain/utils.py @@ -34,6 +34,7 @@ def __next__(self): except Exception: self._dd_span.set_exc_info(*sys.exc_info()) self._dd_integration.metric(self._dd_span, "incr", "request.error", 1) + self._dd_span.finish() raise @@ -60,6 +61,7 @@ async def __anext__(self): except Exception: self._dd_span.set_exc_info(*sys.exc_info()) self._dd_integration.metric(self._dd_span, "incr", "request.error", 1) + self._dd_span.finish() raise @@ -79,6 +81,7 @@ def shared_stream( "pin": pin, "operation_id": f"{instance.__module__}.{instance.__class__.__name__}", "interface_type": interface_type, + "submit_to_llmobs": True, } options.update(extra_options) diff --git a/ddtrace/llmobs/_integrations/langchain.py b/ddtrace/llmobs/_integrations/langchain.py index 87a2ba482dc..7f7075e874e 100644 --- a/ddtrace/llmobs/_integrations/langchain.py +++ b/ddtrace/llmobs/_integrations/langchain.py @@ -89,7 +89,7 @@ def _llmobs_set_tags( elif operation == "chat": self._llmobs_set_meta_tags_from_chat_model(span, args, kwargs, response, is_workflow=is_workflow) elif operation == "chain": - self._llmobs_set_meta_tags_from_chain(span, inputs=kwargs, outputs=response) + self._llmobs_set_meta_tags_from_chain(span, args, kwargs, outputs=response) elif operation == "embedding": self._llmobs_set_meta_tags_from_embedding(span, args, kwargs, response, is_workflow=is_workflow) elif operation == "retrieval": @@ -129,16 +129,25 @@ def _llmobs_set_meta_tags_from_llm( input_tag_key = INPUT_VALUE if is_workflow else INPUT_MESSAGES output_tag_key = OUTPUT_VALUE if is_workflow else OUTPUT_MESSAGES + stream = span.get_tag("langchain.request.stream") - prompts = get_argument_value(args, kwargs, 0, "prompts") + prompts = get_argument_value(args, kwargs, 0, "input" if stream else "prompts") if isinstance(prompts, str) or not isinstance(prompts, list): prompts = [prompts] - span.set_tag_str(input_tag_key, safe_json([{"content": str(prompt)} for prompt in prompts])) + if stream: + # chat and llm take the same input types for streamed calls + span.set_tag_str(input_tag_key, safe_json(self._handle_stream_input_messages(prompts))) + else: + span.set_tag_str(input_tag_key, safe_json([{"content": str(prompt)} for prompt in prompts])) + if span.error: span.set_tag_str(output_tag_key, safe_json([{"content": ""}])) return - message_content = [{"content": completion[0].text} for completion in completions.generations] + if stream: + message_content = [{"content": completions}] # single completion for streams + else: + message_content = [{"content": completion[0].text} for completion in completions.generations] span.set_tag_str(output_tag_key, safe_json(message_content)) def _llmobs_set_meta_tags_from_chat_model( @@ -155,20 +164,36 @@ def _llmobs_set_meta_tags_from_chat_model( input_tag_key = INPUT_VALUE if is_workflow else INPUT_MESSAGES output_tag_key = OUTPUT_VALUE if is_workflow else OUTPUT_MESSAGES + stream = span.get_tag("langchain.request.stream") input_messages = [] - chat_messages = get_argument_value(args, kwargs, 0, "messages", optional=True) or [] - for message_set in chat_messages: - for message in message_set: - content = message.get("content", "") if isinstance(message, dict) else getattr(message, "content", "") - role = getattr(message, "role", ROLE_MAPPING.get(message.type, "")) - input_messages.append({"content": str(content), "role": str(role)}) + if stream: + chat_messages = get_argument_value(args, kwargs, 0, "input") + input_messages = self._handle_stream_input_messages(chat_messages) + else: + chat_messages = get_argument_value(args, kwargs, 0, "messages", optional=True) or [] + if not isinstance(chat_messages, list): + chat_messages = [chat_messages] + for message_set in chat_messages: + for message in message_set: + content = ( + message.get("content", "") if isinstance(message, dict) else getattr(message, "content", "") + ) + role = getattr(message, "role", ROLE_MAPPING.get(message.type, "")) + input_messages.append({"content": str(content), "role": str(role)}) span.set_tag_str(input_tag_key, safe_json(input_messages)) if span.error: span.set_tag_str(output_tag_key, json.dumps([{"content": ""}])) return + output_messages = [] + if stream: + content = chat_completions.content + role = chat_completions.__class__.__name__.replace("MessageChunk", "").lower() # AIMessageChunk --> ai + span.set_tag_str(output_tag_key, safe_json([{"content": content, "role": ROLE_MAPPING.get(role, "")}])) + return + for message_set in chat_completions.generations: for chat_completion in message_set: chat_completion_msg = chat_completion.message @@ -196,9 +221,38 @@ def _extract_tool_calls(self, chat_completion_msg: Any) -> List[Dict[str, Any]]: tool_calls_info.append(tool_call_info) return tool_calls_info - def _llmobs_set_meta_tags_from_chain(self, span: Span, outputs: Any, inputs: Optional[Any] = None) -> None: - span.set_tag_str(SPAN_KIND, "workflow") + def _handle_stream_input_messages(self, inputs): + input_messages = [] + if hasattr(inputs, "to_messages"): # isinstance(inputs, langchain_core.prompt_values.PromptValue) + inputs = inputs.to_messages() + elif not isinstance(inputs, list): + inputs = [inputs] + for inp in inputs: + inp_message = {} + content, role = None, None + if isinstance(inp, dict): + content = str(inp.get("content", "")) + role = inp.get("role") + elif hasattr(inp, "content"): # isinstance(inp, langchain_core.messages.BaseMessage) + content = str(inp.content) + role = inp.__class__.__name__ + else: + content = str(inp) + + inp_message["content"] = content + if role is not None: + inp_message["role"] = role + input_messages.append(inp_message) + return input_messages + + def _llmobs_set_meta_tags_from_chain(self, span: Span, args, kwargs, outputs: Any) -> None: + span.set_tag_str(SPAN_KIND, "workflow") + stream = span.get_tag("langchain.request.stream") + if stream: + inputs = get_argument_value(args, kwargs, 0, "input") + else: + inputs = kwargs if inputs is not None: formatted_inputs = self.format_io(inputs) span.set_tag_str(INPUT_VALUE, safe_json(formatted_inputs)) diff --git a/tests/contrib/langchain/test_langchain_llmobs.py b/tests/contrib/langchain/test_langchain_llmobs.py index 3f3f61ad3be..d02ecd2c2fa 100644 --- a/tests/contrib/langchain/test_langchain_llmobs.py +++ b/tests/contrib/langchain/test_langchain_llmobs.py @@ -91,39 +91,52 @@ class BaseTestLLMObsLangchain: @classmethod def _invoke_llm(cls, llm, prompt, mock_tracer, cassette_name): LLMObs.enable(ml_app=cls.ml_app, integrations_enabled=False, _tracer=mock_tracer) - with get_request_vcr(subdirectory_name=cls.cassette_subdirectory_name).use_cassette(cassette_name): - if LANGCHAIN_VERSION < (0, 1): - llm(prompt) - else: - llm.invoke(prompt) + if cassette_name is not None: + with get_request_vcr(subdirectory_name=cls.cassette_subdirectory_name).use_cassette(cassette_name): + if LANGCHAIN_VERSION < (0, 1): + llm(prompt) + else: + llm.invoke(prompt) + else: # streams do not use casettes + for _ in llm.stream(prompt): + pass LLMObs.disable() return mock_tracer.pop_traces()[0][0] @classmethod def _invoke_chat(cls, chat_model, prompt, mock_tracer, cassette_name, role="user"): LLMObs.enable(ml_app=cls.ml_app, integrations_enabled=False, _tracer=mock_tracer) - with get_request_vcr(subdirectory_name=cls.cassette_subdirectory_name).use_cassette(cassette_name): - if role == "user": - messages = [HumanMessage(content=prompt)] - else: - messages = [ChatMessage(content=prompt, role="custom")] - if LANGCHAIN_VERSION < (0, 1): - chat_model(messages) - else: - chat_model.invoke(messages) + if cassette_name is not None: + with get_request_vcr(subdirectory_name=cls.cassette_subdirectory_name).use_cassette(cassette_name): + if role == "user": + messages = [HumanMessage(content=prompt)] + else: + messages = [ChatMessage(content=prompt, role="custom")] + if LANGCHAIN_VERSION < (0, 1): + chat_model(messages) + else: + chat_model.invoke(messages) + else: # streams do not use casettes + for _ in chat_model.stream(prompt): + pass LLMObs.disable() return mock_tracer.pop_traces()[0][0] @classmethod def _invoke_chain(cls, chain, prompt, mock_tracer, cassette_name, batch=False): LLMObs.enable(ml_app=cls.ml_app, integrations_enabled=False, _tracer=mock_tracer) - with get_request_vcr(subdirectory_name=cls.cassette_subdirectory_name).use_cassette(cassette_name): - if batch: - chain.batch(inputs=prompt) - elif LANGCHAIN_VERSION < (0, 1): - chain.run(prompt) - else: - chain.invoke(prompt) + if cassette_name is not None: + with get_request_vcr(subdirectory_name=cls.cassette_subdirectory_name).use_cassette(cassette_name): + if batch: + chain.batch(inputs=prompt) + elif LANGCHAIN_VERSION < (0, 1): + chain.run(prompt) + else: + chain.invoke(prompt) + + else: # streams do not use casettes + for _ in chain.stream(prompt): + pass LLMObs.disable() return mock_tracer.pop_traces()[0] @@ -797,6 +810,91 @@ def circumference_tool(radius: float) -> float: ) ) + def test_llmobs_streamed_chain( + self, langchain_core, langchain_openai, mock_llmobs_span_writer, mock_tracer, streamed_response_responder + ): + client = streamed_response_responder( + module="openai", + client_class_key="OpenAI", + http_client_key="http_client", + endpoint_path=["chat", "completions"], + file="lcel_openai_chat_streamed_response.txt", + ) + + prompt = langchain_core.prompts.ChatPromptTemplate.from_messages( + [("system", "You are a world class technical documentation writer."), ("user", "{input}")] + ) + llm = langchain_openai.ChatOpenAI(model="gpt-4o", client=client) + parser = langchain_core.output_parsers.StrOutputParser() + + chain = prompt | llm | parser + + trace = self._invoke_chain( + chain=chain, + prompt={"input": "how can langsmith help with testing?"}, + mock_tracer=mock_tracer, + cassette_name=None, # do not use cassette, + ) + + assert mock_llmobs_span_writer.enqueue.call_count == 2 + _assert_expected_llmobs_chain_span( + trace[0], + mock_llmobs_span_writer, + input_value=json.dumps({"input": "how can langsmith help with testing?"}), + output_value="Python is\n\nthe best!", + ) + mock_llmobs_span_writer.enqueue.assert_any_call( + _expected_llmobs_llm_span_event( + trace[1], + model_name=trace[1].get_tag("langchain.request.model"), + model_provider=trace[1].get_tag("langchain.request.provider"), + input_messages=[ + {"content": "You are a world class technical documentation writer.", "role": "SystemMessage"}, + {"content": "how can langsmith help with testing?", "role": "HumanMessage"}, + ], + output_messages=[{"content": "Python is\n\nthe best!", "role": "assistant"}], + metadata={"temperature": 0.7}, + token_metrics={}, + tags={"ml_app": "langchain_test"}, + ) + ) + + def test_llmobs_streamed_llm( + self, langchain_openai, mock_llmobs_span_writer, mock_tracer, streamed_response_responder + ): + client = streamed_response_responder( + module="openai", + client_class_key="OpenAI", + http_client_key="http_client", + endpoint_path=["completions"], + file="lcel_openai_llm_streamed_response.txt", + ) + + llm = langchain_openai.OpenAI(client=client) + + span = self._invoke_llm( + cassette_name=None, # do not use cassette + llm=llm, + mock_tracer=mock_tracer, + prompt="Hello!", + ) + + assert mock_llmobs_span_writer.enqueue.call_count == 1 + mock_llmobs_span_writer.enqueue.assert_any_call( + _expected_llmobs_llm_span_event( + span, + model_name=span.get_tag("langchain.request.model"), + model_provider=span.get_tag("langchain.request.provider"), + input_messages=[ + {"content": "Hello!"}, + ], + output_messages=[{"content": "\n\nPython is cool!"}], + metadata={"temperature": 0.7, "max_tokens": 256}, + token_metrics={}, + tags={"ml_app": "langchain_test"}, + ) + ) + @pytest.mark.skipif(LANGCHAIN_VERSION < (0, 1), reason="These tests are for langchain >= 0.1.0") class TestTraceStructureWithLLMIntegrations(SubprocessTestCase): diff --git a/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_chain.json b/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_chain.json index a5b61fa5663..ad34fcb3343 100644 --- a/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_chain.json +++ b/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_chain.json @@ -10,23 +10,23 @@ "error": 0, "meta": { "_dd.p.dm": "-0", - "_dd.p.tid": "66f4277f00000000", + "_dd.p.tid": "66fc049a00000000", "langchain.request.inputs.0.input": "how can langsmith help with testing?", "langchain.request.stream": "True", "langchain.request.type": "chain", "langchain.response.outputs": "Python is\\n\\nthe best!", "language": "python", - "runtime-id": "dd02a535c69f47fd835358ebdcba41f3" + "runtime-id": "afbd64218eba449eb252c9dd82c75727" }, "metrics": { "_dd.measured": 1, "_dd.top_level": 1, "_dd.tracer_kr": 1.0, "_sampling_priority_v1": 1, - "process_id": 88304 + "process_id": 43442 }, - "duration": 7541000, - "start": 1727276927507127000 + "duration": 7276000, + "start": 1727792282788378000 }, { "name": "langchain.request", @@ -43,6 +43,7 @@ "langchain.request.messages.0.message_type": "SystemMessage", "langchain.request.messages.1.content": "how can langsmith help with testing?", "langchain.request.messages.1.message_type": "HumanMessage", + "langchain.request.model": "gpt-3.5-turbo", "langchain.request.openai-chat.parameters.model": "gpt-3.5-turbo", "langchain.request.openai-chat.parameters.model_name": "gpt-3.5-turbo", "langchain.request.openai-chat.parameters.n": "1", @@ -57,6 +58,6 @@ "metrics": { "_dd.measured": 1 }, - "duration": 4894000, - "start": 1727276927509619000 + "duration": 4675000, + "start": 1727792282790857000 }]] diff --git a/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_chat.json b/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_chat.json index bc83c61268b..abec415263d 100644 --- a/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_chat.json +++ b/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_chat.json @@ -10,9 +10,10 @@ "error": 0, "meta": { "_dd.p.dm": "-0", - "_dd.p.tid": "66f4277f00000000", + "_dd.p.tid": "66fc049a00000000", "langchain.request.api_key": "...key>", "langchain.request.messages.0.content": "how can langsmith help with testing?", + "langchain.request.model": "gpt-3.5-turbo", "langchain.request.openai-chat.parameters.model": "gpt-3.5-turbo", "langchain.request.openai-chat.parameters.model_name": "gpt-3.5-turbo", "langchain.request.openai-chat.parameters.n": "1", @@ -24,15 +25,15 @@ "langchain.response.content": "Python is\\n\\nthe best!", "langchain.response.message_type": "AIMessage", "language": "python", - "runtime-id": "dd02a535c69f47fd835358ebdcba41f3" + "runtime-id": "afbd64218eba449eb252c9dd82c75727" }, "metrics": { "_dd.measured": 1, "_dd.top_level": 1, "_dd.tracer_kr": 1.0, "_sampling_priority_v1": 1, - "process_id": 88304 + "process_id": 43442 }, - "duration": 55964000, - "start": 1727276927342138000 + "duration": 33999000, + "start": 1727792282655302000 }]] diff --git a/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_json_output_parser.json b/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_json_output_parser.json index 8376cc7626a..9a840eea4bb 100644 --- a/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_json_output_parser.json +++ b/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_json_output_parser.json @@ -10,24 +10,24 @@ "error": 0, "meta": { "_dd.p.dm": "-0", - "_dd.p.tid": "66f4277f00000000", + "_dd.p.tid": "66fc049a00000000", "langchain.request.inputs.0": "content='You know everything about the world.'", "langchain.request.inputs.1": "content='output a list of the country france their population in JSON format. Use a dict with an outer key of \"countries\" which ...", "langchain.request.stream": "True", "langchain.request.type": "chain", "langchain.response.outputs": "{\"countries\": \"France is a country!\"}", "language": "python", - "runtime-id": "dd02a535c69f47fd835358ebdcba41f3" + "runtime-id": "afbd64218eba449eb252c9dd82c75727" }, "metrics": { "_dd.measured": 1, "_dd.top_level": 1, "_dd.tracer_kr": 1.0, "_sampling_priority_v1": 1, - "process_id": 88304 + "process_id": 43442 }, - "duration": 15160000, - "start": 1727276927466650000 + "duration": 13421000, + "start": 1727792282751749000 }, { "name": "langchain.request", @@ -44,6 +44,7 @@ "langchain.request.messages.0.message_type": "SystemMessage", "langchain.request.messages.1.content": "output a list of the country france their population in JSON format. Use a dict with an outer key of \"countries\" which contains ...", "langchain.request.messages.1.message_type": "HumanMessage", + "langchain.request.model": "gpt-4o", "langchain.request.openai-chat.parameters.max_tokens": "50", "langchain.request.openai-chat.parameters.model": "gpt-4o", "langchain.request.openai-chat.parameters.model_name": "gpt-4o", @@ -59,6 +60,6 @@ "metrics": { "_dd.measured": 1 }, - "duration": 8959000, - "start": 1727276927472597000 + "duration": 9210000, + "start": 1727792282755690000 }]] diff --git a/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_llm.json b/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_llm.json index b69cc467915..8b5043f37c4 100644 --- a/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_llm.json +++ b/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_llm.json @@ -10,9 +10,10 @@ "error": 0, "meta": { "_dd.p.dm": "-0", - "_dd.p.tid": "66eda7d800000000", + "_dd.p.tid": "66fc049a00000000", "langchain.request.api_key": "...key>", "langchain.request.messages.0.content": "How do I write technical documentation?", + "langchain.request.model": "gpt-3.5-turbo-instruct", "langchain.request.openai.parameters.frequency_penalty": "0", "langchain.request.openai.parameters.max_tokens": "256", "langchain.request.openai.parameters.model_name": "gpt-3.5-turbo-instruct", @@ -25,15 +26,15 @@ "langchain.request.type": "llm", "langchain.response.content": "\\n\\nPython is cool!", "language": "python", - "runtime-id": "0de46c287bae4eb081fc848584384768" + "runtime-id": "afbd64218eba449eb252c9dd82c75727" }, "metrics": { "_dd.measured": 1, "_dd.top_level": 1, "_dd.tracer_kr": 1.0, "_sampling_priority_v1": 1, - "process_id": 21857 + "process_id": 43442 }, - "duration": 7765000, - "start": 1726851032766410000 + "duration": 7867000, + "start": 1727792282719000000 }]] From e1a684d7912968857fa098a7a458248b44e70f4e Mon Sep 17 00:00:00 2001 From: Sam Brenner Date: Wed, 2 Oct 2024 15:46:41 -0400 Subject: [PATCH 2/3] add release note --- .../llmobs-langchain-streamed-calls-23a13029ac5d8907.yaml | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 releasenotes/notes/llmobs-langchain-streamed-calls-23a13029ac5d8907.yaml diff --git a/releasenotes/notes/llmobs-langchain-streamed-calls-23a13029ac5d8907.yaml b/releasenotes/notes/llmobs-langchain-streamed-calls-23a13029ac5d8907.yaml new file mode 100644 index 00000000000..1c11ceacf1c --- /dev/null +++ b/releasenotes/notes/llmobs-langchain-streamed-calls-23a13029ac5d8907.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + LLM Observability: LangChain streamed calls (``llm.stream``, ``chat_model.stream``, and ``chain.stream``) submit to LLM Observability. From b2eef10b33679d4e2f0972386eb06af6d1273099 Mon Sep 17 00:00:00 2001 From: Sam Brenner Date: Wed, 2 Oct 2024 15:50:03 -0400 Subject: [PATCH 3/3] fmt --- tests/contrib/langchain/test_langchain_llmobs.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/contrib/langchain/test_langchain_llmobs.py b/tests/contrib/langchain/test_langchain_llmobs.py index d02ecd2c2fa..450e7402d85 100644 --- a/tests/contrib/langchain/test_langchain_llmobs.py +++ b/tests/contrib/langchain/test_langchain_llmobs.py @@ -97,7 +97,7 @@ def _invoke_llm(cls, llm, prompt, mock_tracer, cassette_name): llm(prompt) else: llm.invoke(prompt) - else: # streams do not use casettes + else: # streams do not use casettes for _ in llm.stream(prompt): pass LLMObs.disable() @@ -116,7 +116,7 @@ def _invoke_chat(cls, chat_model, prompt, mock_tracer, cassette_name, role="user chat_model(messages) else: chat_model.invoke(messages) - else: # streams do not use casettes + else: # streams do not use casettes for _ in chat_model.stream(prompt): pass LLMObs.disable() @@ -134,7 +134,7 @@ def _invoke_chain(cls, chain, prompt, mock_tracer, cassette_name, batch=False): else: chain.invoke(prompt) - else: # streams do not use casettes + else: # streams do not use casettes for _ in chain.stream(prompt): pass LLMObs.disable()