Open
Description
To test the basic usage of websockets, I want to start a server and test the return string from client, I think server and client should be in separate thread.
The below code passed, but I just want to know if there's a proper/generalized way to do it?
import pytest
import asyncio
import threading
import websockets
async def server(stop_sig):
async def echo(ws):
async for msg in ws:
await ws.send(msg)
async with websockets.serve(echo, "localhost", 8000):
await stop_sig
@pytest.fixture
def threaded_server():
policy = asyncio.get_event_loop_policy()
loop = policy.new_event_loop()
sig = asyncio.Future(loop=loop)
def run_loop(loop, coro):
loop.run_until_complete(coro)
loop.close()
thread = threading.Thread(target=run_loop, args=(loop, server(sig)))
thread.start()
yield
loop.call_soon_threadsafe(sig.set_result, None)
thread.join()
async def test_client_echo(threaded_server):
async with websockets.connect('ws://localhost:8000') as ws:
await ws.send("hello")
assert await ws.recv() == 'hello'