Skip to content

Commit 00a842a

Browse files
authored
feat: Add multiplexer, round-robin (#48)
1 parent 4a5f9e8 commit 00a842a

File tree

3 files changed

+132
-6
lines changed

3 files changed

+132
-6
lines changed

src/scheduler/scheduler.test.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import test from 'ava';
2+
3+
import { createTable } from '../schema/table.js';
4+
5+
import { getRoundRobinTableClients } from './scheduler.js';
6+
7+
test('getRoundRobinTableClients', (t): void => {
8+
const client = { id: () => 'client_0' };
9+
const tables = [
10+
createTable({
11+
name: 'table1',
12+
multiplexer: (client) => {
13+
return Array.from({ length: 2 }).map((_, index) => ({
14+
id: () => `client_${index}`,
15+
}));
16+
},
17+
}),
18+
createTable({
19+
name: 'table2',
20+
multiplexer: (client) => {
21+
return Array.from({ length: 4 }).map((_, index) => ({
22+
id: () => `client_${index}`,
23+
}));
24+
},
25+
}),
26+
createTable({
27+
name: 'table3',
28+
multiplexer: (client) => {
29+
return Array.from({ length: 1 }).map((_, index) => ({
30+
id: () => `client_${index}`,
31+
}));
32+
},
33+
}),
34+
createTable({
35+
name: 'table4',
36+
multiplexer: (client) => {
37+
return [];
38+
},
39+
}),
40+
];
41+
42+
const tableClients = getRoundRobinTableClients(tables, client);
43+
t.is(tableClients.length, 7);
44+
t.is(tableClients[0].table.name, 'table1');
45+
t.is(tableClients[0].client.id(), 'client_0');
46+
t.is(tableClients[1].table.name, 'table2');
47+
t.is(tableClients[1].client.id(), 'client_0');
48+
t.is(tableClients[2].table.name, 'table3');
49+
t.is(tableClients[2].client.id(), 'client_0');
50+
t.is(tableClients[3].table.name, 'table1');
51+
t.is(tableClients[3].client.id(), 'client_1');
52+
t.is(tableClients[4].table.name, 'table2');
53+
t.is(tableClients[4].client.id(), 'client_1');
54+
t.is(tableClients[5].table.name, 'table2');
55+
t.is(tableClients[5].client.id(), 'client_2');
56+
t.is(tableClients[6].table.name, 'table2');
57+
t.is(tableClients[6].client.id(), 'client_3');
58+
});

src/scheduler/scheduler.ts

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,14 @@ export type Options = {
1818
stream: SyncStream;
1919
deterministicCQId: boolean;
2020
concurrency: number;
21+
strategy?: Strategy;
2122
};
2223

24+
export enum Strategy {
25+
dfs = 'dfs',
26+
roundRobin = 'round-robin',
27+
}
28+
2329
class TableResolverStream extends Duplex {
2430
queue: unknown[] = [];
2531

@@ -100,19 +106,81 @@ const resolveTable = async (
100106

101107
syncStream.write(new SyncResponse({ insert: new Insert({ record: encodeResource(resource) }) }));
102108

103-
await Promise.all(table.relations.map((child) => resolveTable(logger, client, child, resource, syncStream)));
109+
await pMap(table.relations, (child) => resolveTable(logger, client, child, resource, syncStream));
104110
}
105111
};
106112

107-
export const sync = async ({ logger, client, tables, stream: syncStream, concurrency }: Options) => {
113+
const syncDfs = async ({ logger, client, tables, stream: syncStream, concurrency }: Omit<Options, 'strategy'>) => {
114+
const tableClients = tables.flatMap((table) => {
115+
const clients = table.multiplexer(client);
116+
return clients.map((client) => ({ table, client }));
117+
});
118+
119+
await pMap(tableClients, ({ table, client }) => resolveTable(logger, client, table, null, syncStream), {
120+
concurrency,
121+
});
122+
};
123+
124+
export const getRoundRobinTableClients = (tables: Table[], client: ClientMeta) => {
125+
let tablesWithClients = tables
126+
.map((table) => ({ table, clients: table.multiplexer(client) }))
127+
.filter(({ clients }) => clients.length > 0);
128+
129+
const tableClients: { table: Table; client: ClientMeta }[] = [];
130+
while (tablesWithClients.length > 0) {
131+
for (const { table, clients } of tablesWithClients) {
132+
tableClients.push({ table, client: clients.shift() as ClientMeta });
133+
}
134+
tablesWithClients = tablesWithClients.filter(({ clients }) => clients.length > 0);
135+
}
136+
137+
return tableClients;
138+
};
139+
140+
const syncRoundRobin = async ({
141+
logger,
142+
client,
143+
tables,
144+
stream: syncStream,
145+
concurrency,
146+
}: Omit<Options, 'strategy'>) => {
147+
const tableClients = getRoundRobinTableClients(tables, client);
148+
await pMap(tableClients, ({ table, client }) => resolveTable(logger, client, table, null, syncStream), {
149+
concurrency,
150+
});
151+
};
152+
153+
export const sync = async ({
154+
logger,
155+
client,
156+
tables,
157+
stream,
158+
concurrency,
159+
strategy = Strategy.dfs,
160+
deterministicCQId,
161+
}: Options) => {
108162
for (const table of tables) {
109163
logger.info(`sending migrate message for table ${table.name}`);
110164
// eslint-disable-next-line @typescript-eslint/naming-convention
111-
syncStream.write(new SyncResponse({ migrate_table: new MigrateTable({ table: encodeTable(table) }) }));
165+
stream.write(new SyncResponse({ migrate_table: new MigrateTable({ table: encodeTable(table) }) }));
112166
}
113167

114-
await pMap(tables, (table) => resolveTable(logger, client, table, null, syncStream), { concurrency });
168+
switch (strategy) {
169+
case Strategy.dfs: {
170+
logger.debug(`using dfs strategy`);
171+
await syncDfs({ logger, client, tables, stream, concurrency, deterministicCQId });
172+
break;
173+
}
174+
case Strategy.roundRobin: {
175+
logger.debug(`using round-robin strategy`);
176+
await syncRoundRobin({ logger, client, tables, stream, concurrency, deterministicCQId });
177+
break;
178+
}
179+
default: {
180+
throw new Error(`unknown strategy ${strategy}`);
181+
}
182+
}
115183

116-
syncStream.end();
184+
stream.end();
117185
return await Promise.resolve();
118186
};

src/schema/table.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ export const createTable = ({
3939
relations = [],
4040
transform = () => {},
4141
resolver = () => Promise.resolve(),
42-
multiplexer = () => [],
42+
multiplexer = (client) => [client],
4343
postResourceResolver = () => Promise.resolve(),
4444
preResourceResolver = () => Promise.resolve(),
4545
isIncremental = false,

0 commit comments

Comments
 (0)