Skip to content

feat: Add initial sync, scheduler #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ npm run build
npm test
```

### Start a local memory based plugin server

```bash
npm run dev
```

### Formatting and Linting

```bash
Expand Down
3 changes: 1 addition & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
"@types/luxon": "^3.3.1",
"boolean": "^3.2.0",
"luxon": "^3.4.0",
"matcher": "^5.0.0",
"winston": "^3.10.0",
"yargs": "^17.7.2"
}
Expand Down
70 changes: 36 additions & 34 deletions src/grpc/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,27 @@ import { pluginV3 } from '@cloudquery/plugin-pb-javascript';
import grpc = require('@grpc/grpc-js');

import { Plugin } from '../plugin/plugin.js';
import { encode as encodeTables } from '../schema/table.js';

export class MigrateTable extends pluginV3.cloudquery.plugin.v3.Sync.MessageMigrateTable {}
export class SyncResponse extends pluginV3.cloudquery.plugin.v3.Sync.Response {}
export class ReadResponse extends pluginV3.cloudquery.plugin.v3.Read.Response {}
export class WriteResponse extends pluginV3.cloudquery.plugin.v3.Write.Response {}

export type SyncStream = grpc.ServerWritableStream<
pluginV3.cloudquery.plugin.v3.Sync.Request,
pluginV3.cloudquery.plugin.v3.Sync.Response
>;

export type ReadStream = grpc.ServerWritableStream<
pluginV3.cloudquery.plugin.v3.Read.Request,
pluginV3.cloudquery.plugin.v3.Read.Response
>;

export type WriteStream = grpc.ServerReadableStream<
pluginV3.cloudquery.plugin.v3.Write.Request,
pluginV3.cloudquery.plugin.v3.Write.Response
>;

export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPluginService {
// Needed due to some TypeScript nonsense
Expand Down Expand Up @@ -34,9 +55,7 @@ export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPlu
call: grpc.ServerUnaryCall<pluginV3.cloudquery.plugin.v3.Init.Request, pluginV3.cloudquery.plugin.v3.Init.Response>,
callback: grpc.sendUnaryData<pluginV3.cloudquery.plugin.v3.Init.Response>,
): void {
const {
request: { spec, no_connection: noConnection },
} = call;
const { spec = new Uint8Array(), no_connection: noConnection = false } = call.request.toObject();

const stringSpec = new TextDecoder().decode(spec);
this.plugin
Expand All @@ -58,36 +77,30 @@ export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPlu
callback: grpc.sendUnaryData<pluginV3.cloudquery.plugin.v3.GetTables.Response>,
): void {
const {
request: { tables, skip_tables: skipTables, skip_dependent_tables: skipDependentTables },
} = call;
tables = [],
skip_tables: skipTables = [],
skip_dependent_tables: skipDependentTables = false,
} = call.request.toObject();

