Skip to content

Commit ab180fd

Browse files
committed
Merge remote-tracking branch 'upstream/master' into update-btrdb-custom-serialization
2 parents cd27a0b + aa08c59 commit ab180fd

File tree

3 files changed

+7
-97
lines changed

3 files changed

+7
-97
lines changed

btrdb/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
## Module Info
1616
##########################################################################
1717

18-
__version_info__ = { 'major': 5, 'minor': 10, 'micro': 3, 'releaselevel': 'final'}
18+
__version_info__ = { 'major': 5, 'minor': 11, 'micro': 0, 'releaselevel': 'final'}
1919

2020
##########################################################################
2121
## Helper Functions

docs/source/working/multiprocessing.rst

Lines changed: 4 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,9 @@
33
Multiprocessing
44
===============
55

6-
.. warning::
7-
8-
If using btrdb-python with multiprocessing, you must fork (e.g. start your workers)
9-
before creating a connection to the database, otherwise the gRPC connection will
10-
hang. See: https://github.com/grpc/grpc/issues/15334 for details.
11-
126
Complex analytics in Python may require additional speedups that can be gained by using the Python multiprocessing library. Other libraries like web applications take advantage of multiprocessing to serve a large number of users. Because btrdb-python uses `grpc <https://grpc.io/docs/tutorials/basic/python.html>`_ under the hood, it is important to understand how to connect and reuse connections to the database in a multiprocess or multithread context.
137

14-
The first and most critical thing to note is that ``btrdb.Connection`` objects *are not thread- or multiprocess-safe*. This means that in your code you should use either a lock or a semaphore to share a single connection object or that each process or thread should create their own connection object and clean up after themselves when they are done using the connection. Moreover, because of the forking issue discribed in the warning above, you must also take care when to create connections in worker processes.
8+
The most critical thing to note is that ``btrdb.Connection`` objects *are not thread or multiprocess-safe*. This means that in your code you should use either a lock or a semaphore to share a single connection object or that each process or thread should create their own connection object and clean up after themselves when they are done using the connection.
159

1610
Let's take the following simple example: we want to perform a data quality analysis on 12 hour chunks of data for all the streams in our ``staging/sensors`` collection. If we have hundreds of sensor streams across many months, this job can be sped up dramatically by using multiprocessing. Instead of having a single process churning through the each chunk of data one at a time, several workers can process multiple data chunks simultanously using multiple CPU cores and taking advantage of other CPU scheduling optimizations.
1711

@@ -66,7 +60,7 @@ Consider the processing architecture shown in :numref:`architecture`. At first g
6660
quality.append(data_quality(values))
6761
6862
# Return the quality scores
69-
return json.dumps({"uuid": uuid, "version": version, "quality": quality})
63+
return json.dumps({"uuid": str(uuid), "version": version, "quality": quality})
7064
7165
7266
if __name__ == "__main__":
@@ -82,90 +76,6 @@ Consider the processing architecture shown in :numref:`architecture`. At first g
8276
8377
Let's break this down quickly since this is a very common design pattern. First the ``time_ranges`` function gets the earliest and latest timestamp from a stream, then returns all 12 hour intervals between those two timestamps with no overlap. An imaginary ``stream_quality`` function takes a uuid for a stream, connects to the database and then applies the example ``data_quality`` method to all 12 hour chunks of data using the ``time_ranges`` method, returning a JSON string with the results.
8478

