Skip to content

New Feature Proposal: Assistant API - Chaining streams for function execution  #1261

Closed as not planned
@pi-infected

Description

@pi-infected

Confirm this is a feature request for the Python library and not the underlying OpenAI API.

  • This is a feature request for the Python library

Describe the feature or improvement you're requesting

Hi,

I have developed a monkey patch to add the capacity for chaining streams which is very beneficial for the Assistant API function execution workflow. I think it could be integrated into the openai library. So, I guess you want to know the use case, right?

Imagine you are processing the assistant events in a loop (in my case I use the Async stream client but it's basically the almost same for the non-async streaming one):

async for chunk in assistant_stream_response:
    # Process chunk here
    
    
    # Process function calls
    if isinstance(chunk, ThreadRunRequiresAction):
        tool_outputs = # Execute the function and gather the outputs in this var     
    
        new_stream = await async_client.beta.threads.runs.submit_tool_outputs(
            thread_id=thread_id, # stored along the way
            run_id=chunk.data.id,
            tool_outputs=tool_outputs,
            stream=True
        )
        # we can chain the new_stream at the end of the current one to avoid writing another chunk processing loop
        assistant_stream_response.chain_stream(new_stream)


    yield result

With this, we can chain the tool submit stream response to the current one to avoid writing another chunk processing loop.
Tested & working.

It very beneficial, especially when you integrate the assistant API inside a project to avoid changing the existing workflow.
Here is the monkey patch:

#--------------------------------------MONKEY-PATCH-OPENAI--------------------------------------------------------------
import openai
from typing import Any, TypeVar, AsyncIterator, cast
from openai._utils import is_mapping
from openai._exceptions import APIError
from openai import AsyncOpenAI
import httpx

_T = TypeVar("_T")


def monkey_patch__init__(self, *, cast_to: type[_T], response: httpx.Response, client: AsyncOpenAI) -> None:
  self.response = response
  self._cast_to = cast_to
  self._client = client
  self._decoder = client._make_sse_decoder()
  self._iterator = self.__stream__()
  self._chained_stream = None # MOD HERE 

def chain_stream(self, stream): # NEW FUNCT HERE
  if self._chained_stream:
    self._chained_stream.chain_stream(stream)
  else:
    self._chained_stream = stream

async def monkey_patch__stream__(self) -> AsyncIterator[_T]:
  cast_to = cast(Any, self._cast_to)
  response = self.response
  process_data = self._client._process_response_data
  iterator = self._iter_events()

  async for sse in iterator:
    if sse.data.startswith("[DONE]"):
      break

    if sse.event is None:
      data = sse.json()
      if is_mapping(data) and data.get("error"):
        message = None
        error = data.get("error")
        if is_mapping(error):
          message = error.get("message")
        if not message or not isinstance(message, str):
          message = "An error occurred during streaming"

        raise APIError(
          message=message,
          request=self.response.request,
          body=data["error"],
        )

      yield process_data(data=data, cast_to=cast_to, response=response)

    else:
      data = sse.json()

      if sse.event == "error" and is_mapping(data) and data.get("error"):
        message = None
        error = data.get("error")
        if is_mapping(error):
          message = error.get("message")
        if not message or not isinstance(message, str):
          message = "An error occurred during streaming"

        raise APIError(
          message=message,
          request=self.response.request,
          body=data["error"],
        )

      yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response)

  async for _sse in iterator:
    ...

  if self._chained_stream: # MOD HERE
    async for chunk in self._chained_stream:
      yield chunk


openai.AsyncStream.__init__ = monkey_patch__init__
openai.AsyncStream.__stream__ = monkey_patch__stream__
openai.AsyncStream.chain_stream = chain_stream
#-----------------------------------------------------------------------------------------------------------------------

Best regards,
Paul Irolla

Additional context

I have implemented this inside my personal fork of LiteLLM for integrating the assistant API into the existing workflow without changing a thousand of code lines.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions