Skip to content

1st round of migrating integration tests to testkit #558

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions testkitbackend/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ def _process(self, request):
if isinstance(e, Neo4jError):
payload["code"] = e.code
self.send_response("DriverError", payload)
except requests.FrontendError as e:
self.send_response("FrontendError", {"msg": str(e)})
except Exception:
tb = traceback.format_exc()
log.error(tb)
Expand Down
2 changes: 1 addition & 1 deletion testkitbackend/fromtestkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def to_meta_and_timeout(data):
metadata.mark_all_as_read()
timeout = data.get('timeout', None)
if timeout:
timeout = float(timeout) / 1000
timeout = timeout / 1000
return metadata, timeout


Expand Down
46 changes: 40 additions & 6 deletions testkitbackend/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
from testkitbackend.fromtestkit import to_meta_and_timeout


class FrontendError(Exception):
pass


def load_config():
with open(path.join(path.dirname(__file__), "test_config.json"), "r") as fd:
config = json.load(fd)
Expand Down Expand Up @@ -193,7 +197,7 @@ def SessionRun(backend, data):
result = session.run(query, parameters=params)
key = backend.next_key()
backend.results[key] = result
backend.send_response("Result", {"id": key})
backend.send_response("Result", {"id": key, "keys": result.keys()})


def SessionClose(backend, data):
Expand Down Expand Up @@ -244,7 +248,7 @@ def func(tx):
if session_tracker.error_id:
raise backend.errors[session_tracker.error_id]
else:
raise Exception("Client said no")
raise FrontendError("Client said no")

if is_read:
session.read_transaction(func)
Expand All @@ -270,7 +274,7 @@ def TransactionRun(backend, data):
result = tx.run(cypher, parameters=params)
key = backend.next_key()
backend.results[key] = result
backend.send_response("Result", {"id": key})
backend.send_response("Result", {"id": key, "keys": result.keys()})


def TransactionCommit(backend, data):
Expand Down Expand Up @@ -300,13 +304,43 @@ def ResultNext(backend, data):
def ResultConsume(backend, data):
result = backend.results[data["resultId"]]
summary = result.consume()
from neo4j.work.summary import ResultSummary
assert isinstance(summary, ResultSummary)
backend.send_response("Summary", {
"serverInfo": {
"address": ":".join(map(str, summary.server.address)),
"agent": summary.server.agent,
"protocolVersion":
".".join(map(str, summary.server.protocol_version)),
"agent": summary.server.agent,
"address": ":".join(map(str, summary.server.address)),
}
},
"counters": None if not summary.counters else {
"constraintsAdded": summary.counters.constraints_added,
"constraintsRemoved": summary.counters.constraints_removed,
"containsSystemUpdates": summary.counters.contains_system_updates,
"containsUpdates": summary.counters.contains_updates,
"indexesAdded": summary.counters.indexes_added,
"indexesRemoved": summary.counters.indexes_removed,
"labelsAdded": summary.counters.labels_added,
"labelsRemoved": summary.counters.labels_removed,
"nodesCreated": summary.counters.nodes_created,
"nodesDeleted": summary.counters.nodes_deleted,
"propertiesSet": summary.counters.properties_set,
"relationshipsCreated": summary.counters.relationships_created,
"relationshipsDeleted": summary.counters.relationships_deleted,
"systemUpdates": summary.counters.system_updates,
},
"database": summary.database,
"notifications": summary.notifications,
"plan": summary.plan,
"profile": summary.profile,
"query": {
"text": summary.query,
"parameters": {k: totestkit.field(v)
for k, v in summary.parameters.items()},
},
"queryType": summary.query_type,
"resultAvailableAfter": summary.result_available_after,
"resultConsumedAfter": summary.result_consumed_after,
})


