From bdc8773ec0ef43e6098f16aff3d66f5199f8edb6 Mon Sep 17 00:00:00 2001 From: Kemal Hadimli Date: Thu, 10 Aug 2023 14:02:51 +0100 Subject: [PATCH] feat: MemDB read --- src/grpc/plugin.ts | 17 +++++------------ src/memdb/memdb.ts | 38 ++++++++++++++++++++++++++++---------- 2 files changed, 33 insertions(+), 22 deletions(-) diff --git a/src/grpc/plugin.ts b/src/grpc/plugin.ts index f5ba0d5..810ebef 100644 --- a/src/grpc/plugin.ts +++ b/src/grpc/plugin.ts @@ -5,25 +5,18 @@ import { Plugin } from '../plugin/plugin.js'; import { encodeTables } from '../schema/table.js'; export class MigrateTable extends pluginV3.cloudquery.plugin.v3.Sync.MessageMigrateTable {} +export class SyncRequest extends pluginV3.cloudquery.plugin.v3.Sync.Request {} export class SyncResponse extends pluginV3.cloudquery.plugin.v3.Sync.Response {} +export class ReadRequest extends pluginV3.cloudquery.plugin.v3.Read.Request {} export class ReadResponse extends pluginV3.cloudquery.plugin.v3.Read.Response {} export class WriteRequest extends pluginV3.cloudquery.plugin.v3.Write.Request {} export class WriteResponse extends pluginV3.cloudquery.plugin.v3.Write.Response {} -export type SyncStream = grpc.ServerWritableStream< - pluginV3.cloudquery.plugin.v3.Sync.Request, - pluginV3.cloudquery.plugin.v3.Sync.Response ->; +export type SyncStream = grpc.ServerWritableStream; -export type ReadStream = grpc.ServerWritableStream< - pluginV3.cloudquery.plugin.v3.Read.Request, - pluginV3.cloudquery.plugin.v3.Read.Response ->; +export type ReadStream = grpc.ServerWritableStream; -export type WriteStream = grpc.ServerReadableStream< - pluginV3.cloudquery.plugin.v3.Write.Request, - pluginV3.cloudquery.plugin.v3.Write.Response ->; +export type WriteStream = grpc.ServerReadableStream; export class PluginServer extends pluginV3.cloudquery.plugin.v3.UnimplementedPluginService { // Needed due to some TypeScript nonsense diff --git a/src/memdb/memdb.ts b/src/memdb/memdb.ts index 8bc3326..de89a41 100644 --- a/src/memdb/memdb.ts +++ b/src/memdb/memdb.ts @@ -1,15 +1,8 @@ import { StructRowProxy } from '@apache-arrow/esnext-esm'; import { pluginV3 } from '@cloudquery/plugin-pb-javascript'; -import { WriteRequest, WriteStream } from '../grpc/plugin.js'; -import { - Plugin, - newUnimplementedDestination, - newPlugin, - SyncOptions, - TableOptions, - NewClientOptions, -} from '../plugin/plugin.js'; +import { WriteRequest, WriteStream, ReadStream, ReadRequest } from '../grpc/plugin.js'; +import { Plugin, newPlugin, SyncOptions, TableOptions, NewClientOptions } from '../plugin/plugin.js'; import { sync } from '../scheduler/scheduler.js'; import { Table, createTable, filterTables, decodeTable, decodeRecord, getPrimaryKeys } from '../schema/table.js'; @@ -93,7 +86,6 @@ export const newMemDBPlugin = (): Plugin => { }; const pluginClient = { - ...newUnimplementedDestination(), init: (spec: string, options: NewClientOptions) => Promise.resolve(), close: () => Promise.resolve(), tables: (options: TableOptions) => { @@ -151,6 +143,32 @@ export const newMemDBPlugin = (): Plugin => { resolve(); }); + stream.on('error', (error) => { + reject(error); + }); + }); + }, + read(stream: ReadStream): Promise { + return new Promise((resolve, reject) => { + stream.on('data', (request: ReadRequest) => { + const table = decodeTable(request.table); + + try { + const rows = memoryDB[table.name] || []; + + // We iterate over records in reverse here because we don't set an expectation + // of ordering on plugins, and we want to make sure that the tests are not + // dependent on the order of insertion either. + for (let index = rows.length - 1; index >= 0; index--) { + stream.write(rows[index]); + } + stream.end(); + resolve(); + } catch (error) { + reject(error); + } + }); + stream.on('error', (error) => { reject(error); });