Skip to content

Commit

Permalink
Update index.md
Browse files Browse the repository at this point in the history
  • Loading branch information
srdas committed Oct 25, 2024
1 parent 54469d9 commit 1e0f1bc
Showing 1 changed file with 8 additions and 66 deletions.
74 changes: 8 additions & 66 deletions docs/source/developers/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,9 @@ async def process_message(self, message: HumanChatMessage):

The last line of `process_message` above calls `stream_reply` in `base.py`.
Note that a custom pending message may also be passed.
The `stream_reply` function leverages the LCEL Runnable as shown here:
The `stream_reply` function leverages the LCEL Runnable.
The function takes in the input, human message, and optional
pending message strings and configuration, as shown below:

```python
async def stream_reply(
Expand Down Expand Up @@ -481,72 +483,12 @@ async def stream_reply(
"""
assert self.llm_chain
assert isinstance(self.llm_chain, Runnable)

received_first_chunk = False
metadata_handler = MetadataCallbackHandler()
base_config: RunnableConfig = {
"configurable": {"last_human_msg": human_msg},
"callbacks": [metadata_handler],
}
merged_config: RunnableConfig = merge_runnable_configs(base_config, config)

# start with a pending message
with self.pending(pending_msg, human_msg) as pending_message:
# stream response in chunks. this works even if a provider does not
# implement streaming, as `astream()` defaults to yielding `_call()`
# when `_stream()` is not implemented on the LLM class.
chunk_generator = self.llm_chain.astream(input, config=merged_config)
stream_interrupted = False
async for chunk in chunk_generator:
if not received_first_chunk:
# when receiving the first chunk, close the pending message and
# start the stream.
self.close_pending(pending_message)
stream_id = self._start_stream(human_msg=human_msg)
received_first_chunk = True
self.message_interrupted[stream_id] = asyncio.Event()

if self.message_interrupted[stream_id].is_set():
try:
# notify the model provider that streaming was interrupted
# (this is essential to allow the model to stop generating)
#
# note: `mypy` flags this line, claiming that `athrow` is
# not defined on `AsyncIterator`. This is why an ignore
# comment is placed here.
await chunk_generator.athrow( # type:ignore[attr-defined]
GenerationInterrupted()
)
except GenerationInterrupted:
# do not let the exception bubble up in case if
# the provider did not handle it
pass
stream_interrupted = True
break

if isinstance(chunk, AIMessageChunk) and isinstance(chunk.content, str):
self._send_stream_chunk(stream_id, chunk.content)
elif isinstance(chunk, str):
self._send_stream_chunk(stream_id, chunk)
else:
self.log.error(f"Unrecognized type of chunk yielded: {type(chunk)}")
break

# complete stream after all chunks have been streamed
stream_tombstone = (
"\n\n(AI response stopped by user)" if stream_interrupted else ""
)
self._send_stream_chunk(
stream_id,
stream_tombstone,
complete=True,
metadata=metadata_handler.jai_metadata,
)
del self.message_interrupted[stream_id]
```

The function chunks up the response and streams it one chunk at a time.

The function `stream_reply` in `base.py` first shows a pending message, and
then processes the streamed response in chunks. It is possible to interrupt the
stream in which case `stream_interrupted` is set to `True`. Streaming is
handled by the `astream` method in the `llm_chain`. Streaming concludes
after all chunks have been streamed.

## Custom message footer

Expand Down

0 comments on commit 1e0f1bc

Please sign in to comment.