Expand Down
5 changes: 4 additions & 1 deletion testkitbackend/test_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
"Optimization:ImplicitDefaultArguments": true,
"Optimization:MinimalResets": "Driver resets some clean connections when put back into pool",
"Optimization:ConnectionReuse": true,
"Optimization:PullPipelining": true
"Optimization:PullPipelining": true,
"Temporary:ResultKeys": true,
"Temporary:FullSummary": true,
"Temporary:CypherPathAndRelationship": true
}
}
21 changes: 20 additions & 1 deletion testkitbackend/totestkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from neo4j.graph import Node
from neo4j.graph import (
Node,
Path,
Relationship,
)


def record(rec):
Expand Down Expand Up @@ -55,5 +59,20 @@ def to(name, val):
"props": field(v._properties),
}
return {"name": "Node", "data": node}
if isinstance(v, Relationship):
rel = {
"id": field(v.id),
"startNodeId": field(v.start_node.id),
"endNodeId": field(v.end_node.id),
"type": field(v.type),
"props": field(v._properties),
}
return {"name": "Relationship", "data": rel}
if isinstance(v, Path):
path = {
"nodes": field(list(v.nodes)),
"relationships": field(list(v.relationships)),
}
return {"name": "Path", "data": path}

raise Exception("Unhandled type:" + str(type(v)))
221 changes: 2 additions & 219 deletions tests/integration/test_autocommit.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,228 +19,11 @@
# limitations under the License.


import pytest

from neo4j.work.simple import Query
from neo4j.exceptions import Neo4jError, ClientError, TransientError
from neo4j.graph import Node, Relationship
from neo4j.api import Version


def test_can_run_simple_statement(session):
result = session.run("RETURN 1 AS n")
for record in result:
assert record[0] == 1
assert record["n"] == 1
with pytest.raises(KeyError):
_ = record["x"]
assert record["n"] == 1
with pytest.raises(KeyError):
_ = record["x"]
with pytest.raises(TypeError):
_ = record[object()]
assert repr(record)
assert len(record) == 1


def test_can_run_simple_statement_with_params(session):
count = 0
for record in session.run("RETURN $x AS n",
{"x": {"abc": ["d", "e", "f"]}}):
assert record[0] == {"abc": ["d", "e", "f"]}
assert record["n"] == {"abc": ["d", "e", "f"]}
assert repr(record)
assert len(record) == 1
count += 1
assert count == 1


def test_autocommit_transactions_use_bookmarks(neo4j_driver):
bookmarks = []
# Generate an initial bookmark
with neo4j_driver.session() as session:
session.run("CREATE ()").consume()
bookmark = session.last_bookmark()
assert bookmark is not None
bookmarks.append(bookmark)
# Propagate into another session
with neo4j_driver.session(bookmarks=bookmarks) as session:
assert list(session._bookmarks) == bookmarks
session.run("CREATE ()").consume()
bookmark = session.last_bookmark()
assert bookmark is not None
assert bookmark not in bookmarks


def test_fails_on_bad_syntax(session):
with pytest.raises(Neo4jError):
session.run("X").consume()


def test_fails_on_missing_parameter(session):
with pytest.raises(Neo4jError):
session.run("RETURN {x}").consume()


def test_keys_with_an_error(session):
with pytest.raises(Neo4jError):
result = session.run("X")
list(result.keys())


def test_should_not_allow_empty_statements(session):
with pytest.raises(ValueError):
_ = session.run("")


def test_can_run_statement_that_returns_multiple_records(session):
count = 0
for record in session.run("unwind(range(1, 10)) AS z RETURN z"):
assert 1 <= record[0] <= 10
count += 1
assert count == 10


def test_can_use_with_to_auto_close_session(session):
record_list = list(session.run("RETURN 1"))
assert len(record_list) == 1
for record in record_list:
assert record[0] == 1


def test_can_return_node(neo4j_driver):
with neo4j_driver.session() as session:
record_list = list(session.run("CREATE (a:Person {name:'Alice'}) "
"RETURN a"))
assert len(record_list) == 1
for record in record_list:
alice = record[0]
assert isinstance(alice, Node)
assert alice.labels == {"Person"}
assert dict(alice) == {"name": "Alice"}


