Skip to content

Commit 773a0e5

Browse files
authored
fix: Write gRPC call, use for await on write readble stream (#52)
1 parent dd71c60 commit 773a0e5

File tree

3 files changed

+43
-44
lines changed

3 files changed

+43
-44
lines changed

src/grpc/plugin.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,16 @@ export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPlu
111111
this.plugin.read(call);
112112
}
113113
Write(call: WriteStream, callback: grpc.sendUnaryData<pluginV3.cloudquery.plugin.v3.Write.Response>): void {
114-
this.plugin.write(call);
115-
callback(null, new pluginV3.cloudquery.plugin.v3.Write.Response());
114+
this.plugin
115+
.write(call)
116+
.then(() => {
117+
// eslint-disable-next-line promise/no-callback-in-promise
118+
return callback(null, new pluginV3.cloudquery.plugin.v3.Write.Response());
119+
})
120+
.catch((error) => {
121+
// eslint-disable-next-line promise/no-callback-in-promise
122+
return callback(error, null);
123+
});
116124
}
117125
Close(
118126
call: grpc.ServerUnaryCall<

src/memdb/write.ts

Lines changed: 31 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -11,54 +11,45 @@ export const createWrite = (
1111
overwrite: OverwriteFunction,
1212
deleteStale: DeleteStaleFunction,
1313
) => {
14-
return (stream: WriteStream): Promise<void> => {
15-
return new Promise((resolve, reject) => {
16-
stream.on('data', (request: WriteRequest) => {
17-
switch (request.message) {
18-
case 'migrate_table': {
19-
// Update table schema in the `tables` map
20-
const table = decodeTable(request.migrate_table.table);
21-
tables[table.name] = table;
22-
break;
23-
}
24-
25-
case 'insert': {
26-
const [tableName, batches] = decodeRecord(request.insert.record);
27-
28-
if (!memoryDB[tableName]) {
29-
memoryDB[tableName] = [];
30-
}
14+
return async (stream: WriteStream) => {
15+
for await (const data of stream) {
16+
const request = data as WriteRequest;
17+
switch (request.message) {
18+
case 'migrate_table': {
19+
// Update table schema in the `tables` map
20+
const table = decodeTable(request.migrate_table.table);
21+
tables[table.name] = table;
22+
break;
23+
}
3124

32-
const tableSchema = tables[tableName];
33-
const pks = getPrimaryKeys(tableSchema);
25+
case 'insert': {
26+
const [tableName, batches] = decodeRecord(request.insert.record);
3427

35-
for (const batch of batches) {
36-
//eslint-disable-next-line unicorn/no-array-for-each
37-
for (const record of batch) {
38-
overwrite(tableSchema, pks, record);
39-
}
40-
}
41-
break;
28+
if (!memoryDB[tableName]) {
29+
memoryDB[tableName] = [];
4230
}
4331

44-
case 'delete': {
45-
deleteStale(request.delete);
46-
break;
47-
}
32+
const tableSchema = tables[tableName];
33+
const pks = getPrimaryKeys(tableSchema);
4834

49-
default: {
50-
throw new Error(`Unknown request message type: ${request.message}`);
35+
for (const batch of batches) {
36+
//eslint-disable-next-line unicorn/no-array-for-each
37+
for (const record of batch) {
38+
overwrite(tableSchema, pks, record);
39+
}
5140
}
41+
break;
5242
}
53-
});
5443

55-
stream.on('finish', () => {
56-
resolve();
57-
});
44+
case 'delete': {
45+
deleteStale(request.delete);
46+
break;
47+
}
5848

59-
stream.on('error', (error) => {
60-
reject(error);
61-
});
62-
});
49+
default: {
50+
throw new Error(`Unknown request message type: ${request.message}`);
51+
}
52+
}
53+
}
6354
};
6455
};

src/plugin/plugin.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ export interface SourceClient {
3636

3737
export interface DestinationClient {
3838
read: (stream: ReadStream) => void;
39-
write: (stream: WriteStream) => void;
39+
write: (stream: WriteStream) => Promise<void>;
4040
}
4141

4242
export interface Client extends SourceClient, DestinationClient {
@@ -72,7 +72,7 @@ export const newPlugin = (name: string, version: string, newClient: NewClientFun
7272
name: () => name,
7373
version: () => version,
7474
write: (stream: WriteStream) => {
75-
return plugin.client?.write(stream) ?? new Error('client not initialized');
75+
return plugin.client?.write(stream) ?? Promise.reject(new Error('client not initialized'));
7676
},
7777
read: (stream: ReadStream) => {
7878
return plugin.client?.read(stream) ?? new Error('client not initialized');

0 commit comments

Comments
 (0)