diff --git a/src/connection/helpers.ts b/src/connection/helpers.ts index f89ae5dc..e5b598ca 100644 --- a/src/connection/helpers.ts +++ b/src/connection/helpers.ts @@ -86,6 +86,8 @@ export function connectToWeaviateCloud( grpcHost = `grpc-${url.hostname}`; } + const { authCredentials: auth, headers, ...rest } = options || {}; + return clientMaker({ connectionParams: { http: { @@ -99,8 +101,9 @@ export function connectToWeaviateCloud( port: 443, }, }, - auth: options?.authCredentials, - headers: addWeaviateEmbeddingServiceHeaders(clusterURL, options), + auth, + headers: addWeaviateEmbeddingServiceHeaders(clusterURL, auth, headers), + ...rest, }).catch((e) => { throw new WeaviateStartUpError(`Weaviate failed to startup with message: ${e.message}`); }); @@ -110,21 +113,22 @@ export function connectToLocal( clientMaker: (params: ClientParams) => Promise, options?: ConnectToLocalOptions ): Promise { + const { host, port, grpcPort, authCredentials: auth, ...rest } = options || {}; return clientMaker({ connectionParams: { http: { secure: false, - host: options?.host || 'localhost', - port: options?.port || 8080, + host: host || 'localhost', + port: port || 8080, }, grpc: { secure: false, - host: options?.host || 'localhost', - port: options?.grpcPort || 50051, + host: host || 'localhost', + port: grpcPort || 50051, }, }, - auth: options?.authCredentials, - headers: options?.headers, + auth, + ...rest, }).catch((e) => { throw new WeaviateStartUpError(`Weaviate failed to startup with message: ${e.message}`); }); @@ -134,37 +138,49 @@ export function connectToCustom( clientMaker: (params: ClientParams) => Promise, options?: ConnectToCustomOptions ): Promise { + const { + httpHost, + httpPath, + httpPort, + httpSecure, + grpcHost, + grpcPort, + grpcSecure, + authCredentials: auth, + ...rest + } = options || {}; return clientMaker({ connectionParams: { http: { - secure: options?.httpSecure || false, - host: options?.httpHost || 'localhost', - path: options?.httpPath || '', - port: options?.httpPort || 8080, + secure: httpSecure || false, + host: httpHost || 'localhost', + path: httpPath || '', + port: httpPort || 8080, }, grpc: { - secure: options?.grpcSecure || false, - host: options?.grpcHost || 'localhost', - port: options?.grpcPort || 50051, + secure: grpcSecure || false, + host: grpcHost || 'localhost', + port: grpcPort || 50051, }, }, - auth: options?.authCredentials, - headers: options?.headers, - proxies: options?.proxies, + auth, + ...rest, }).catch((e) => { throw new WeaviateStartUpError(`Weaviate failed to startup with message: ${e.message}`); }); } -function addWeaviateEmbeddingServiceHeaders(clusterURL: string, options?: ConnectToWeaviateCloudOptions) { - const creds = options?.authCredentials; - +function addWeaviateEmbeddingServiceHeaders( + clusterURL: string, + creds?: AuthCredentials, + headers?: Record +) { if (!isApiKey(creds)) { - return options?.headers; + return headers; } return { - ...options?.headers, + ...headers, 'X-Weaviate-Api-Key': mapApiKey(creds).apiKey, 'X-Weaviate-Cluster-Url': clusterURL, }; diff --git a/src/connection/unit.test.ts b/src/connection/unit.test.ts index a622a007..9d2919e0 100644 --- a/src/connection/unit.test.ts +++ b/src/connection/unit.test.ts @@ -1,3 +1,6 @@ +import express from 'express'; +import { Server as HttpServer } from 'http'; + import { testServer } from '../../test/server.js'; import { ApiKey, @@ -7,6 +10,23 @@ import { } from './auth.js'; import Connection from './index.js'; +import { createServer, Server as GrpcServer } from 'nice-grpc'; +import { + HealthCheckRequest, + HealthCheckResponse, + HealthCheckResponse_ServingStatus, + HealthDefinition, + HealthServiceImplementation, +} from '../proto/google/health/v1/health'; +import { TenantsGetReply } from '../proto/v1/tenants'; +import { WeaviateDefinition, WeaviateServiceImplementation } from '../proto/v1/weaviate'; + +import { WeaviateRequestTimeoutError } from '../errors.js'; +import weaviate, { Collection, WeaviateClient } from '../index'; +import { BatchObjectsReply } from '../proto/v1/batch.js'; +import { BatchDeleteReply } from '../proto/v1/batch_delete.js'; +import { SearchReply } from '../proto/v1/search_get.js'; + describe('mock server auth tests', () => { const server = testServer(); describe('OIDC auth flows', () => { @@ -197,3 +217,113 @@ describe('mock server auth tests', () => { return server.close(); }); }); + +const COLLECTION_NAME = 'TestCollectionTimeouts'; + +const makeRestApp = (version: string) => { + const httpApp = express(); + httpApp.get(`/v1/schema/${COLLECTION_NAME}`, (req, res) => + new Promise((r) => setTimeout(r, 2000)).then(() => res.send({ class: COLLECTION_NAME })) + ); + httpApp.get('/v1/meta', (req, res) => res.send({ version })); + return httpApp; +}; + +const makeGrpcApp = () => { + const weaviateMockImpl: WeaviateServiceImplementation = { + tenantsGet: (): Promise => + new Promise((r) => { + setTimeout(r, 2000); + }).then(() => { + return { + took: 5000, + tenants: [], + }; + }), + search: (): Promise => + new Promise((r) => { + setTimeout(r, 2000); + }).then(() => { + return { + results: [], + took: 5000, + groupByResults: [], + }; + }), + batchDelete: (): Promise => + new Promise((r) => { + setTimeout(r, 2000); + }).then(() => { + return { + took: 5000, + status: 'SUCCESS', + failed: 0, + matches: 0, + successful: 0, + objects: [], + }; + }), + batchObjects: (): Promise => + new Promise((r) => { + setTimeout(r, 2000); + }).then(() => { + return { + took: 5000, + errors: [], + }; + }), + }; + const healthMockImpl: HealthServiceImplementation = { + check: (request: HealthCheckRequest): Promise => + Promise.resolve(HealthCheckResponse.create({ status: HealthCheckResponse_ServingStatus.SERVING })), + watch: jest.fn(), + }; + + const grpcApp = createServer(); + grpcApp.add(WeaviateDefinition, weaviateMockImpl); + grpcApp.add(HealthDefinition, healthMockImpl); + + return grpcApp; +}; + +const makeMockServers = async (weaviateVersion: string, httpPort: number, grpcAddress: string) => { + const rest = makeRestApp(weaviateVersion); + const grpc = makeGrpcApp(); + const server = await rest.listen(httpPort); + await grpc.listen(grpcAddress); + return { rest: server, grpc, express }; +}; + +describe('Mock testing of timeout behaviour', () => { + let servers: { + rest: HttpServer; + grpc: GrpcServer; + }; + let client: WeaviateClient; + let collection: Collection; + + beforeAll(async () => { + servers = await makeMockServers('1.28.2', 8954, 'localhost:8955'); + client = await weaviate.connectToLocal({ port: 8954, grpcPort: 8955, timeout: { query: 1, insert: 1 } }); + collection = client.collections.get(COLLECTION_NAME); + }); + + it('should timeout when calling REST GET v1/schema', () => + expect(collection.config.get()).rejects.toThrow(WeaviateRequestTimeoutError)); + + it('should timeout when calling gRPC TenantsGet', () => + expect(collection.tenants.get()).rejects.toThrow(WeaviateRequestTimeoutError)); + + it('should timeout when calling gRPC Search', () => + expect(collection.query.fetchObjects()).rejects.toThrow(WeaviateRequestTimeoutError)); + + it('should timeout when calling gRPC BatchObjects', () => + expect(collection.data.insertMany([{ thing: 'what' }])).rejects.toThrow(WeaviateRequestTimeoutError)); + + it('should timeout when calling gRPC BatchDelete', () => + expect(collection.data.deleteMany(collection.filter.byId().equal('123' as any))).rejects.toThrow( + WeaviateRequestTimeoutError + )); + + afterAll(() => Promise.all([servers.rest.close(), servers.grpc.shutdown()])); +}); diff --git a/src/errors.ts b/src/errors.ts index ad1a21e9..023bf7e6 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -37,6 +37,15 @@ export class WeaviateDeleteManyError extends WeaviateError { } } +/** + * Is thrown if a gRPC tenants get to Weaviate fails in any way. + */ +export class WeaviateTenantsGetError extends WeaviateError { + constructor(message: string) { + super(`Tenants get failed with message: ${message}`); + } +} + /** * Is thrown if a gRPC batch query to Weaviate fails in any way. */ diff --git a/src/grpc/base.ts b/src/grpc/base.ts index 2a0c0e1b..17865f8c 100644 --- a/src/grpc/base.ts +++ b/src/grpc/base.ts @@ -1,9 +1,7 @@ -import { isAbortError } from 'abort-controller-x'; import { ConsistencyLevel } from '../data/index.js'; import { Metadata } from 'nice-grpc'; import { RetryOptions } from 'nice-grpc-client-middleware-retry'; -import { WeaviateRequestTimeoutError } from '../errors.js'; import { ConsistencyLevel as ConsistencyLevelGRPC } from '../proto/v1/base.js'; import { WeaviateClient } from '../proto/v1/weaviate.js'; @@ -47,13 +45,6 @@ export default class Base { protected sendWithTimeout = (send: (signal: AbortSignal) => Promise): Promise => { const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), this.timeout * 1000); - return send(controller.signal) - .catch((error) => { - if (isAbortError(error)) { - throw new WeaviateRequestTimeoutError(`timed out after ${this.timeout}ms`); - } - throw error; - }) - .finally(() => clearTimeout(timeoutId)); + return send(controller.signal).finally(() => clearTimeout(timeoutId)); }; } diff --git a/src/grpc/batcher.ts b/src/grpc/batcher.ts index 51179042..aba131fa 100644 --- a/src/grpc/batcher.ts +++ b/src/grpc/batcher.ts @@ -10,11 +10,13 @@ import { WeaviateBatchError, WeaviateDeleteManyError, WeaviateInsufficientPermissionsError, + WeaviateRequestTimeoutError, } from '../errors.js'; import { Filters } from '../proto/v1/base.js'; import { BatchDeleteReply, BatchDeleteRequest } from '../proto/v1/batch_delete.js'; import Base from './base.js'; +import { isAbortError } from 'abort-controller-x'; import { retryOptions } from './retry.js'; export interface Batch { @@ -65,6 +67,9 @@ export default class Batcher extends Base implements Batch { if (err instanceof ServerError && err.code === Status.PERMISSION_DENIED) { throw new WeaviateInsufficientPermissionsError(7, err.message); } + if (isAbortError(err)) { + throw new WeaviateRequestTimeoutError(`timed out after ${this.timeout}ms`); + } throw new WeaviateDeleteManyError(err.message); }); } @@ -87,6 +92,9 @@ export default class Batcher extends Base implements Batch { if (err instanceof ServerError && err.code === Status.PERMISSION_DENIED) { throw new WeaviateInsufficientPermissionsError(7, err.message); } + if (isAbortError(err)) { + throw new WeaviateRequestTimeoutError(`timed out after ${this.timeout}ms`); + } throw new WeaviateBatchError(err.message); }) ); diff --git a/src/grpc/searcher.ts b/src/grpc/searcher.ts index f3c1cf01..893d0c74 100644 --- a/src/grpc/searcher.ts +++ b/src/grpc/searcher.ts @@ -24,8 +24,13 @@ import { } from '../proto/v1/search_get.js'; import { WeaviateClient } from '../proto/v1/weaviate.js'; +import { isAbortError } from 'abort-controller-x'; import { RetryOptions } from 'nice-grpc-client-middleware-retry'; -import { WeaviateInsufficientPermissionsError, WeaviateQueryError } from '../errors.js'; +import { + WeaviateInsufficientPermissionsError, + WeaviateQueryError, + WeaviateRequestTimeoutError, +} from '../errors.js'; import { GenerativeSearch } from '../proto/v1/generative.js'; import Base from './base.js'; import { retryOptions } from './retry.js'; @@ -160,6 +165,9 @@ export default class Searcher extends Base implements Search { if (err instanceof ServerError && err.code === Status.PERMISSION_DENIED) { throw new WeaviateInsufficientPermissionsError(7, err.message); } + if (isAbortError(err)) { + throw new WeaviateRequestTimeoutError(`timed out after ${this.timeout}ms`); + } throw new WeaviateQueryError(err.message, 'gRPC'); }) ); diff --git a/src/grpc/tenantsManager.ts b/src/grpc/tenantsManager.ts index 2e21d6f4..fb703202 100644 --- a/src/grpc/tenantsManager.ts +++ b/src/grpc/tenantsManager.ts @@ -1,6 +1,11 @@ +import { isAbortError } from 'abort-controller-x'; import { Metadata, ServerError, Status } from 'nice-grpc'; import { RetryOptions } from 'nice-grpc-client-middleware-retry'; -import { WeaviateDeleteManyError, WeaviateInsufficientPermissionsError } from '../errors.js'; +import { + WeaviateInsufficientPermissionsError, + WeaviateRequestTimeoutError, + WeaviateTenantsGetError, +} from '../errors.js'; import { TenantsGetReply, TenantsGetRequest } from '../proto/v1/tenants.js'; import { WeaviateClient } from '../proto/v1/weaviate.js'; import Base from './base.js'; @@ -45,7 +50,10 @@ export default class TenantsManager extends Base implements Tenants { if (err instanceof ServerError && err.code === Status.PERMISSION_DENIED) { throw new WeaviateInsufficientPermissionsError(7, err.message); } - throw new WeaviateDeleteManyError(err.message); + if (isAbortError(err)) { + throw new WeaviateRequestTimeoutError(`timed out after ${this.timeout}ms`); + } + throw new WeaviateTenantsGetError(err.message); }) ); }