From 31fea9ede16f0bb44c2abb765dffb2dbf783030c Mon Sep 17 00:00:00 2001 From: Jan Hecking Date: Fri, 2 Jul 2021 18:24:31 +0800 Subject: [PATCH] Fetch all shards by paginating results --- index.js | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/index.js b/index.js index 6830249..47b1930 100644 --- a/index.js +++ b/index.js @@ -53,23 +53,32 @@ class DynamoDBStream extends EventEmitter { StreamArn: this._streamArn } - const { StreamDescription } = await this._ddbStreams.describeStream(params) - - const shards = StreamDescription.Shards const newShardIds = [] + let lastShardId = null + + do { + if (lastShardId) { + debug('lastShardId: %s', lastShardId) + params.ExclusiveStartShardId = lastShardId + } + const { StreamDescription } = await this._ddbStreams.describeStream(params) - // collect all the new shards of this stream - for (const newShardEntry of shards) { - const existingShardEntry = this._shards.get(newShardEntry.ShardId) + const shards = StreamDescription.Shards + lastShardId = StreamDescription.LastEvaluatedShardId - if (!existingShardEntry) { - this._shards.set(newShardEntry.ShardId, { - shardId: newShardEntry.ShardId - }) + // collect all the new shards of this stream + for (const newShardEntry of shards) { + const existingShardEntry = this._shards.get(newShardEntry.ShardId) - newShardIds.push(newShardEntry.ShardId) + if (!existingShardEntry) { + this._shards.set(newShardEntry.ShardId, { + shardId: newShardEntry.ShardId + }) + + newShardIds.push(newShardEntry.ShardId) + } } - } + } while (lastShardId) if (newShardIds.length > 0) { debug('Added %d new shards', newShardIds.length)