From 836f14ef2f58116c56633589d9da3d7809c9b7e8 Mon Sep 17 00:00:00 2001 From: erezrokah Date: Thu, 10 Aug 2023 20:53:03 +0200 Subject: [PATCH] feat: Add multiplexer, round-robin --- src/scheduler/scheduler.test.ts | 58 ++++++++++++++++++++++++ src/scheduler/scheduler.ts | 78 ++++++++++++++++++++++++++++++--- src/schema/table.ts | 2 +- 3 files changed, 132 insertions(+), 6 deletions(-) create mode 100644 src/scheduler/scheduler.test.ts diff --git a/src/scheduler/scheduler.test.ts b/src/scheduler/scheduler.test.ts new file mode 100644 index 0000000..564c08a --- /dev/null +++ b/src/scheduler/scheduler.test.ts @@ -0,0 +1,58 @@ +import test from 'ava'; + +import { createTable } from '../schema/table.js'; + +import { getRoundRobinTableClients } from './scheduler.js'; + +test('getRoundRobinTableClients', (t): void => { + const client = { id: () => 'client_0' }; + const tables = [ + createTable({ + name: 'table1', + multiplexer: (client) => { + return Array.from({ length: 2 }).map((_, index) => ({ + id: () => `client_${index}`, + })); + }, + }), + createTable({ + name: 'table2', + multiplexer: (client) => { + return Array.from({ length: 4 }).map((_, index) => ({ + id: () => `client_${index}`, + })); + }, + }), + createTable({ + name: 'table3', + multiplexer: (client) => { + return Array.from({ length: 1 }).map((_, index) => ({ + id: () => `client_${index}`, + })); + }, + }), + createTable({ + name: 'table4', + multiplexer: (client) => { + return []; + }, + }), + ]; + + const tableClients = getRoundRobinTableClients(tables, client); + t.is(tableClients.length, 7); + t.is(tableClients[0].table.name, 'table1'); + t.is(tableClients[0].client.id(), 'client_0'); + t.is(tableClients[1].table.name, 'table2'); + t.is(tableClients[1].client.id(), 'client_0'); + t.is(tableClients[2].table.name, 'table3'); + t.is(tableClients[2].client.id(), 'client_0'); + t.is(tableClients[3].table.name, 'table1'); + t.is(tableClients[3].client.id(), 'client_1'); + t.is(tableClients[4].table.name, 'table2'); + t.is(tableClients[4].client.id(), 'client_1'); + t.is(tableClients[5].table.name, 'table2'); + t.is(tableClients[5].client.id(), 'client_2'); + t.is(tableClients[6].table.name, 'table2'); + t.is(tableClients[6].client.id(), 'client_3'); +}); diff --git a/src/scheduler/scheduler.ts b/src/scheduler/scheduler.ts index eb5f5d2..49b2e38 100644 --- a/src/scheduler/scheduler.ts +++ b/src/scheduler/scheduler.ts @@ -18,8 +18,14 @@ export type Options = { stream: SyncStream; deterministicCQId: boolean; concurrency: number; + strategy?: Strategy; }; +export enum Strategy { + dfs = 'dfs', + roundRobin = 'round-robin', +} + class TableResolverStream extends Duplex { queue: unknown[] = []; @@ -100,19 +106,81 @@ const resolveTable = async ( syncStream.write(new SyncResponse({ insert: new Insert({ record: encodeResource(resource) }) })); - await Promise.all(table.relations.map((child) => resolveTable(logger, client, child, resource, syncStream))); + await pMap(table.relations, (child) => resolveTable(logger, client, child, resource, syncStream)); } }; -export const sync = async ({ logger, client, tables, stream: syncStream, concurrency }: Options) => { +const syncDfs = async ({ logger, client, tables, stream: syncStream, concurrency }: Omit) => { + const tableClients = tables.flatMap((table) => { + const clients = table.multiplexer(client); + return clients.map((client) => ({ table, client })); + }); + + await pMap(tableClients, ({ table, client }) => resolveTable(logger, client, table, null, syncStream), { + concurrency, + }); +}; + +export const getRoundRobinTableClients = (tables: Table[], client: ClientMeta) => { + let tablesWithClients = tables + .map((table) => ({ table, clients: table.multiplexer(client) })) + .filter(({ clients }) => clients.length > 0); + + const tableClients: { table: Table; client: ClientMeta }[] = []; + while (tablesWithClients.length > 0) { + for (const { table, clients } of tablesWithClients) { + tableClients.push({ table, client: clients.shift() as ClientMeta }); + } + tablesWithClients = tablesWithClients.filter(({ clients }) => clients.length > 0); + } + + return tableClients; +}; + +const syncRoundRobin = async ({ + logger, + client, + tables, + stream: syncStream, + concurrency, +}: Omit) => { + const tableClients = getRoundRobinTableClients(tables, client); + await pMap(tableClients, ({ table, client }) => resolveTable(logger, client, table, null, syncStream), { + concurrency, + }); +}; + +export const sync = async ({ + logger, + client, + tables, + stream, + concurrency, + strategy = Strategy.dfs, + deterministicCQId, +}: Options) => { for (const table of tables) { logger.info(`sending migrate message for table ${table.name}`); // eslint-disable-next-line @typescript-eslint/naming-convention - syncStream.write(new SyncResponse({ migrate_table: new MigrateTable({ table: encodeTable(table) }) })); + stream.write(new SyncResponse({ migrate_table: new MigrateTable({ table: encodeTable(table) }) })); } - await pMap(tables, (table) => resolveTable(logger, client, table, null, syncStream), { concurrency }); + switch (strategy) { + case Strategy.dfs: { + logger.debug(`using dfs strategy`); + await syncDfs({ logger, client, tables, stream, concurrency, deterministicCQId }); + break; + } + case Strategy.roundRobin: { + logger.debug(`using round-robin strategy`); + await syncRoundRobin({ logger, client, tables, stream, concurrency, deterministicCQId }); + break; + } + default: { + throw new Error(`unknown strategy ${strategy}`); + } + } - syncStream.end(); + stream.end(); return await Promise.resolve(); }; diff --git a/src/schema/table.ts b/src/schema/table.ts index 182ae17..3c63175 100644 --- a/src/schema/table.ts +++ b/src/schema/table.ts @@ -39,7 +39,7 @@ export const createTable = ({ relations = [], transform = () => {}, resolver = () => Promise.resolve(), - multiplexer = () => [], + multiplexer = (client) => [client], postResourceResolver = () => Promise.resolve(), preResourceResolver = () => Promise.resolve(), isIncremental = false,