From ecf332bd19f104321b54c3ff6e79e99d9a28c598 Mon Sep 17 00:00:00 2001 From: erezrokah Date: Wed, 9 Aug 2023 19:28:48 +0200 Subject: [PATCH 1/2] feat: Add init sync, scheduler --- README.md | 6 +++ package-lock.json | 3 +- package.json | 1 + src/grpc/plugin.ts | 70 ++++++++++++------------ src/main.ts | 4 +- src/memdb/memdb.ts | 41 ++++++++++++++ src/plugin/plugin.ts | 60 ++++++++++----------- src/plugin/serve.test.ts | 5 +- src/scheduler/scheduler.ts | 17 ++++++ src/schema/arrow.ts | 2 + src/schema/table.test.ts | 84 +++++++++++++++++++++++++++++ src/schema/table.ts | 107 ++++++++++++++++++++++++++++++++++++- 12 files changed, 326 insertions(+), 74 deletions(-) create mode 100644 src/memdb/memdb.ts create mode 100644 src/scheduler/scheduler.ts create mode 100644 src/schema/table.test.ts diff --git a/README.md b/README.md index 197feb9..64d050f 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,12 @@ npm run build npm test ``` +### Start a local memory based plugin server + +```bash +npm run dev +``` + ### Formatting and Linting ```bash diff --git a/package-lock.json b/package-lock.json index 01756dc..ff70190 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,6 +14,7 @@ "@types/luxon": "^3.3.1", "boolean": "^3.2.0", "luxon": "^3.4.0", + "matcher": "^5.0.0", "winston": "^3.10.0", "yargs": "^17.7.2" }, @@ -2191,7 +2192,6 @@ "version": "5.0.0", "resolved": "https://registry.npmjs.org/escape-string-regexp/-/escape-string-regexp-5.0.0.tgz", "integrity": "sha512-/veY75JbMK4j1yjvuUxuVsiS/hr/4iHs9FTT6cgTexxdE0Ly/glccBAkloH/DofkjRbZU3bnoj38mOmhkZ0lHw==", - "dev": true, "engines": { "node": ">=12" }, @@ -4130,7 +4130,6 @@ "version": "5.0.0", "resolved": "https://registry.npmjs.org/matcher/-/matcher-5.0.0.tgz", "integrity": "sha512-s2EMBOWtXFc8dgqvoAzKJXxNHibcdJMV0gwqKUaw9E2JBJuGUK7DrNKrA6g/i+v72TT16+6sVm5mS3thaMLQUw==", - "dev": true, "dependencies": { "escape-string-regexp": "^5.0.0" }, diff --git a/package.json b/package.json index 7a2edea..3f9a1b3 100644 --- a/package.json +++ b/package.json @@ -84,6 +84,7 @@ "@types/luxon": "^3.3.1", "boolean": "^3.2.0", "luxon": "^3.4.0", + "matcher": "^5.0.0", "winston": "^3.10.0", "yargs": "^17.7.2" } diff --git a/src/grpc/plugin.ts b/src/grpc/plugin.ts index 633074f..5340141 100644 --- a/src/grpc/plugin.ts +++ b/src/grpc/plugin.ts @@ -2,6 +2,27 @@ import { pluginV3 } from '@cloudquery/plugin-pb-javascript'; import grpc = require('@grpc/grpc-js'); import { Plugin } from '../plugin/plugin.js'; +import { encode as encodeTables } from '../schema/table.js'; + +export class MigrateTable extends pluginV3.cloudquery.plugin.v3.Sync.MessageMigrateTable {} +export class SyncResponse extends pluginV3.cloudquery.plugin.v3.Sync.Response {} +export class ReadResponse extends pluginV3.cloudquery.plugin.v3.Read.Response {} +export class WriteResponse extends pluginV3.cloudquery.plugin.v3.Write.Response {} + +export type SyncStream = grpc.ServerWritableStream< + pluginV3.cloudquery.plugin.v3.Sync.Request, + pluginV3.cloudquery.plugin.v3.Sync.Response +>; + +export type ReadStream = grpc.ServerWritableStream< + pluginV3.cloudquery.plugin.v3.Read.Request, + pluginV3.cloudquery.plugin.v3.Read.Response +>; + +export type WriteStream = grpc.ServerReadableStream< + pluginV3.cloudquery.plugin.v3.Write.Request, + pluginV3.cloudquery.plugin.v3.Write.Response +>; export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPluginService { // Needed due to some TypeScript nonsense @@ -34,9 +55,7 @@ export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPlu call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData, ): void { - const { - request: { spec, no_connection: noConnection }, - } = call; + const { spec = new Uint8Array(), no_connection: noConnection = false } = call.request.toObject(); const stringSpec = new TextDecoder().decode(spec); this.plugin @@ -58,36 +77,30 @@ export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPlu callback: grpc.sendUnaryData, ): void { const { - request: { tables, skip_tables: skipTables, skip_dependent_tables: skipDependentTables }, - } = call; + tables = [], + skip_tables: skipTables = [], + skip_dependent_tables: skipDependentTables = false, + } = call.request.toObject(); this.plugin .tables({ tables, skipTables, skipDependentTables }) .then((tables) => { - const encodedTables = tables.map((table) => new TextEncoder().encode(table)); // eslint-disable-next-line promise/no-callback-in-promise - return callback(null, new pluginV3.cloudquery.plugin.v3.GetTables.Response({ tables: encodedTables })); + return callback(null, new pluginV3.cloudquery.plugin.v3.GetTables.Response({ tables: encodeTables(tables) })); }) .catch((error) => { // eslint-disable-next-line promise/no-callback-in-promise return callback(error, null); }); } - Sync( - call: grpc.ServerWritableStream< - pluginV3.cloudquery.plugin.v3.Sync.Request, - pluginV3.cloudquery.plugin.v3.Sync.Response - >, - ): void { + Sync(call: SyncStream): void { const { - request: { - tables, - skip_tables: skipTables, - skip_dependent_tables: skipDependentTables, - deterministic_cq_id: deterministicCQId, - backend: { connection, table_name: tableName }, - }, - } = call; + tables = [], + skip_tables: skipTables = [], + skip_dependent_tables: skipDependentTables = false, + deterministic_cq_id: deterministicCQId = false, + backend: { connection = '', table_name: tableName = '' } = {}, + } = call.request.toObject(); this.plugin.sync({ tables, @@ -98,21 +111,10 @@ export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPlu stream: call, }); } - Read( - call: grpc.ServerWritableStream< - pluginV3.cloudquery.plugin.v3.Read.Request, - pluginV3.cloudquery.plugin.v3.Read.Response - >, - ): void { + Read(call: ReadStream): void { this.plugin.read(call); } - Write( - call: grpc.ServerReadableStream< - pluginV3.cloudquery.plugin.v3.Write.Request, - pluginV3.cloudquery.plugin.v3.Write.Response - >, - callback: grpc.sendUnaryData, - ): void { + Write(call: WriteStream, callback: grpc.sendUnaryData): void { this.plugin.write(call); callback(null, new pluginV3.cloudquery.plugin.v3.Write.Response()); } diff --git a/src/main.ts b/src/main.ts index 1960522..4c7ffde 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,4 +1,4 @@ -import { newPlugin, newUnimplementedClient } from './plugin/plugin.js'; +import { newMemDBPlugin } from './memdb/memdb.js'; import { createServeCommand } from './plugin/serve.js'; -createServeCommand(newPlugin('test', 'v1.0.0', newUnimplementedClient)).parse(); +createServeCommand(newMemDBPlugin()).parse(); diff --git a/src/memdb/memdb.ts b/src/memdb/memdb.ts new file mode 100644 index 0000000..8d4d2b5 --- /dev/null +++ b/src/memdb/memdb.ts @@ -0,0 +1,41 @@ +import { + Plugin, + newUnimplementedDestination, + newPlugin, + SyncOptions, + TableOptions, + NewClientOptions, +} from '../plugin/plugin.js'; +import { sync } from '../scheduler/scheduler.js'; +import { Table, createTable, filterTables } from '../schema/table.js'; + +export const createMemDBClient = () => { + return { id: () => 'memdb' }; +}; + +export const newMemDBPlugin = (): Plugin => { + const memdbClient = createMemDBClient(); + + const allTables: Table[] = [ + createTable({ name: 'table1', title: 'Table 1', description: 'Table 1 description' }), + createTable({ name: 'table2', title: 'Table 2', description: 'Table 2 description' }), + ]; + + const pluginClient = { + ...newUnimplementedDestination(), + init: (spec: string, options: NewClientOptions) => Promise.resolve(), + close: () => Promise.resolve(), + tables: (options: TableOptions) => { + const { tables, skipTables, skipDependentTables } = options; + const filtered = filterTables(allTables, tables, skipTables, skipDependentTables); + return Promise.resolve(filtered); + }, + sync: async (options: SyncOptions) => { + const { stream, tables, skipTables, skipDependentTables, deterministicCQId } = options; + const filtered = filterTables(allTables, tables, skipTables, skipDependentTables); + return await sync(memdbClient, filtered, stream, { deterministicCQId }); + }, + }; + + return newPlugin('memdb', '0.0.1', () => Promise.resolve(pluginClient)); +}; diff --git a/src/plugin/plugin.ts b/src/plugin/plugin.ts index 1af03e4..8f21762 100644 --- a/src/plugin/plugin.ts +++ b/src/plugin/plugin.ts @@ -1,7 +1,8 @@ -import { Readable, Writable } from 'node:stream'; - import { Logger } from 'winston'; +import { SyncStream, ReadStream, WriteStream } from '../grpc/plugin.js'; +import { Table } from '../schema/table.js'; + export type BackendOptions = { tableName: string; connection: string; @@ -19,7 +20,7 @@ export type SyncOptions = { skipDependentTables: boolean; deterministicCQId: boolean; backendOptions: BackendOptions; - stream: Writable; + stream: SyncStream; }; export type NewClientOptions = { @@ -29,68 +30,61 @@ export type NewClientOptions = { export type NewClientFunction = (logger: Logger, spec: string, options: NewClientOptions) => Promise; export interface SourceClient { - close: () => Promise; - tables: (options: TableOptions) => Promise; + tables: (options: TableOptions) => Promise; sync: (options: SyncOptions) => void; } export interface DestinationClient { - close: () => Promise; - read: (stream: Writable) => void; - write: (stream: Readable) => void; + read: (stream: ReadStream) => void; + write: (stream: WriteStream) => void; } -export interface Client extends SourceClient, DestinationClient {} +export interface Client extends SourceClient, DestinationClient { + init: (spec: string, options: NewClientOptions) => Promise; + close: () => Promise; +} -export interface Plugin { +export interface Plugin extends Client { + setLogger: (logger: Logger) => void; name: () => string; version: () => string; - write: (stream: Readable) => void; - read: (stream: Writable) => void; - setLogger: (logger: Logger) => void; - sync: (options: SyncOptions) => void; - tables: (options: TableOptions) => Promise; - init: (spec: string, options: NewClientOptions) => Promise; - close: () => Promise; } -export const newUnimplementedSourceClient = (): SourceClient => { +export const newUnimplementedSource = (): SourceClient => { return { - close: () => Promise.reject(new Error('unimplemented')), tables: () => Promise.reject(new Error('unimplemented')), sync: () => Promise.reject(new Error('unimplemented')), }; }; -export const newUnimplementedDestinationClient = (): DestinationClient => { +export const newUnimplementedDestination = (): DestinationClient => { return { - close: () => Promise.reject(new Error('unimplemented')), read: () => Promise.reject(new Error('unimplemented')), write: () => Promise.reject(new Error('unimplemented')), }; }; -export const newUnimplementedClient: NewClientFunction = (logger: Logger, spec: string, options: NewClientOptions) => { - return Promise.resolve({ - ...newUnimplementedSourceClient(), - ...newUnimplementedDestinationClient(), - }); -}; - export const newPlugin = (name: string, version: string, newClient: NewClientFunction): Plugin => { const plugin = { client: undefined as Client | undefined, logger: undefined as Logger | undefined, name: () => name, version: () => version, - write: (stream: Readable) => plugin.client?.write(stream) ?? new Error('client not initialized'), - read: (stream: Writable) => plugin.client?.read(stream) ?? new Error('client not initialized'), + write: (stream: WriteStream) => { + return plugin.client?.write(stream) ?? new Error('client not initialized'); + }, + read: (stream: ReadStream) => { + return plugin.client?.read(stream) ?? new Error('client not initialized'); + }, setLogger: (logger: Logger) => { plugin.logger = logger; }, - sync: (options: SyncOptions) => plugin.client?.sync(options) ?? new Error('client not initialized'), - tables: (options: TableOptions) => - plugin.client?.tables(options) ?? Promise.reject(new Error('client not initialized')), + sync: (options: SyncOptions) => { + return plugin.client?.sync(options) ?? new Error('client not initialized'); + }, + tables: (options: TableOptions) => { + return plugin.client?.tables(options) ?? Promise.reject(new Error('client not initialized')); + }, init: async (spec: string, options: NewClientOptions) => { plugin.client = await newClient(plugin.logger!, spec, options); }, diff --git a/src/plugin/serve.test.ts b/src/plugin/serve.test.ts index 64a7353..18dd9af 100644 --- a/src/plugin/serve.test.ts +++ b/src/plugin/serve.test.ts @@ -1,9 +1,10 @@ import test from 'ava'; -import { newPlugin, newUnimplementedClient } from './plugin.js'; +import { newMemDBPlugin } from '../memdb/memdb.js'; + import { createServeCommand, ServeArguments } from './serve.js'; -const serve = createServeCommand(newPlugin('test', 'v1.0.0', newUnimplementedClient)).exitProcess(false); +const serve = createServeCommand(newMemDBPlugin()).exitProcess(false); test('should return error without command', (t) => { t.throws(() => serve.parse([]), { message: 'Specify a command to run' }); diff --git a/src/scheduler/scheduler.ts b/src/scheduler/scheduler.ts new file mode 100644 index 0000000..345795c --- /dev/null +++ b/src/scheduler/scheduler.ts @@ -0,0 +1,17 @@ +import { SyncStream, SyncResponse, MigrateTable } from '../grpc/plugin.js'; +import { ClientMeta } from '../schema/meta.js'; +import { Table } from '../schema/table.js'; + +export type Options = { + deterministicCQId: boolean; +}; + +export const sync = async (client: ClientMeta, tables: Table[], stream: SyncStream, options: Options) => { + for (const { name } of tables) { + const table = new TextEncoder().encode(name); + // eslint-disable-next-line @typescript-eslint/naming-convention + stream.write(new SyncResponse({ migrate_table: new MigrateTable({ table }) })); + } + + return await Promise.resolve(); +}; diff --git a/src/schema/arrow.ts b/src/schema/arrow.ts index c36b03a..1c63e23 100644 --- a/src/schema/arrow.ts +++ b/src/schema/arrow.ts @@ -6,4 +6,6 @@ export const METADATA_INCREMENTAL = 'cq:extension:incremental'; export const METADATA_TRUE = 'true'; export const METADATA_FALSE = 'false'; export const METADATA_TABLE_NAME = 'cq:table_name'; +export const METADATA_TABLE_TITLE = 'cq:table_title'; export const METADATA_TABLE_DESCRIPTION = 'cq:table_description'; +export const METADATA_TABLE_DEPENDS_ON = 'cq:table_depends_on'; diff --git a/src/schema/table.test.ts b/src/schema/table.test.ts new file mode 100644 index 0000000..dca5b66 --- /dev/null +++ b/src/schema/table.test.ts @@ -0,0 +1,84 @@ +import test from 'ava'; + +import { filterTables, createTable, flattenTables } from './table.js'; + +const tableA = createTable({ name: 'a' }); +const tableC = createTable({ name: 'c' }); +const tableB = createTable({ name: 'b', relations: [tableC] }); + +const allTables = [tableA, tableB]; + +const testCases = [ + { + name: 'should return all tables when * is specified', + allTables, + tables: ['*'], + skipTables: [], + skipDependentTables: false, + expected: [tableA, tableB, tableC], + }, + { + name: 'should skip tables when * and skipTables are specified', + allTables, + tables: ['*'], + skipTables: ['a'], + skipDependentTables: false, + expected: [tableB, tableC], + }, + { + name: 'should skip tables when skipTables is specified', + allTables, + tables: ['a'], + skipTables: ['b'], + skipDependentTables: false, + expected: [tableA], + }, + { + name: 'should return only specified tables', + allTables, + tables: ['a'], + skipTables: [], + skipDependentTables: false, + expected: [tableA], + }, + { + name: 'should return parent and child if child is specified', + allTables, + tables: ['c'], + skipTables: [], + skipDependentTables: false, + expected: [tableB, tableC], + }, + { + name: 'should skip dependent tables when skipDependentTables is specified', + allTables, + tables: ['*'], + skipTables: [], + skipDependentTables: true, + expected: [tableA, tableB], + }, + { + name: 'should error when child is included, but parent skipped', + allTables, + tables: ['c'], + skipTables: ['b'], + skipDependentTables: false, + expected: [], + expectedError: `Can't skip parent table when child table is included. Skipped parents are: b`, + }, +]; + +testCases.forEach((testCase) => { + test(`filterTables - ${testCase.name}`, (t) => { + const { allTables, tables, skipTables, skipDependentTables, expected, expectedError } = testCase; + if (expectedError) { + t.throws(() => filterTables(allTables, tables, skipTables, skipDependentTables), { message: expectedError }); + return; + } + const actual = filterTables(allTables, tables, skipTables, skipDependentTables); + t.deepEqual( + actual.map(({ name }) => name), + expected.map(({ name }) => name), + ); + }); +}); diff --git a/src/schema/table.ts b/src/schema/table.ts index fe5b43e..7128397 100644 --- a/src/schema/table.ts +++ b/src/schema/table.ts @@ -1,6 +1,10 @@ import { Writable } from 'node:stream'; -import { Column } from './column.js'; +import { Table as ArrowTable, tableToIPC, Schema } from '@apache-arrow/esnext-esm'; +import { isMatch } from 'matcher'; + +import * as arrow from './arrow.js'; +import { Column, toArrowField } from './column.js'; import { ClientMeta } from './meta.js'; import { Resource } from './resource.js'; import { Nullable } from './types.js'; @@ -27,6 +31,38 @@ export type Table = { pkConstraintName: string; }; +export const createTable = ({ + name = '', + title = '', + description = '', + columns = [], + relations = [], + transform = () => {}, + resolver = () => {}, + multiplexer = () => [], + postResourceResolver = () => {}, + preResourceResolver = () => {}, + isIncremental = false, + ignoreInTests = false, + parent = null, + pkConstraintName = '', +}: Partial = {}): Table => ({ + name, + title, + description, + columns, + relations, + transform, + resolver, + multiplexer, + postResourceResolver, + preResourceResolver, + isIncremental, + ignoreInTests, + parent, + pkConstraintName, +}); + export const getTablesNames = (tables: Table[]) => tables.map((table) => table.name); export const getTopLevelTableByName = (tables: Table[], name: string): Table | undefined => tables.find((table) => table.name === name); @@ -43,3 +79,72 @@ export const getTableByName = (tables: Table[], name: string): Table | undefined } } }; + +export const flattenTables = (tables: Table[]): Table[] => { + return tables.flatMap((table) => [table, ...flattenTables(table.relations.map((c) => ({ ...c, parent: table })))]); +}; + +export const getAllParents = (table: Table): Table[] => { + if (table.parent === null) { + return []; + } + return [table.parent, ...getAllParents(table.parent)]; +}; + +export const filterTables = ( + tables: Table[], + include: string[], + skip: string[], + skipDependantTables: boolean, +): Table[] => { + const flattened = flattenTables(tables); + + const withIncludes = flattened.filter((table) => { + return isMatch(table.name, include) || getAllParents(table).some((parent) => isMatch(parent.name, include)); + }); + // If a child was included, include the parent as well + const withParents = withIncludes + .flatMap((table) => [...getAllParents(table), table]) + .filter((value, index, array) => array.indexOf(value) === index); + + const withSkipped = withParents.filter((table) => { + return !isMatch(table.name, skip) && !getAllParents(table).some((parent) => isMatch(parent.name, skip)); + }); + + const withSkipDependant = withSkipped.filter((table) => table.parent === null || !skipDependantTables); + + const skippedParents = withParents + .filter((table) => table.parent && !withSkipDependant.includes(table.parent)) + .map((table) => table.parent!.name); + + if (skippedParents.length > 0) { + throw new Error( + `Can't skip parent table when child table is included. Skipped parents are: ${skippedParents.join(', ')}`, + ); + } + + return withSkipDependant; +}; + +export const toArrowSchema = (table: Table) => { + const metadata = new Map(); + metadata.set(arrow.METADATA_TABLE_NAME, table.name); + metadata.set(arrow.METADATA_TABLE_DESCRIPTION, table.description); + metadata.set(arrow.METADATA_TABLE_TITLE, table.title); + metadata.set(arrow.METADATA_CONSTRAINT_NAME, table.pkConstraintName); + if (table.isIncremental) { + metadata.set(arrow.METADATA_INCREMENTAL, arrow.METADATA_TRUE); + } + if (table.parent) { + metadata.set(arrow.METADATA_TABLE_DEPENDS_ON, table.parent.name); + } + const fields = table.columns.map((c) => toArrowField(c)); + return new Schema(fields, metadata); +}; + +export const encode = (tables: Table[]): Uint8Array[] => { + const schemas = tables.map((table) => toArrowSchema(table)); + const arrowTables = schemas.map((schema) => new ArrowTable(schema)); + const bytes = arrowTables.map((table) => tableToIPC(table)); + return bytes; +}; From f67600bc9c978848ee4f6a11b320201ed7530d2d Mon Sep 17 00:00:00 2001 From: erezrokah Date: Wed, 9 Aug 2023 19:34:35 +0200 Subject: [PATCH 2/2] refactor: linting --- src/schema/table.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/schema/table.test.ts b/src/schema/table.test.ts index dca5b66..0181468 100644 --- a/src/schema/table.test.ts +++ b/src/schema/table.test.ts @@ -1,6 +1,6 @@ import test from 'ava'; -import { filterTables, createTable, flattenTables } from './table.js'; +import { filterTables, createTable } from './table.js'; const tableA = createTable({ name: 'a' }); const tableC = createTable({ name: 'c' });