How to use RunnableWithMessageHistory in a RAG pipeline #16582
-
Checked other resources
Commit to Help
Example Codedef get_chain(vectorstore: Pinecone, stream_handler, ) -> RunnableParallel:
streaming_llm = ChatOpenAI(
model="gpt-4",
streaming=True,
callbacks=[stream_handler],
verbose=True,
temperature=0,
openai_api_key=OPENAI_API_KEY
)
# RAG prompt
template = (variables["promptTemplate"]+
"""
Context: {context}
Question: {question}
History: {history}
""")
prompt = ChatPromptTemplate.from_template(template)
retriever = vectorstore.as_retriever(search_kwargs={"k": variables["numberOfKwargs"]})
# Re-rank
compressor = CohereRerank(cohere_api_key=COHERE_API_KEY, top_n=variables["numberOfReranks"])
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor, base_retriever=retriever
)
# RAG chain with source metadata
testChain = (
{
"context": compression_retriever,
"question": itemgetter("question"),
"history": ???
}
| prompt
| streaming_llm
| StrOutputParser()
)
qa2 = RunnableWithMessageHistory(
testChain,
lambda session_id: RedisChatMessageHistory(session_id, url=REDIS_URL),
input_messages_key="question",
history_messages_key="history",
)
class Question(BaseModel):
__root__: str
qa2 = qa2.with_types(input_type=Question)
print("qa", qa2)
return qa2 DescriptionI've created a function that starts a chain. I've been using this without memory added to it for some time, and its been working great. However, now I'm trying to add memory to it, using REDIS memory (following the examples on the langchain docs). However, the example there only uses the memory. Testing that, it works fine. However, I need to maintain my context, but if I try to add context to "ChatPromptTemplate.from_messages" it screams at me saying its not one of the accepted inputs. And if I try to add history to "ChatPromptTemplate.from_template", it also screams, saying that something (I'm at a loss as to what) saying: "TypeError: expected string or buffer" I think I'm just passing the history object in the wrong way in the testChain object, but I barely even know if I'm on the right track. System Infolangchain==0.1.3 |
Beta Was this translation helpful? Give feedback.
Replies: 8 comments 12 replies
-
🤖 From your code and description, it seems like you're trying to incorporate both context and chat history into your RAG pipeline. The issue you're facing is likely due to the way you're trying to pass the history object in the In LangChain, the When you're creating your However, in your To fix this, you need to ensure that the input messages are being passed to the testChain = (
{
"context": compression_retriever,
"question": itemgetter("question"),
"history": itemgetter("history")
}
| prompt
| streaming_llm
| StrOutputParser()
) Here, I hope this helps! If you're still having issues, could you provide more information about the input you're passing to the Sources
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot. |
Beta Was this translation helpful? Give feedback.
-
@ChristianEvc Here's a reference imlpementation. This code is broken down into steps to make it easier to invoke each step to see the inputs / outputs. You can always debug by sprinkling through the LCEL pipeline. def print_me(inputs):
print(inputs)
return inputs The code contains an in memory implementation of chat history to make it easier to run the code without any complex set up and understand what's happening under the hood. from operator import itemgetter
from typing import List
from langchain_openai.chat_models import ChatOpenAI
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.documents import Document
from langchain_core.messages import BaseMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import (
RunnableLambda,
ConfigurableFieldSpec,
RunnablePassthrough,
)
from langchain_core.runnables.history import RunnableWithMessageHistory
class InMemoryHistory(BaseChatMessageHistory, BaseModel):
"""In memory implementation of chat message history."""
messages: List[BaseMessage] = Field(default_factory=list)
def add_message(self, message: BaseMessage) -> None:
"""Add a self-created message to the store"""
self.messages.append(message)
def clear(self) -> None:
self.messages = []
store = {}
def get_session_history(user_id: str, conversation_id: str) -> BaseChatMessageHistory:
if (user_id, conversation_id) not in store:
store[(user_id, conversation_id)] = InMemoryHistory()
return store[(user_id, conversation_id)]
history = get_session_history("1", "1")
history.add_message(AIMessage(content="hello"))
print(store)
def fake_retriever(query):
assert isinstance(query, str)
return [
Document(page_content="cats are the answer"),
Document(page_content="CAT POWERS"),
]
fake_retriever = RunnableLambda(fake_retriever)
print(fake_retriever.invoke("hello"))
model = ChatOpenAI()
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You're an assistant who's good at {ability}. Here is some {context}",
),
MessagesPlaceholder(variable_name="history"),
("human", "{question}"),
]
)
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
context = itemgetter("question") | fake_retriever | format_docs
first_step = RunnablePassthrough.assign(context=context)
chain = first_step | prompt | model
with_message_history = RunnableWithMessageHistory(
chain,
get_session_history=get_session_history,
input_messages_key="question",
history_messages_key="history",
history_factory_config=[
ConfigurableFieldSpec(
id="user_id",
annotation=str,
name="User ID",
description="Unique identifier for the user.",
default="",
is_shared=True,
),
ConfigurableFieldSpec(
id="conversation_id",
annotation=str,
name="Conversation ID",
description="Unique identifier for the conversation.",
default="",
is_shared=True,
),
],
)
print(context.invoke({"question": "hello", "other_stuff": "ignored"}))
print(first_step.invoke({"question": "hello", "history": "boom"}))
print(chain.invoke({"question": "hello", "history": [], "ability": "math"}))
print(
with_message_history.invoke(
{"ability": "math", "question": "What does cosine mean?"},
config={
"configurable": {"user_id": "user_id", "conversation_id": "conversation_id"}
},
)
)
print(store) |
Beta Was this translation helpful? Give feedback.
-
@eyurtsev Thanks for this amazing code, is it possible to use with Agents?
|
Beta Was this translation helpful? Give feedback.
-
I can run the code with MongoDBChatMessageHistory. this is the example: from operator import itemgetter
from langchain_community.chat_message_histories import MongoDBChatMessageHistory
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import (
RunnableLambda,
)
from langchain_core.runnables import RunnablePassthrough
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_openai import ChatOpenAI
def fake_retriever(query):
assert isinstance(query, str)
return [
Document(page_content="人工智能快速发展"),
Document(page_content="机器学习功不可没"),
]
fake_retriever = RunnableLambda(fake_retriever)
model = ChatOpenAI()
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You're an assistant who's good at something. Here is some {context}",
),
MessagesPlaceholder(variable_name="history"),
("human", "{question}"),
]
)
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
context = itemgetter("question") | fake_retriever | format_docs
first_step = RunnablePassthrough.assign(context=context)
chain = first_step | prompt | ChatOpenAI()
with_message_history = RunnableWithMessageHistory(
chain,
lambda session_id: MongoDBChatMessageHistory(
session_id=session_id,
connection_string="mongodb://root:password@your_ip:27017",
database_name="my_dbxxx",
collection_name="tttttt",
),
input_messages_key="question",
history_messages_key="history",
)
if __name__ == '__main__':
config = {"configurable": {"session_id": "test_session_id"}}
result = with_message_history.invoke({"question": "请你为内容生成一个标题"}, config)
print(result) |
Beta Was this translation helpful? Give feedback.
-
Hi everybody, I have a follow-up question for roughly the same scenario, however with one small difference: I would also like to add the retrieved context to the history. In the examples here, this is not happening because we only reference the Some minimal code to get the idea: placeholder_prompt = ChatPromptTemplate.from_messages([
MessagesPlaceholder(variable_name="history"),
MessagesPlaceholder(variable_name="input"),
])
llm = <SOME_LLM>
chain = placeholder_prompt | llm
chain_with_history = RunnableWithMessageHistory(
chain,
<SOME_SESSION_STORE>,
input_messages_key="input",
history_messages_key="history",
)
prompt = ChatPromptTemplate.from_messages([
("system", "Context: {context}"),
("human", "{question}"),
])
complete_chain_with_history = {"input": prompt} | chain_with_history
# That's where the type mismatch happens.
# How can I extract the messages from the ChatPromptTemplate?
# Can I replace the ChatPromptTemplate with something better? EDIT: I think I found a way using a EDIT: Another (better) solution I found was to use a RunnableLambda to extract the list of messages: complete_chain_with_history = prompt | RunnableLambda(lambda v: {"input": v.messages}) | chain_with_history Maybe there are better (native) solutions? |
Beta Was this translation helpful? Give feedback.
-
if i have this code from langchain import hub def format_docs(docs): from langchain_core.runnables import RunnableParallel, RunnablePassthrough rag_chain_from_docs = ( rag_chain_with_source = RunnableParallel( response = rag_chain_with_source.invoke("What is LSTM?") How to convert this type runnableparallel with history component i got this reference from langchain docs def get_session_history(session_id: str) -> BaseChatMessageHistory: with_message_history = RunnableWithMessageHistory( with_message_history.invoke( How can we integrate RunnableWithMessageHistory with runnableparallel ? |
Beta Was this translation helpful? Give feedback.
-
@eyurtsev , Thanks for the amazing code. I want to add create_history_aware_retriever and create_stuff_documents_chain to your code, How to rewrite it? |
Beta Was this translation helpful? Give feedback.
-
I found a similar discussion that might help you: How to input multiple inputs in RunnableWithMessageHistory of LangChain?. This discussion provides an example of how to correctly handle multiple inputs with Here's how you can modify your code to include the history correctly:
This setup ensures that the For more detailed information, you can refer to the LangChain documentation on message history. |
Beta Was this translation helpful? Give feedback.
@ChristianEvc Here's a reference imlpementation.
This code is broken down into steps to make it easier to invoke each step to see the inputs / outputs.
You can always debug by sprinkling through the LCEL pipeline.
The code contains an in memory implementation of chat history to make it easier to run the code without any complex set up and understand what's happening under the hood.