def test_can_return_relationship(neo4j_driver):
with neo4j_driver.session() as session:
record_list = list(session.run("CREATE ()-[r:KNOWS {since:1999}]->() "
"RETURN r"))
assert len(record_list) == 1
for record in record_list:
rel = record[0]
assert isinstance(rel, Relationship)
assert rel.type == "KNOWS"
assert dict(rel) == {"since": 1999}


# TODO: re-enable after server bug is fixed
# def test_can_return_path(session):
# with self.driver.session() as session:
# record_list = list(session.run("MERGE p=({name:'Alice'})-[:KNOWS]->"
# "({name:'Bob'}) RETURN p"))
# assert len(record_list) == 1
# for record in record_list:
# path = record[0]
# assert isinstance(path, Path)
# assert path.start_node["name"] == "Alice"
# assert path.end_node["name"] == "Bob"
# assert path.relationships[0].type == "KNOWS"
# assert len(path.nodes) == 2
# assert len(path.relationships) == 1


def test_keys_are_available_before_and_after_stream(session):
result = session.run("UNWIND range(1, 10) AS n RETURN n")
assert list(result.keys()) == ["n"]
list(result)
assert list(result.keys()) == ["n"]


# TODO: this test will stay until a uniform behavior for `.single()` across the
# drivers has been specified and tests are created in testkit
def test_result_single_record_value(session):
record = session.run(Query("RETURN $x"), x=1).single()
assert record.value() == 1


@pytest.mark.parametrize(
"test_input, neo4j_version",
[
("CALL dbms.getTXMetaData", Version(3, 0)),
("CALL tx.getMetaData", Version(4, 0)),
]
)
def test_autocommit_transactions_should_support_metadata(session, test_input, neo4j_version):
# python -m pytest tests/integration/test_autocommit.py -s -r fEsxX -k test_autocommit_transactions_should_support_metadata
metadata_in = {"foo": "bar"}

result = session.run("RETURN 1")
value = result.single().value()
summary = result.consume()
server_agent = summary.server.agent

try:
statement = Query(test_input, metadata=metadata_in)
result = session.run(statement)
metadata_out = result.single().value()
except ClientError as e:
if e.code == "Neo.ClientError.Procedure.ProcedureNotFound":
pytest.skip("Cannot assert correct metadata as {} does not support procedure '{}' introduced in Neo4j {}".format(server_agent, test_input, neo4j_version))
else:
raise
else:
assert metadata_in == metadata_out


def test_autocommit_transactions_should_support_timeout(neo4j_driver):
with neo4j_driver.session() as s1:
s1.run("CREATE (a:Node)").consume()
with neo4j_driver.session() as s2:
tx1 = s1.begin_transaction()
tx1.run("MATCH (a:Node) SET a.property = 1").consume()
try:
result = s2.run(Query("MATCH (a:Node) SET a.property = 2", timeout=0.25))
result.consume()
# On 4.0 and older
except TransientError:
pass
# On 4.1 and forward
except ClientError:
pass
else:
raise


def test_regex_in_parameter(session):
matches = []
result = session.run("UNWIND ['A', 'B', 'C', 'A B', 'B C', 'A B C', "
"'A BC', 'AB C'] AS t WITH t "
"WHERE t =~ $re RETURN t", re=r'.*\bB\b.*')
for record in result:
matches.append(record.value())
assert matches == ["B", "A B", "B C", "A B C"]


def test_regex_inline(session):
matches = []
result = session.run(r"UNWIND ['A', 'B', 'C', 'A B', 'B C', 'A B C', "
r"'A BC', 'AB C'] AS t WITH t "
r"WHERE t =~ '.*\\bB\\b.*' RETURN t")
for record in result:
matches.append(record.value())
assert matches == ["B", "A B", "B C", "A B C"]


def test_automatic_reset_after_failure(session):
try:
result = session.run("X")
result.consume()
except Neo4jError:
result = session.run("RETURN 1")
record = next(iter(result))
assert record[0] == 1
else:
assert False, "A Cypher error should have occurred"


def test_large_values(bolt_driver):
for i in range(1, 7):
with bolt_driver.session() as session:
session.run("RETURN '{}'".format("A" * 2 ** 20))
Loading