diff --git a/.eslintrc b/.eslintrc index 02b99b4..de412fb 100644 --- a/.eslintrc +++ b/.eslintrc @@ -51,6 +51,12 @@ "rules": { "unicorn/no-array-for-each": 0 } + }, + { + "files": ["src/errors/errors.ts"], + "rules": { + "@typescript-eslint/naming-convention": 0 + } } ], "settings": { diff --git a/package-lock.json b/package-lock.json index a7842fe..96d1864 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18,10 +18,13 @@ "dot-prop": "^8.0.2", "luxon": "^3.4.0", "matcher": "^5.0.0", + "modern-errors": "^6.0.0", + "modern-errors-bugs": "^4.0.0", "p-map": "^6.0.0", "p-timeout": "^6.1.2", "uuid": "^9.0.0", "winston": "^3.10.0", + "winston-error-format": "^2.0.0", "yargs": "^17.7.2" }, "devDependencies": { @@ -2137,6 +2140,25 @@ "node": ">=10.13.0" } }, + "node_modules/error-class-utils": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/error-class-utils/-/error-class-utils-3.0.0.tgz", + "integrity": "sha512-L26cyYkaV6nzbUbmDRNSXAZfcuQy4cvEDvD+WoRF6c6nIEEydfgn7grd+idf2xLVYaTHnn7yYQjaz+Dnx+N1lQ==", + "engines": { + "node": ">=16.17.0" + } + }, + "node_modules/error-custom-class": { + "version": "9.0.0", + "resolved": "https://registry.npmjs.org/error-custom-class/-/error-custom-class-9.0.0.tgz", + "integrity": "sha512-cfXOxbwRQpXLUSecZctO/GPtKm9auTd2v1eY4CsclMgRkse/h5w59V1u1p7LdStVnw/SCbROcsd5zLenauvlRw==", + "dependencies": { + "error-class-utils": "^3.0.0" + }, + "engines": { + "node": ">=16.17.0" + } + }, "node_modules/error-ex": { "version": "1.3.2", "resolved": "https://registry.npmjs.org/error-ex/-/error-ex-1.3.2.tgz", @@ -2146,6 +2168,21 @@ "is-arrayish": "^0.2.1" } }, + "node_modules/error-serializer": { + "version": "6.0.1", + "resolved": "https://registry.npmjs.org/error-serializer/-/error-serializer-6.0.1.tgz", + "integrity": "sha512-SDEXcpWyys6yd6zLcC+s5bGnfe+xWxBJoC7p+o72c5F+hDdgdWc8LB8EOvcdqs7U+rzInYldFpiqSwmC3VZUeg==", + "dependencies": { + "is-error-instance": "^2.0.0", + "is-plain-obj": "^4.1.0", + "normalize-exception": "^3.0.0", + "safe-json-value": "^2.0.1", + "set-error-class": "^2.0.0" + }, + "engines": { + "node": ">=16.17.0" + } + }, "node_modules/es-abstract": { "version": "1.22.1", "resolved": "https://registry.npmjs.org/es-abstract/-/es-abstract-1.22.1.tgz", @@ -3094,6 +3131,17 @@ "node": ">=8" } }, + "node_modules/filter-obj": { + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/filter-obj/-/filter-obj-5.1.0.tgz", + "integrity": "sha512-qWeTREPoT7I0bifpPUXtxkZJ1XJzxWtfoWWkdVGqa+eCr3SHW/Ocp89o8vLvbUuQnadybJpjOKu4V+RwO6sGng==", + "engines": { + "node": ">=14.16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/find-replace": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/find-replace/-/find-replace-3.0.0.tgz", @@ -3742,6 +3790,14 @@ "integrity": "sha512-IOQqts/aHWbiisY5DuPJQ0gcbvaLFCa7fBa9xoLfxBZvQ+ZI/Zh9xoI7Gk+G64N0FdK4AbibytHht2tWgpJWLg==", "dev": true }, + "node_modules/is-error-instance": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/is-error-instance/-/is-error-instance-2.0.0.tgz", + "integrity": "sha512-5RuM+oFY0P5MRa1nXJo6IcTx9m2VyXYhRtb4h0olsi2GHci4bqZ6akHk+GmCYvDrAR9yInbiYdr2pnoqiOMw/Q==", + "engines": { + "node": ">=16.17.0" + } + }, "node_modules/is-extglob": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/is-extglob/-/is-extglob-2.1.1.tgz", @@ -3834,6 +3890,17 @@ "node": ">=8" } }, + "node_modules/is-plain-obj": { + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/is-plain-obj/-/is-plain-obj-4.1.0.tgz", + "integrity": "sha512-+Pgi+vMuUNkJyExiMBt5IlFoMyKnr5zhJ4Uspz58WOhBF5QoIZkFyNHIbBAtHwzVAgk5RtndVNsDRN61/mmDqg==", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/is-plain-object": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/is-plain-object/-/is-plain-object-5.0.0.tgz", @@ -4246,6 +4313,20 @@ "url": "https://github.com/sindresorhus/mem?sponsor=1" } }, + "node_modules/merge-error-cause": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/merge-error-cause/-/merge-error-cause-4.0.1.tgz", + "integrity": "sha512-fTPQshSNjhq6BGvoe5F6xezzcWTn98rog8Ra0gJ0jqgwZXizPNRyg/pjhWX5+pXYanecSPUXa17uEM/RwZfKXw==", + "dependencies": { + "normalize-exception": "^3.0.0", + "set-error-class": "^2.0.0", + "set-error-props": "^5.0.0", + "wrap-error-message": "^2.0.1" + }, + "engines": { + "node": ">=16.17.0" + } + }, "node_modules/merge-stream": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/merge-stream/-/merge-stream-2.0.0.tgz", @@ -4322,6 +4403,36 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/modern-errors": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/modern-errors/-/modern-errors-6.0.0.tgz", + "integrity": "sha512-IgtbY9ITQfbtZUdoIiqOwReV+Z2iL82OtwWTNV9cusKT/SvNivIAXKyGjEGcoCpLc+32UZp0DuqXViIDAG44Zg==", + "dependencies": { + "error-class-utils": "^3.0.0", + "error-custom-class": "^9.0.0", + "filter-obj": "^5.1.0", + "is-plain-obj": "^4.1.0", + "merge-error-cause": "^4.0.1", + "normalize-exception": "^3.0.0", + "set-error-message": "^2.0.1", + "set-error-props": "^5.0.0", + "set-error-stack": "^2.0.0" + }, + "engines": { + "node": ">=16.17.0" + } + }, + "node_modules/modern-errors-bugs": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/modern-errors-bugs/-/modern-errors-bugs-4.0.0.tgz", + "integrity": "sha512-kFKGiT1JtXreUOs6lQTTElIx1g/ZRZOsnI+vfZWCC7GRCgZQysmhXwN+7ypKqM/j3LNJbbSub2NI71ZZAUlkjw==", + "engines": { + "node": ">=16.17.0" + }, + "peerDependencies": { + "modern-errors": "^6.0.0" + } + }, "node_modules/ms": { "version": "2.1.3", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", @@ -4348,6 +4459,18 @@ "node": ">=12.19" } }, + "node_modules/normalize-exception": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/normalize-exception/-/normalize-exception-3.0.0.tgz", + "integrity": "sha512-SMZtWSLjls45KBgwvS2jWyXLtOI9j90JyQ6tJstl91Gti4W7QwZyF/nWwlFRz/Cx4Gy70DAtLT0EzXYXcPJJUw==", + "dependencies": { + "is-error-instance": "^2.0.0", + "is-plain-obj": "^4.1.0" + }, + "engines": { + "node": ">=16.17.0" + } + }, "node_modules/normalize-package-data": { "version": "2.5.0", "resolved": "https://registry.npmjs.org/normalize-package-data/-/normalize-package-data-2.5.0.tgz", @@ -5150,6 +5273,17 @@ "node": ">=8.10.0" } }, + "node_modules/redefine-property": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/redefine-property/-/redefine-property-2.0.0.tgz", + "integrity": "sha512-7UfHFiHkePd9mb/vYMPYuAPjAa/77xGQ1S6laaWNQkz5gVJAtYpoWYQ5iFL/ZcDxXZVqnD7N4aFFnIn4T36Sbw==", + "dependencies": { + "is-plain-obj": "^4.1.0" + }, + "engines": { + "node": ">=16.17.0" + } + }, "node_modules/reduce-flatten": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/reduce-flatten/-/reduce-flatten-2.0.0.tgz", @@ -5471,6 +5605,18 @@ } ] }, + "node_modules/safe-json-value": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/safe-json-value/-/safe-json-value-2.0.1.tgz", + "integrity": "sha512-vvoBxKVyksxwqzNDoD2vLVkcvbjYBFXS/CghUrFDsrP0wgTaw+/gIyOADNYa1vyPmICLUWH7RNh0FtwmFsEQCQ==", + "dependencies": { + "is-plain-obj": "^4.1.0", + "normalize-exception": "^3.0.0" + }, + "engines": { + "node": ">=16.17.0" + } + }, "node_modules/safe-regex-test": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/safe-regex-test/-/safe-regex-test-1.0.0.tgz", @@ -5523,6 +5669,52 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/set-error-class": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/set-error-class/-/set-error-class-2.0.0.tgz", + "integrity": "sha512-ZBXDmoj+bWd+vJbA8VZE/aVQ6NL5iu2AVMtUyVIVXVMEi4oozCGPZAPjaJJZ4k8koLYb0OAFcyIRb0T6XiCuXg==", + "dependencies": { + "normalize-exception": "^3.0.0" + }, + "engines": { + "node": ">=16.17.0" + } + }, + "node_modules/set-error-message": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/set-error-message/-/set-error-message-2.0.1.tgz", + "integrity": "sha512-s/eeP0f4ed1S3fl0KbxZoy5Pbeg5D6Nbple9nut4VPwHTvEIk5r7vKq0FwjNjszdUPdlTrs4GJCOkWUqWeTeWg==", + "dependencies": { + "normalize-exception": "^3.0.0" + }, + "engines": { + "node": ">=16.17.0" + } + }, + "node_modules/set-error-props": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/set-error-props/-/set-error-props-5.0.0.tgz", + "integrity": "sha512-AKeNtJ7f9HUzB9Vw9KWiKKe6NR5b8hJoVVnXGN+ZkEj0jTfM0ggL+I2O/14zfJn9lgUqGgMgyjjRhldp7eTpeA==", + "dependencies": { + "is-error-instance": "^2.0.0", + "is-plain-obj": "^4.1.0", + "redefine-property": "^2.0.0" + }, + "engines": { + "node": ">=16.17.0" + } + }, + "node_modules/set-error-stack": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/set-error-stack/-/set-error-stack-2.0.0.tgz", + "integrity": "sha512-mABWr7mmaY1EVBMXWo32t6byRkKclJ3gipglE2+XGBZxDEk0+zVumRfWyAK3s/EB/TbbUm1Gp0H8VvqlFkMa+g==", + "dependencies": { + "normalize-exception": "^3.0.0" + }, + "engines": { + "node": ">=16.17.0" + } + }, "node_modules/shebang-command": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/shebang-command/-/shebang-command-2.0.0.tgz", @@ -6335,6 +6527,26 @@ "node": ">= 12.0.0" } }, + "node_modules/winston-error-format": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/winston-error-format/-/winston-error-format-2.0.0.tgz", + "integrity": "sha512-1/x1VfKXef/6wFFeMsrdbjz44lJGHJHzy/VQe89oDE4CikRQ5Y31ewNt4u9g4+lpEbacPOtNAF2MPS0vxc+ZRQ==", + "dependencies": { + "error-serializer": "^6.0.1", + "is-error-instance": "^2.0.0", + "is-plain-obj": "^4.1.0", + "logform": "^2.5.1", + "normalize-exception": "^3.0.0", + "safe-json-value": "^2.0.1", + "triple-beam": "^1.3.0" + }, + "engines": { + "node": ">=16.17.0" + }, + "peerDependencies": { + "winston": "^3.8.2" + } + }, "node_modules/winston-transport": { "version": "4.5.0", "resolved": "https://registry.npmjs.org/winston-transport/-/winston-transport-4.5.0.tgz", @@ -6395,6 +6607,18 @@ "url": "https://github.com/chalk/wrap-ansi?sponsor=1" } }, + "node_modules/wrap-error-message": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/wrap-error-message/-/wrap-error-message-2.0.1.tgz", + "integrity": "sha512-LrBMsWJ85HKjLs5ABjhZeW7mWpwsAoV16iqmhEXUf4Y2GvdLwrqK4FPGNNoAi7a20wy4wHU2ci61wQfcOgz/Kw==", + "dependencies": { + "normalize-exception": "^3.0.0", + "set-error-message": "^2.0.1" + }, + "engines": { + "node": ">=16.17.0" + } + }, "node_modules/wrappy": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", diff --git a/package.json b/package.json index db65e8f..eba236e 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,14 @@ ], "type": "module", "exports": { - "./serve": "./dist/serve/serve.js" + "./arrow": "./dist/arrow/arrow.js", + "./plugin/plugin": "./dist/plugin/plugin.js", + "./plugin/serve": "./dist/plugin/serve.js", + "./scheduler": "./dist/scheduler/scheduler.js", + "./schema/table": "./dist/schema/table.js", + "./schema/column": "./dist/schema/column.js", + "./schema/resolvers": "./dist/schema/resolvers.js", + "./types/*": "./dist/types/*.js" }, "scripts": { "dev": "ts-node --esm src/main.ts serve", @@ -88,10 +95,13 @@ "dot-prop": "^8.0.2", "luxon": "^3.4.0", "matcher": "^5.0.0", + "modern-errors": "^6.0.0", + "modern-errors-bugs": "^4.0.0", "p-map": "^6.0.0", "p-timeout": "^6.1.2", "uuid": "^9.0.0", "winston": "^3.10.0", + "winston-error-format": "^2.0.0", "yargs": "^17.7.2" } } diff --git a/src/arrow/arrow.ts b/src/arrow/arrow.ts new file mode 100644 index 0000000..92d1629 --- /dev/null +++ b/src/arrow/arrow.ts @@ -0,0 +1 @@ +export * from '@apache-arrow/esnext-esm'; diff --git a/src/errors/errors.ts b/src/errors/errors.ts new file mode 100644 index 0000000..c3bf50f --- /dev/null +++ b/src/errors/errors.ts @@ -0,0 +1,69 @@ +import ModernError from 'modern-errors'; +import modernErrorsBugs from 'modern-errors-bugs'; + +import type { Column } from '../schema/column.js'; +import type { ClientMeta } from '../schema/meta.js'; +import type { ResourceType } from '../schema/resource.js'; +import type { Table } from '../schema/table.js'; + +export const BaseError = ModernError.subclass('BaseError', { + plugins: [modernErrorsBugs], +}); + +export const UnknownError = BaseError.subclass('UnknownError', { + bugs: 'https://github.com/cloudquery/plugin-sdk-javascript/issues', +}); + +export const ValidationError = BaseError.subclass('ValidationError', { props: { spec: '' } }); +export const TableError = BaseError.subclass('TableError'); +export const WriteError = BaseError.subclass('WriteError', { props: { message: '' } }); +export const UnimplementedError = BaseError.subclass('UnimplementedError'); +export const InitializationError = BaseError.subclass('InitializationError'); +export const FormatError = BaseError.subclass('FormatError', { props: { value: undefined as unknown } }); +export const ResourceError = BaseError.subclass('ResourceError', { + props: { resource: undefined as unknown as ResourceType }, +}); +export const ResolverError = BaseError.subclass('ResolverError', { + props: { + column: undefined as unknown as Column, + resource: undefined as unknown as ResourceType, + }, +}); + +export const SyncError = BaseError.subclass('SyncError'); +export const SyncValidationError = SyncError.subclass('SyncValidationError'); +export const SyncColumnResolveError = SyncError.subclass('SyncColumnResolveError', { + props: { + column: undefined as unknown as Column, + table: undefined as unknown as Table, + resource: undefined as unknown as ResourceType, + clientMeta: undefined as unknown as ClientMeta, + }, +}); +export const SyncPreResolveError = SyncError.subclass('SyncPreResolveError', { + props: { + table: undefined as unknown as Table, + resource: undefined as unknown as ResourceType, + clientMeta: undefined as unknown as ClientMeta, + }, +}); +export const SyncPostResolveError = SyncError.subclass('SyncPostResolveError', { + props: { + table: undefined as unknown as Table, + resource: undefined as unknown as ResourceType, + clientMeta: undefined as unknown as ClientMeta, + }, +}); +export const SyncTableResolveError = SyncError.subclass('SyncTableResolveError', { + props: { + table: undefined as unknown as Table, + }, +}); + +export const TransformError = BaseError.subclass('TransformError', { props: { value: undefined as unknown } }); + +export const SyncResourceEncodeError = BaseError.subclass('SyncResourceEncodeError', { + props: { + resource: undefined as unknown as ResourceType, + }, +}); diff --git a/src/grpc/server.ts b/src/grpc/server.ts index 50049a7..6e0f9cd 100644 --- a/src/grpc/server.ts +++ b/src/grpc/server.ts @@ -18,7 +18,7 @@ export const startServer = (logger: winston.Logger, address: string, plugin: Plu server.bindAsync(address, grpc.ServerCredentials.createInsecure(), (error, port) => { if (error) { - logger.error(error); + logger.error('failed to start server', error); return; } server.start(); diff --git a/src/logger/logger.ts b/src/logger/logger.ts index 66a3445..d73fbd0 100644 --- a/src/logger/logger.ts +++ b/src/logger/logger.ts @@ -1,4 +1,5 @@ -import { createLogger as createWinstonLogger, format as winstonFormat, transports } from 'winston'; +import { createLogger as createWinstonLogger, format, transports } from 'winston'; +import { fullFormat, shortFormat } from 'winston-error-format'; export enum LogLevel { trace = 'trace', @@ -13,12 +14,20 @@ export enum LogFormat { text = 'text', } -export const createLogger = (level: LogLevel, format: LogFormat) => { +export const createLogger = (level: LogLevel, logFormat: LogFormat) => { // Winston doesn't have a TRACE level, so we need to normalize it to DEBUG. const normalizedLevel = level === LogLevel.trace ? LogLevel.debug : level; + + const consoleFormat = format.printf(({ level, message, timestamp }) => { + return `[${timestamp}] ${level} ${message}`; + }); + const logger = createWinstonLogger({ level: normalizedLevel, - format: format == LogFormat.json ? winstonFormat.json() : winstonFormat.simple(), + format: + logFormat == LogFormat.json + ? format.combine(fullFormat(), format.timestamp(), format.json()) + : format.combine(shortFormat(), format.timestamp(), format.colorize(), consoleFormat), transports: [new transports.Console()], }); diff --git a/src/memdb/memdb.ts b/src/memdb/memdb.ts index f169b25..4ce3fac 100644 --- a/src/memdb/memdb.ts +++ b/src/memdb/memdb.ts @@ -1,5 +1,6 @@ import { default as Ajv } from 'ajv'; +import { ValidationError } from '../errors/errors.js'; import type { Plugin, SyncOptions, TableOptions, NewClientFunction } from '../plugin/plugin.js'; import { newPlugin } from '../plugin/plugin.js'; import { sync } from '../scheduler/scheduler.js'; @@ -84,7 +85,7 @@ export const newMemDBPlugin = (): Plugin => { const validSchema = validate(parsedSpec); if (!validSchema) { const messages = validate.errors?.map((error) => error.message).join(', '); - return Promise.reject(new Error(`Invalid spec: ${messages}`)); + return Promise.reject(new ValidationError(`Invalid spec: ${messages}`, { props: { spec } })); } const { concurrency = 10_000 } = parsedSpec; pluginClient.spec = { concurrency }; diff --git a/src/memdb/write.ts b/src/memdb/write.ts index e9c3330..e363b3a 100644 --- a/src/memdb/write.ts +++ b/src/memdb/write.ts @@ -1,3 +1,4 @@ +import { WriteError } from '../errors/errors.js'; import type { WriteStream, WriteRequest } from '../grpc/plugin.js'; import type { Table } from '../schema/table.js'; import { decodeTable, decodeRecord, getPrimaryKeys } from '../schema/table.js'; @@ -48,7 +49,7 @@ export const createWrite = ( } default: { - throw new Error(`Unknown request message type: ${request.message}`); + throw new WriteError(`Unknown request message type`, { props: { message: request.message } }); } } } diff --git a/src/plugin/plugin.ts b/src/plugin/plugin.ts index 0b1dc6f..3adc424 100644 --- a/src/plugin/plugin.ts +++ b/src/plugin/plugin.ts @@ -1,5 +1,6 @@ import type { Logger } from 'winston'; +import { UnimplementedError, InitializationError } from '../errors/errors.js'; import type { SyncStream, ReadStream, WriteStream } from '../grpc/plugin.js'; import type { Table } from '../schema/table.js'; @@ -53,15 +54,15 @@ export interface Plugin extends Client { export const newUnimplementedSource = (): SourceClient => { return { - tables: () => Promise.reject(new Error('unimplemented')), - sync: () => Promise.reject(new Error('unimplemented')), + tables: () => Promise.reject(new UnimplementedError('unimplemented', { props: { method: 'tables' } })), + sync: () => Promise.reject(new UnimplementedError('unimplemented', { props: { method: 'sync' } })), }; }; export const newUnimplementedDestination = (): DestinationClient => { return { - read: () => Promise.reject(new Error('unimplemented')), - write: () => Promise.reject(new Error('unimplemented')), + read: () => Promise.reject(new UnimplementedError('unimplemented', { props: { method: 'read' } })), + write: () => Promise.reject(new UnimplementedError('unimplemented', { props: { method: 'write' } })), }; }; @@ -72,10 +73,10 @@ export const newPlugin = (name: string, version: string, newClient: NewClientFun name: () => name, version: () => version, write: (stream: WriteStream) => { - return plugin.client?.write(stream) ?? Promise.reject(new Error('client not initialized')); + return plugin.client?.write(stream) ?? Promise.reject(new InitializationError('client not initialized')); }, read: (stream: ReadStream) => { - return plugin.client?.read(stream) ?? new Error('client not initialized'); + return plugin.client?.read(stream) ?? new InitializationError('client not initialized'); }, getLogger: () => { return plugin.logger!; @@ -84,15 +85,15 @@ export const newPlugin = (name: string, version: string, newClient: NewClientFun plugin.logger = logger; }, sync: (options: SyncOptions) => { - return plugin.client?.sync(options) ?? new Error('client not initialized'); + return plugin.client?.sync(options) ?? new InitializationError('client not initialized'); }, tables: (options: TableOptions) => { - return plugin.client?.tables(options) ?? Promise.reject(new Error('client not initialized')); + return plugin.client?.tables(options) ?? Promise.reject(new InitializationError('client not initialized')); }, init: async (spec: string, options: NewClientOptions) => { plugin.client = await newClient(plugin.logger!, spec, options); }, - close: () => plugin.client?.close() ?? Promise.reject(new Error('client not initialized')), + close: () => plugin.client?.close() ?? Promise.reject(new InitializationError('client not initialized')), }; return plugin; diff --git a/src/scalar/bool.test.ts b/src/scalar/bool.test.ts index bc7fa21..487dc6f 100644 --- a/src/scalar/bool.test.ts +++ b/src/scalar/bool.test.ts @@ -33,5 +33,5 @@ import { Bool } from './bool.js'; }); test('should throw when unable to set value', (t) => { - t.throws(() => new Bool({ value: {} }), { message: "Unable to set '[object Object]' as Bool" }); + t.throws(() => new Bool({ value: {} }), { message: 'Unable to set Bool from value' }); }); diff --git a/src/scalar/bool.ts b/src/scalar/bool.ts index 8a70e32..f45ecda 100644 --- a/src/scalar/bool.ts +++ b/src/scalar/bool.ts @@ -1,12 +1,15 @@ import { Bool as ArrowBool } from '@apache-arrow/esnext-esm'; import { boolean, isBooleanable } from 'boolean'; +import { FormatError } from '../errors/errors.js'; +import type { Nullable } from '../schema/types.js'; + import type { Scalar } from './scalar.js'; import { isInvalid, NULL_VALUE } from './util.js'; -export class Bool implements Scalar { +export class Bool implements Scalar> { private _valid = false; - private _value = false; + private _value: Nullable = null; public constructor(v?: unknown) { this.value = v; @@ -21,7 +24,10 @@ export class Bool implements Scalar { return this._valid; } - public get value(): boolean { + public get value(): Nullable { + if (!this._valid) { + return null; + } return this._value; } @@ -43,12 +49,12 @@ export class Bool implements Scalar { return; } - throw new Error(`Unable to set '${value}' as Bool`); + throw new FormatError(`Unable to set Bool from value`, { props: { value } }); } public toString() { if (this._valid) { - return String(this._value); + return String(this._value!); } return NULL_VALUE; diff --git a/src/scalar/date.ts b/src/scalar/date.ts index da388c1..cd0310c 100644 --- a/src/scalar/date.ts +++ b/src/scalar/date.ts @@ -2,12 +2,15 @@ import type { DataType, DateUnit } from '@apache-arrow/esnext-esm'; import { Date_ as ArrowDate } from '@apache-arrow/esnext-esm'; import { DateTime } from 'luxon'; +import { FormatError } from '../errors/errors.js'; +import type { Nullable } from '../schema/types.js'; + import type { Scalar } from './scalar.js'; import { isInvalid, NULL_VALUE } from './util.js'; -export class Date implements Scalar { +export class Date implements Scalar> { private _valid = false; - private _value: DateTime = DateTime.fromMillis(0); + private _value: Nullable = null; private _unit: DateUnit; public constructor(unit: DateUnit, v?: unknown) { @@ -24,7 +27,10 @@ export class Date implements Scalar { return this._valid; } - public get value(): DateTime { + public get value(): Nullable { + if (!this._valid) { + return null; + } return this._value; } @@ -50,18 +56,26 @@ export class Date implements Scalar { } } + if (value instanceof DateTime) { + dateValue = value; + } + + if (value instanceof globalThis.Date) { + dateValue = DateTime.fromJSDate(value, { zone: 'utc' }); + } + if (dateValue && dateValue.isValid) { - this._value = dateValue; + this._value = dateValue.toJSDate(); this._valid = true; return; } - throw new Error(`Unable to set '${value}' as Date`); + throw new FormatError(`Unable to set Date from value`, { props: { value } }); } public toString(): string { if (this._valid) { - return this._value.toISO()!; + return this._value!.toISOString(); } return NULL_VALUE; diff --git a/src/scalar/float32.ts b/src/scalar/float32.ts index 7ac2e6e..756218b 100644 --- a/src/scalar/float32.ts +++ b/src/scalar/float32.ts @@ -1,12 +1,15 @@ import type { DataType } from '@apache-arrow/esnext-esm'; import { Float32 as ArrowFloat32 } from '@apache-arrow/esnext-esm'; +import { FormatError } from '../errors/errors.js'; +import type { Nullable } from '../schema/types.js'; + import type { Scalar } from './scalar.js'; import { isInvalid, NULL_VALUE } from './util.js'; -export class Float32 implements Scalar { +export class Float32 implements Scalar> { private _valid = false; - private _value: number = 0; + private _value: Nullable = null; public constructor(v?: unknown) { this.value = v; @@ -21,7 +24,10 @@ export class Float32 implements Scalar { return this._valid; } - public get value(): number { + public get value(): Nullable { + if (!this._valid) { + return null; + } return this._value; } @@ -58,12 +64,12 @@ export class Float32 implements Scalar { return; } - throw new Error(`Unable to set '${value}' as Float32`); + throw new FormatError(`Unable to set Float32 from value`, { props: { value } }); } public toString() { if (this._valid) { - return String(this._value); + return String(this._value!); } return NULL_VALUE; diff --git a/src/scalar/float64.ts b/src/scalar/float64.ts index ff8c77c..336b644 100644 --- a/src/scalar/float64.ts +++ b/src/scalar/float64.ts @@ -1,12 +1,15 @@ import type { DataType } from '@apache-arrow/esnext-esm'; import { Float64 as ArrowFloat64 } from '@apache-arrow/esnext-esm'; +import { FormatError } from '../errors/errors.js'; +import type { Nullable } from '../schema/types.js'; + import type { Scalar } from './scalar.js'; import { isInvalid, NULL_VALUE } from './util.js'; -export class Float64 implements Scalar { +export class Float64 implements Scalar> { private _valid = false; - private _value: number = 0; + private _value: Nullable = null; public constructor(v?: unknown) { this.value = v; @@ -21,7 +24,10 @@ export class Float64 implements Scalar { return this._valid; } - public get value(): number { + public get value(): Nullable { + if (!this._valid) { + return null; + } return this._value; } @@ -50,12 +56,12 @@ export class Float64 implements Scalar { return; } - throw new Error(`Unable to set '${value}' as Float64`); + throw new FormatError(`Unable to set Float64 from value`, { props: { value } }); } public toString() { if (this._valid) { - return String(this._value); + return String(this._value!); } return NULL_VALUE; diff --git a/src/scalar/int16.ts b/src/scalar/int16.ts index a88bc16..ddbb5e4 100644 --- a/src/scalar/int16.ts +++ b/src/scalar/int16.ts @@ -2,12 +2,15 @@ import type { DataType } from '@apache-arrow/esnext-esm'; import { Int16 as ArrowInt16 } from '@apache-arrow/esnext-esm'; import { bigIntToNumber } from '@apache-arrow/esnext-esm/util/bigint.js'; +import { FormatError } from '../errors/errors.js'; +import type { Nullable } from '../schema/types.js'; + import type { Scalar } from './scalar.js'; import { isInvalid, NULL_VALUE } from './util.js'; -export class Int16 implements Scalar { +export class Int16 implements Scalar> { private _valid = false; - private _value: bigint = BigInt(0); + private _value: Nullable = null; public constructor(v?: unknown) { this.value = v; @@ -22,7 +25,10 @@ export class Int16 implements Scalar { return this._valid; } - public get value(): bigint { + public get value(): Nullable { + if (!this._valid) { + return null; + } return this._value; } @@ -57,12 +63,12 @@ export class Int16 implements Scalar { return; } - throw new Error(`Unable to set '${value}' as Int16`); + throw new FormatError(`Unable to set Int16 from value`, { props: { value } }); } public toString() { if (this._valid) { - return String(this._value); + return String(this._value!); } return NULL_VALUE; diff --git a/src/scalar/int32.ts b/src/scalar/int32.ts index 3db3deb..8c8f0a9 100644 --- a/src/scalar/int32.ts +++ b/src/scalar/int32.ts @@ -2,12 +2,15 @@ import type { DataType } from '@apache-arrow/esnext-esm'; import { Int32 as ArrowInt32 } from '@apache-arrow/esnext-esm'; import { bigIntToNumber } from '@apache-arrow/esnext-esm/util/bigint.js'; +import { FormatError } from '../errors/errors.js'; +import type { Nullable } from '../schema/types.js'; + import type { Scalar } from './scalar.js'; import { isInvalid, NULL_VALUE } from './util.js'; -export class Int32 implements Scalar { +export class Int32 implements Scalar> { private _valid = false; - private _value: bigint = BigInt(0); + private _value: Nullable = null; public constructor(v?: unknown) { this.value = v; @@ -22,7 +25,10 @@ export class Int32 implements Scalar { return this._valid; } - public get value(): bigint { + public get value(): Nullable { + if (!this._valid) { + return null; + } return this._value; } @@ -57,12 +63,12 @@ export class Int32 implements Scalar { return; } - throw new Error(`Unable to set '${value}' as Int32`); + throw new FormatError(`Unable to set Int32 from value`, { props: { value } }); } public toString() { if (this._valid) { - return String(this._value); + return String(this._value!); } return NULL_VALUE; diff --git a/src/scalar/int64.ts b/src/scalar/int64.ts index 7b1f651..c27eb6e 100644 --- a/src/scalar/int64.ts +++ b/src/scalar/int64.ts @@ -2,12 +2,15 @@ import type { DataType } from '@apache-arrow/esnext-esm'; import { Int64 as ArrowInt64 } from '@apache-arrow/esnext-esm'; import { bigIntToNumber } from '@apache-arrow/esnext-esm/util/bigint.js'; +import { FormatError } from '../errors/errors.js'; +import type { Nullable } from '../schema/types.js'; + import type { Scalar } from './scalar.js'; import { isInvalid, NULL_VALUE } from './util.js'; -export class Int64 implements Scalar { +export class Int64 implements Scalar> { private _valid = false; - private _value: bigint = BigInt(0); + private _value: Nullable = null; public constructor(v?: unknown) { this.value = v; @@ -22,7 +25,10 @@ export class Int64 implements Scalar { return this._valid; } - public get value(): bigint { + public get value(): Nullable { + if (!this._valid) { + return null; + } return this._value; } @@ -57,12 +63,12 @@ export class Int64 implements Scalar { return; } - throw new Error(`Unable to set '${value}' as Int64`); + throw new FormatError(`Unable to set Int64 from value`, { props: { value } }); } public toString() { if (this._valid) { - return String(this._value); + return String(this._value!); } return NULL_VALUE; diff --git a/src/scalar/json.ts b/src/scalar/json.ts index f0a8930..744da7b 100644 --- a/src/scalar/json.ts +++ b/src/scalar/json.ts @@ -1,5 +1,6 @@ import { Utf8 as ArrowString } from '@apache-arrow/esnext-esm'; +import { FormatError } from '../errors/errors.js'; import type { Nullable } from '../schema/types.js'; import type { Scalar } from './scalar.js'; @@ -32,6 +33,9 @@ class JSONType implements Scalar> { } public get value(): Nullable { + if (!this._valid) { + return null; + } return this._value; } @@ -62,8 +66,8 @@ class JSONType implements Scalar> { try { this._value = new TextEncoder().encode(JSON.stringify(value)); this._valid = true; - } catch { - throw new Error(`Unable to set '${value}' as JSON`); + } catch (error) { + throw new FormatError(`Unable to set JSON from value`, { cause: error, props: { value } }); } } diff --git a/src/scalar/list.ts b/src/scalar/list.ts index 61b6c20..341443a 100644 --- a/src/scalar/list.ts +++ b/src/scalar/list.ts @@ -1,15 +1,18 @@ import type { DataType } from '@apache-arrow/esnext-esm'; import { List as ArrowList } from '@apache-arrow/esnext-esm'; +import { FormatError } from '../errors/errors.js'; +import type { Nullable } from '../schema/types.js'; + import type { Scalar } from './scalar.js'; import { isInvalid, NULL_VALUE } from './util.js'; type TVector> = T[]; -export class List> implements Scalar> { +export class List> implements Scalar>> { private _childScalarInstance: T; private _valid = false; - private _value: TVector = []; + private _value: Nullable> = null; constructor(childScalarInstance: T, initialValue?: TVector) { this._childScalarInstance = childScalarInstance; @@ -45,8 +48,9 @@ export class List> implements Scalar> { try { this._childScalarInstance.value = item; } catch { - throw new Error( + throw new FormatError( `Type mismatch: All items should be of the same type as the first item. Expected type ${firstItemType.name}`, + { props: { value: inputValue } }, ); } @@ -68,7 +72,10 @@ export class List> implements Scalar> { return this._valid; } - get value(): TVector { + get value(): Nullable> { + if (!this._valid) { + return null; + } return this._value; } @@ -76,11 +83,14 @@ export class List> implements Scalar> { if (!this._valid) { return NULL_VALUE; } - return `[${this._value.map((v) => v.toString()).join(', ')}]`; + return `[${this._value!.map((v) => v.toString()).join(', ')}]`; } get length(): number { - return this._value.length; + if (!this._valid) { + return 0; + } + return this._value!.length; } // If you need an equality method, you can add an equals method similar to the Python __eq__ diff --git a/src/scalar/scalar.ts b/src/scalar/scalar.ts index 259a228..ec88821 100644 --- a/src/scalar/scalar.ts +++ b/src/scalar/scalar.ts @@ -64,8 +64,7 @@ export const newScalar = (dataType: DataType): Scalar => { if (DataType.isFloat(dataType)) { switch (dataType.precision) { - // case Precision.HALF: { - // TODO + // TODO: case Precision.HALF: { // } case Precision.SINGLE: { return new Float32(); @@ -76,8 +75,13 @@ export const newScalar = (dataType: DataType): Scalar => { } } + if (DataType.isDecimal(dataType)) { + // TODO: Add Decimal support + return new Float64(); + } + if (DataType.isTimestamp(dataType)) { - return new Timestamp(); + return new Timestamp(dataType.unit); } if (DataType.isList(dataType)) { diff --git a/src/scalar/text.test.ts b/src/scalar/text.test.ts index c7ce80b..4469c62 100644 --- a/src/scalar/text.test.ts +++ b/src/scalar/text.test.ts @@ -5,9 +5,9 @@ import { Text } from './text.js'; // eslint-disable-next-line unicorn/no-null [null, undefined, new Text()].forEach((v) => { - test(`should set values to empty string when ${v} is passed`, (t) => { + test(`should set values to null string when ${v} is passed`, (t) => { const s = new Text(v); - t.is(s.value, ''); + t.is(s.value, null); t.is(s.valid, false); t.true(DataType.isUtf8(s.dataType)); }); @@ -29,5 +29,5 @@ import { Text } from './text.js'; }); test('should throw when unable to set value', (t) => { - t.throws(() => new Text({ value: {} }), { message: "Unable to set '[object Object]' as Text" }); + t.throws(() => new Text({ value: {} }), { message: 'Unable to set Text from value' }); }); diff --git a/src/scalar/text.ts b/src/scalar/text.ts index c7672e8..73c93b4 100644 --- a/src/scalar/text.ts +++ b/src/scalar/text.ts @@ -1,11 +1,14 @@ import { Utf8 as ArrowString } from '@apache-arrow/esnext-esm'; +import { FormatError } from '../errors/errors.js'; +import type { Nullable } from '../schema/types.js'; + import type { Scalar } from './scalar.js'; import { isInvalid, NULL_VALUE } from './util.js'; -export class Text implements Scalar { +export class Text implements Scalar> { private _valid = false; - private _value = ''; + private _value: Nullable = null; public constructor(v?: unknown) { this.value = v; @@ -20,7 +23,10 @@ export class Text implements Scalar { return this._valid; } - public get value(): string { + public get value(): Nullable { + if (!this._valid) { + return null; + } return this._value; } @@ -54,12 +60,12 @@ export class Text implements Scalar { return; } - throw new Error(`Unable to set '${value}' as Text`); + throw new FormatError(`Unable to set Text from value`, { props: { value } }); } public toString() { if (this._valid) { - return this._value; + return this._value!; } return NULL_VALUE; diff --git a/src/scalar/timestamp.ts b/src/scalar/timestamp.ts index 6a130ef..5c7e1bd 100644 --- a/src/scalar/timestamp.ts +++ b/src/scalar/timestamp.ts @@ -2,15 +2,18 @@ import type { DataType } from '@apache-arrow/esnext-esm'; import { Timestamp as ArrowTimestamp, TimeUnit } from '@apache-arrow/esnext-esm'; import { DateTime } from 'luxon'; +import { FormatError } from '../errors/errors.js'; +import type { Nullable } from '../schema/types.js'; + import type { Scalar } from './scalar.js'; import { isInvalid, NULL_VALUE } from './util.js'; -export class Timestamp implements Scalar { +export class Timestamp implements Scalar> { private _valid = false; - private _value: DateTime = DateTime.fromMillis(0); + private _value: Nullable = null; private _unit: TimeUnit = TimeUnit.NANOSECOND; - public constructor(v?: unknown, unit?: TimeUnit) { + public constructor(unit?: TimeUnit, v?: unknown) { this.value = v; if (unit) { this._unit = unit; @@ -26,7 +29,10 @@ export class Timestamp implements Scalar { return this._valid; } - public get value(): DateTime { + public get value(): Nullable { + if (!this._valid) { + return null; + } return this._value; } @@ -60,18 +66,26 @@ export class Timestamp implements Scalar { } } + if (value instanceof DateTime) { + dateValue = value; + } + + if (value instanceof globalThis.Date) { + dateValue = DateTime.fromJSDate(value, { zone: 'utc' }); + } + if (dateValue && dateValue.isValid) { - this._value = dateValue; + this._value = dateValue.toJSDate(); this._valid = true; return; } - throw new Error(`Unable to set '${value}' as Timestamp`); + throw new FormatError(`Unable to set Timestamp from value`, { props: { value } }); } public toString(): string { if (this._valid) { - return this._value.toISO()!; + return this._value!.toISOString(); } return NULL_VALUE; diff --git a/src/scalar/uint16.ts b/src/scalar/uint16.ts index 7238567..f0cbbd5 100644 --- a/src/scalar/uint16.ts +++ b/src/scalar/uint16.ts @@ -2,12 +2,15 @@ import type { DataType } from '@apache-arrow/esnext-esm'; import { Uint16 as ArrowUint16 } from '@apache-arrow/esnext-esm'; import { bigIntToNumber } from '@apache-arrow/esnext-esm/util/bigint.js'; +import { FormatError } from '../errors/errors.js'; +import type { Nullable } from '../schema/types.js'; + import type { Scalar } from './scalar.js'; import { isInvalid, NULL_VALUE } from './util.js'; -export class Uint16 implements Scalar { +export class Uint16 implements Scalar> { private _valid = false; - private _value: bigint = BigInt(0); + private _value: Nullable = null; public constructor(v?: unknown) { this.value = v; @@ -22,7 +25,10 @@ export class Uint16 implements Scalar { return this._valid; } - public get value(): bigint { + public get value(): Nullable { + if (!this._valid) { + return null; + } return this._value; } @@ -57,12 +63,12 @@ export class Uint16 implements Scalar { return; } - throw new Error(`Unable to set '${value}' as Uint16`); + throw new FormatError(`Unable to set Uint16 from value`, { props: { value } }); } public toString() { if (this._valid) { - return String(this._value); + return String(this._value!); } return NULL_VALUE; diff --git a/src/scalar/uint32.ts b/src/scalar/uint32.ts index 442c4e5..aa46560 100644 --- a/src/scalar/uint32.ts +++ b/src/scalar/uint32.ts @@ -2,12 +2,15 @@ import type { DataType } from '@apache-arrow/esnext-esm'; import { Uint32 as ArrowUint32 } from '@apache-arrow/esnext-esm'; import { bigIntToNumber } from '@apache-arrow/esnext-esm/util/bigint.js'; +import { FormatError } from '../errors/errors.js'; +import type { Nullable } from '../schema/types.js'; + import type { Scalar } from './scalar.js'; import { isInvalid, NULL_VALUE } from './util.js'; -export class Uint32 implements Scalar { +export class Uint32 implements Scalar> { private _valid = false; - private _value: bigint = BigInt(0); + private _value: Nullable = null; public constructor(v?: unknown) { this.value = v; @@ -22,7 +25,10 @@ export class Uint32 implements Scalar { return this._valid; } - public get value(): bigint { + public get value(): Nullable { + if (!this._valid) { + return null; + } return this._value; } @@ -57,12 +63,12 @@ export class Uint32 implements Scalar { return; } - throw new Error(`Unable to set '${value}' as Uint32`); + throw new FormatError(`Unable to set Uint32 from value`, { props: { value } }); } public toString() { if (this._valid) { - return String(this._value); + return String(this._value!); } return NULL_VALUE; diff --git a/src/scalar/uint64.ts b/src/scalar/uint64.ts index 8690030..2d47f92 100644 --- a/src/scalar/uint64.ts +++ b/src/scalar/uint64.ts @@ -2,12 +2,15 @@ import type { DataType } from '@apache-arrow/esnext-esm'; import { Uint64 as ArrowUint64 } from '@apache-arrow/esnext-esm'; import { bigIntToNumber } from '@apache-arrow/esnext-esm/util/bigint.js'; +import { FormatError } from '../errors/errors.js'; +import type { Nullable } from '../schema/types.js'; + import type { Scalar } from './scalar.js'; import { isInvalid, NULL_VALUE } from './util.js'; -export class Uint64 implements Scalar { +export class Uint64 implements Scalar> { private _valid = false; - private _value: bigint = BigInt(0); + private _value: Nullable = null; public constructor(v?: unknown) { this.value = v; @@ -22,7 +25,10 @@ export class Uint64 implements Scalar { return this._valid; } - public get value(): bigint { + public get value(): Nullable { + if (!this._valid) { + return null; + } return this._value; } @@ -57,12 +63,12 @@ export class Uint64 implements Scalar { return; } - throw new Error(`Unable to set '${value}' as Uint64`); + throw new FormatError(`Unable to set Uint64 from value`, { props: { value } }); } public toString() { if (this._valid) { - return String(this._value); + return String(this._value!); } return NULL_VALUE; diff --git a/src/scalar/uuid.ts b/src/scalar/uuid.ts index e5edfbf..6e4b102 100644 --- a/src/scalar/uuid.ts +++ b/src/scalar/uuid.ts @@ -1,6 +1,7 @@ import { FixedSizeBinary } from '@apache-arrow/esnext-esm'; import { parse, stringify } from 'uuid'; +import { FormatError } from '../errors/errors.js'; import type { Nullable } from '../schema/types.js'; import type { Scalar } from './scalar.js'; @@ -23,6 +24,9 @@ export class UUID implements Scalar> { } public get value(): Nullable { + if (!this._valid) { + return null; + } return this._value; } @@ -58,7 +62,7 @@ export class UUID implements Scalar> { return; } - throw new Error(`Unable to set '${value}' as UUID`); + throw new FormatError(`Unable to set UUID from value`, { props: { value } }); } public toString() { diff --git a/src/scheduler/scheduler.ts b/src/scheduler/scheduler.ts index 1ce97a0..a1996ab 100644 --- a/src/scheduler/scheduler.ts +++ b/src/scheduler/scheduler.ts @@ -4,6 +4,14 @@ import pMap from 'p-map'; import pTimeout from 'p-timeout'; import type { Logger } from 'winston'; +import { + SyncValidationError, + SyncColumnResolveError, + SyncTableResolveError, + SyncPreResolveError, + SyncPostResolveError, + SyncResourceEncodeError, +} from '../errors/errors.js'; import type { SyncStream } from '../grpc/plugin.js'; import { SyncResponse, MigrateTable, Insert } from '../grpc/plugin.js'; import type { Column } from '../schema/column.js'; @@ -59,7 +67,7 @@ const validateResource = (resource: Resource) => { .map((column) => column.name); if (missingPKs.length > 0) { - throw new Error(`missing primary key(s) ${missingPKs.join(', ')}`); + throw new SyncValidationError(`missing primary key(s) ${missingPKs.join(', ')} for table ${resource.table.name}`); } }; @@ -67,7 +75,10 @@ const resolveColumn = async (client: ClientMeta, table: Table, resource: Resourc try { return await column.resolver(client, resource, column); } catch (error) { - throw new Error(`error resolving column ${column.name} for table ${table.name}: ${error}`); + throw new SyncColumnResolveError(`error resolving column ${column.name} for table ${table.name}`, { + cause: error, + props: { resource, column, table, client }, + }); } }; @@ -84,21 +95,29 @@ const resolveTable = async ( try { await table.resolver(client, null, stream); } catch (error) { - logger.error(`error resolving table ${table.name}: ${error}`); + const tableError = new SyncTableResolveError(`error resolving table ${table.name}`, { + cause: error, + props: { table, client }, + }); + logger.error(`error resolving table ${table.name}`, tableError); return; } finally { stream.end(); } for await (const data of stream) { - logger.info(`resolving resource for table ${table.name}`); + logger.debug(`resolving resource for table ${table.name}`); const resolveResourceTimeout = 10 * 60 * 1000; const resource = new Resource(table, parent, data); try { await pTimeout(table.preResourceResolver(client, resource), { milliseconds: resolveResourceTimeout }); } catch (error) { - logger.error(`error resolving preResourceResolver for table ${table.name}: ${error}`); + const preResolverError = new SyncPreResolveError(`error calling preResourceResolver for table ${table.name}`, { + cause: error, + props: { resource, table, client }, + }); + logger.error(preResolverError); continue; } @@ -108,26 +127,51 @@ const resolveTable = async ( }); await pTimeout(allColumnsPromise, { milliseconds: resolveResourceTimeout }); } catch (error) { - logger.error(`error resolving columns for table ${table.name}: ${error}`); + logger.error(`error resolving columns for table ${table.name}`, error); continue; } try { await table.postResourceResolver(client, resource); } catch (error) { - logger.error(`error resolving postResourceResolver for table ${table.name}: ${error}`); + const postResolveError = new SyncPostResolveError(`error calling postResourceResolver for table ${table.name}`, { + cause: error, + props: { resource, table, client }, + }); + logger.error(postResolveError); continue; } setCQId(resource, deterministicCQId); - validateResource(resource); - syncStream.write(new SyncResponse({ insert: new Insert({ record: encodeResource(resource) }) })); + try { + validateResource(resource); + } catch (error) { + logger.error(error); + continue; + } + + try { + syncStream.write(new SyncResponse({ insert: new Insert({ record: encodeResource(resource) }) })); + } catch (error) { + const encodeError = new SyncResourceEncodeError(`error encoding resource for table ${table.name}`, { + cause: error, + props: { + resource, + }, + }); + logger.error(encodeError); + continue; + } + + logger.debug(`done resolving resource for table ${table.name}`); await pMap(table.relations, (child) => resolveTable(logger, client, child, resource, syncStream, deterministicCQId), ); } + + logger.info(`done resolving table ${table.name}`); }; const syncDfs = async ({ @@ -213,7 +257,7 @@ export const sync = async ({ break; } default: { - throw new Error(`unknown strategy ${strategy}`); + throw new SyncValidationError(`unknown strategy ${strategy}`); } } diff --git a/src/schema/resolvers.ts b/src/schema/resolvers.ts index c40ac28..c14f924 100644 --- a/src/schema/resolvers.ts +++ b/src/schema/resolvers.ts @@ -1,5 +1,7 @@ import { getProperty } from 'dot-prop'; +import { ResolverError } from '../errors/errors.js'; + import type { ColumnResolver } from './column.js'; export const pathResolver = (path: string): ColumnResolver => { @@ -13,11 +15,11 @@ export const parentColumnResolver = (parentColumn: string): ColumnResolver => { return (_, resource, c) => { const parent = resource.parent; if (!parent) { - throw new Error(`parent not found for column ${c.name}`); + throw new ResolverError(`parent not found for column ${c.name}`, { props: { resource, column: c } }); } const parentData = parent.getColumnData(parentColumn); if (!parentData) { - throw new Error(`parent data not found for column ${c.name}`); + throw new ResolverError(`parent data not found for column ${c.name}`, { props: { resource, column: c } }); } resource.setColumData(c.name, parentData); return Promise.resolve(); diff --git a/src/schema/resource.ts b/src/schema/resource.ts index fb24804..e80304a 100644 --- a/src/schema/resource.ts +++ b/src/schema/resource.ts @@ -1,5 +1,6 @@ import { tableToIPC, Table as ArrowTable, RecordBatch, vectorFromArray } from '@apache-arrow/esnext-esm'; +import { ResourceError } from '../errors/errors.js'; import type { Scalar, Vector } from '../scalar/scalar.js'; import { newScalar } from '../scalar/scalar.js'; import { isExtensionType } from '../types/extensions.js'; @@ -25,7 +26,7 @@ export class Resource { getColumnData(columnName: string): Scalar { const columnIndex = this.table.columns.findIndex((c) => c.name === columnName); if (columnIndex === undefined) { - throw new Error(`Column '${columnName}' not found`); + throw new ResourceError(`Column '${columnName}' not found`, { props: { resource: this } }); } return this.data[columnIndex]; } @@ -33,7 +34,7 @@ export class Resource { setColumData(columnName: string, value: unknown): void { const columnIndex = this.table.columns.findIndex((c) => c.name === columnName); if (columnIndex === undefined) { - throw new Error(`Column '${columnName}' not found`); + throw new ResourceError(`Column '${columnName}' not found`, { props: { resource: this } }); } this.data[columnIndex].value = value; } @@ -58,6 +59,7 @@ export class Resource { export const encodeResource = (resource: Resource): Uint8Array => { const { table } = resource; const schema = toArrowSchema(table); + // TODO: Check if this can be simplified let batch = new RecordBatch(schema, undefined); for (let index = 0; index < table.columns.length; index++) { @@ -74,3 +76,5 @@ export const encodeResource = (resource: Resource): Uint8Array => { const bytes = tableToIPC(arrowTable); return bytes; }; + +export type ResourceType = Resource; diff --git a/src/schema/table.ts b/src/schema/table.ts index efe5c7b..7c84e3c 100644 --- a/src/schema/table.ts +++ b/src/schema/table.ts @@ -4,6 +4,8 @@ import type { RecordBatch } from '@apache-arrow/esnext-esm'; import { Table as ArrowTable, tableFromIPC, tableToIPC, Schema } from '@apache-arrow/esnext-esm'; import { isMatch } from 'matcher'; +import { TableError } from '../errors/errors.js'; + import * as arrow from './arrow.js'; import type { Column } from './column.js'; import { fromArrowField, toArrowField } from './column.js'; @@ -139,7 +141,7 @@ export const filterTables = ( .map((table) => table.parent!.name); if (skippedParents.length > 0) { - throw new Error( + throw new TableError( `Can't skip parent table when child table is included. Skipped parents are: ${skippedParents.join(', ')}`, ); } diff --git a/src/transformers/transform.ts b/src/transformers/transform.ts index 335f1b5..547d84c 100644 --- a/src/transformers/transform.ts +++ b/src/transformers/transform.ts @@ -1,6 +1,7 @@ import type { DataType } from '@apache-arrow/esnext-esm'; import { Field, Utf8, Int64, Float64, Bool, List } from '@apache-arrow/esnext-esm'; +import { TransformError } from '../errors/errors.js'; import type { Column } from '../schema/column.js'; import { createColumn } from '../schema/column.js'; import { JSONType } from '../types/json.js'; @@ -31,7 +32,7 @@ function defaultGetTypeFromValue(key: string, value: unknown): DataType | null { } } default: { - throw new Error(`Unsupported type: ${typeof value}`); + throw new TransformError(`Unsupported type: ${typeof value}`, { props: { value } }); } } }