Skip to content

feat: Add addCQIDsColumns util, split MemDB into files #51

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
"unicorn/no-null": 0,
"unused-imports/no-unused-imports": "error",
"no-console": "error",
"@typescript-eslint/no-unused-vars": 0,
"require-await": "off",
"@typescript-eslint/require-await": "error",
"@typescript-eslint/naming-convention": "error",
Expand Down
8 changes: 0 additions & 8 deletions src/arrow/index.test.ts

This file was deleted.

19 changes: 0 additions & 19 deletions src/arrow/index.ts

This file was deleted.

1 change: 1 addition & 0 deletions src/grpc/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Plugin } from '../plugin/plugin.js';
import { encodeTables } from '../schema/table.js';

export class MigrateTable extends pluginV3.cloudquery.plugin.v3.Sync.MessageMigrateTable {}
export class DeleteStale extends pluginV3.cloudquery.plugin.v3.Write.MessageDeleteStale {}
export class SyncRequest extends pluginV3.cloudquery.plugin.v3.Sync.Request {}
export class Insert extends pluginV3.cloudquery.plugin.v3.Sync.MessageInsert {}
export class SyncResponse extends pluginV3.cloudquery.plugin.v3.Sync.Response {}
Expand Down
32 changes: 32 additions & 0 deletions src/memdb/delete-stale.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { DeleteStale } from '../grpc/plugin.js';

export type DeleteStaleFunction = (message: DeleteStale) => void;

//eslint-disable-next-line @typescript-eslint/no-explicit-any
export const createDeleteStale = (memoryDB: Record<string, any[]>) => {
return (message: DeleteStale) => {
const tableName = message.table_name;

// Filter the table based on the provided criteria
const filteredTable = memoryDB[tableName].filter((row) => {
const sc = row.Schema();

const sourceColIndex = sc.FieldIndices('source_name_column');
const syncColIndex = sc.FieldIndices('sync_time_column');

// Ensure both columns are present
if (sourceColIndex === undefined || syncColIndex === undefined) {
return true; // Keep the record if either column is missing
}

const rowSourceName = row.Column(sourceColIndex).Value(0);
const rowSyncTime = row.Column(syncColIndex).Value(0); // Assuming it returns a Date object

// If source names match and the record's sync time is not before the given sync time, keep the record
return rowSourceName === message.source_name && !rowSyncTime.before(message.sync_time);
});

// Update the memory database with the filtered table
memoryDB[tableName] = filteredTable;
};
};
191 changes: 15 additions & 176 deletions src/memdb/memdb.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { StructRowProxy, Utf8 } from '@apache-arrow/esnext-esm';
import { pluginV3 } from '@cloudquery/plugin-pb-javascript';
import { default as Ajv } from 'ajv';

import { WriteRequest, WriteStream, ReadStream, ReadRequest } from '../grpc/plugin.js';
import { Plugin, newPlugin, SyncOptions, TableOptions, NewClientFunction } from '../plugin/plugin.js';
import { sync } from '../scheduler/scheduler.js';
import { createColumn } from '../schema/column.js';
import { pathResolver } from '../schema/resolvers.js';
import { Table, createTable, filterTables, decodeTable, decodeRecord, getPrimaryKeys } from '../schema/table.js';
import { Table, filterTables } from '../schema/table.js';

import { createDeleteStale } from './delete-stale.js';
import { createOverwrite } from './overwrite.js';
import { createRead } from './read.js';
import { createTables } from './tables.js';
import { createWrite } from './write.js';

export const createMemDBClient = () => {
//eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down Expand Up @@ -39,100 +40,12 @@ export const newMemDBPlugin = (): Plugin => {
const memoryDB = memdbClient.memoryDB;
const tables = memdbClient.tables;

const allTables: Table[] = [
createTable({
name: 'table1',
title: 'Table 1',
description: 'Table 1 description',
resolver: (clientMeta, parent, stream) => {
stream.write({ id: 'table1-name1' });
stream.write({ id: 'table1-name2' });
return Promise.resolve();
},
columns: [
createColumn({
name: 'id',
type: new Utf8(),
resolver: pathResolver('id'),
}),
],
}),
createTable({
name: 'table2',
title: 'Table 2',
description: 'Table 2 description',
resolver: (clientMeta, parent, stream) => {
stream.write({ name: 'table2-name1' });
stream.write({ name: 'table2-name2' });
return Promise.resolve();
},
columns: [
createColumn({
name: 'name',
type: new Utf8(),
resolver: pathResolver('name'),
}),
],
}),
];

const memdb: { inserts: unknown[]; [key: string]: unknown } = {
inserts: [],
...memoryDB,
};

//eslint-disable-next-line @typescript-eslint/no-explicit-any
const overwrite = (table: Table, primaryKeys: string[], record: StructRowProxy<any>) => {
const tableData = memoryDB[table.name] || [];

if (primaryKeys.length === 0) {
// If there are no primary keys, simply append the data
tableData.push(record);
memoryDB[table.name] = tableData;
return;
}

// Otherwise, perform an upsert based on the primary keys
const recordIndex = tableData.findIndex((existingRecord) => {
return primaryKeys.every((key) => existingRecord[key] === record[key]);
});

if (recordIndex > -1) {
// If record exists, update (overwrite) it
tableData[recordIndex] = record;
} else {
// If record doesn't exist, insert it
tableData.push(record);
}

memoryDB[table.name] = tableData; // Update the memoryDB with the modified table data
};

const deleteStale = (message: pluginV3.cloudquery.plugin.v3.Write.MessageDeleteStale): void => {
const tableName = message.table_name;

// Filter the table based on the provided criteria
const filteredTable = memoryDB[tableName].filter((row) => {
const sc = row.Schema();

const sourceColIndex = sc.FieldIndices('source_name_column');
const syncColIndex = sc.FieldIndices('sync_time_column');

// Ensure both columns are present
if (sourceColIndex === undefined || syncColIndex === undefined) {
return true; // Keep the record if either column is missing
}

const rowSourceName = row.Column(sourceColIndex).Value(0);
const rowSyncTime = row.Column(syncColIndex).Value(0); // Assuming it returns a Date object
const overwrite = createOverwrite(memoryDB);
const deleteStale = createDeleteStale(memoryDB);
const write = createWrite(memoryDB, tables, overwrite, deleteStale);
const read = createRead(memoryDB);

// If source names match and the record's sync time is not before the given sync time, keep the record
return rowSourceName === message.source_name && !rowSyncTime.before(message.sync_time);
});

// Update the memory database with the filtered table
memoryDB[tableName] = filteredTable;
};
const allTables = createTables();

const pluginClient = {
plugin: null as unknown as Plugin,
Expand Down Expand Up @@ -160,85 +73,11 @@ export const newMemDBPlugin = (): Plugin => {
concurrency,
});
},
write(stream: WriteStream): Promise<void> {
return new Promise((resolve, reject) => {
stream.on('data', (request: WriteRequest) => {
switch (request.message) {
case 'migrate_table': {
// Update table schema in the `tables` map
const table = decodeTable(request.migrate_table.table);
tables[table.name] = table;
break;
}

case 'insert': {
const [tableName, batches] = decodeRecord(request.insert.record);

if (!memoryDB[tableName]) {
memoryDB[tableName] = [];
}

const tableSchema = tables[tableName];
const pks = getPrimaryKeys(tableSchema);

for (const batch of batches) {
//eslint-disable-next-line unicorn/no-array-for-each
for (const record of batch) {
overwrite(tableSchema, pks, record);
}
}
break;
}

case 'delete': {
deleteStale(request.delete);
break;
}

default: {
throw new Error(`Unknown request message type: ${request.message}`);
}
}
});

stream.on('finish', () => {
resolve();
});

stream.on('error', (error) => {
reject(error);
});
});
},
read(stream: ReadStream): Promise<void> {
return new Promise((resolve, reject) => {
stream.on('data', (request: ReadRequest) => {
const table = decodeTable(request.table);

try {
const rows = memoryDB[table.name] || [];

// We iterate over records in reverse here because we don't set an expectation
// of ordering on plugins, and we want to make sure that the tests are not
// dependent on the order of insertion either.
for (let index = rows.length - 1; index >= 0; index--) {
stream.write(rows[index]);
}
stream.end();
resolve();
} catch (error) {
reject(error);
}
});

stream.on('error', (error) => {
reject(error);
});
});
},
write,
read,
};

const newClient: NewClientFunction = (logger, spec, options) => {
const newClient: NewClientFunction = (logger, spec /* options */) => {
const parsedSpec = JSON.parse(spec) as Partial<Spec>;
const validSchema = validate(parsedSpec);
if (!validSchema) {
Expand Down
36 changes: 36 additions & 0 deletions src/memdb/overwrite.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { StructRowProxy } from '@apache-arrow/esnext-esm';

import { Table } from '../schema/table.js';

//eslint-disable-next-line @typescript-eslint/no-explicit-any
export type OverwriteFunction = (table: Table, primaryKeys: string[], record: StructRowProxy<any>) => void;

//eslint-disable-next-line @typescript-eslint/no-explicit-any
export const createOverwrite = (memoryDB: Record<string, any[]>): OverwriteFunction => {
//eslint-disable-next-line @typescript-eslint/no-explicit-any
return (table: Table, primaryKeys: string[], record: StructRowProxy<any>) => {
const tableData = memoryDB[table.name] || [];

if (primaryKeys.length === 0) {
// If there are no primary keys, simply append the data
tableData.push(record);
memoryDB[table.name] = tableData;
return;
}

// Otherwise, perform an upsert based on the primary keys
const recordIndex = tableData.findIndex((existingRecord) => {
return primaryKeys.every((key) => existingRecord[key] === record[key]);
});

if (recordIndex > -1) {
// If record exists, update (overwrite) it
tableData[recordIndex] = record;
} else {
// If record doesn't exist, insert it
tableData.push(record);
}

memoryDB[table.name] = tableData; // Update the memoryDB with the modified table data
};
};
32 changes: 32 additions & 0 deletions src/memdb/read.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { ReadStream, ReadRequest } from '../grpc/plugin.js';
import { decodeTable } from '../schema/table.js';

//eslint-disable-next-line @typescript-eslint/no-explicit-any
export const createRead = (memoryDB: Record<string, any[]>) => {
return (stream: ReadStream): Promise<void> => {
return new Promise((resolve, reject) => {
stream.on('data', (request: ReadRequest) => {
const table = decodeTable(request.table);

try {
const rows = memoryDB[table.name] || [];

// We iterate over records in reverse here because we don't set an expectation
// of ordering on plugins, and we want to make sure that the tests are not
// dependent on the order of insertion either.
for (let index = rows.length - 1; index >= 0; index--) {
stream.write(rows[index]);
}
stream.end();
resolve();
} catch (error) {
reject(error);
}
});

stream.on('error', (error) => {
reject(error);
});
});
};
};
Loading