diff --git a/.circleci/config.yml b/.circleci/config.yml index 76f81c19..264b4e1f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -55,6 +55,9 @@ jobs: test-to-run: type: string default: "run-tests-single" + enable-extra-db-features: + type: boolean + default: false steps: - checkout - run: @@ -71,6 +74,7 @@ jobs: GOIMAGE: << pipeline.parameters.goImage >> ALPINE_IMAGE: << pipeline.parameters.alpineImage >> STARTER: << pipeline.parameters.starterImage >> + ENABLE_DATABASE_EXTRA_FEATURES: << parameters.enable-extra-db-features >> TEST_DISALLOW_UNKNOWN_FIELDS: false VERBOSE: 1 @@ -123,6 +127,12 @@ workflows: requires: - download-demo-data test-to-run: run-v2-tests-cluster + - run-integration-tests: + name: Test V2 cluster - DB extra features (compression) + requires: + - download-demo-data + test-to-run: run-v2-tests-cluster + enable-extra-db-features: true - run-integration-tests: name: Test V1 single diff --git a/Makefile b/Makefile index 7fcc38cd..23eda534 100644 --- a/Makefile +++ b/Makefile @@ -414,6 +414,7 @@ __test_go_test: -e TEST_ENABLE_SHUTDOWN=$(TEST_ENABLE_SHUTDOWN) \ -e TEST_REQUEST_LOG=$(TEST_REQUEST_LOG) \ -e TEST_DISALLOW_UNKNOWN_FIELDS=$(TEST_DISALLOW_UNKNOWN_FIELDS) \ + -e ENABLE_DATABASE_EXTRA_FEATURES=$(ENABLE_DATABASE_EXTRA_FEATURES) \ -e GODEBUG=tls13=1 \ -e CGO_ENABLED=$(CGO_ENABLED) \ -w /usr/code/ \ @@ -441,6 +442,7 @@ __test_v2_go_test: -e TEST_BACKUP_REMOTE_CONFIG='$(TEST_BACKUP_REMOTE_CONFIG)' \ -e TEST_DEBUG='$(TEST_DEBUG)' \ -e TEST_ENABLE_SHUTDOWN=$(TEST_ENABLE_SHUTDOWN) \ + -e ENABLE_DATABASE_EXTRA_FEATURES=$(ENABLE_DATABASE_EXTRA_FEATURES) \ -e GODEBUG=tls13=1 \ -e CGO_ENABLED=$(CGO_ENABLED) \ -w /usr/code/v2/ \ @@ -475,7 +477,9 @@ ifdef JWTSECRET echo "$JWTSECRET" > "${JWTSECRETFILE}" endif @-docker rm -f -v $(TESTCONTAINER) &> /dev/null - @TESTCONTAINER=$(TESTCONTAINER) ARANGODB=$(ARANGODB) ALPINE_IMAGE=$(ALPINE_IMAGE) ENABLE_BACKUP=$(ENABLE_BACKUP) ARANGO_LICENSE_KEY=$(ARANGO_LICENSE_KEY) STARTER=$(STARTER) STARTERMODE=$(TEST_MODE) TMPDIR="${TMPDIR}" DEBUG_PORT=$(DEBUG_PORT) $(CLUSTERENV) "${ROOTDIR}/test/cluster.sh" start + @TESTCONTAINER=$(TESTCONTAINER) ARANGODB=$(ARANGODB) ALPINE_IMAGE=$(ALPINE_IMAGE) ENABLE_BACKUP=$(ENABLE_BACKUP) \ + ARANGO_LICENSE_KEY=$(ARANGO_LICENSE_KEY) STARTER=$(STARTER) STARTERMODE=$(TEST_MODE) TMPDIR="${TMPDIR}" \ + ENABLE_DATABASE_EXTRA_FEATURES=$(ENABLE_DATABASE_EXTRA_FEATURES) DEBUG_PORT=$(DEBUG_PORT) $(CLUSTERENV) "${ROOTDIR}/test/cluster.sh" start endif __test_cleanup: diff --git a/test/cluster.sh b/test/cluster.sh index 3490bd9e..1c0fcd9e 100755 --- a/test/cluster.sh +++ b/test/cluster.sh @@ -48,8 +48,8 @@ if [ "$CMD" == "start" ]; then if [ -n "$ENABLE_BACKUP" ]; then STARTERARGS="$STARTERARGS --all.backup.api-enabled=true" fi - if [ -n "$ENABLE_DATABASE_EXTENDED_NAMES" ]; then - STARTERARGS="$STARTERARGS --all.database.extended-names-databases=true" + if [ -n "$ENABLE_DATABASE_EXTRA_FEATURES" ]; then + STARTERARGS="$STARTERARGS --all.database.extended-names-databases=true --args.all.http.compress-response-threshold=1 --args.all.http.handle-content-encoding-for-unauthenticated-requests=true" fi if [[ "$OSTYPE" == "darwin"* ]]; then DOCKERPLATFORMARG="--platform linux/x86_64" diff --git a/v2/CHANGELOG.md b/v2/CHANGELOG.md index 6b0fe6a7..d3357cca 100644 --- a/v2/CHANGELOG.md +++ b/v2/CHANGELOG.md @@ -19,6 +19,7 @@ - Backup API support - Admin Cluster API support - Set Licence API support +- Transparent compression of requests and responses (ArangoDBConfiguration.Compression) ## [2.0.3](https://github.com/arangodb/go-driver/tree/v2.0.3) (2023-10-31) diff --git a/v2/connection/connection.go b/v2/connection/connection.go index e5f3e6fe..06ea6ff2 100644 --- a/v2/connection/connection.go +++ b/v2/connection/connection.go @@ -23,8 +23,12 @@ package connection import ( "context" "io" + "net/http" ) +type EncodingCodec interface { +} + type Wrapper func(c Connection) Connection type Factory func() (Connection, error) @@ -40,8 +44,38 @@ type ArangoDBConfiguration struct { // DriverFlags configure additional flags for the `x-arango-driver` header DriverFlags []string + + // Compression is used to enable compression between client and server + Compression *CompressionConfig +} + +// CompressionConfig is used to enable compression for the connection +type CompressionConfig struct { + // CompressionConfig is used to enable compression for the requests + CompressionType CompressionType + + // ResponseCompressionEnabled is used to enable compression for the responses (requires server side adjustments) + ResponseCompressionEnabled bool + + // RequestCompressionEnabled is used to enable compression for the requests + RequestCompressionEnabled bool + + // RequestCompressionLevel - Sets the compression level between -1 and 9 + // Default: 0 (NoCompression). For Reference see: https://pkg.go.dev/compress/flate#pkg-constants + RequestCompressionLevel int } +type CompressionType string + +const ( + + // RequestCompressionTypeGzip is used to enable gzip compression + RequestCompressionTypeGzip CompressionType = "gzip" + + // RequestCompressionTypeDeflate is used to enable deflate compression + RequestCompressionTypeDeflate CompressionType = "deflate" +) + type Connection interface { // NewRequest initializes Request object NewRequest(method string, urls ...string) (Request, error) @@ -111,4 +145,6 @@ type Response interface { // Header gets the first value associated with the given key. // If there are no values associated with the key, Get returns "". Header(name string) string + + RawResponse() *http.Response } diff --git a/v2/connection/connection_compression.go b/v2/connection/connection_compression.go new file mode 100644 index 00000000..c5eb89b5 --- /dev/null +++ b/v2/connection/connection_compression.go @@ -0,0 +1,122 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package connection + +import ( + "compress/zlib" + "fmt" + "io" + + "github.com/arangodb/go-driver/v2/log" +) + +type Compression interface { + ApplyRequestHeaders(r Request) + ApplyRequestCompression(r *httpRequest, rootWriter io.Writer) (io.WriteCloser, error) +} + +func newCompression(config *CompressionConfig) Compression { + if config == nil { + return noCompression{} + } else if config.CompressionType == "gzip" { + return gzipCompression{config: config} + } else if config.CompressionType == "deflate" { + return deflateCompression{config: config} + } else { + log.Error(fmt.Errorf("unknown compression type: %s", config.CompressionType), "") + return noCompression{config: config} + } +} + +type gzipCompression struct { + config *CompressionConfig +} + +func (g gzipCompression) ApplyRequestHeaders(r Request) { + if g.config != nil && g.config.ResponseCompressionEnabled { + if g.config.CompressionType == "gzip" { + r.AddHeader("Accept-Encoding", "gzip") + } + } +} + +func (g gzipCompression) ApplyRequestCompression(r *httpRequest, rootWriter io.Writer) (io.WriteCloser, error) { + config := g.config + + if config != nil && config.RequestCompressionEnabled { + if config.CompressionType == "deflate" { + r.headers["Content-Encoding"] = "deflate" + + zlibWriter, err := zlib.NewWriterLevel(rootWriter, config.RequestCompressionLevel) + if err != nil { + log.Errorf(err, "error creating zlib writer") + return nil, err + } + + return zlibWriter, nil + } + } + + return nil, nil +} + +type deflateCompression struct { + config *CompressionConfig +} + +func (g deflateCompression) ApplyRequestHeaders(r Request) { + if g.config != nil && g.config.ResponseCompressionEnabled { + if g.config.CompressionType == "deflate" { + r.AddHeader("Accept-Encoding", "deflate") + } + } +} + +func (g deflateCompression) ApplyRequestCompression(r *httpRequest, rootWriter io.Writer) (io.WriteCloser, error) { + config := g.config + + if config != nil && config.RequestCompressionEnabled { + if config.CompressionType == "deflate" { + r.headers["Content-Encoding"] = "deflate" + + zlibWriter, err := zlib.NewWriterLevel(rootWriter, config.RequestCompressionLevel) + if err != nil { + log.Errorf(err, "error creating zlib writer") + return nil, err + } + + return zlibWriter, nil + } + } + + return nil, nil +} + +type noCompression struct { + config *CompressionConfig +} + +func (g noCompression) ApplyRequestHeaders(r Request) { +} + +func (g noCompression) ApplyRequestCompression(r *httpRequest, rootWriter io.Writer) (io.WriteCloser, error) { + return nil, nil +} diff --git a/v2/connection/connection_http_internal.go b/v2/connection/connection_http_internal.go index 357eaade..1c650ca7 100644 --- a/v2/connection/connection_http_internal.go +++ b/v2/connection/connection_http_internal.go @@ -22,6 +22,8 @@ package connection import ( "bytes" + "compress/gzip" + "compress/zlib" "context" "crypto/tls" "io" @@ -248,7 +250,7 @@ func (j *httpConnection) stream(ctx context.Context, req *httpRequest) (*httpRes ctx = context.Background() } - reader := j.bodyReadFunc(j.Decoder(j.contentType), req.body, j.streamSender) + reader := j.bodyReadFunc(j.Decoder(j.contentType), req, j.streamSender) r, err := req.asRequest(ctx, reader) if err != nil { return nil, nil, errors.WithStack(err) @@ -263,9 +265,19 @@ func (j *httpConnection) stream(ctx context.Context, req *httpRequest) (*httpRes log.Debugf("(%s) Response received: %d", id, resp.StatusCode) if b := resp.Body; b != nil { - var body = resp.Body + var resultBody io.ReadCloser + + respEncoding := resp.Header.Get("Content-Encoding") + switch respEncoding { + case "gzip": + resultBody, err = gzip.NewReader(resp.Body) + case "deflate": + resultBody, err = zlib.NewReader(resp.Body) + default: + resultBody = resp.Body + } - return &httpResponse{response: resp, request: req}, body, nil + return &httpResponse{response: resp, request: req}, resultBody, nil } @@ -289,8 +301,8 @@ func getDecoderByContentType(contentType string) Decoder { type bodyReadFactory func() (io.Reader, error) -func (j *httpConnection) bodyReadFunc(decoder Decoder, obj interface{}, stream bool) bodyReadFactory { - if obj == nil { +func (j *httpConnection) bodyReadFunc(decoder Decoder, req *httpRequest, stream bool) bodyReadFactory { + if req.body == nil { return func() (io.Reader, error) { return nil, nil } @@ -299,21 +311,64 @@ func (j *httpConnection) bodyReadFunc(decoder Decoder, obj interface{}, stream b if !stream { return func() (io.Reader, error) { b := bytes.NewBuffer([]byte{}) - if err := decoder.Encode(b, obj); err != nil { - log.Errorf(err, "error encoding body - OBJ: %v", obj) + compressedWriter, err := newCompression(j.config.Compression).ApplyRequestCompression(req, b) + if err != nil { + log.Errorf(err, "error applying compression") return nil, err } - return b, nil + if compressedWriter != nil { + defer func(compressedWriter io.WriteCloser) { + errCompression := compressedWriter.Close() + if errCompression != nil { + log.Error(errCompression, "error closing compressed writer") + if err == nil { + err = errCompression + } + } + }(compressedWriter) + + err = decoder.Encode(compressedWriter, req.body) + } else { + err = decoder.Encode(b, req.body) + } + + if err != nil { + log.Errorf(err, "error encoding body - OBJ: %v", req.body) + return nil, err + } + return b, err } } else { return func() (io.Reader, error) { reader, writer := io.Pipe() + + compressedWriter, err := newCompression(j.config.Compression).ApplyRequestCompression(req, writer) + if err != nil { + log.Errorf(err, "error applying compression") + return nil, err + } + go func() { defer writer.Close() - if err := decoder.Encode(writer, obj); err != nil { - log.Errorf(err, "error encoding body (stream) - OBJ: %v", obj) + var encErr error + if compressedWriter != nil { + defer func(compressedWriter io.WriteCloser) { + errCompression := compressedWriter.Close() + if errCompression != nil { + log.Errorf(errCompression, "error closing compressed writer - stream") + writer.CloseWithError(err) + } + }(compressedWriter) + + encErr = decoder.Encode(compressedWriter, req.body) + } else { + encErr = decoder.Encode(writer, req.body) + } + + if encErr != nil { + log.Errorf(err, "error encoding body stream - OBJ: %v", req.body) writer.CloseWithError(err) } }() diff --git a/v2/connection/connection_http_response.go b/v2/connection/connection_http_response.go index f54efc20..29990ad4 100644 --- a/v2/connection/connection_http_response.go +++ b/v2/connection/connection_http_response.go @@ -58,3 +58,7 @@ func (j *httpResponse) Header(name string) string { } return j.response.Header.Get(name) } + +func (j *httpResponse) RawResponse() *http.Response { + return j.response +} diff --git a/v2/connection/modifiers.go b/v2/connection/modifiers.go index 7e3ff8e5..4e419bee 100644 --- a/v2/connection/modifiers.go +++ b/v2/connection/modifiers.go @@ -104,6 +104,8 @@ func applyArangoDBConfiguration(config ArangoDBConfiguration, ctx context.Contex } } + newCompression(config.Compression).ApplyRequestHeaders(r) + return nil } } diff --git a/v2/tests/call_test.go b/v2/tests/call_test.go index 83511709..9586fe92 100644 --- a/v2/tests/call_test.go +++ b/v2/tests/call_test.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2021-2023 ArangoDB GmbH, Cologne, Germany +// Copyright 2021-2024 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,7 +21,9 @@ package tests import ( + "compress/gzip" "context" + "fmt" "io" "net/http" "testing" @@ -91,3 +93,100 @@ func Test_CallWithChecks(t *testing.T) { }) }) } + +func Test_Compression_Builtin(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + WithDatabase(t, client, nil, func(db arangodb.Database) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + query := "FOR i IN 1..10 RETURN i" + + testCases := []struct { + compression connection.CompressionType + level int + request bool + response bool + }{ + {connection.RequestCompressionTypeGzip, gzip.BestCompression, true, true}, + {connection.RequestCompressionTypeDeflate, gzip.BestCompression, true, true}, + {connection.RequestCompressionTypeGzip, gzip.DefaultCompression, true, false}, + {connection.RequestCompressionTypeGzip, gzip.BestCompression, true, false}, + {connection.RequestCompressionTypeDeflate, gzip.DefaultCompression, true, false}, + {connection.RequestCompressionTypeDeflate, gzip.BestCompression, true, false}, + } + + for _, tc := range testCases { + config := client.Connection().GetConfiguration() + config.Compression = &connection.CompressionConfig{ + CompressionType: tc.compression, + RequestCompressionEnabled: tc.request, + RequestCompressionLevel: tc.level, + ResponseCompressionEnabled: tc.response, + } + client.Connection().SetConfiguration(config) + + t.Run(fmt.Sprintf("compression: %s, %d", tc.compression, tc.level), func(t *testing.T) { + var result []int + q, err := db.QueryBatch(ctx, query, nil, &result) + require.NoError(t, err) + + require.Len(t, result, 10) + require.False(t, q.HasMoreBatches()) + + require.NoError(t, q.Close()) + }) + } + }) + }) + }) +} + +func Test_Compression_Raw(t *testing.T) { + requireExtraDBFeatures(t) + + Wrap(t, func(t *testing.T, client arangodb.Client) { + WithDatabase(t, client, nil, func(db arangodb.Database) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + testCases := []struct { + compression connection.CompressionType + level int + request bool + response bool + }{ + {connection.RequestCompressionTypeGzip, gzip.BestCompression, true, true}, + {connection.RequestCompressionTypeDeflate, gzip.BestCompression, true, true}, + } + + for _, tc := range testCases { + config := client.Connection().GetConfiguration() + config.Compression = &connection.CompressionConfig{ + CompressionType: tc.compression, + RequestCompressionEnabled: tc.request, + RequestCompressionLevel: tc.level, + ResponseCompressionEnabled: tc.response, + } + client.Connection().SetConfiguration(config) + + t.Run(fmt.Sprintf("compression raw: %s, %d", tc.compression, tc.level), func(t *testing.T) { + var request = struct { + Query string `json:"query"` + }{ + Query: "FOR i IN 1..10 RETURN i", + } + + var result = struct { + shared.ResponseStruct `json:",inline"` + Result []int `json:"result"` + }{} + + resp, err := client.Post(ctx, &result, request, "_api", "cursor") + require.NoError(t, err) + require.Equal(t, http.StatusCreated, resp.Code()) + // This header is available only if the response is compressed and server supports it + require.Contains(t, resp.RawResponse().Header.Get("Content-Encoding"), tc.compression) + require.Len(t, result.Result, 10) + }) + } + }) + }) + }) +} diff --git a/v2/tests/run_wrap_test.go b/v2/tests/run_wrap_test.go index 5b4e1148..1db47992 100644 --- a/v2/tests/run_wrap_test.go +++ b/v2/tests/run_wrap_test.go @@ -26,6 +26,7 @@ import ( "math/rand" "net" "net/http" + "os" "testing" "time" @@ -94,6 +95,7 @@ func WrapConnectionFactory(t *testing.T, w WrapperConnectionFactory, wo ...WrapO } waitForConnection(t, arangodb.NewClient(conn)) + applyCompression(conn) return conn }) }) @@ -110,6 +112,7 @@ func WrapConnectionFactory(t *testing.T, w WrapperConnectionFactory, wo ...WrapO } waitForConnection(t, arangodb.NewClient(conn)) + applyCompression(conn) return conn }) }) @@ -129,6 +132,7 @@ func WrapConnectionFactory(t *testing.T, w WrapperConnectionFactory, wo ...WrapO } waitForConnection(t, arangodb.NewClient(conn)) + applyCompression(conn) return conn }) }) @@ -148,11 +152,25 @@ func WrapConnectionFactory(t *testing.T, w WrapperConnectionFactory, wo ...WrapO } waitForConnection(t, arangodb.NewClient(conn)) + applyCompression(conn) return conn }) }) } +func applyCompression(conn connection.Connection) { + if os.Getenv("ENABLE_DATABASE_EXTRA_FEATURES") == "true" { + cmp := conn.GetConfiguration() + cmp.Compression = &connection.CompressionConfig{ + CompressionType: connection.RequestCompressionTypeDeflate, + RequestCompressionLevel: 9, + ResponseCompressionEnabled: true, + RequestCompressionEnabled: true, + } + conn.SetConfiguration(cmp) + } +} + func WrapConnection(t *testing.T, w WrapperConnection, wo ...WrapOptions) { WrapConnectionFactory(t, func(t *testing.T, connFactory ConnectionFactory) { w(t, connFactory(t)) diff --git a/v2/tests/util_test.go b/v2/tests/util_test.go index eb9b4d5d..9fdba6be 100644 --- a/v2/tests/util_test.go +++ b/v2/tests/util_test.go @@ -71,6 +71,12 @@ func skipResilientSingleMode(t testing.TB) { requireMode(t, testModeCluster, testModeSingle) } +func requireExtraDBFeatures(t testing.TB) { + if os.Getenv("ENABLE_DATABASE_EXTRA_FEATURES") != "true" { + t.Skip("Skipping test, extra database features are not enabled") + } +} + func skipNoEnterprise(c arangodb.Client, ctx context.Context, t testing.TB) { version, err := c.Version(ctx) require.NoError(t, err)