Skip to content

Added support for custom auth tokens #86

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions neo4j/v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
instances that are in turn used for managing sessions.
"""


from __future__ import division

from collections import deque
Expand All @@ -39,7 +38,6 @@
from .summary import ResultSummary
from .types import hydrated


DEFAULT_MAX_POOL_SIZE = 50

localhost = re.compile(r"^(localhost|127(\.\d+){3})$", re.IGNORECASE)
Expand All @@ -49,10 +47,17 @@ class AuthToken(object):
""" Container for auth information
"""

def __init__(self, scheme, principal, credentials):
#: By default we should not send any realm
realm = None

def __init__(self, scheme, principal, credentials, realm=None, **parameters):
self.scheme = scheme
self.principal = principal
self.credentials = credentials
if realm:
self.realm = realm
if parameters:
self.parameters = parameters


class GraphDatabase(object):
Expand Down Expand Up @@ -135,7 +140,7 @@ def __init__(self, address, **config):
self.encrypted = encrypted
self.trust = trust = config.get("trust", TRUST_DEFAULT)
if encrypted == ENCRYPTION_ON or \
encrypted == ENCRYPTION_NON_LOCAL and not localhost.match(host):
encrypted == ENCRYPTION_NON_LOCAL and not localhost.match(host):
if not SSL_AVAILABLE:
raise RuntimeError("Bolt over TLS is only available in Python 2.7.9+ and Python 3.3+")
ssl_context = SSLContext(PROTOCOL_SSLv23)
Expand Down Expand Up @@ -510,14 +515,28 @@ def __ne__(self, other):
return not self.__eq__(other)


def basic_auth(user, password):
def basic_auth(user, password, realm=None):
""" Generate a basic auth token for a given user and password.

:param user: user name
:param password: current password
:param realm: specifies the authentication provider
:return: auth token for use with :meth:`GraphDatabase.driver`
"""
return AuthToken("basic", user, password, realm)


def custom_auth(principal, credentials, realm, scheme, **parameters):
""" Generate a basic auth token for a given user and password.

:param principal: specifies who is being authenticated
:param credentials: authenticates the principal
:param realm: specifies the authentication provider
:param scheme: specifies the type of authentication
:param parameters: parameters passed along to the authenticatin provider
:return: auth token for use with :meth:`GraphDatabase.driver`
"""
return AuthToken("basic", user, password)
return AuthToken(scheme, principal, credentials, realm, **parameters)


def run(connection, statement, parameters=None):
Expand Down
25 changes: 24 additions & 1 deletion test/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from neo4j.v1.constants import TRUST_ON_FIRST_USE
from neo4j.v1.exceptions import CypherError, ProtocolError, ResultError
from neo4j.v1.session import GraphDatabase, basic_auth, Record, SSL_AVAILABLE
from neo4j.v1.session import GraphDatabase, basic_auth, custom_auth, Record, SSL_AVAILABLE
from neo4j.v1.types import Node, Relationship, Path

from test.util import ServerTestCase
Expand Down Expand Up @@ -97,6 +97,29 @@ def test_fail_nicely_when_connecting_to_http_port(self):
assert str(context.exception) == "Server responded HTTP. Make sure you are not trying to connect to the http " \
"endpoint (HTTP defaults to port 7474 whereas BOLT defaults to port 7687)"

def test_can_provide_realm_with_basic_auth_token(self):
token = basic_auth("neo4j", "neo4j", "native")
driver = GraphDatabase.driver("bolt://localhost", auth=token)
session = driver.session()
result = session.run("RETURN 1").consume()
session.close()
assert result is not None

def test_can_create_custom_auth_token(self):
token = custom_auth("neo4j", "neo4j", "native", "basic")
driver = GraphDatabase.driver("bolt://localhost", auth=token)
session = driver.session()
result = session.run("RETURN 1").consume()
session.close()
assert result is not None

def test_can_create_custom_auth_token_with_additional_parameters(self):
token = custom_auth("neo4j", "neo4j", "native", "basic", secret=42)
driver = GraphDatabase.driver("bolt://localhost", auth=token)
session = driver.session()
result = session.run("RETURN 1").consume()
session.close()
assert result is not None


class SecurityTestCase(ServerTestCase):
Expand Down