Skip to content

Commit dd71c60

Browse files
authored
feat: Add addCQIDsColumns util, split MemDB into files (#51)
1 parent 00a842a commit dd71c60

File tree

16 files changed

+315
-216
lines changed

16 files changed

+315
-216
lines changed

.eslintrc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
"unicorn/no-null": 0,
2323
"unused-imports/no-unused-imports": "error",
2424
"no-console": "error",
25-
"@typescript-eslint/no-unused-vars": 0,
2625
"require-await": "off",
2726
"@typescript-eslint/require-await": "error",
2827
"@typescript-eslint/naming-convention": "error",

src/arrow/index.test.ts

Lines changed: 0 additions & 8 deletions
This file was deleted.

src/arrow/index.ts

Lines changed: 0 additions & 19 deletions
This file was deleted.

src/grpc/plugin.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { Plugin } from '../plugin/plugin.js';
55
import { encodeTables } from '../schema/table.js';
66

77
export class MigrateTable extends pluginV3.cloudquery.plugin.v3.Sync.MessageMigrateTable {}
8+
export class DeleteStale extends pluginV3.cloudquery.plugin.v3.Write.MessageDeleteStale {}
89
export class SyncRequest extends pluginV3.cloudquery.plugin.v3.Sync.Request {}
910
export class Insert extends pluginV3.cloudquery.plugin.v3.Sync.MessageInsert {}
1011
export class SyncResponse extends pluginV3.cloudquery.plugin.v3.Sync.Response {}

src/memdb/delete-stale.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { DeleteStale } from '../grpc/plugin.js';
2+
3+
export type DeleteStaleFunction = (message: DeleteStale) => void;
4+
5+
//eslint-disable-next-line @typescript-eslint/no-explicit-any
6+
export const createDeleteStale = (memoryDB: Record<string, any[]>) => {
7+
return (message: DeleteStale) => {
8+
const tableName = message.table_name;
9+
10+
// Filter the table based on the provided criteria
11+
const filteredTable = memoryDB[tableName].filter((row) => {
12+
const sc = row.Schema();
13+
14+
const sourceColIndex = sc.FieldIndices('source_name_column');
15+
const syncColIndex = sc.FieldIndices('sync_time_column');
16+
17+
// Ensure both columns are present
18+
if (sourceColIndex === undefined || syncColIndex === undefined) {
19+
return true; // Keep the record if either column is missing
20+
}
21+
22+
const rowSourceName = row.Column(sourceColIndex).Value(0);
23+
const rowSyncTime = row.Column(syncColIndex).Value(0); // Assuming it returns a Date object
24+
25+
// If source names match and the record's sync time is not before the given sync time, keep the record
26+
return rowSourceName === message.source_name && !rowSyncTime.before(message.sync_time);
27+
});
28+
29+
// Update the memory database with the filtered table
30+
memoryDB[tableName] = filteredTable;
31+
};
32+
};

src/memdb/memdb.ts

Lines changed: 15 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1-
import { StructRowProxy, Utf8 } from '@apache-arrow/esnext-esm';
2-
import { pluginV3 } from '@cloudquery/plugin-pb-javascript';
31
import { default as Ajv } from 'ajv';
42

5-
import { WriteRequest, WriteStream, ReadStream, ReadRequest } from '../grpc/plugin.js';
63
import { Plugin, newPlugin, SyncOptions, TableOptions, NewClientFunction } from '../plugin/plugin.js';
74
import { sync } from '../scheduler/scheduler.js';
8-
import { createColumn } from '../schema/column.js';
9-
import { pathResolver } from '../schema/resolvers.js';
10-
import { Table, createTable, filterTables, decodeTable, decodeRecord, getPrimaryKeys } from '../schema/table.js';
5+
import { Table, filterTables } from '../schema/table.js';
6+
7+
import { createDeleteStale } from './delete-stale.js';
8+
import { createOverwrite } from './overwrite.js';
9+
import { createRead } from './read.js';
10+
import { createTables } from './tables.js';
11+
import { createWrite } from './write.js';
1112

1213
export const createMemDBClient = () => {
1314
//eslint-disable-next-line @typescript-eslint/no-explicit-any
@@ -39,100 +40,12 @@ export const newMemDBPlugin = (): Plugin => {
3940
const memoryDB = memdbClient.memoryDB;
4041
const tables = memdbClient.tables;
4142

42-
const allTables: Table[] = [
43-
createTable({
44-
name: 'table1',
45-
title: 'Table 1',
46-
description: 'Table 1 description',
47-
resolver: (clientMeta, parent, stream) => {
48-
stream.write({ id: 'table1-name1' });
49-
stream.write({ id: 'table1-name2' });
50-
return Promise.resolve();
51-
},
52-
columns: [
53-
createColumn({
54-
name: 'id',
55-
type: new Utf8(),
56-
resolver: pathResolver('id'),
57-
}),
58-
],
59-
}),
60-
createTable({
61-
name: 'table2',
62-
title: 'Table 2',
63-
description: 'Table 2 description',
64-
resolver: (clientMeta, parent, stream) => {
65-
stream.write({ name: 'table2-name1' });
66-
stream.write({ name: 'table2-name2' });
67-
return Promise.resolve();
68-
},
69-
columns: [
70-
createColumn({
71-
name: 'name',
72-
type: new Utf8(),
73-
resolver: pathResolver('name'),
74-
}),
75-
],
76-
}),
77-
];
78-
79-
const memdb: { inserts: unknown[]; [key: string]: unknown } = {
80-
inserts: [],
81-
...memoryDB,
82-
};
83-
84-
//eslint-disable-next-line @typescript-eslint/no-explicit-any
85-
const overwrite = (table: Table, primaryKeys: string[], record: StructRowProxy<any>) => {
86-
const tableData = memoryDB[table.name] || [];
87-
88-
if (primaryKeys.length === 0) {
89-
// If there are no primary keys, simply append the data
90-
tableData.push(record);
91-
memoryDB[table.name] = tableData;
92-
return;
93-
}
94-
95-
// Otherwise, perform an upsert based on the primary keys
96-
const recordIndex = tableData.findIndex((existingRecord) => {
97-
return primaryKeys.every((key) => existingRecord[key] === record[key]);
98-
});
99-
100-
if (recordIndex > -1) {
101-
// If record exists, update (overwrite) it
102-
tableData[recordIndex] = record;
103-
} else {
104-
// If record doesn't exist, insert it
105-
tableData.push(record);
106-
}
107-
108-
memoryDB[table.name] = tableData; // Update the memoryDB with the modified table data
109-
};
110-
111-
const deleteStale = (message: pluginV3.cloudquery.plugin.v3.Write.MessageDeleteStale): void => {
112-
const tableName = message.table_name;
113-
114-
// Filter the table based on the provided criteria
115-
const filteredTable = memoryDB[tableName].filter((row) => {
116-
const sc = row.Schema();
117-
118-
const sourceColIndex = sc.FieldIndices('source_name_column');
119-
const syncColIndex = sc.FieldIndices('sync_time_column');
120-
121-
// Ensure both columns are present
122-
if (sourceColIndex === undefined || syncColIndex === undefined) {
123-
return true; // Keep the record if either column is missing
124-
}
125-
126-
const rowSourceName = row.Column(sourceColIndex).Value(0);
127-
const rowSyncTime = row.Column(syncColIndex).Value(0); // Assuming it returns a Date object
43+
const overwrite = createOverwrite(memoryDB);
44+
const deleteStale = createDeleteStale(memoryDB);
45+
const write = createWrite(memoryDB, tables, overwrite, deleteStale);
46+
const read = createRead(memoryDB);
12847

129-
// If source names match and the record's sync time is not before the given sync time, keep the record
130-
return rowSourceName === message.source_name && !rowSyncTime.before(message.sync_time);
131-
});
132-
133-
// Update the memory database with the filtered table
134-
memoryDB[tableName] = filteredTable;
135-
};
48+
const allTables = createTables();
13649

13750
const pluginClient = {
13851
plugin: null as unknown as Plugin,
@@ -160,85 +73,11 @@ export const newMemDBPlugin = (): Plugin => {
16073
concurrency,
16174
});
16275
},
163-
write(stream: WriteStream): Promise<void> {
164-
return new Promise((resolve, reject) => {
165-
stream.on('data', (request: WriteRequest) => {
166-
switch (request.message) {
167-
case 'migrate_table': {
168-
// Update table schema in the `tables` map
169-
const table = decodeTable(request.migrate_table.table);
170-
tables[table.name] = table;
171-
break;
172-
}
173-
174-
case 'insert': {
175-
const [tableName, batches] = decodeRecord(request.insert.record);
176-
177-
if (!memoryDB[tableName]) {
178-
memoryDB[tableName] = [];
179-
}
180-
181-
const tableSchema = tables[tableName];
182-
const pks = getPrimaryKeys(tableSchema);
183-
184-
for (const batch of batches) {
185-
//eslint-disable-next-line unicorn/no-array-for-each
186-
for (const record of batch) {
187-
overwrite(tableSchema, pks, record);
188-
}
189-
}
190-
break;
191-
}
192-
193-
case 'delete': {
194-
deleteStale(request.delete);
195-
break;
196-
}
197-
198-
default: {
199-
throw new Error(`Unknown request message type: ${request.message}`);
200-
}
201-
}
202-
});
203-
204-
stream.on('finish', () => {
205-
resolve();
206-
});
207-
208-
stream.on('error', (error) => {
209-
reject(error);
210-
});
211-
});
212-
},
213-
read(stream: ReadStream): Promise<void> {
214-
return new Promise((resolve, reject) => {
215-
stream.on('data', (request: ReadRequest) => {
216-
const table = decodeTable(request.table);
217-
218-
try {
219-
const rows = memoryDB[table.name] || [];
220-
221-
// We iterate over records in reverse here because we don't set an expectation
222-
// of ordering on plugins, and we want to make sure that the tests are not
223-
// dependent on the order of insertion either.
224-
for (let index = rows.length - 1; index >= 0; index--) {
225-
stream.write(rows[index]);
226-
}
227-
stream.end();
228-
resolve();
229-
} catch (error) {
230-
reject(error);
231-
}
232-
});
233-
234-
stream.on('error', (error) => {
235-
reject(error);
236-
});
237-
});
238-
},
76+
write,
77+
read,
23978
};
24079

241-
const newClient: NewClientFunction = (logger, spec, options) => {
80+
const newClient: NewClientFunction = (logger, spec /* options */) => {
24281
const parsedSpec = JSON.parse(spec) as Partial<Spec>;
24382
const validSchema = validate(parsedSpec);
24483
if (!validSchema) {

src/memdb/overwrite.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { StructRowProxy } from '@apache-arrow/esnext-esm';
2+
3+
import { Table } from '../schema/table.js';
4+
5+
//eslint-disable-next-line @typescript-eslint/no-explicit-any
6+
export type OverwriteFunction = (table: Table, primaryKeys: string[], record: StructRowProxy<any>) => void;
7+
8+
//eslint-disable-next-line @typescript-eslint/no-explicit-any
9+
export const createOverwrite = (memoryDB: Record<string, any[]>): OverwriteFunction => {
10+
//eslint-disable-next-line @typescript-eslint/no-explicit-any
11+
return (table: Table, primaryKeys: string[], record: StructRowProxy<any>) => {
12+
const tableData = memoryDB[table.name] || [];
13+
14+
if (primaryKeys.length === 0) {
15+
// If there are no primary keys, simply append the data
16+
tableData.push(record);
17+
memoryDB[table.name] = tableData;
18+
return;
19+
}
20+
21+
// Otherwise, perform an upsert based on the primary keys
22+
const recordIndex = tableData.findIndex((existingRecord) => {
23+
return primaryKeys.every((key) => existingRecord[key] === record[key]);
24+
});
25+
26+
if (recordIndex > -1) {
27+
// If record exists, update (overwrite) it
28+
tableData[recordIndex] = record;
29+
} else {
30+
// If record doesn't exist, insert it
31+
tableData.push(record);
32+
}
33+
34+
memoryDB[table.name] = tableData; // Update the memoryDB with the modified table data
35+
};
36+
};

src/memdb/read.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { ReadStream, ReadRequest } from '../grpc/plugin.js';
2+
import { decodeTable } from '../schema/table.js';
3+
4+
//eslint-disable-next-line @typescript-eslint/no-explicit-any
5+
export const createRead = (memoryDB: Record<string, any[]>) => {
6+
return (stream: ReadStream): Promise<void> => {
7+
return new Promise((resolve, reject) => {
8+
stream.on('data', (request: ReadRequest) => {
9+
const table = decodeTable(request.table);
10+
11+
try {
12+
const rows = memoryDB[table.name] || [];
13+
14+
// We iterate over records in reverse here because we don't set an expectation
15+
// of ordering on plugins, and we want to make sure that the tests are not
16+
// dependent on the order of insertion either.
17+
for (let index = rows.length - 1; index >= 0; index--) {
18+
stream.write(rows[index]);
19+
}
20+
stream.end();
21+
resolve();
22+
} catch (error) {
23+
reject(error);
24+
}
25+
});
26+
27+
stream.on('error', (error) => {
28+
reject(error);
29+
});
30+
});
31+
};
32+
};

0 commit comments

Comments
 (0)