Skip to content

Commit b84b44b

Browse files
committed
add async interface
1 parent 1c97c6d commit b84b44b

File tree

2 files changed

+74
-0
lines changed

2 files changed

+74
-0
lines changed

pylsp_jsonrpc/endpoint.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging
55
import uuid
66
import sys
7+
import asyncio
78

89
from concurrent import futures
910
from .exceptions import (JsonRpcException, JsonRpcRequestCancelled,
@@ -35,6 +36,17 @@ def __init__(self, dispatcher, consumer, id_generator=lambda: str(uuid.uuid4()),
3536
self._client_request_futures = {}
3637
self._server_request_futures = {}
3738
self._executor_service = futures.ThreadPoolExecutor(max_workers=max_workers)
39+
self._cancelledRequests = set()
40+
self._messageQueue = asyncio.Queue()
41+
42+
async def consume_task(self):
43+
log.warning("starting task")
44+
loop = asyncio.get_running_loop()
45+
while True:
46+
message = await self._messageQueue.get()
47+
await loop.run_in_executor(None, self.consume, message)
48+
log.warning("got message in task")
49+
self._messageQueue.task_done()
3850

3951
def shutdown(self):
4052
self._executor_service.shutdown()
@@ -94,7 +106,15 @@ def callback(future):
94106
future.set_exception(JsonRpcRequestCancelled())
95107
return callback
96108

109+
async def consume_async(self, message):
110+
log.warning("got message put in queue")
111+
if message['method'] == CANCEL_METHOD:
112+
self._cancelledRequests.add(message.get('params')['id'])
113+
await self._messageQueue.put(message)
114+
115+
97116
def consume(self, message):
117+
log.warning("consume message")
98118
"""Consume a JSON RPC message from the client.
99119
100120
Args:
@@ -182,6 +202,9 @@ def _handle_request(self, msg_id, method, params):
182202
except KeyError as e:
183203
raise JsonRpcMethodNotFound.of(method) from e
184204

205+
if msg_id in self._cancelledRequests:
206+
raise JsonRpcRequestCancelled()
207+
185208
handler_result = handler(params)
186209

187210
if callable(handler_result):

pylsp_jsonrpc/streams.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
import logging
55
import threading
6+
import asyncio
7+
from concurrent.futures import Future
68

79
try:
810
import ujson as json
@@ -65,6 +67,55 @@ def _read_message(self):
6567
# Grab the body
6668
return self._rfile.read(content_length)
6769

70+
async def listen_async(self, message_consumer):
71+
"""Blocking call to listen for messages on the rfile.
72+
73+
Args:
74+
message_consumer (fn): function that is passed each message as it is read off the socket.
75+
"""
76+
77+
while not self._rfile.closed:
78+
try:
79+
request_str = await self._read_message_async()
80+
except ValueError:
81+
if self._rfile.closed:
82+
return
83+
log.exception("Failed to read from rfile")
84+
85+
if request_str is None:
86+
break
87+
88+
try:
89+
await message_consumer(json.loads(request_str.decode('utf-8')))
90+
except ValueError:
91+
log.exception("Failed to parse JSON message %s", request_str)
92+
continue
93+
94+
async def _read_message_async(self):
95+
"""Reads the contents of a message.
96+
97+
Returns:
98+
body of message if parsable else None
99+
"""
100+
101+
loop = asyncio.get_running_loop()
102+
line = await loop.run_in_executor(None, self._rfile.readline)
103+
104+
if not line:
105+
return None
106+
107+
content_length = self._content_length(line)
108+
109+
# Blindly consume all header lines
110+
while line and line.strip():
111+
line = await loop.run_in_executor(None, self._rfile.readline)
112+
113+
if not line:
114+
return None
115+
116+
# Grab the body
117+
return await loop.run_in_executor(None, self._rfile.read, content_length)
118+
68119
@staticmethod
69120
def _content_length(line):
70121
"""Extract the content length from an input line."""

0 commit comments

Comments
 (0)