Skip to content

Running concurrent event loops #383

Open
@hyansuper

Description

@hyansuper

To test the basic usage of websockets, I want to start a server and test the return string from client, I think server and client should be in separate thread.

The below code passed, but I just want to know if there's a proper/generalized way to do it?

import pytest
import asyncio
import threading
import websockets

async def server(stop_sig):
    async def echo(ws):
        async for msg in ws:
            await ws.send(msg)
    async with websockets.serve(echo, "localhost", 8000):
        await stop_sig

@pytest.fixture
def threaded_server():
    policy = asyncio.get_event_loop_policy()
    loop = policy.new_event_loop()
    sig = asyncio.Future(loop=loop)
    def run_loop(loop, coro):
        loop.run_until_complete(coro)
        loop.close()
    thread = threading.Thread(target=run_loop, args=(loop, server(sig)))
    thread.start()
    yield
    loop.call_soon_threadsafe(sig.set_result, None)
    thread.join()

async def test_client_echo(threaded_server):
    async with websockets.connect('ws://localhost:8000') as ws:
        await ws.send("hello")
        assert await ws.recv() == 'hello'

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions