diff --git a/.eslintrc b/.eslintrc index 87f12c7..cba2c3a 100644 --- a/.eslintrc +++ b/.eslintrc @@ -22,7 +22,6 @@ "unicorn/no-null": 0, "unused-imports/no-unused-imports": "error", "no-console": "error", - "@typescript-eslint/no-unused-vars": 0, "require-await": "off", "@typescript-eslint/require-await": "error", "@typescript-eslint/naming-convention": "error", diff --git a/src/arrow/index.test.ts b/src/arrow/index.test.ts deleted file mode 100644 index 256d21a..0000000 --- a/src/arrow/index.test.ts +++ /dev/null @@ -1,8 +0,0 @@ -import test from 'ava'; - -import { testArrow } from './index.js'; - -test('testArrow', (t) => { - const vectors = testArrow(); - t.not(vectors, undefined); -}); diff --git a/src/arrow/index.ts b/src/arrow/index.ts deleted file mode 100644 index 1ff71fb..0000000 --- a/src/arrow/index.ts +++ /dev/null @@ -1,19 +0,0 @@ -import { makeVector, vectorFromArray, Dictionary, Uint8, Utf8 } from '@apache-arrow/esnext-esm'; - -export const testArrow = () => { - const uft8Vector = vectorFromArray(['foo', 'bar', 'baz'], new Utf8()); - - const dictionaryVector1 = vectorFromArray(['foo', 'bar', 'baz', 'foo', 'bar']); - - const dictionaryVector2 = makeVector({ - data: [0, 1, 2, 0, 1], // indexes into the dictionary - dictionary: uft8Vector, - type: new Dictionary(new Utf8(), new Uint8()), - }); - - return { - uft8Vector, - dictionaryVector1, - dictionaryVector2, - }; -}; diff --git a/src/grpc/plugin.ts b/src/grpc/plugin.ts index 8fc4208..964318f 100644 --- a/src/grpc/plugin.ts +++ b/src/grpc/plugin.ts @@ -5,6 +5,7 @@ import { Plugin } from '../plugin/plugin.js'; import { encodeTables } from '../schema/table.js'; export class MigrateTable extends pluginV3.cloudquery.plugin.v3.Sync.MessageMigrateTable {} +export class DeleteStale extends pluginV3.cloudquery.plugin.v3.Write.MessageDeleteStale {} export class SyncRequest extends pluginV3.cloudquery.plugin.v3.Sync.Request {} export class Insert extends pluginV3.cloudquery.plugin.v3.Sync.MessageInsert {} export class SyncResponse extends pluginV3.cloudquery.plugin.v3.Sync.Response {} diff --git a/src/memdb/delete-stale.ts b/src/memdb/delete-stale.ts new file mode 100644 index 0000000..7b7b84a --- /dev/null +++ b/src/memdb/delete-stale.ts @@ -0,0 +1,32 @@ +import { DeleteStale } from '../grpc/plugin.js'; + +export type DeleteStaleFunction = (message: DeleteStale) => void; + +//eslint-disable-next-line @typescript-eslint/no-explicit-any +export const createDeleteStale = (memoryDB: Record) => { + return (message: DeleteStale) => { + const tableName = message.table_name; + + // Filter the table based on the provided criteria + const filteredTable = memoryDB[tableName].filter((row) => { + const sc = row.Schema(); + + const sourceColIndex = sc.FieldIndices('source_name_column'); + const syncColIndex = sc.FieldIndices('sync_time_column'); + + // Ensure both columns are present + if (sourceColIndex === undefined || syncColIndex === undefined) { + return true; // Keep the record if either column is missing + } + + const rowSourceName = row.Column(sourceColIndex).Value(0); + const rowSyncTime = row.Column(syncColIndex).Value(0); // Assuming it returns a Date object + + // If source names match and the record's sync time is not before the given sync time, keep the record + return rowSourceName === message.source_name && !rowSyncTime.before(message.sync_time); + }); + + // Update the memory database with the filtered table + memoryDB[tableName] = filteredTable; + }; +}; diff --git a/src/memdb/memdb.ts b/src/memdb/memdb.ts index 895eb2f..414e9af 100644 --- a/src/memdb/memdb.ts +++ b/src/memdb/memdb.ts @@ -1,13 +1,14 @@ -import { StructRowProxy, Utf8 } from '@apache-arrow/esnext-esm'; -import { pluginV3 } from '@cloudquery/plugin-pb-javascript'; import { default as Ajv } from 'ajv'; -import { WriteRequest, WriteStream, ReadStream, ReadRequest } from '../grpc/plugin.js'; import { Plugin, newPlugin, SyncOptions, TableOptions, NewClientFunction } from '../plugin/plugin.js'; import { sync } from '../scheduler/scheduler.js'; -import { createColumn } from '../schema/column.js'; -import { pathResolver } from '../schema/resolvers.js'; -import { Table, createTable, filterTables, decodeTable, decodeRecord, getPrimaryKeys } from '../schema/table.js'; +import { Table, filterTables } from '../schema/table.js'; + +import { createDeleteStale } from './delete-stale.js'; +import { createOverwrite } from './overwrite.js'; +import { createRead } from './read.js'; +import { createTables } from './tables.js'; +import { createWrite } from './write.js'; export const createMemDBClient = () => { //eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -39,100 +40,12 @@ export const newMemDBPlugin = (): Plugin => { const memoryDB = memdbClient.memoryDB; const tables = memdbClient.tables; - const allTables: Table[] = [ - createTable({ - name: 'table1', - title: 'Table 1', - description: 'Table 1 description', - resolver: (clientMeta, parent, stream) => { - stream.write({ id: 'table1-name1' }); - stream.write({ id: 'table1-name2' }); - return Promise.resolve(); - }, - columns: [ - createColumn({ - name: 'id', - type: new Utf8(), - resolver: pathResolver('id'), - }), - ], - }), - createTable({ - name: 'table2', - title: 'Table 2', - description: 'Table 2 description', - resolver: (clientMeta, parent, stream) => { - stream.write({ name: 'table2-name1' }); - stream.write({ name: 'table2-name2' }); - return Promise.resolve(); - }, - columns: [ - createColumn({ - name: 'name', - type: new Utf8(), - resolver: pathResolver('name'), - }), - ], - }), - ]; - - const memdb: { inserts: unknown[]; [key: string]: unknown } = { - inserts: [], - ...memoryDB, - }; - - //eslint-disable-next-line @typescript-eslint/no-explicit-any - const overwrite = (table: Table, primaryKeys: string[], record: StructRowProxy) => { - const tableData = memoryDB[table.name] || []; - - if (primaryKeys.length === 0) { - // If there are no primary keys, simply append the data - tableData.push(record); - memoryDB[table.name] = tableData; - return; - } - - // Otherwise, perform an upsert based on the primary keys - const recordIndex = tableData.findIndex((existingRecord) => { - return primaryKeys.every((key) => existingRecord[key] === record[key]); - }); - - if (recordIndex > -1) { - // If record exists, update (overwrite) it - tableData[recordIndex] = record; - } else { - // If record doesn't exist, insert it - tableData.push(record); - } - - memoryDB[table.name] = tableData; // Update the memoryDB with the modified table data - }; - - const deleteStale = (message: pluginV3.cloudquery.plugin.v3.Write.MessageDeleteStale): void => { - const tableName = message.table_name; - - // Filter the table based on the provided criteria - const filteredTable = memoryDB[tableName].filter((row) => { - const sc = row.Schema(); - - const sourceColIndex = sc.FieldIndices('source_name_column'); - const syncColIndex = sc.FieldIndices('sync_time_column'); - - // Ensure both columns are present - if (sourceColIndex === undefined || syncColIndex === undefined) { - return true; // Keep the record if either column is missing - } - - const rowSourceName = row.Column(sourceColIndex).Value(0); - const rowSyncTime = row.Column(syncColIndex).Value(0); // Assuming it returns a Date object + const overwrite = createOverwrite(memoryDB); + const deleteStale = createDeleteStale(memoryDB); + const write = createWrite(memoryDB, tables, overwrite, deleteStale); + const read = createRead(memoryDB); - // If source names match and the record's sync time is not before the given sync time, keep the record - return rowSourceName === message.source_name && !rowSyncTime.before(message.sync_time); - }); - - // Update the memory database with the filtered table - memoryDB[tableName] = filteredTable; - }; + const allTables = createTables(); const pluginClient = { plugin: null as unknown as Plugin, @@ -160,85 +73,11 @@ export const newMemDBPlugin = (): Plugin => { concurrency, }); }, - write(stream: WriteStream): Promise { - return new Promise((resolve, reject) => { - stream.on('data', (request: WriteRequest) => { - switch (request.message) { - case 'migrate_table': { - // Update table schema in the `tables` map - const table = decodeTable(request.migrate_table.table); - tables[table.name] = table; - break; - } - - case 'insert': { - const [tableName, batches] = decodeRecord(request.insert.record); - - if (!memoryDB[tableName]) { - memoryDB[tableName] = []; - } - - const tableSchema = tables[tableName]; - const pks = getPrimaryKeys(tableSchema); - - for (const batch of batches) { - //eslint-disable-next-line unicorn/no-array-for-each - for (const record of batch) { - overwrite(tableSchema, pks, record); - } - } - break; - } - - case 'delete': { - deleteStale(request.delete); - break; - } - - default: { - throw new Error(`Unknown request message type: ${request.message}`); - } - } - }); - - stream.on('finish', () => { - resolve(); - }); - - stream.on('error', (error) => { - reject(error); - }); - }); - }, - read(stream: ReadStream): Promise { - return new Promise((resolve, reject) => { - stream.on('data', (request: ReadRequest) => { - const table = decodeTable(request.table); - - try { - const rows = memoryDB[table.name] || []; - - // We iterate over records in reverse here because we don't set an expectation - // of ordering on plugins, and we want to make sure that the tests are not - // dependent on the order of insertion either. - for (let index = rows.length - 1; index >= 0; index--) { - stream.write(rows[index]); - } - stream.end(); - resolve(); - } catch (error) { - reject(error); - } - }); - - stream.on('error', (error) => { - reject(error); - }); - }); - }, + write, + read, }; - const newClient: NewClientFunction = (logger, spec, options) => { + const newClient: NewClientFunction = (logger, spec /* options */) => { const parsedSpec = JSON.parse(spec) as Partial; const validSchema = validate(parsedSpec); if (!validSchema) { diff --git a/src/memdb/overwrite.ts b/src/memdb/overwrite.ts new file mode 100644 index 0000000..fd32d12 --- /dev/null +++ b/src/memdb/overwrite.ts @@ -0,0 +1,36 @@ +import { StructRowProxy } from '@apache-arrow/esnext-esm'; + +import { Table } from '../schema/table.js'; + +//eslint-disable-next-line @typescript-eslint/no-explicit-any +export type OverwriteFunction = (table: Table, primaryKeys: string[], record: StructRowProxy) => void; + +//eslint-disable-next-line @typescript-eslint/no-explicit-any +export const createOverwrite = (memoryDB: Record): OverwriteFunction => { + //eslint-disable-next-line @typescript-eslint/no-explicit-any + return (table: Table, primaryKeys: string[], record: StructRowProxy) => { + const tableData = memoryDB[table.name] || []; + + if (primaryKeys.length === 0) { + // If there are no primary keys, simply append the data + tableData.push(record); + memoryDB[table.name] = tableData; + return; + } + + // Otherwise, perform an upsert based on the primary keys + const recordIndex = tableData.findIndex((existingRecord) => { + return primaryKeys.every((key) => existingRecord[key] === record[key]); + }); + + if (recordIndex > -1) { + // If record exists, update (overwrite) it + tableData[recordIndex] = record; + } else { + // If record doesn't exist, insert it + tableData.push(record); + } + + memoryDB[table.name] = tableData; // Update the memoryDB with the modified table data + }; +}; diff --git a/src/memdb/read.ts b/src/memdb/read.ts new file mode 100644 index 0000000..0a1b9e9 --- /dev/null +++ b/src/memdb/read.ts @@ -0,0 +1,32 @@ +import { ReadStream, ReadRequest } from '../grpc/plugin.js'; +import { decodeTable } from '../schema/table.js'; + +//eslint-disable-next-line @typescript-eslint/no-explicit-any +export const createRead = (memoryDB: Record) => { + return (stream: ReadStream): Promise => { + return new Promise((resolve, reject) => { + stream.on('data', (request: ReadRequest) => { + const table = decodeTable(request.table); + + try { + const rows = memoryDB[table.name] || []; + + // We iterate over records in reverse here because we don't set an expectation + // of ordering on plugins, and we want to make sure that the tests are not + // dependent on the order of insertion either. + for (let index = rows.length - 1; index >= 0; index--) { + stream.write(rows[index]); + } + stream.end(); + resolve(); + } catch (error) { + reject(error); + } + }); + + stream.on('error', (error) => { + reject(error); + }); + }); + }; +}; diff --git a/src/memdb/tables.ts b/src/memdb/tables.ts new file mode 100644 index 0000000..f1202f5 --- /dev/null +++ b/src/memdb/tables.ts @@ -0,0 +1,48 @@ +import { Utf8 } from '@apache-arrow/esnext-esm'; + +import { createColumn } from '../schema/column.js'; +import { addCQIDsColumns } from '../schema/meta.js'; +import { pathResolver } from '../schema/resolvers.js'; +import { createTable } from '../schema/table.js'; + +export const createTables = () => { + const allTables = [ + createTable({ + name: 'table1', + title: 'Table 1', + description: 'Table 1 description', + resolver: (clientMeta, parent, stream) => { + stream.write({ id: 'table1-name1' }); + stream.write({ id: 'table1-name2' }); + return Promise.resolve(); + }, + columns: [ + createColumn({ + name: 'id', + type: new Utf8(), + resolver: pathResolver('id'), + }), + ], + }), + createTable({ + name: 'table2', + title: 'Table 2', + description: 'Table 2 description', + resolver: (clientMeta, parent, stream) => { + stream.write({ name: 'table2-name1' }); + stream.write({ name: 'table2-name2' }); + return Promise.resolve(); + }, + columns: [ + createColumn({ + name: 'name', + type: new Utf8(), + resolver: pathResolver('name'), + }), + ], + }), + ]; + + const tableWithCQIDs = allTables.map((table) => addCQIDsColumns(table)); + return tableWithCQIDs; +}; diff --git a/src/memdb/write.ts b/src/memdb/write.ts new file mode 100644 index 0000000..6bd4366 --- /dev/null +++ b/src/memdb/write.ts @@ -0,0 +1,64 @@ +import { WriteStream, WriteRequest } from '../grpc/plugin.js'; +import { Table, decodeTable, decodeRecord, getPrimaryKeys } from '../schema/table.js'; + +import { DeleteStaleFunction } from './delete-stale.js'; +import { OverwriteFunction } from './overwrite.js'; + +export const createWrite = ( + //eslint-disable-next-line @typescript-eslint/no-explicit-any + memoryDB: Record, + tables: Record, + overwrite: OverwriteFunction, + deleteStale: DeleteStaleFunction, +) => { + return (stream: WriteStream): Promise => { + return new Promise((resolve, reject) => { + stream.on('data', (request: WriteRequest) => { + switch (request.message) { + case 'migrate_table': { + // Update table schema in the `tables` map + const table = decodeTable(request.migrate_table.table); + tables[table.name] = table; + break; + } + + case 'insert': { + const [tableName, batches] = decodeRecord(request.insert.record); + + if (!memoryDB[tableName]) { + memoryDB[tableName] = []; + } + + const tableSchema = tables[tableName]; + const pks = getPrimaryKeys(tableSchema); + + for (const batch of batches) { + //eslint-disable-next-line unicorn/no-array-for-each + for (const record of batch) { + overwrite(tableSchema, pks, record); + } + } + break; + } + + case 'delete': { + deleteStale(request.delete); + break; + } + + default: { + throw new Error(`Unknown request message type: ${request.message}`); + } + } + }); + + stream.on('finish', () => { + resolve(); + }); + + stream.on('error', (error) => { + reject(error); + }); + }); + }; +}; diff --git a/src/scheduler/scheduler.test.ts b/src/scheduler/scheduler.test.ts index 564c08a..7170924 100644 --- a/src/scheduler/scheduler.test.ts +++ b/src/scheduler/scheduler.test.ts @@ -9,7 +9,7 @@ test('getRoundRobinTableClients', (t): void => { const tables = [ createTable({ name: 'table1', - multiplexer: (client) => { + multiplexer: () => { return Array.from({ length: 2 }).map((_, index) => ({ id: () => `client_${index}`, })); @@ -17,7 +17,7 @@ test('getRoundRobinTableClients', (t): void => { }), createTable({ name: 'table2', - multiplexer: (client) => { + multiplexer: () => { return Array.from({ length: 4 }).map((_, index) => ({ id: () => `client_${index}`, })); @@ -25,7 +25,7 @@ test('getRoundRobinTableClients', (t): void => { }), createTable({ name: 'table3', - multiplexer: (client) => { + multiplexer: () => { return Array.from({ length: 1 }).map((_, index) => ({ id: () => `client_${index}`, })); @@ -33,7 +33,7 @@ test('getRoundRobinTableClients', (t): void => { }), createTable({ name: 'table4', - multiplexer: (client) => { + multiplexer: () => { return []; }, }), diff --git a/src/schema/meta.test.ts b/src/schema/meta.test.ts new file mode 100644 index 0000000..9db4cb3 --- /dev/null +++ b/src/schema/meta.test.ts @@ -0,0 +1,65 @@ +import test from 'ava'; + +import { createColumn } from './column.js'; +import { addCQIDsColumns, cqIDColumn, cqParentIDColumn } from './meta.js'; +import { createTable } from './table.js'; + +test('addCQIDsColumns', (t) => { + const table = createTable({ + name: 'table1', + columns: [ + createColumn({ name: 'column1' }), + createColumn({ name: 'column2' }), + createColumn({ name: 'column3' }), + createColumn({ name: 'column4' }), + ], + relations: [ + createTable({ + name: 'table1-child1', + columns: [createColumn({ name: 'column1' }), createColumn({ name: 'column2' })], + }), + createTable({ name: 'table1-child2', columns: [createColumn({ name: 'column1' })] }), + createTable({ name: 'table1-child3', columns: [createColumn({ name: 'column1' })] }), + createTable({ + name: 'table1-child4', + columns: [createColumn({ name: 'column1' })], + relations: [ + createTable({ + name: 'table1-child4-child1', + columns: [ + createColumn({ name: 'column1' }), + createColumn({ name: 'column2' }), + createColumn({ name: 'column3' }), + ], + }), + ], + }), + ], + }); + + const tableWithCQIDs = addCQIDsColumns(table); + + t.is(tableWithCQIDs.columns.length, 6); + t.is(tableWithCQIDs.columns[0], cqIDColumn); + t.is(tableWithCQIDs.columns[1], cqParentIDColumn); + + t.is(tableWithCQIDs.relations[0].columns.length, 4); + t.is(tableWithCQIDs.relations[0].columns[0], cqIDColumn); + t.is(tableWithCQIDs.relations[0].columns[1], cqParentIDColumn); + + t.is(tableWithCQIDs.relations[1].columns.length, 3); + t.is(tableWithCQIDs.relations[1].columns[0], cqIDColumn); + t.is(tableWithCQIDs.relations[1].columns[1], cqParentIDColumn); + + t.is(tableWithCQIDs.relations[2].columns.length, 3); + t.is(tableWithCQIDs.relations[2].columns[0], cqIDColumn); + t.is(tableWithCQIDs.relations[2].columns[1], cqParentIDColumn); + + t.is(tableWithCQIDs.relations[3].columns.length, 3); + t.is(tableWithCQIDs.relations[3].columns[0], cqIDColumn); + t.is(tableWithCQIDs.relations[3].columns[1], cqParentIDColumn); + + t.is(tableWithCQIDs.relations[3].relations[0].columns.length, 5); + t.is(tableWithCQIDs.relations[3].relations[0].columns[0], cqIDColumn); + t.is(tableWithCQIDs.relations[3].relations[0].columns[1], cqParentIDColumn); +}); diff --git a/src/schema/meta.ts b/src/schema/meta.ts index e610073..d60962f 100644 --- a/src/schema/meta.ts +++ b/src/schema/meta.ts @@ -4,6 +4,7 @@ import { UUIDType } from '../types/uuid.js'; import { Column, createColumn, ColumnResolver } from './column.js'; import { Resource } from './resource.js'; +import { Table } from './table.js'; export type ClientMeta = { id: () => string; @@ -22,6 +23,7 @@ export const parentCqUUIDResolver = (): ColumnResolver => { }; }; +// These columns are managed and populated by the source plugins export const cqIDColumn = createColumn({ name: '_cq_id', type: new UUIDType(), @@ -36,17 +38,25 @@ export const cqParentIDColumn = createColumn({ resolver: parentCqUUIDResolver(), ignoreInTests: true, }); + +// These columns are managed and populated by the destination plugin export const cqSyncTimeColumn = createColumn({ name: '_cq_sync_time', type: new TimeNanosecond(), description: 'Internal CQ row of when sync was started (this will be the same for all rows in a single fetch)', - resolver: parentCqUUIDResolver(), ignoreInTests: true, }); export const cqSourceNameColumn = createColumn({ name: '_cq_source_name', type: new Binary(), description: 'Internal CQ row that references the source plugin name data was retrieved', - resolver: parentCqUUIDResolver(), ignoreInTests: true, }); + +export const addCQIDsColumns = (table: Table): Table => { + return { + ...table, + columns: [cqIDColumn, cqParentIDColumn, ...table.columns], + relations: table.relations.map((relation) => addCQIDsColumns(relation)), + }; +}; diff --git a/src/transformers/transform.test.ts b/src/transformers/transform.test.ts index 0e96c21..258dcc9 100644 --- a/src/transformers/transform.test.ts +++ b/src/transformers/transform.test.ts @@ -63,7 +63,7 @@ test('should parse object with custom types', (t) => { float: 1, }, { - getTypeFromValue: function (key: string, value: unknown): DataType | null | undefined { + getTypeFromValue: function (key: string): DataType | null | undefined { if (key === 'float') return new Float64(); return undefined; }, @@ -86,7 +86,7 @@ test('should parse object with custom types and allow skip columns in type trans skip: 'test', }, { - getTypeFromValue: function (key: string, value: unknown): DataType | null | undefined { + getTypeFromValue: function (key: string): DataType | null | undefined { return key === 'skip' ? null : undefined; }, }, diff --git a/src/types/json.ts b/src/types/json.ts index 3a001f9..0f48a3c 100644 --- a/src/types/json.ts +++ b/src/types/json.ts @@ -1,4 +1,4 @@ -import { DataType, Binary, Type } from '@apache-arrow/esnext-esm'; +import { DataType, Type } from '@apache-arrow/esnext-esm'; export class JSONType extends DataType { readonly extensionName: string = 'json'; @@ -14,7 +14,7 @@ export class JSONType extends DataType { return new TextEncoder().encode('json-serialized').buffer; } - static deserialize(storageType: Binary, serialized: ArrayBuffer): JSONType { + static deserialize(/*storageType: Binary, serialized: ArrayBuffer*/): JSONType { // Implement your deserialization logic here. return new JSONType(); } diff --git a/src/types/uuid.ts b/src/types/uuid.ts index 923f10d..ba58a19 100644 --- a/src/types/uuid.ts +++ b/src/types/uuid.ts @@ -1,4 +1,4 @@ -import { DataType, Binary, Type } from '@apache-arrow/esnext-esm'; +import { DataType, Type } from '@apache-arrow/esnext-esm'; export class UUIDType extends DataType { readonly extensionName: string = 'uuid'; @@ -15,7 +15,7 @@ export class UUIDType extends DataType { return new TextEncoder().encode('uuid-serialized').buffer; } - static deserialize(storageType: Binary, serialized: ArrayBuffer): UUIDType { + static deserialize(/*storageType: Binary, serialized: ArrayBuffer*/): UUIDType { // Implement your deserialization logic here. return new UUIDType(); }