Skip to content

Commit 288e97f

Browse files
committed
feat: Scaffold plugin server
1 parent 244f6a4 commit 288e97f

File tree

12 files changed

+313
-121
lines changed

12 files changed

+313
-121
lines changed

.eslintrc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"root": true,
33
"parser": "@typescript-eslint/parser",
4-
"plugins": ["@typescript-eslint", "prettier", "unicorn"],
4+
"plugins": ["@typescript-eslint", "prettier", "unicorn", "unused-imports"],
55
"parserOptions": {
66
"project": "./tsconfig.json"
77
},
@@ -19,6 +19,7 @@
1919
"plugin:you-dont-need-lodash-underscore/all"
2020
],
2121
"rules": {
22+
"unused-imports/no-unused-imports": "error",
2223
"no-console": "error",
2324
"@typescript-eslint/no-unused-vars": 0,
2425
"require-await": "off",

package-lock.json

Lines changed: 35 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
"eslint-plugin-prettier": "^5.0.0",
6060
"eslint-plugin-promise": "^6.1.1",
6161
"eslint-plugin-unicorn": "^48.0.1",
62+
"eslint-plugin-unused-imports": "^3.0.0",
6263
"eslint-plugin-you-dont-need-lodash-underscore": "^6.12.0",
6364
"prettier": "^3.0.1",
6465
"ts-node": "^10.9.1",
@@ -78,7 +79,7 @@
7879
},
7980
"dependencies": {
8081
"@apache-arrow/esnext-esm": "^12.0.1",
81-
"@cloudquery/plugin-pb-javascript": "^0.0.6",
82+
"@cloudquery/plugin-pb-javascript": "^0.0.7",
8283
"boolean": "^3.2.0",
8384
"winston": "^3.10.0",
8485
"yargs": "^17.7.2"

src/grpc/discovery.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { discovery1 } from '@cloudquery/plugin-pb-javascript';
22
import grpc = require('@grpc/grpc-js');
3-
import winston from 'winston';
43

54
const SUPPORTED_VERSIONS = [3];
65

src/grpc/plugin.ts

Lines changed: 73 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,25 @@
11
import { pluginV3 } from '@cloudquery/plugin-pb-javascript';
22
import grpc = require('@grpc/grpc-js');
3-
import winston from 'winston';
3+
4+
import { Plugin } from '../plugin/plugin.js';
45

56
export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPluginService {
7+
// Needed due to some TypeScript nonsense
8+
private plugin: Plugin & grpc.UntypedHandleCall;
9+
10+
constructor(plugin: Plugin) {
11+
super();
12+
this.plugin = plugin as Plugin & grpc.UntypedHandleCall;
13+
}
14+
615
GetName(
716
call: grpc.ServerUnaryCall<
817
pluginV3.cloudquery.plugin.v3.GetName.Request,
918
pluginV3.cloudquery.plugin.v3.GetName.Response
1019
>,
1120
callback: grpc.sendUnaryData<pluginV3.cloudquery.plugin.v3.GetName.Response>,
1221
): void {
13-
throw new Error('Method not implemented.');
22+
return callback(null, new pluginV3.cloudquery.plugin.v3.GetName.Response({ name: this.plugin.name() }));
1423
}
1524
GetVersion(
1625
call: grpc.ServerUnaryCall<
@@ -19,13 +28,27 @@ export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPlu
1928
>,
2029
callback: grpc.sendUnaryData<pluginV3.cloudquery.plugin.v3.GetVersion.Response>,
2130
): void {
22-
throw new Error('Method not implemented.');
31+
return callback(null, new pluginV3.cloudquery.plugin.v3.GetVersion.Response({ version: this.plugin.version() }));
2332
}
2433
Init(
2534
call: grpc.ServerUnaryCall<pluginV3.cloudquery.plugin.v3.Init.Request, pluginV3.cloudquery.plugin.v3.Init.Response>,
2635
callback: grpc.sendUnaryData<pluginV3.cloudquery.plugin.v3.Init.Response>,
2736
): void {
28-
throw new Error('Method not implemented.');
37+
const {
38+
request: { spec, no_connection: noConnection },
39+
} = call;
40+
41+
const stringSpec = new TextDecoder().decode(spec);
42+
this.plugin
43+
.init(stringSpec, { noConnection })
44+
.then(() => {
45+
// eslint-disable-next-line promise/no-callback-in-promise
46+
return callback(null, new pluginV3.cloudquery.plugin.v3.Init.Response());
47+
})
48+
.catch((error) => {
49+
// eslint-disable-next-line promise/no-callback-in-promise
50+
return callback(error, null);
51+
});
2952
}
3053
GetTables(
3154
call: grpc.ServerUnaryCall<
@@ -34,23 +57,54 @@ export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPlu
3457
>,
3558
callback: grpc.sendUnaryData<pluginV3.cloudquery.plugin.v3.GetTables.Response>,
3659
): void {
37-
throw new Error('Method not implemented.');
60+
const {
61+
request: { tables, skip_tables: skipTables, skip_dependent_tables: skipDependentTables },
62+
} = call;
63+
64+
this.plugin
65+
.tables({ tables, skipTables, skipDependentTables })
66+
.then((tables) => {
67+
const encodedTables = tables.map((table) => new TextEncoder().encode(table));
68+
// eslint-disable-next-line promise/no-callback-in-promise
69+
return callback(null, new pluginV3.cloudquery.plugin.v3.GetTables.Response({ tables: encodedTables }));
70+
})
71+
.catch((error) => {
72+
// eslint-disable-next-line promise/no-callback-in-promise
73+
return callback(error, null);
74+
});
3875
}
3976
Sync(
4077
call: grpc.ServerWritableStream<
4178
pluginV3.cloudquery.plugin.v3.Sync.Request,
4279
pluginV3.cloudquery.plugin.v3.Sync.Response
4380
>,
4481
): void {
45-
throw new Error('Method not implemented.');
82+
const {
83+
request: {
84+
tables,
85+
skip_tables: skipTables,
86+
skip_dependent_tables: skipDependentTables,
87+
deterministic_cq_id: deterministicCQId,
88+
backend: { connection, table_name: tableName },
89+
},
90+
} = call;
91+
92+
this.plugin.sync({
93+
tables,
94+
skipTables,
95+
skipDependentTables,
96+
deterministicCQId,
97+
backendOptions: { connection, tableName },
98+
stream: call,
99+
});
46100
}
47101
Read(
48102
call: grpc.ServerWritableStream<
49103
pluginV3.cloudquery.plugin.v3.Read.Request,
50104
pluginV3.cloudquery.plugin.v3.Read.Response
51105
>,
52106
): void {
53-
throw new Error('Method not implemented.');
107+
this.plugin.read(call);
54108
}
55109
Write(
56110
call: grpc.ServerReadableStream<
@@ -59,7 +113,8 @@ export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPlu
59113
>,
60114
callback: grpc.sendUnaryData<pluginV3.cloudquery.plugin.v3.Write.Response>,
61115
): void {
62-
throw new Error('Method not implemented.');
116+
this.plugin.write(call);
117+
callback(null, new pluginV3.cloudquery.plugin.v3.Write.Response());
63118
}
64119
Close(
65120
call: grpc.ServerUnaryCall<
@@ -68,6 +123,15 @@ export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPlu
68123
>,
69124
callback: grpc.sendUnaryData<pluginV3.cloudquery.plugin.v3.Close.Response>,
70125
): void {
71-
throw new Error('Method not implemented.');
126+
this.plugin
127+
.close()
128+
.then(() => {
129+
// eslint-disable-next-line promise/no-callback-in-promise
130+
return callback(null, new pluginV3.cloudquery.plugin.v3.Close.Response());
131+
})
132+
.catch((error) => {
133+
// eslint-disable-next-line promise/no-callback-in-promise
134+
return callback(error, null);
135+
});
72136
}
73137
}

src/grpc/server.test.ts

Lines changed: 0 additions & 8 deletions
This file was deleted.

src/grpc/server.ts

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,16 @@ import { pluginV3, discovery1 } from '@cloudquery/plugin-pb-javascript';
22
import grpc = require('@grpc/grpc-js');
33
import winston from 'winston';
44

5+
import { Plugin } from '../plugin/plugin.js';
6+
57
import { DiscoveryServer } from './discovery.js';
68
import { PluginServer } from './plugin.js';
79

8-
export const getServer = () => {
10+
export const startServer = (logger: winston.Logger, address: string, plugin: Plugin) => {
911
const server = new grpc.Server();
10-
server.addService(pluginV3.cloudquery.plugin.v3.UnimplementedPluginService.definition, new PluginServer());
12+
server.addService(pluginV3.cloudquery.plugin.v3.UnimplementedPluginService.definition, new PluginServer(plugin));
1113
server.addService(discovery1.cloudquery.discovery.v1.UnimplementedDiscoveryService.definition, new DiscoveryServer());
1214

13-
return server;
14-
};
15-
16-
export const startServer = (logger: winston.Logger, address: string) => {
17-
const server = getServer();
1815
server.bindAsync(address, grpc.ServerCredentials.createInsecure(), (error, port) => {
1916
if (error) {
2017
logger.error(error);

src/main.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1-
import { serve } from './serve/serve.js';
1+
import { newPlugin, newUnimplementedClient } from './plugin/plugin.js';
2+
import { createServeCommand } from './plugin/serve.js';
23

3-
serve.parse();
4+
createServeCommand(newPlugin('test', 'v1.0.0', newUnimplementedClient)).parse();

src/plugin/plugin.ts

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import { Readable, Writable } from 'node:stream';
2+
3+
import { Logger } from 'winston';
4+
5+
export type BackendOptions = {
6+
tableName: string;
7+
connection: string;
8+
};
9+
10+
export type TableOptions = {
11+
tables: string[];
12+
skipTables: string[];
13+
skipDependentTables: boolean;
14+
};
15+
16+
export type SyncOptions = {
17+
tables: string[];
18+
skipTables: string[];
19+
skipDependentTables: boolean;
20+
deterministicCQId: boolean;
21+
backendOptions: BackendOptions;
22+
stream: Writable;
23+
};
24+
25+
export type NewClientOptions = {
26+
noConnection: boolean;
27+
};
28+
29+
export type NewClientFunction = (logger: Logger, spec: string, options: NewClientOptions) => Promise<Client>;
30+
31+
export interface SourceClient {
32+
close: () => Promise<void>;
33+
tables: (options: TableOptions) => Promise<string[]>;
34+
sync: (options: SyncOptions) => void;
35+
}
36+
37+
export interface DestinationClient {
38+
close: () => Promise<void>;
39+
read: (stream: Writable) => void;
40+
write: (stream: Readable) => void;
41+
}
42+
43+
export interface Client extends SourceClient, DestinationClient {}
44+
45+
export interface Plugin {
46+
name: () => string;
47+
version: () => string;
48+
write: (stream: Readable) => void;
49+
read: (stream: Writable) => void;
50+
setLogger: (logger: Logger) => void;
51+
sync: (options: SyncOptions) => void;
52+
tables: (options: TableOptions) => Promise<string[]>;
53+
init: (spec: string, options: NewClientOptions) => Promise<void>;
54+
close: () => Promise<void>;
55+
}
56+
57+
export const newUnimplementedSourceClient = (): SourceClient => {
58+
return {
59+
close: () => Promise.reject(new Error('unimplemented')),
60+
tables: () => Promise.reject(new Error('unimplemented')),
61+
sync: () => Promise.reject(new Error('unimplemented')),
62+
};
63+
};
64+
65+
export const newUnimplementedDestinationClient = (): DestinationClient => {
66+
return {
67+
close: () => Promise.reject(new Error('unimplemented')),
68+
read: () => Promise.reject(new Error('unimplemented')),
69+
write: () => Promise.reject(new Error('unimplemented')),
70+
};
71+
};
72+
73+
export const newUnimplementedClient: NewClientFunction = (logger: Logger, spec: string, options: NewClientOptions) => {
74+
return Promise.resolve({
75+
...newUnimplementedSourceClient(),
76+
...newUnimplementedDestinationClient(),
77+
});
78+
};
79+
80+
export const newPlugin = (name: string, version: string, newClient: NewClientFunction): Plugin => {
81+
const plugin = {
82+
client: undefined as Client | undefined,
83+
logger: undefined as Logger | undefined,
84+
name: () => name,
85+
version: () => version,
86+
write: (stream: Readable) => plugin.client?.write(stream) ?? new Error('client not initialized'),
87+
read: (stream: Writable) => plugin.client?.read(stream) ?? new Error('client not initialized'),
88+
setLogger: (logger: Logger) => {
89+
plugin.logger = logger;
90+
},
91+
sync: (options: SyncOptions) => plugin.client?.sync(options) ?? new Error('client not initialized'),
92+
tables: (options: TableOptions) =>
93+
plugin.client?.tables(options) ?? Promise.reject(new Error('client not initialized')),
94+
init: async (spec: string, options: NewClientOptions) => {
95+
plugin.client = await newClient(plugin.logger!, spec, options);
96+
},
97+
close: () => plugin.client?.close() ?? Promise.reject(new Error('client not initialized')),
98+
};
99+
100+
return plugin;
101+
};

0 commit comments

Comments
 (0)