Skip to content

Feature/stream tx graph #310

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Oct 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ jobs:
docker-img:
- docker.io/arangodb:3.3.23
- docker.io/arangodb:3.4.8
- docker.io/arangodb:3.5.0
- docker.io/arangodb:3.5.1
- docker.io/arangodb/enterprise:3.4.8
- docker.io/arangodb/enterprise:3.5.0
- docker.io/arangodb/enterprise:3.5.1
topology:
- single
- cluster
Expand Down
8 changes: 8 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) a

## [Unreleased]

### Added

- Stream Transactions support for graph APIs

### Fixed

- `catchExceptions` option in async `getEdge` and `getVertex`

## [6.3.0] - 2019-09-16

### Added
Expand Down
2 changes: 1 addition & 1 deletion docker/start_db_cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# ./start_db_cluster.sh <dockerImage>

# EXAMPLE:
# ./start_db_cluster.sh docker.io/arangodb:3.5.0
# ./start_db_cluster.sh docker.io/arangodb/arangodb:3.5.1

docker pull "$1"

Expand Down
2 changes: 1 addition & 1 deletion docker/start_db_single.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# ./start_db_single.sh <dockerImage>

# EXAMPLE:
# ./start_db_single.sh docker.io/arangodb:3.5.0
# ./start_db_single.sh docker.io/arangodb/arangodb:3.5.1

docker pull "$1"

Expand Down
16 changes: 16 additions & 0 deletions docs/Drivers/Java/Reference/Graph/Edges.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ Retrieves the edge document with the given `key` from the collection.

Whether or not catch possible thrown exceptions

- **streamTransactionId**: `String`

If set, the operation will be executed within the transaction

## ArangoEdgeCollection.insertEdge

`ArangoEdgeCollection.insertEdge(T value, EdgeCreateOptions options) : EdgeEntity`
Expand Down Expand Up @@ -88,6 +92,10 @@ a edge and no precondition is violated.

Replace a document based on target revision

- **streamTransactionId**: `String`

If set, the operation will be executed within the transaction

**Examples**

```Java
Expand Down Expand Up @@ -135,6 +143,10 @@ edge and no precondition is violated.
from the existing document that are contained in the patch document with an
attribute value of null.

- **streamTransactionId**: `String`

If set, the operation will be executed within the transaction

**Examples**

```Java
Expand Down Expand Up @@ -169,6 +181,10 @@ Deletes the edge with the given _key_ from the collection.

Remove a document based on target revision

- **streamTransactionId**: `String`

If set, the operation will be executed within the transaction

**Examples**

```Java
Expand Down
24 changes: 24 additions & 0 deletions docs/Drivers/Java/Reference/Graph/Vertices.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ Retrieves the vertex document with the given `key` from the collection.

Whether or not catch possible thrown exceptions

- **streamTransactionId**: `String`

If set, the operation will be executed within the transaction

## ArangoVertexCollection.insertVertex

`ArangoVertexCollection.insertVertex(T value, VertexCreateOptions options) : VertexEntity`
Expand All @@ -48,6 +52,14 @@ Creates a new vertex in the collection.

Wait until document has been synced to disk.

- **streamTransactionId**: `String`

If set, the operation will be executed within the transaction

- **streamTransactionId**: `String`

If set, the operation will be executed within the transaction

**Examples**

```Java
Expand Down Expand Up @@ -88,6 +100,10 @@ a vertex and no precondition is violated.

Replace a document based on target revision

- **streamTransactionId**: `String`

If set, the operation will be executed within the transaction

**Examples**

```Java
Expand Down Expand Up @@ -135,6 +151,10 @@ a vertex and no precondition is violated.
from the existing document that are contained in the patch document with
an attribute value of null.

- **streamTransactionId**: `String`

If set, the operation will be executed within the transaction

**Examples**

```Java
Expand Down Expand Up @@ -169,6 +189,10 @@ Deletes the vertex with the given _key_ from the collection.

Remove a document based on target revision

- **streamTransactionId**: `String`

If set, the operation will be executed within the transaction

**Examples**