this.plugin
.tables({ tables, skipTables, skipDependentTables })
.then((tables) => {
const encodedTables = tables.map((table) => new TextEncoder().encode(table));
// eslint-disable-next-line promise/no-callback-in-promise
return callback(null, new pluginV3.cloudquery.plugin.v3.GetTables.Response({ tables: encodedTables }));
return callback(null, new pluginV3.cloudquery.plugin.v3.GetTables.Response({ tables: encodeTables(tables) }));
})
.catch((error) => {
// eslint-disable-next-line promise/no-callback-in-promise
return callback(error, null);
});
}
Sync(
call: grpc.ServerWritableStream<
pluginV3.cloudquery.plugin.v3.Sync.Request,
pluginV3.cloudquery.plugin.v3.Sync.Response
>,
): void {
Sync(call: SyncStream): void {
const {
request: {
tables,
skip_tables: skipTables,
skip_dependent_tables: skipDependentTables,
deterministic_cq_id: deterministicCQId,
backend: { connection, table_name: tableName },
},
} = call;
tables = [],
skip_tables: skipTables = [],
skip_dependent_tables: skipDependentTables = false,
deterministic_cq_id: deterministicCQId = false,
backend: { connection = '', table_name: tableName = '' } = {},
} = call.request.toObject();

this.plugin.sync({
tables,
Expand All @@ -98,21 +111,10 @@ export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPlu
stream: call,
});
}
Read(
call: grpc.ServerWritableStream<
pluginV3.cloudquery.plugin.v3.Read.Request,
pluginV3.cloudquery.plugin.v3.Read.Response
>,
): void {
Read(call: ReadStream): void {
this.plugin.read(call);
}
Write(
call: grpc.ServerReadableStream<
pluginV3.cloudquery.plugin.v3.Write.Request,
pluginV3.cloudquery.plugin.v3.Write.Response
>,
callback: grpc.sendUnaryData<pluginV3.cloudquery.plugin.v3.Write.Response>,
): void {
Write(call: WriteStream, callback: grpc.sendUnaryData<pluginV3.cloudquery.plugin.v3.Write.Response>): void {
this.plugin.write(call);
callback(null, new pluginV3.cloudquery.plugin.v3.Write.Response());
}
Expand Down
4 changes: 2 additions & 2 deletions src/main.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { newPlugin, newUnimplementedClient } from './plugin/plugin.js';
import { newMemDBPlugin } from './memdb/memdb.js';
import { createServeCommand } from './plugin/serve.js';

createServeCommand(newPlugin('test', 'v1.0.0', newUnimplementedClient)).parse();
createServeCommand(newMemDBPlugin()).parse();
41 changes: 41 additions & 0 deletions src/memdb/memdb.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import {
Plugin,
newUnimplementedDestination,
newPlugin,
SyncOptions,
TableOptions,
NewClientOptions,
} from '../plugin/plugin.js';
import { sync } from '../scheduler/scheduler.js';
import { Table, createTable, filterTables } from '../schema/table.js';

export const createMemDBClient = () => {
return { id: () => 'memdb' };
};

export const newMemDBPlugin = (): Plugin => {
const memdbClient = createMemDBClient();

const allTables: Table[] = [
createTable({ name: 'table1', title: 'Table 1', description: 'Table 1 description' }),
createTable({ name: 'table2', title: 'Table 2', description: 'Table 2 description' }),
];

const pluginClient = {
...newUnimplementedDestination(),
init: (spec: string, options: NewClientOptions) => Promise.resolve(),
close: () => Promise.resolve(),
tables: (options: TableOptions) => {
const { tables, skipTables, skipDependentTables } = options;
const filtered = filterTables(allTables, tables, skipTables, skipDependentTables);
return Promise.resolve(filtered);
},
sync: async (options: SyncOptions) => {
const { stream, tables, skipTables, skipDependentTables, deterministicCQId } = options;
const filtered = filterTables(allTables, tables, skipTables, skipDependentTables);
return await sync(memdbClient, filtered, stream, { deterministicCQId });
},
};

return newPlugin('memdb', '0.0.1', () => Promise.resolve(pluginClient));
};
60 changes: 27 additions & 33 deletions src/plugin/plugin.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Readable, Writable } from 'node:stream';

import { Logger } from 'winston';

import { SyncStream, ReadStream, WriteStream } from '../grpc/plugin.js';
import { Table } from '../schema/table.js';

export type BackendOptions = {
tableName: string;
connection: string;
Expand All @@ -19,7 +20,7 @@ export type SyncOptions = {
skipDependentTables: boolean;
deterministicCQId: boolean;
backendOptions: BackendOptions;
stream: Writable;
stream: SyncStream;
};

export type NewClientOptions = {
Expand All @@ -29,68 +30,61 @@ export type NewClientOptions = {
export type NewClientFunction = (logger: Logger, spec: string, options: NewClientOptions) => Promise<Client>;

export interface SourceClient {
close: () => Promise<void>;
tables: (options: TableOptions) => Promise<string[]>;
tables: (options: TableOptions) => Promise<Table[]>;
sync: (options: SyncOptions) => void;
}

export interface DestinationClient {
close: () => Promise<void>;
read: (stream: Writable) => void;
write: (stream: Readable) => void;
read: (stream: ReadStream) => void;
write: (stream: WriteStream) => void;
}

export interface Client extends SourceClient, DestinationClient {}
export interface Client extends SourceClient, DestinationClient {
init: (spec: string, options: NewClientOptions) => Promise<void>;
close: () => Promise<void>;
}

export interface Plugin {
export interface Plugin extends Client {
setLogger: (logger: Logger) => void;
name: () => string;
version: () => string;
write: (stream: Readable) => void;
read: (stream: Writable) => void;
setLogger: (logger: Logger) => void;
sync: (options: SyncOptions) => void;
tables: (options: TableOptions) => Promise<string[]>;
init: (spec: string, options: NewClientOptions) => Promise<void>;
close: () => Promise<void>;
}

export const newUnimplementedSourceClient = (): SourceClient => {
export const newUnimplementedSource = (): SourceClient => {
return {
close: () => Promise.reject(new Error('unimplemented')),
tables: () => Promise.reject(new Error('unimplemented')),
sync: () => Promise.reject(new Error('unimplemented')),
};
};

export const newUnimplementedDestinationClient = (): DestinationClient => {
export const newUnimplementedDestination = (): DestinationClient => {
return {
close: () => Promise.reject(new Error('unimplemented')),
read: () => Promise.reject(new Error('unimplemented')),
write: () => Promise.reject(new Error('unimplemented')),
};
};

export const newUnimplementedClient: NewClientFunction = (logger: Logger, spec: string, options: NewClientOptions) => {
return Promise.resolve({
...newUnimplementedSourceClient(),
...newUnimplementedDestinationClient(),
});
};

export const newPlugin = (name: string, version: string, newClient: NewClientFunction): Plugin => {
const plugin = {
client: undefined as Client | undefined,
logger: undefined as Logger | undefined,
name: () => name,
version: () => version,
write: (stream: Readable) => plugin.client?.write(stream) ?? new Error('client not initialized'),
read: (stream: Writable) => plugin.client?.read(stream) ?? new Error('client not initialized'),
write: (stream: WriteStream) => {
return plugin.client?.write(stream) ?? new Error('client not initialized');
},
read: (stream: ReadStream) => {
return plugin.client?.read(stream) ?? new Error('client not initialized');
},
setLogger: (logger: Logger) => {
plugin.logger = logger;
},
sync: (options: SyncOptions) => plugin.client?.sync(options) ?? new Error('client not initialized'),
tables: (options: TableOptions) =>
plugin.client?.tables(options) ?? Promise.reject(new Error('client not initialized')),
sync: (options: SyncOptions) => {
return plugin.client?.sync(options) ?? new Error('client not initialized');
},
tables: (options: TableOptions) => {
return plugin.client?.tables(options) ?? Promise.reject(new Error('client not initialized'));
},
init: async (spec: string, options: NewClientOptions) => {
plugin.client = await newClient(plugin.logger!, spec, options);
},
Expand Down
5 changes: 3 additions & 2 deletions src/plugin/serve.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import test from 'ava';

import { newPlugin, newUnimplementedClient } from './plugin.js';
import { newMemDBPlugin } from '../memdb/memdb.js';

import { createServeCommand, ServeArguments } from './serve.js';

const serve = createServeCommand(newPlugin('test', 'v1.0.0', newUnimplementedClient)).exitProcess(false);
const serve = createServeCommand(newMemDBPlugin()).exitProcess(false);

test('should return error without command', (t) => {
t.throws(() => serve.parse([]), { message: 'Specify a command to run' });
Expand Down
17 changes: 17 additions & 0 deletions src/scheduler/scheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { SyncStream, SyncResponse, MigrateTable } from '../grpc/plugin.js';
import { ClientMeta } from '../schema/meta.js';
import { Table } from '../schema/table.js';

export type Options = {
deterministicCQId: boolean;
};

export const sync = async (client: ClientMeta, tables: Table[], stream: SyncStream, options: Options) => {
for (const { name } of tables) {
const table = new TextEncoder().encode(name);
// eslint-disable-next-line @typescript-eslint/naming-convention
stream.write(new SyncResponse({ migrate_table: new MigrateTable({ table }) }));
}

return await Promise.resolve();
};
2 changes: 2 additions & 0 deletions src/schema/arrow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ export const METADATA_INCREMENTAL = 'cq:extension:incremental';
export const METADATA_TRUE = 'true';
export const METADATA_FALSE = 'false';
export const METADATA_TABLE_NAME = 'cq:table_name';
export const METADATA_TABLE_TITLE = 'cq:table_title';
export const METADATA_TABLE_DESCRIPTION = 'cq:table_description';
export const METADATA_TABLE_DEPENDS_ON = 'cq:table_depends_on';
Loading