Description
Confirm this is a feature request for the Python library and not the underlying OpenAI API.
- This is a feature request for the Python library
Describe the feature or improvement you're requesting
Hi,
I have developed a monkey patch to add the capacity for chaining streams which is very beneficial for the Assistant API function execution workflow. I think it could be integrated into the openai library. So, I guess you want to know the use case, right?
Imagine you are processing the assistant events in a loop (in my case I use the Async stream client but it's basically the almost same for the non-async streaming one):
async for chunk in assistant_stream_response:
# Process chunk here
# Process function calls
if isinstance(chunk, ThreadRunRequiresAction):
tool_outputs = # Execute the function and gather the outputs in this var
new_stream = await async_client.beta.threads.runs.submit_tool_outputs(
thread_id=thread_id, # stored along the way
run_id=chunk.data.id,
tool_outputs=tool_outputs,
stream=True
)
# we can chain the new_stream at the end of the current one to avoid writing another chunk processing loop
assistant_stream_response.chain_stream(new_stream)
yield result
With this, we can chain the tool submit stream response to the current one to avoid writing another chunk processing loop.
Tested & working.
It very beneficial, especially when you integrate the assistant API inside a project to avoid changing the existing workflow.
Here is the monkey patch:
#--------------------------------------MONKEY-PATCH-OPENAI--------------------------------------------------------------
import openai
from typing import Any, TypeVar, AsyncIterator, cast
from openai._utils import is_mapping
from openai._exceptions import APIError
from openai import AsyncOpenAI
import httpx
_T = TypeVar("_T")
def monkey_patch__init__(self, *, cast_to: type[_T], response: httpx.Response, client: AsyncOpenAI) -> None:
self.response = response
self._cast_to = cast_to
self._client = client
self._decoder = client._make_sse_decoder()
self._iterator = self.__stream__()
self._chained_stream = None # MOD HERE
def chain_stream(self, stream): # NEW FUNCT HERE
if self._chained_stream:
self._chained_stream.chain_stream(stream)
else:
self._chained_stream = stream
async def monkey_patch__stream__(self) -> AsyncIterator[_T]:
cast_to = cast(Any, self._cast_to)
response = self.response
process_data = self._client._process_response_data
iterator = self._iter_events()
async for sse in iterator:
if sse.data.startswith("[DONE]"):
break
if sse.event is None:
data = sse.json()
if is_mapping(data) and data.get("error"):
message = None
error = data.get("error")
if is_mapping(error):
message = error.get("message")
if not message or not isinstance(message, str):
message = "An error occurred during streaming"
raise APIError(
message=message,
request=self.response.request,
body=data["error"],
)
yield process_data(data=data, cast_to=cast_to, response=response)
else:
data = sse.json()
if sse.event == "error" and is_mapping(data) and data.get("error"):
message = None
error = data.get("error")
if is_mapping(error):
message = error.get("message")
if not message or not isinstance(message, str):
message = "An error occurred during streaming"
raise APIError(
message=message,
request=self.response.request,
body=data["error"],
)
yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response)
async for _sse in iterator:
...
if self._chained_stream: # MOD HERE
async for chunk in self._chained_stream:
yield chunk
openai.AsyncStream.__init__ = monkey_patch__init__
openai.AsyncStream.__stream__ = monkey_patch__stream__
openai.AsyncStream.chain_stream = chain_stream
#-----------------------------------------------------------------------------------------------------------------------
Best regards,
Paul Irolla
Additional context
I have implemented this inside my personal fork of LiteLLM for integrating the assistant API into the existing workflow without changing a thousand of code lines.