85-
We expect the ``stream_quality`` function to be our parallelizable function (e.g. computing the data quality for multiple streams at a time). Depending on how long the ``data_quality`` function takes to compute we may also want to parallelize ``(stream, start, end)`` tuples. It seems that the ``multiprocessing.Pool`` would be perfect for this.
86-
87-
The problem, however, occurs because in order to get the UUIDs of the streams to queue to the ``Pool``, we must first connect to the database and perform a search on the specified collection. This connection appears before the fork (which occurs when ``imap_unordered`` is called) and therefore gRPC fails. Unfortunately this means we have to be a bit more verbose.
88-
89-
The solution is to create a custom worker that connects to BTrDB after the fork. Unfortunately, at the time of this writing there is no way to pass a custom worker to the ``Pool`` object. The worker is as follows:
90-
91-
.. code-block:: python
92-
93-
class Worker(mp.Process):
94-
95-
def __init__(self, host, apikey, handler, tasks, results):
96-
self.host = host
97-
self.apikey = apikey
98-
self.handler = handler
99-
self.tasks = tasks
100-
self.results = results
101-
102-
self.db = None
103-
super(Worker, self).__init__()
104-
105-
def connect(self):
106-
self.db = btrdb.connect(self.host, apikey=self.apikey)
107-
108-
def run(self):
109-
# connect when started to ensure connection is in the fork
110-
self.connect()
111-
112-
while True:
113-
task = self.tasks.get()
114-
if task is None:
115-
# poison pill means shutdown
116-
return
117-
118-
try:
119-
# Pass the task to the handler
120-
result = self.handler(task)
121-
except Exception as e:
122-
# Send any exceptions back to main process
123-
result = {"task": task, "error": str(e)}
124-
125-
self.results.put_nowait(result)
126-
127-
This simple worker process accepts BTrDB connection arguments, the URL and API key to connect to the database as well as a handler function and tasks and resuls queues. It only connects to the database on ``run()``, ensuring that the connection occurs after the fork. Then it simply reads off the task queue, executing the task and putting the results (or exceptions) on the results queue. If it gets ``None`` from the tasks queue, it shuts down.
128-
129-
We can change our multiprocessing method to use this new worker and connect after fork as follows:
130-
131-
.. code-block:: python
132-
133-
if __name__ == "__main__":
134-
135-
# BTrDB connection credentials
136-
HOST = "sensors.predictivegrid.com"
137-
APIKEY = "mysupersecretkey"
138-
139-
# Tasks and results queues
140-
tasks, results = mp.Queue(), mp.Queue()
141-
142-
# Create the workers with credentials and queues
143-
workers = [
144-
Worker(HOST, APIKEY, stream_quality, tasks, results)
145-
for _ in range(mp.cpu_count())
146-
]
147-
148-
# Start the workers, this is where the fork occurs
149-
for worker in workers:
150-
worker.start()
151-
152-
# Now we can connect to the database and enqueue the streams
153-
n_tasks = 0
154-
db = btrdb.connect(HOST, apikey=APIKEY)
155-
for stream in db.streams_in_collection("staging/sensors"):
156-
tasks.put_nowait(stream.uuid)
157-
n_tasks += 1
158-
159-
# Enqueue the poison pill to shut the workers down
160-
for _ in range(len(workers)):
161-
tasks.put_nowait(None)
162-
163-
# Begin reading off of the results queue
164-
for _ in range(n_tasks):
165-
print(results.get())
166-
167-
# Join on the workers to ensure they clean up
168-
for worker in workers:
169-
worker.join()
79+
The ``stream_quality`` function is our parallelizable function (e.g. computing the data quality for multiple streams at a time). Depending on how long the ``data_quality`` function takes to compute, we may also want to parallelize ``(stream, start, end)`` tuples.
17080

171-
This method is certainly a lot more verbose than using `mp.Pool`, but unfortunately is the only work around to the forking issue that exists in BTrDB. If you would like features like a connection pool object (as other databases have) or multiprocessing helpers, please leave us a note in our GitHub issues!
81+
If you would like features like a connection pool object (as other databases have) or multiprocessing helpers, please leave us a note in our GitHub issues!

requirements.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# GRPC / Protobuff related
2-
grpcio>=1.16.1
3-
grpcio-tools>=1.16.1
2+
grpcio>=1.19.0
3+
grpcio-tools>=1.19.0
44

55
# Time related utils
66
pytz

0 commit comments

Comments
 (0)