|
| 1 | +const _ = require('lodash') |
| 2 | +const ion = require('ion-js') |
| 3 | +const AWS = require('aws-sdk') |
| 4 | +const QLDB = require('amazon-qldb-driver-nodejs') |
| 5 | +const logger = require('../../src/common/logger') |
| 6 | + |
| 7 | +// Create S3 service object |
| 8 | +const s3 = new AWS.S3({ |
| 9 | + apiVersion: '2006-03-01', |
| 10 | + region: process.env.AWS_REGION, |
| 11 | + accessKeyId: process.env.AWS_ACCESS_KEY_ID, |
| 12 | + secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY |
| 13 | +}) |
| 14 | + |
| 15 | +/** |
| 16 | + * Get the statement table. |
| 17 | + * @param {String} statement the execute statement |
| 18 | + */ |
| 19 | +function getTable (statement) { |
| 20 | + if (statement.startsWith('create index on') || statement.startsWith('CREATE INDEX ON')) { |
| 21 | + return statement.substring(16, statement.indexOf(' ', 16)) |
| 22 | + } |
| 23 | + if (statement.startsWith('create table')) { |
| 24 | + return statement.substring(13) |
| 25 | + } |
| 26 | +} |
| 27 | + |
| 28 | +/** |
| 29 | + * import data from s3 to qldb. |
| 30 | + */ |
| 31 | +async function main () { |
| 32 | + const qldbInstance = new QLDB.QldbDriver(process.env.QLDB_NAME, { |
| 33 | + region: process.env.AWS_REGION, |
| 34 | + accessKeyId: process.env.AWS_ACCESS_KEY_ID, |
| 35 | + secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY |
| 36 | + }) |
| 37 | + const tables = await qldbInstance.getTableNames() |
| 38 | + const bucket = process.env.BUCKET_NAME |
| 39 | + |
| 40 | + try { |
| 41 | + const list = await s3.listObjects({ Bucket: bucket }).promise() |
| 42 | + // get all data file key |
| 43 | + const keys = _(list.Contents).map('Key').filter(key => _.endsWith(key, '.ion')) |
| 44 | + .sortBy(key => _.parseInt(_.split(_.split(key, '.')[1], '-')[0])).value() |
| 45 | + for (const key of keys) { |
| 46 | + // get data of each file |
| 47 | + const data = await s3.getObject({ Bucket: bucket, Key: key }).promise() |
| 48 | + const contents = ion.loadAll(data.Body) |
| 49 | + for (const content of contents) { |
| 50 | + // get content of each transaction |
| 51 | + const statement = content.transactionInfo.statements.get(0).statement.stringValue() |
| 52 | + // process create statement |
| 53 | + if (statement.startsWith('create') || statement.startsWith('CREATE')) { |
| 54 | + if (!_.includes(tables, getTable(statement))) { |
| 55 | + logger.info(`start executing: ${statement}`) |
| 56 | + await qldbInstance.executeLambda(t => t.execute(statement)) |
| 57 | + } else { |
| 58 | + logger.info(`exists, skip ${statement}`) |
| 59 | + } |
| 60 | + } |
| 61 | + // process insert and update statement |
| 62 | + if (statement.startsWith('INSERT') || statement.startsWith('UPDATE')) { |
| 63 | + logger.info(`start executing: ${statement}`) |
| 64 | + try { |
| 65 | + await qldbInstance.executeLambda(t => t.execute(statement, content.revisions.get(0).data)) |
| 66 | + } catch (e) { |
| 67 | + logger.error(e) |
| 68 | + logger.warn(`execute ${statement} failed`) |
| 69 | + } |
| 70 | + } |
| 71 | + // process delete statement |
| 72 | + if (statement.startsWith('DELETE')) { |
| 73 | + logger.info(`start executing: ${statement}`) |
| 74 | + try { |
| 75 | + await qldbInstance.executeLambda(t => t.execute(statement)) |
| 76 | + } catch (e) { |
| 77 | + logger.error(e) |
| 78 | + logger.warn(`execute ${statement} failed`) |
| 79 | + } |
| 80 | + } |
| 81 | + } |
| 82 | + } |
| 83 | + } finally { |
| 84 | + qldbInstance.close() |
| 85 | + } |
| 86 | +} |
| 87 | + |
| 88 | +(async () => { |
| 89 | + main().catch(err => console.error(err)) |
| 90 | +})() |
0 commit comments