```Java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Function;

/**
* @author Mark Vollmary
Expand Down Expand Up @@ -116,29 +114,7 @@ public <T> CompletableFuture<T> getDocument(
DocumentUtil.validateDocumentKey(key);
boolean isCatchException = options != null ? options.isCatchException() : new DocumentReadOptions().isCatchException();
return (CompletableFuture<T>) executor.execute(getDocumentRequest(key, options), type)
.exceptionally(handleGetDocumentExceptions(isCatchException));
}

private <T> Function<Throwable, T> handleGetDocumentExceptions(Boolean isCatchException) {
return throwable -> {
if (throwable instanceof CompletionException) {
if (throwable.getCause() instanceof ArangoDBException) {
ArangoDBException arangoDBException = (ArangoDBException) throwable.getCause();

// handle Response: 404, Error: 1655 - transaction not found
if (arangoDBException.getErrorNum() != null && arangoDBException.getErrorNum() == 1655) {
throw (CompletionException) throwable;
}

if ((arangoDBException.getResponseCode() != null && (arangoDBException.getResponseCode() == 404 || arangoDBException.getResponseCode() == 304
|| arangoDBException.getResponseCode() == 412)) && isCatchException) {
return null;
}
}
throw (CompletionException) throwable;
}
throw new CompletionException(throwable);
};
.exceptionally(ExceptionUtil.catchGetDocumentExceptions(isCatchException));
}

@Override
Expand Down Expand Up @@ -260,7 +236,7 @@ public CompletableFuture<Boolean> documentExists(final String key) {
public CompletableFuture<Boolean> documentExists(final String key, final DocumentExistsOptions options) {
boolean isCatchException = options != null ? options.isCatchException() : new DocumentExistsOptions().isCatchException();
return executor.execute(documentExistsRequest(key, options), response -> response)
.exceptionally(handleGetDocumentExceptions(isCatchException))
.exceptionally(ExceptionUtil.catchGetDocumentExceptions(isCatchException))
.thenApply(Objects::nonNull);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ public <T> CompletableFuture<EdgeEntity> insertEdge(final T value, final EdgeCre

@Override
public <T> CompletableFuture<T> getEdge(final String key, final Class<T> type) {
return executor.execute(getEdgeRequest(key, new GraphDocumentReadOptions()), getEdgeResponseDeserializer(type));
return getEdge(key, type, null);
}

@Override
public <T> CompletableFuture<T> getEdge(final String key, final Class<T> type, final GraphDocumentReadOptions options) {
return executor.execute(getEdgeRequest(key, options), getEdgeResponseDeserializer(type));
boolean isCatchException = options != null ? options.isCatchException() : new GraphDocumentReadOptions().isCatchException();
return executor.execute(getEdgeRequest(key, options), getEdgeResponseDeserializer(type))
.exceptionally(ExceptionUtil.catchGetDocumentExceptions(isCatchException));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,17 @@ public <T> CompletableFuture<VertexEntity> insertVertex(final T value, final Ver

@Override
public <T> CompletableFuture<T> getVertex(final String key, final Class<T> type) {
return executor.execute(getVertexRequest(key, new GraphDocumentReadOptions()), getVertexResponseDeserializer(type));
return getVertex(key, type, null);
}

@Override
public <T> CompletableFuture<T> getVertex(
final String key,
final Class<T> type,
final GraphDocumentReadOptions options) {
return executor.execute(getVertexRequest(key, options), getVertexResponseDeserializer(type));
boolean isCatchException = options != null ? options.isCatchException() : new GraphDocumentReadOptions().isCatchException();
return executor.execute(getVertexRequest(key, options), getVertexResponseDeserializer(type))
.exceptionally(ExceptionUtil.catchGetDocumentExceptions(isCatchException));
}

@Override
Expand Down
52 changes: 52 additions & 0 deletions src/main/java/com/arangodb/async/internal/ExceptionUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.arangodb.async.internal;/*
* DISCLAIMER
*
* Copyright 2016 ArangoDB GmbH, Cologne, Germany
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Copyright holder is ArangoDB GmbH, Cologne, Germany
*/


import com.arangodb.ArangoDBException;

import java.util.concurrent.CompletionException;
import java.util.function.Function;

/**
* @author Michele Rastelli
*/
class ExceptionUtil {
static <T> Function<Throwable, T> catchGetDocumentExceptions(Boolean isCatchException) {
return throwable -> {
if (throwable instanceof CompletionException) {
if (throwable.getCause() instanceof ArangoDBException) {
ArangoDBException arangoDBException = (ArangoDBException) throwable.getCause();

// handle Response: 404, Error: 1655 - transaction not found
if (arangoDBException.getErrorNum() != null && arangoDBException.getErrorNum() == 1655) {
throw (CompletionException) throwable;
}

if ((arangoDBException.getResponseCode() != null && (arangoDBException.getResponseCode() == 404 || arangoDBException.getResponseCode() == 304
|| arangoDBException.getResponseCode() == 412)) && isCatchException) {
return null;
}
}
throw (CompletionException) throwable;
}
throw new CompletionException(throwable);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public abstract class InternalArangoEdgeCollection<A extends InternalArangoDB<E>
private static final String PATH_API_GHARIAL = "/_api/gharial";
private static final String EDGE = "edge";

private static final String TRANSACTION_ID = "x-arango-trx-id";

private final G graph;
private final String name;

Expand All @@ -67,6 +69,7 @@ protected <T> Request insertEdgeRequest(final T value, final EdgeCreateOptions o
final Request request = request(graph.db().name(), RequestType.POST, PATH_API_GHARIAL, graph.name(), EDGE,
name);
final EdgeCreateOptions params = (options != null ? options : new EdgeCreateOptions());
request.putHeaderParam(TRANSACTION_ID, params.getStreamTransactionId());
request.putQueryParam(ArangoRequestParam.WAIT_FOR_SYNC, params.getWaitForSync());
request.setBody(util(Serializer.CUSTOM).serialize(value));
return request;
Expand All @@ -89,6 +92,7 @@ protected Request getEdgeRequest(final String key, final GraphDocumentReadOption
final Request request = request(graph.db().name(), RequestType.GET, PATH_API_GHARIAL, graph.name(), EDGE,
DocumentUtil.createDocumentHandle(name, key));
final GraphDocumentReadOptions params = (options != null ? options : new GraphDocumentReadOptions());
request.putHeaderParam(TRANSACTION_ID, params.getStreamTransactionId());
request.putHeaderParam(ArangoRequestParam.IF_NONE_MATCH, params.getIfNoneMatch());
request.putHeaderParam(ArangoRequestParam.IF_MATCH, params.getIfMatch());
if (params.getAllowDirtyRead() == Boolean.TRUE) {
Expand All @@ -105,6 +109,7 @@ protected <T> Request replaceEdgeRequest(final String key, final T value, final
final Request request = request(graph.db().name(), RequestType.PUT, PATH_API_GHARIAL, graph.name(), EDGE,
DocumentUtil.createDocumentHandle(name, key));
final EdgeReplaceOptions params = (options != null ? options : new EdgeReplaceOptions());
request.putHeaderParam(TRANSACTION_ID, params.getStreamTransactionId());
request.putQueryParam(ArangoRequestParam.WAIT_FOR_SYNC, params.getWaitForSync());
request.putHeaderParam(ArangoRequestParam.IF_MATCH, params.getIfMatch());
request.setBody(util(Serializer.CUSTOM).serialize(value));
Expand All @@ -127,6 +132,7 @@ protected <T> Request updateEdgeRequest(final String key, final T value, final E
request = request(graph.db().name(), RequestType.PATCH, PATH_API_GHARIAL, graph.name(), EDGE,
DocumentUtil.createDocumentHandle(name, key));
final EdgeUpdateOptions params = (options != null ? options : new EdgeUpdateOptions());
request.putHeaderParam(TRANSACTION_ID, params.getStreamTransactionId());
request.putQueryParam(ArangoRequestParam.KEEP_NULL, params.getKeepNull());
request.putQueryParam(ArangoRequestParam.WAIT_FOR_SYNC, params.getWaitForSync());
request.putHeaderParam(ArangoRequestParam.IF_MATCH, params.getIfMatch());
Expand All @@ -150,6 +156,7 @@ protected Request deleteEdgeRequest(final String key, final EdgeDeleteOptions op
final Request request = request(graph.db().name(), RequestType.DELETE, PATH_API_GHARIAL, graph.name(), EDGE,
DocumentUtil.createDocumentHandle(name, key));
final EdgeDeleteOptions params = (options != null ? options : new EdgeDeleteOptions());
request.putHeaderParam(TRANSACTION_ID, params.getStreamTransactionId());
request.putQueryParam(ArangoRequestParam.WAIT_FOR_SYNC, params.getWaitForSync());
request.putHeaderParam(ArangoRequestParam.IF_MATCH, params.getIfMatch());
return request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected Request dropRequest() {
protected Request dropRequest(final boolean dropCollections) {
final Request request = request(db.name(), RequestType.DELETE, PATH_API_GHARIAL, name);
if (dropCollections) {
request.putQueryParam("dropCollections", dropCollections);
request.putQueryParam("dropCollections", true);
}
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public abstract class InternalArangoVertexCollection<A extends InternalArangoDB<
private static final String PATH_API_GHARIAL = "/_api/gharial";
private static final String VERTEX = "vertex";

private static final String TRANSACTION_ID = "x-arango-trx-id";

private final G graph;
private final String name;

Expand All @@ -71,6 +73,7 @@ protected <T> Request insertVertexRequest(final T value, final VertexCreateOptio
final Request request = request(graph.db().name(), RequestType.POST, PATH_API_GHARIAL, graph.name(), VERTEX,
name);
final VertexCreateOptions params = (options != null ? options : new VertexCreateOptions());
request.putHeaderParam(TRANSACTION_ID, params.getStreamTransactionId());
request.putQueryParam(ArangoRequestParam.WAIT_FOR_SYNC, params.getWaitForSync());
request.setBody(util(Serializer.CUSTOM).serialize(value));
return request;
Expand All @@ -93,6 +96,7 @@ protected Request getVertexRequest(final String key, final GraphDocumentReadOpti
final Request request = request(graph.db().name(), RequestType.GET, PATH_API_GHARIAL, graph.name(), VERTEX,
DocumentUtil.createDocumentHandle(name, key));
final GraphDocumentReadOptions params = (options != null ? options : new GraphDocumentReadOptions());
request.putHeaderParam(TRANSACTION_ID, params.getStreamTransactionId());
request.putHeaderParam(ArangoRequestParam.IF_NONE_MATCH, params.getIfNoneMatch());
request.putHeaderParam(ArangoRequestParam.IF_MATCH, params.getIfMatch());
if (params.getAllowDirtyRead() == Boolean.TRUE) {
Expand All @@ -109,6 +113,7 @@ protected <T> Request replaceVertexRequest(final String key, final T value, fina
final Request request = request(graph.db().name(), RequestType.PUT, PATH_API_GHARIAL, graph.name(), VERTEX,
DocumentUtil.createDocumentHandle(name, key));
final VertexReplaceOptions params = (options != null ? options : new VertexReplaceOptions());
request.putHeaderParam(TRANSACTION_ID, params.getStreamTransactionId());
request.putQueryParam(ArangoRequestParam.WAIT_FOR_SYNC, params.getWaitForSync());
request.putHeaderParam(ArangoRequestParam.IF_MATCH, params.getIfMatch());
request.setBody(util(Serializer.CUSTOM).serialize(value));
Expand All @@ -131,6 +136,7 @@ protected <T> Request updateVertexRequest(final String key, final T value, final
request = request(graph.db().name(), RequestType.PATCH, PATH_API_GHARIAL, graph.name(), VERTEX,
DocumentUtil.createDocumentHandle(name, key));
final VertexUpdateOptions params = (options != null ? options : new VertexUpdateOptions());
request.putHeaderParam(TRANSACTION_ID, params.getStreamTransactionId());
request.putQueryParam(ArangoRequestParam.KEEP_NULL, params.getKeepNull());
request.putQueryParam(ArangoRequestParam.WAIT_FOR_SYNC, params.getWaitForSync());
request.putHeaderParam(ArangoRequestParam.IF_MATCH, params.getIfMatch());
Expand All @@ -154,6 +160,7 @@ protected Request deleteVertexRequest(final String key, final VertexDeleteOption
final Request request = request(graph.db().name(), RequestType.DELETE, PATH_API_GHARIAL, graph.name(), VERTEX,
DocumentUtil.createDocumentHandle(name, key));
final VertexDeleteOptions params = (options != null ? options : new VertexDeleteOptions());
request.putHeaderParam(TRANSACTION_ID, params.getStreamTransactionId());
request.putQueryParam(ArangoRequestParam.WAIT_FOR_SYNC, params.getWaitForSync());
request.putHeaderParam(ArangoRequestParam.IF_MATCH, params.getIfMatch());
return request;
Expand Down
Loading