Skip to content

Make BTrDB connections serializable in Ray #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion btrdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
from btrdb.exceptions import ConnectionError
from btrdb.version import get_version
from btrdb.utils.credentials import credentials_by_profile, credentials
from btrdb.utils.ray import register_serializer
from btrdb.stream import MINIMUM_TIME, MAXIMUM_TIME
from warnings import warn

##########################################################################
## Module Variables
Expand All @@ -40,7 +42,7 @@
def _connect(endpoints=None, apikey=None):
return BTrDB(Endpoint(Connection(endpoints, apikey=apikey).channel))

def connect(conn_str=None, apikey=None, profile=None):
def connect(conn_str=None, apikey=None, profile=None, shareable=False):
"""
Connect to a BTrDB server.

Expand All @@ -57,6 +59,12 @@ def connect(conn_str=None, apikey=None, profile=None):
The name of a profile containing the required connection information as
found in the user's predictive grid credentials file
`~/.predictivegrid/credentials.yaml`.
shareable: bool, default=False
Whether or not the connection can be "shared" in a distributed setting such
as Ray workers. If set to True, the connection can be serialized and sent
to other workers so that data can be retrieved in parallel; **however**, this
is less secure because it is possible for other users of the Ray cluster to
use your API key to fetch data.

Returns
-------
Expand All @@ -68,6 +76,11 @@ def connect(conn_str=None, apikey=None, profile=None):
if conn_str and profile:
raise ValueError("Received both conn_str and profile arguments.")

# check shareable flag and register custom serializer if necessary
if shareable:
warn("a shareable connection is potentially insecure; other users of the same cluster may be able to access your API key")
register_serializer(conn_str=conn_str, apikey=apikey, profile=profile)

# use specific profile if requested
if profile:
return _connect(**credentials_by_profile(profile))
Expand Down
16 changes: 14 additions & 2 deletions btrdb/utils/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import ray

import semver

import btrdb
from btrdb.conn import BTrDB

Expand All @@ -22,8 +24,18 @@ def register_serializer(conn_str=None, apikey=None, profile=None):
found in the user's predictive grid credentials file
`~/.predictivegrid/credentials.yaml`.
"""
ray.register_custom_serializer(
BTrDB, serializer=btrdb_serializer, deserializer=partial(btrdb_deserializer, conn_str=conn_str, apikey=apikey, profile=profile))
assert ray.is_initialized(), "Need to call ray.init() before registering custom serializer"
# TODO: check the version using the 'semver' package?
ver = semver.VersionInfo.parse(ray.__version__)
if ver.major == 0:
ray.register_custom_serializer(
BTrDB, serializer=btrdb_serializer, deserializer=partial(btrdb_deserializer, conn_str=conn_str, apikey=apikey, profile=profile))
elif ver.major == 1 and ver.minor in range(2, 4):
# TODO: check different versions of ray?
ray.util.register_serializer(
BTrDB, serializer=btrdb_serializer, deserializer=partial(btrdb_deserializer, conn_str=conn_str, apikey=apikey, profile=profile))
else:
raise Exception("Ray version %s does not have custom serialization. Please upgrade to >= 1.2.0" % ray.__version__)

def btrdb_serializer(_):
"""
Expand Down