diff --git a/README.md b/README.md index fbe0ed7..1d8f26b 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ Example of [how to Migrate 1M items from MongoDB to Postgres in just a few minutes](https://youtu.be/EnK8-x8L9TY) using Node.js child process **First leave your star in the repo 🌟** -![Aumentando em 999x a velocidade de processamento de dados com Node](https://github.com/ErickWendel/parallelizing-nodejs-ops/assets/8060102/6974de93-7848-477a-9198-9d99dedc18f3) +![Aumentando em 999x a velocidade de processamento de dados com Node](./assets/benchmarking.png) ## Running diff --git a/assets/benchmarking.png b/assets/benchmarking.png new file mode 100644 index 0000000..1e0d9b2 Binary files /dev/null and b/assets/benchmarking.png differ diff --git a/package-lock.json b/package-lock.json index 8ad697b..e61eb40 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,6 +13,7 @@ "draftlog": "^1.0.13", "mongodb": "^6.5.0", "pg": "^8.11.5", + "pg-format": "^1.0.4", "sqlite3": "^5.1.7" }, "devDependencies": { @@ -1066,6 +1067,14 @@ "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.6.4.tgz", "integrity": "sha512-v+Z7W/0EO707aNMaAEfiGnGL9sxxumwLl2fJvCQtMn9Fxsg+lPpPkdcyBSv/KFgpGdYkMfn+EI1Or2EHjpgLCA==" }, + "node_modules/pg-format": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/pg-format/-/pg-format-1.0.4.tgz", + "integrity": "sha512-YyKEF78pEA6wwTAqOUaHIN/rWpfzzIuMh9KdAhc3rSLQ/7zkRFcCgYBAEGatDstLyZw4g0s9SNICmaTGnBVeyw==", + "engines": { + "node": ">=4.0" + } + }, "node_modules/pg-int8": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz", diff --git a/package.json b/package.json index f07edea..7fac7e4 100644 --- a/package.json +++ b/package.json @@ -24,6 +24,7 @@ "draftlog": "^1.0.13", "mongodb": "^6.5.0", "pg": "^8.11.5", + "pg-format": "^1.0.4", "sqlite3": "^5.1.7" } -} \ No newline at end of file +} diff --git a/src/background-task.js b/src/background-task.js index 0069dd1..8387b74 100644 --- a/src/background-task.js +++ b/src/background-task.js @@ -5,13 +5,11 @@ const db = await getPostgresConnection() process.on('message', (items) => { // console.log(` ${process.pid} received ${items.length} items`,); - for (const item of items) { - db.students.insert(item) - .then(() => { - process.send('item-done'); - }) - .catch((error) => { - console.error(error); - }); - } + db.students.insertMany(items) + .then(() => { + process.send(items.length); + }) + .catch((error) => { + console.error(error); + }); }); diff --git a/src/cluster.js b/src/cluster.js index ade430e..6c01088 100644 --- a/src/cluster.js +++ b/src/cluster.js @@ -26,7 +26,6 @@ function initializeCluster({ backgroundTaskFile, clusterSize, onMessage }) { }) child.on('message', (message) => { - if (message !== 'item-done') return onMessage(message) }) diff --git a/src/data-streaming.js b/src/data-streaming.js new file mode 100644 index 0000000..a66455e --- /dev/null +++ b/src/data-streaming.js @@ -0,0 +1,12 @@ +import { StreamCache } from './stram-cache.js' +import { getMongoConnection } from './db.js' + +const ITEMS_PER_PAGE = 4000 + +const mongoDB = await getMongoConnection() +const stream = mongoDB.students.find().stream() +const cache = new StreamCache(stream, ITEMS_PER_PAGE) + +cache.stream().on('data', (data) => { + process.send(JSON.parse(data)); +}); diff --git a/src/db.js b/src/db.js index aa37a0c..23f4073 100644 --- a/src/db.js +++ b/src/db.js @@ -1,5 +1,6 @@ import { MongoClient } from 'mongodb'; import pg from 'pg'; +import format from 'pg-format'; const { Client } = pg; // Connection URL for MongoDB @@ -47,6 +48,15 @@ async function getPostgresConnection() { await client.query(query, values); + }, + async insertMany(persons) { + const query = format( + 'INSERT INTO students (name, email, age, registered_at) VALUES %L', + persons.map((person) => [person.name, person.email, person.age, person.registered_at]) + ); + + await client.query(query); + }, async list(limit = 100) { const query = 'SELECT * FROM students LIMIT $1'; diff --git a/src/index.js b/src/index.js index e954588..1ea7814 100644 --- a/src/index.js +++ b/src/index.js @@ -1,27 +1,18 @@ import { initialize } from "./cluster.js" import { getMongoConnection, getPostgresConnection } from './db.js' import cliProgress from 'cli-progress' -import { setTimeout } from 'node:timers/promises' +import os from 'os'; + const mongoDB = await getMongoConnection() const postgresDB = await getPostgresConnection() -const ITEMS_PER_PAGE = 4000 -const CLUSTER_SIZE = 99 +// const ITEMS_PER_PAGE = 4000 +const CLUSTER_SIZE = os.cpus().length const TASK_FILE = new URL('./background-task.js', import.meta.url).pathname +const DATA_STREAMING_FILE = new URL('./data-streaming.js', import.meta.url).pathname // console.log(`there was ${await postgresDB.students.count()} items on Postgres, deleting all...`) await postgresDB.students.deleteAll() -async function* getAllPagedData(itemsPerPage, page = 0) { - - const data = mongoDB.students.find().skip(page).limit(itemsPerPage) - const items = await data.toArray() - if (!items.length) return - - yield items - - yield* getAllPagedData(itemsPerPage, page += itemsPerPage) -} - const total = await mongoDB.students.countDocuments() // console.log(`total items on DB: ${total}`) @@ -37,25 +28,29 @@ const cp = initialize( backgroundTaskFile: TASK_FILE, clusterSize: CLUSTER_SIZE, amountToBeProcessed: total, - async onMessage(message) { - progress.increment() + async onMessage(cumulativeProcessed) { + totalProcessed += cumulativeProcessed; + progress.update(totalProcessed); - if (++totalProcessed !== total) return + if (totalProcessed !== total) return // console.log(`all ${amountToBeProcessed} processed! Exiting...`) progress.stop() cp.killAll() - const insertedOnSQLite = await postgresDB.students.count() - console.log(`total on MongoDB ${total} and total on PostGres ${insertedOnSQLite}`) - console.log(`are the same? ${total === insertedOnSQLite ? 'yes' : 'no'}`) + const insertedOnSQLPostGres = await postgresDB.students.count() + console.log(`total on MongoDB ${total} and total on PostGres ${insertedOnSQLPostGres}`) + console.log(`are the same? ${total === insertedOnSQLPostGres ? 'yes' : 'no'}`) process.exit() - } } ) -await setTimeout(1000) - -for await (const data of getAllPagedData(ITEMS_PER_PAGE)) { - cp.sendToChild(data) -} +initialize( + { + backgroundTaskFile: DATA_STREAMING_FILE, + clusterSize: 1, + async onMessage(message) { + cp.sendToChild(message) + } + } +) diff --git a/src/stram-cache.js b/src/stram-cache.js new file mode 100644 index 0000000..027a089 --- /dev/null +++ b/src/stram-cache.js @@ -0,0 +1,33 @@ +import { Readable } from 'stream'; + +export class StreamCache { + constructor(inputStream, cacheThreshold = 4000) { + this.cacheStream = new Readable({ + read() { } + }); + this.cache = []; + this.cacheThreshold = cacheThreshold; + inputStream.on('data', this._addDataToCache); + inputStream.on('end', () => { + this._emitCache(); + this.cacheStream.emit('end'); + }); + } + + _addDataToCache = (data) => { + this.cache.push(data); + if (this.cache.length >= this.cacheThreshold) { + this._emitCache(); + } + } + + _emitCache() { + if (!this.cache.length) return + this.cacheStream.push(JSON.stringify(this.cache)); + this.cache = []; + } + + stream() { + return this.cacheStream; + } +}