diff --git a/btrdb/__init__.py b/btrdb/__init__.py index a7019c7..cd14662 100644 --- a/btrdb/__init__.py +++ b/btrdb/__init__.py @@ -20,7 +20,9 @@ from btrdb.exceptions import ConnectionError from btrdb.version import get_version from btrdb.utils.credentials import credentials_by_profile, credentials +from btrdb.utils.ray import register_serializer from btrdb.stream import MINIMUM_TIME, MAXIMUM_TIME +from warnings import warn ########################################################################## ## Module Variables @@ -40,7 +42,7 @@ def _connect(endpoints=None, apikey=None): return BTrDB(Endpoint(Connection(endpoints, apikey=apikey).channel)) -def connect(conn_str=None, apikey=None, profile=None): +def connect(conn_str=None, apikey=None, profile=None, shareable=False): """ Connect to a BTrDB server. @@ -57,6 +59,12 @@ def connect(conn_str=None, apikey=None, profile=None): The name of a profile containing the required connection information as found in the user's predictive grid credentials file `~/.predictivegrid/credentials.yaml`. + shareable: bool, default=False + Whether or not the connection can be "shared" in a distributed setting such + as Ray workers. If set to True, the connection can be serialized and sent + to other workers so that data can be retrieved in parallel; **however**, this + is less secure because it is possible for other users of the Ray cluster to + use your API key to fetch data. Returns ------- @@ -68,6 +76,11 @@ def connect(conn_str=None, apikey=None, profile=None): if conn_str and profile: raise ValueError("Received both conn_str and profile arguments.") + # check shareable flag and register custom serializer if necessary + if shareable: + warn("a shareable connection is potentially insecure; other users of the same cluster may be able to access your API key") + register_serializer(conn_str=conn_str, apikey=apikey, profile=profile) + # use specific profile if requested if profile: return _connect(**credentials_by_profile(profile)) diff --git a/btrdb/utils/ray.py b/btrdb/utils/ray.py index 8df9527..64b2990 100644 --- a/btrdb/utils/ray.py +++ b/btrdb/utils/ray.py @@ -2,6 +2,8 @@ import ray +import semver + import btrdb from btrdb.conn import BTrDB @@ -22,8 +24,18 @@ def register_serializer(conn_str=None, apikey=None, profile=None): found in the user's predictive grid credentials file `~/.predictivegrid/credentials.yaml`. """ - ray.register_custom_serializer( - BTrDB, serializer=btrdb_serializer, deserializer=partial(btrdb_deserializer, conn_str=conn_str, apikey=apikey, profile=profile)) + assert ray.is_initialized(), "Need to call ray.init() before registering custom serializer" + # TODO: check the version using the 'semver' package? + ver = semver.VersionInfo.parse(ray.__version__) + if ver.major == 0: + ray.register_custom_serializer( + BTrDB, serializer=btrdb_serializer, deserializer=partial(btrdb_deserializer, conn_str=conn_str, apikey=apikey, profile=profile)) + elif ver.major == 1 and ver.minor in range(2, 4): + # TODO: check different versions of ray? + ray.util.register_serializer( + BTrDB, serializer=btrdb_serializer, deserializer=partial(btrdb_deserializer, conn_str=conn_str, apikey=apikey, profile=profile)) + else: + raise Exception("Ray version %s does not have custom serialization. Please upgrade to >= 1.2.0" % ray.__version__) def btrdb_serializer(_): """