Skip to content

Commit 8f21f52

Browse files
disqerezrokah
andauthored
feat: MemDB writes (#42)
Co-authored-by: Erez Rokah <erezrokah@users.noreply.github.com> Co-authored-by: Kemal Hadimli <disq@users.noreply.github.com>
1 parent f6413d2 commit 8f21f52

File tree

3 files changed

+155
-7
lines changed

3 files changed

+155
-7
lines changed

src/grpc/plugin.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { encodeTables } from '../schema/table.js';
77
export class MigrateTable extends pluginV3.cloudquery.plugin.v3.Sync.MessageMigrateTable {}
88
export class SyncResponse extends pluginV3.cloudquery.plugin.v3.Sync.Response {}
99
export class ReadResponse extends pluginV3.cloudquery.plugin.v3.Read.Response {}
10+
export class WriteRequest extends pluginV3.cloudquery.plugin.v3.Write.Request {}
1011
export class WriteResponse extends pluginV3.cloudquery.plugin.v3.Write.Response {}
1112

1213
export type SyncStream = grpc.ServerWritableStream<

src/memdb/memdb.ts

Lines changed: 123 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
import { StructRowProxy } from '@apache-arrow/esnext-esm';
2+
import { pluginV3 } from '@cloudquery/plugin-pb-javascript';
3+
4+
import { WriteRequest, WriteStream } from '../grpc/plugin.js';
15
import {
26
Plugin,
37
newUnimplementedDestination,
@@ -7,20 +11,87 @@ import {
711
NewClientOptions,
812
} from '../plugin/plugin.js';
913
import { sync } from '../scheduler/scheduler.js';
10-
import { Table, createTable, filterTables } from '../schema/table.js';
14+
import { Table, createTable, filterTables, decodeTable, decodeRecord, getPrimaryKeys } from '../schema/table.js';
1115

1216
export const createMemDBClient = () => {
13-
return { id: () => 'memdb' };
17+
//eslint-disable-next-line @typescript-eslint/no-explicit-any
18+
const memoryDB: Record<string, any[]> = {};
19+
const tables: Record<string, Table> = {};
20+
return {
21+
id: () => 'memdb',
22+
memoryDB,
23+
tables,
24+
};
1425
};
1526

1627
export const newMemDBPlugin = (): Plugin => {
1728
const memdbClient = createMemDBClient();
29+
const memoryDB = memdbClient.memoryDB;
30+
const tables = memdbClient.tables;
1831

1932
const allTables: Table[] = [
2033
createTable({ name: 'table1', title: 'Table 1', description: 'Table 1 description' }),
2134
createTable({ name: 'table2', title: 'Table 2', description: 'Table 2 description' }),
2235
];
2336

37+
const memdb: { inserts: unknown[]; [key: string]: unknown } = {
38+
inserts: [],
39+
...memoryDB,
40+
};
41+
42+
//eslint-disable-next-line @typescript-eslint/no-explicit-any
43+
const overwrite = (table: Table, primaryKeys: string[], record: StructRowProxy<any>) => {
44+
const tableData = memoryDB[table.name] || [];
45+
46+
if (primaryKeys.length === 0) {
47+
// If there are no primary keys, simply append the data
48+
tableData.push(record);
49+
memoryDB[table.name] = tableData;
50+
return;
51+
}
52+
53+
// Otherwise, perform an upsert based on the primary keys
54+
const recordIndex = tableData.findIndex((existingRecord) => {
55+
return primaryKeys.every((key) => existingRecord[key] === record[key]);
56+
});
57+
58+
if (recordIndex > -1) {
59+
// If record exists, update (overwrite) it
60+
tableData[recordIndex] = record;
61+
} else {
62+
// If record doesn't exist, insert it
63+
tableData.push(record);
64+
}
65+
66+
memoryDB[table.name] = tableData; // Update the memoryDB with the modified table data
67+
};
68+
69+
const deleteStale = (message: pluginV3.cloudquery.plugin.v3.Write.MessageDeleteStale): void => {
70+
const tableName = message.table_name;
71+
72+
// Filter the table based on the provided criteria
73+
const filteredTable = memoryDB[tableName].filter((row) => {
74+
const sc = row.Schema();
75+
76+
const sourceColIndex = sc.FieldIndices('source_name_column');
77+
const syncColIndex = sc.FieldIndices('sync_time_column');
78+
79+
// Ensure both columns are present
80+
if (sourceColIndex === undefined || syncColIndex === undefined) {
81+
return true; // Keep the record if either column is missing
82+
}
83+
84+
const rowSourceName = row.Column(sourceColIndex).Value(0);
85+
const rowSyncTime = row.Column(syncColIndex).Value(0); // Assuming it returns a Date object
86+
87+
// If source names match and the record's sync time is not before the given sync time, keep the record
88+
return rowSourceName === message.source_name && !rowSyncTime.before(message.sync_time);
89+
});
90+
91+
// Update the memory database with the filtered table
92+
memoryDB[tableName] = filteredTable;
93+
};
94+
2495
const pluginClient = {
2596
...newUnimplementedDestination(),
2697
init: (spec: string, options: NewClientOptions) => Promise.resolve(),
@@ -35,6 +106,56 @@ export const newMemDBPlugin = (): Plugin => {
35106
const filtered = filterTables(allTables, tables, skipTables, skipDependentTables);
36107
return await sync(memdbClient, filtered, stream, { deterministicCQId });
37108
},
109+
write(stream: WriteStream): Promise<void> {
110+
return new Promise((resolve, reject) => {
111+
stream.on('data', (request: WriteRequest) => {
112+
switch (request.message) {
113+
case 'migrate_table': {
114+
// Update table schema in the `tables` map
115+
const table = decodeTable(request.migrate_table.table);
116+
tables[table.name] = table;
117+
break;
118+
}
119+
120+
case 'insert': {
121+
const [tableName, batches] = decodeRecord(request.insert.record);
122+
123+
if (!memoryDB[tableName]) {
124+
memoryDB[tableName] = [];
125+
}
126+
127+
const tableSchema = tables[tableName];
128+
const pks = getPrimaryKeys(tableSchema);
129+
130+
for (const batch of batches) {
131+
//eslint-disable-next-line unicorn/no-array-for-each
132+
for (const record of batch) {
133+
overwrite(tableSchema, pks, record);
134+
}
135+
}
136+
break;
137+
}
138+
139+
case 'delete': {
140+
deleteStale(request.delete);
141+
break;
142+
}
143+
144+
default: {
145+
throw new Error(`Unknown request message type: ${request.message}`);
146+
}
147+
}
148+
});
149+
150+
stream.on('finish', () => {
151+
resolve();
152+
});
153+
154+
stream.on('error', (error) => {
155+
reject(error);
156+
});
157+
});
158+
},
38159
};
39160

40161
return newPlugin('memdb', '0.0.1', () => Promise.resolve(pluginClient));

src/schema/table.ts

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { Writable } from 'node:stream';
22

3-
import { Table as ArrowTable, tableToIPC, Schema } from '@apache-arrow/esnext-esm';
3+
import { Table as ArrowTable, tableFromIPC, tableToIPC, Schema, RecordBatch } from '@apache-arrow/esnext-esm';
44
import { isMatch } from 'matcher';
55

66
import * as arrow from './arrow.js';
7-
import { Column, toArrowField } from './column.js';
7+
import { Column, fromArrowField, toArrowField } from './column.js';
88
import { ClientMeta } from './meta.js';
99
import { Resource } from './resource.js';
1010
import { Nullable } from './types.js';
@@ -80,6 +80,10 @@ export const getTableByName = (tables: Table[], name: string): Table | undefined
8080
}
8181
};
8282

83+
export const getPrimaryKeys = (table: Table): string[] => {
84+
return table.columns.filter((column) => column.primaryKey).map((column) => column.name);
85+
};
86+
8387
export const flattenTables = (tables: Table[]): Table[] => {
8488
return tables.flatMap((table) => [table, ...flattenTables(table.relations.map((c) => ({ ...c, parent: table })))]);
8589
};
@@ -126,7 +130,7 @@ export const filterTables = (
126130
return withSkipDependant;
127131
};
128132

129-
export const toArrowSchema = (table: Table) => {
133+
export const toArrowSchema = (table: Table): Schema => {
130134
const metadata = new Map<string, string>();
131135
metadata.set(arrow.METADATA_TABLE_NAME, table.name);
132136
metadata.set(arrow.METADATA_TABLE_DESCRIPTION, table.description);
@@ -142,13 +146,35 @@ export const toArrowSchema = (table: Table) => {
142146
return new Schema(fields, metadata);
143147
};
144148

149+
export const fromArrowSchema = (schema: Schema): Table => {
150+
return createTable({
151+
name: schema.metadata.get(arrow.METADATA_TABLE_NAME) || '',
152+
title: schema.metadata.get(arrow.METADATA_TABLE_TITLE) || '',
153+
description: schema.metadata.get(arrow.METADATA_TABLE_DESCRIPTION) || '',
154+
pkConstraintName: schema.metadata.get(arrow.METADATA_CONSTRAINT_NAME) || '',
155+
isIncremental: schema.metadata.get(arrow.METADATA_INCREMENTAL) === arrow.METADATA_TRUE,
156+
// dependencies: schema.metadata.get(arrow.METADATA_TABLE_DEPENDS_ON) || '',
157+
columns: schema.fields.map((f) => fromArrowField(f)),
158+
});
159+
};
160+
145161
export const encodeTable = (table: Table): Uint8Array => {
146162
const schema = toArrowSchema(table);
147163
const arrowTable = new ArrowTable(schema);
148-
const bytes = tableToIPC(arrowTable);
149-
return bytes;
164+
return tableToIPC(arrowTable);
150165
};
151166

152167
export const encodeTables = (tables: Table[]): Uint8Array[] => {
153168
return tables.map((table) => encodeTable(table));
154169
};
170+
171+
export const decodeTable = (bytes: Uint8Array): Table => {
172+
const arrowTable = tableFromIPC(bytes);
173+
return fromArrowSchema(arrowTable.schema);
174+
};
175+
176+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
177+
export const decodeRecord = (bytes: Uint8Array): [string, RecordBatch<any>[]] => {
178+
const arrowTable = tableFromIPC(bytes);
179+
return [(arrowTable.schema.metadata.get(arrow.METADATA_TABLE_NAME) || '')!, arrowTable.batches];
180+
};

0 commit comments

Comments
 (0)