Skip to content

Mixing multithreading with async methods hangs when trying to close internal streams #1062

Closed as not planned
@juliomenendez

Description

@juliomenendez

Confirm this is an issue with the Python library and not an underlying OpenAI API

  • This is an issue with the Python library

Describe the bug

When creating multiple threads to make completion requests in parallel the script hangs. After adding some debugging logs I can see that it happens in aiter_raw when calling .aclose().

The same code with just one thread works with no issues.

To Reproduce

I've attached a script that reproduces the issue.

Code snippets

import os
import logging
import threading
import asyncio
from openai import AsyncAzureOpenAI
from dotenv import load_dotenv

load_dotenv()


deployment_name = os.environ.get("AZURE_OPENAI_DEPLOYMENT_NAME")
client = AsyncAzureOpenAI(
    api_version=os.environ.get("AZURE_OPENAI_API_VERSION"),
    api_key=os.environ.get("AZURE_OPENAI_API_KEY"),
    azure_endpoint=os.environ.get("AZURE_OPENAI_ENDPOINT"),
    azure_deployment=deployment_name,
)

texts = ["Hello", "Hola", "Bonjour", "Hallo", "Ciao", "Olá", "Namaste", "Salaam", "Zdras-tvuy-te", "Konnichiwa", "Nǐn hǎo", "Guten Tag", "Shikamoo", "Merhaba", "Sa"]

async def detect_language(text: str) -> str:
    completion = await client.chat.completions.create(
        model="gpt-3.5-turbo-16k",
        messages=[
            {
                "role": "user",
                "content": f"""
## Given this input:
{text}

## Do the following:
## Using only the input provided by the user, you must identify the input language and provide only the two character language code and nothing else:
""",
            },
        ],
    )
    
    if completion.choices:
        return completion.choices[0].message.content if completion.choices[0].message.content else "Unknown"

    return "Unknown"

def thread_function(loop, text):
    logging.info("Thread with text '%s': starting", text)
    asyncio.set_event_loop(loop)
    result = loop.run_until_complete(detect_language(text))
    logging.info("Thread with text '%s': finishing: Detected language: %s", text, result)
    return result

def main():
    threads = []

    for index, text in enumerate(texts):
        loop = asyncio.new_event_loop()
        x = threading.Thread(target=thread_function, args=(loop, text,))
        threads.append(x)
        x.start()

    for index, thread in enumerate(threads):
        logging.info("Before joining thread %d.", index)
        thread.join()
        logging.info("Thread %d done", index)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    main()

OS

Windows 11

Python version

Python 3.9

Library version

openai 1.6.1

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions