Skip to content

Commit f037753

Browse files
committed
feat: Add init sync, scheduler
1 parent c14f017 commit f037753

File tree

11 files changed

+320
-74
lines changed

11 files changed

+320
-74
lines changed

package-lock.json

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
"@apache-arrow/esnext-esm": "^12.0.1",
8383
"@cloudquery/plugin-pb-javascript": "^0.0.7",
8484
"boolean": "^3.2.0",
85+
"matcher": "^5.0.0",
8586
"winston": "^3.10.0",
8687
"yargs": "^17.7.2"
8788
}

src/grpc/plugin.ts

Lines changed: 36 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,27 @@ import { pluginV3 } from '@cloudquery/plugin-pb-javascript';
22
import grpc = require('@grpc/grpc-js');
33

44
import { Plugin } from '../plugin/plugin.js';
5+
import { encode as encodeTables } from '../schema/table.js';
6+
7+
export class MigrateTable extends pluginV3.cloudquery.plugin.v3.Sync.MessageMigrateTable {}
8+
export class SyncResponse extends pluginV3.cloudquery.plugin.v3.Sync.Response {}
9+
export class ReadResponse extends pluginV3.cloudquery.plugin.v3.Read.Response {}
10+
export class WriteResponse extends pluginV3.cloudquery.plugin.v3.Write.Response {}
11+
12+
export type SyncStream = grpc.ServerWritableStream<
13+
pluginV3.cloudquery.plugin.v3.Sync.Request,
14+
pluginV3.cloudquery.plugin.v3.Sync.Response
15+
>;
16+
17+
export type ReadStream = grpc.ServerWritableStream<
18+
pluginV3.cloudquery.plugin.v3.Read.Request,
19+
pluginV3.cloudquery.plugin.v3.Read.Response
20+
>;
21+
22+
export type WriteStream = grpc.ServerReadableStream<
23+
pluginV3.cloudquery.plugin.v3.Write.Request,
24+
pluginV3.cloudquery.plugin.v3.Write.Response
25+
>;
526

627
export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPluginService {
728
// Needed due to some TypeScript nonsense
@@ -34,9 +55,7 @@ export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPlu
3455
call: grpc.ServerUnaryCall<pluginV3.cloudquery.plugin.v3.Init.Request, pluginV3.cloudquery.plugin.v3.Init.Response>,
3556
callback: grpc.sendUnaryData<pluginV3.cloudquery.plugin.v3.Init.Response>,
3657
): void {
37-
const {
38-
request: { spec, no_connection: noConnection },
39-
} = call;
58+
const { spec = new Uint8Array(), no_connection: noConnection = false } = call.request.toObject();
4059

4160
const stringSpec = new TextDecoder().decode(spec);
4261
this.plugin
@@ -58,36 +77,30 @@ export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPlu
5877
callback: grpc.sendUnaryData<pluginV3.cloudquery.plugin.v3.GetTables.Response>,
5978
): void {
6079
const {
61-
request: { tables, skip_tables: skipTables, skip_dependent_tables: skipDependentTables },
62-
} = call;
80+
tables = [],
81+
skip_tables: skipTables = [],
82+
skip_dependent_tables: skipDependentTables = false,
83+
} = call.request.toObject();
6384

6485
this.plugin
6586
.tables({ tables, skipTables, skipDependentTables })
6687
.then((tables) => {
67-
const encodedTables = tables.map((table) => new TextEncoder().encode(table));
6888
// eslint-disable-next-line promise/no-callback-in-promise
69-
return callback(null, new pluginV3.cloudquery.plugin.v3.GetTables.Response({ tables: encodedTables }));
89+
return callback(null, new pluginV3.cloudquery.plugin.v3.GetTables.Response({ tables: encodeTables(tables) }));
7090
})
7191
.catch((error) => {
7292
// eslint-disable-next-line promise/no-callback-in-promise
7393
return callback(error, null);
7494
});
7595
}
76-
Sync(
77-
call: grpc.ServerWritableStream<
78-
pluginV3.cloudquery.plugin.v3.Sync.Request,
79-
pluginV3.cloudquery.plugin.v3.Sync.Response
80-
>,
81-
): void {
96+
Sync(call: SyncStream): void {
8297
const {
83-
request: {
84-
tables,
85-
skip_tables: skipTables,
86-
skip_dependent_tables: skipDependentTables,
87-
deterministic_cq_id: deterministicCQId,
88-
backend: { connection, table_name: tableName },
89-
},
90-
} = call;
98+
tables = [],
99+
skip_tables: skipTables = [],
100+
skip_dependent_tables: skipDependentTables = false,
101+
deterministic_cq_id: deterministicCQId = false,
102+
backend: { connection = '', table_name: tableName = '' } = {},
103+
} = call.request.toObject();
91104

92105
this.plugin.sync({
93106
tables,
@@ -98,21 +111,10 @@ export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPlu
98111
stream: call,
99112
});
100113
}
101-
Read(
102-
call: grpc.ServerWritableStream<
103-
pluginV3.cloudquery.plugin.v3.Read.Request,
104-
pluginV3.cloudquery.plugin.v3.Read.Response
105-
>,
106-
): void {
114+
Read(call: ReadStream): void {
107115
this.plugin.read(call);
108116
}
109-
Write(
110-
call: grpc.ServerReadableStream<
111-
pluginV3.cloudquery.plugin.v3.Write.Request,
112-
pluginV3.cloudquery.plugin.v3.Write.Response
113-
>,
114-
callback: grpc.sendUnaryData<pluginV3.cloudquery.plugin.v3.Write.Response>,
115-
): void {
117+
Write(call: WriteStream, callback: grpc.sendUnaryData<pluginV3.cloudquery.plugin.v3.Write.Response>): void {
116118
this.plugin.write(call);
117119
callback(null, new pluginV3.cloudquery.plugin.v3.Write.Response());
118120
}

src/main.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { newPlugin, newUnimplementedClient } from './plugin/plugin.js';
1+
import { newMemDBPlugin } from './memdb/memdb.js';
22
import { createServeCommand } from './plugin/serve.js';
33

4-
createServeCommand(newPlugin('test', 'v1.0.0', newUnimplementedClient)).parse();
4+
createServeCommand(newMemDBPlugin()).parse();

src/memdb/memdb.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import {
2+
Plugin,
3+
newUnimplementedDestination,
4+
newPlugin,
5+
SyncOptions,
6+
TableOptions,
7+
NewClientOptions,
8+
} from '../plugin/plugin.js';
9+
import { sync } from '../scheduler/scheduler.js';
10+
import { Table, createTable, filterTables } from '../schema/table.js';
11+
12+
export const createMemDBClient = () => {
13+
return { id: () => 'memdb' };
14+
};
15+
16+
export const newMemDBPlugin = (): Plugin => {
17+
const memdbClient = createMemDBClient();
18+
19+
const allTables: Table[] = [
20+
createTable({ name: 'table1', title: 'Table 1', description: 'Table 1 description' }),
21+
createTable({ name: 'table2', title: 'Table 2', description: 'Table 2 description' }),
22+
];
23+
24+
const pluginClient = {
25+
...newUnimplementedDestination(),
26+
init: (spec: string, options: NewClientOptions) => Promise.resolve(),
27+
close: () => Promise.resolve(),
28+
tables: (options: TableOptions) => {
29+
const { tables, skipTables, skipDependentTables } = options;
30+
const filtered = filterTables(allTables, tables, skipTables, skipDependentTables);
31+
return Promise.resolve(filtered);
32+
},
33+
sync: async (options: SyncOptions) => {
34+
const { stream, tables, skipTables, skipDependentTables, deterministicCQId } = options;
35+
const filtered = filterTables(allTables, tables, skipTables, skipDependentTables);
36+
return await sync(memdbClient, filtered, stream, { deterministicCQId });
37+
},
38+
};
39+
40+
return newPlugin('memdb', '0.0.1', () => Promise.resolve(pluginClient));
41+
};

src/plugin/plugin.ts

Lines changed: 27 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
import { Readable, Writable } from 'node:stream';
2-
31
import { Logger } from 'winston';
42

3+
import { SyncStream, ReadStream, WriteStream } from '../grpc/plugin.js';
4+
import { Table } from '../schema/table.js';
5+
56
export type BackendOptions = {
67
tableName: string;
78
connection: string;
@@ -19,7 +20,7 @@ export type SyncOptions = {
1920
skipDependentTables: boolean;
2021
deterministicCQId: boolean;
2122
backendOptions: BackendOptions;
22-
stream: Writable;
23+
stream: SyncStream;
2324
};
2425

2526
export type NewClientOptions = {
@@ -29,68 +30,61 @@ export type NewClientOptions = {
2930
export type NewClientFunction = (logger: Logger, spec: string, options: NewClientOptions) => Promise<Client>;
3031

3132
export interface SourceClient {
32-
close: () => Promise<void>;
33-
tables: (options: TableOptions) => Promise<string[]>;
33+
tables: (options: TableOptions) => Promise<Table[]>;
3434
sync: (options: SyncOptions) => void;
3535
}
3636

3737
export interface DestinationClient {
38-
close: () => Promise<void>;
39-
read: (stream: Writable) => void;
40-
write: (stream: Readable) => void;
38+
read: (stream: ReadStream) => void;
39+
write: (stream: WriteStream) => void;
4140
}
4241

43-
export interface Client extends SourceClient, DestinationClient {}
42+
export interface Client extends SourceClient, DestinationClient {
43+
init: (spec: string, options: NewClientOptions) => Promise<void>;
44+
close: () => Promise<void>;
45+
}
4446

45-
export interface Plugin {
47+
export interface Plugin extends Client {
48+
setLogger: (logger: Logger) => void;
4649
name: () => string;
4750
version: () => string;
48-
write: (stream: Readable) => void;
49-
read: (stream: Writable) => void;
50-
setLogger: (logger: Logger) => void;
51-
sync: (options: SyncOptions) => void;
52-
tables: (options: TableOptions) => Promise<string[]>;
53-
init: (spec: string, options: NewClientOptions) => Promise<void>;
54-
close: () => Promise<void>;
5551
}
5652

57-
export const newUnimplementedSourceClient = (): SourceClient => {
53+
export const newUnimplementedSource = (): SourceClient => {
5854
return {
59-
close: () => Promise.reject(new Error('unimplemented')),
6055
tables: () => Promise.reject(new Error('unimplemented')),
6156
sync: () => Promise.reject(new Error('unimplemented')),
6257
};
6358
};
6459

65-
export const newUnimplementedDestinationClient = (): DestinationClient => {
60+
export const newUnimplementedDestination = (): DestinationClient => {
6661
return {
67-
close: () => Promise.reject(new Error('unimplemented')),
6862
read: () => Promise.reject(new Error('unimplemented')),
6963
write: () => Promise.reject(new Error('unimplemented')),
7064
};
7165
};
7266

73-
export const newUnimplementedClient: NewClientFunction = (logger: Logger, spec: string, options: NewClientOptions) => {
74-
return Promise.resolve({
75-
...newUnimplementedSourceClient(),
76-
...newUnimplementedDestinationClient(),
77-
});
78-
};
79-
8067
export const newPlugin = (name: string, version: string, newClient: NewClientFunction): Plugin => {
8168
const plugin = {
8269
client: undefined as Client | undefined,
8370
logger: undefined as Logger | undefined,
8471
name: () => name,
8572
version: () => version,
86-
write: (stream: Readable) => plugin.client?.write(stream) ?? new Error('client not initialized'),
87-
read: (stream: Writable) => plugin.client?.read(stream) ?? new Error('client not initialized'),
73+
write: (stream: WriteStream) => {
74+
return plugin.client?.write(stream) ?? new Error('client not initialized');
75+
},
76+
read: (stream: ReadStream) => {
77+
return plugin.client?.read(stream) ?? new Error('client not initialized');
78+
},
8879
setLogger: (logger: Logger) => {
8980
plugin.logger = logger;
9081
},
91-
sync: (options: SyncOptions) => plugin.client?.sync(options) ?? new Error('client not initialized'),
92-
tables: (options: TableOptions) =>
93-
plugin.client?.tables(options) ?? Promise.reject(new Error('client not initialized')),
82+
sync: (options: SyncOptions) => {
83+
return plugin.client?.sync(options) ?? new Error('client not initialized');
84+
},
85+
tables: (options: TableOptions) => {
86+
return plugin.client?.tables(options) ?? Promise.reject(new Error('client not initialized'));
87+
},
9488
init: async (spec: string, options: NewClientOptions) => {
9589
plugin.client = await newClient(plugin.logger!, spec, options);
9690
},

src/plugin/serve.test.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import test from 'ava';
22

3-
import { newPlugin, newUnimplementedClient } from './plugin.js';
3+
import { newMemDBPlugin } from '../memdb/memdb.js';
4+
45
import { createServeCommand, ServeArguments } from './serve.js';
56

6-
const serve = createServeCommand(newPlugin('test', 'v1.0.0', newUnimplementedClient)).exitProcess(false);
7+
const serve = createServeCommand(newMemDBPlugin()).exitProcess(false);
78

89
test('should return error without command', (t) => {
910
t.throws(() => serve.parse([]), { message: 'Specify a command to run' });

src/scheduler/scheduler.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { SyncStream, SyncResponse, MigrateTable } from '../grpc/plugin.js';
2+
import { ClientMeta } from '../schema/meta.js';
3+
import { Table } from '../schema/table.js';
4+
5+
export type Options = {
6+
deterministicCQId: boolean;
7+
};
8+
9+
export const sync = async (client: ClientMeta, tables: Table[], stream: SyncStream, options: Options) => {
10+
for (const { name } of tables) {
11+
const table = new TextEncoder().encode(name);
12+
// eslint-disable-next-line @typescript-eslint/naming-convention
13+
stream.write(new SyncResponse({ migrate_table: new MigrateTable({ table }) }));
14+
}
15+
16+
return await Promise.resolve();
17+
};

src/schema/arrow.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,6 @@ export const METADATA_INCREMENTAL = 'cq:extension:incremental';
66
export const METADATA_TRUE = 'true';
77
export const METADATA_FALSE = 'false';
88
export const METADATA_TABLE_NAME = 'cq:table_name';
9+
export const METADATA_TABLE_TITLE = 'cq:table_title';
910
export const METADATA_TABLE_DESCRIPTION = 'cq:table_description';
11+
export const METADATA_TABLE_DEPENDS_ON = 'cq:table_depends_on';

0 commit comments

Comments
 (0)