如果我没记错的话,你总是在看最新的记录。这是通过
ShardIteratorType: 'Latest'
.
根据
documentation
上面写着
LATEST-在碎片中最近的记录之后开始读取,以便始终读取碎片中最新的数据。
这只应用于获取第一个迭代器,然后需要获取下一个迭代器,该迭代器的起始位置与最后一个迭代器的结束位置完全相同。
因此,您可以使用
NextShardIterator
的
GetIterator
如果存在,请要求跟踪comping记录。看见
doc
.
目前,您在每个间隔之后都会丢弃迭代器,并从最末尾再次开始。
实例
我拿走了你的密码
setInterval
仅重复
getRecords
请求下一个迭代器
function getRecord() {
kinesis.describeStream({ StreamName: 'test-stream1'}, function ( err, streamData ) {
if ( err ) {
console.log( err, err.stack ); // an error occurred
} else {
// console.log( streamData ); // successful response
streamData.StreamDescription.Shards.forEach( shard => {
kinesis.getShardIterator({
ShardId: shard.ShardId,
ShardIteratorType: 'LATEST',
StreamName: 'test-stream1'
}, function ( err, shardIteratordata ) {
if ( err ) {
console.log( err, err.stack ); // an error occurred
} else {
var shardIterator = shardIteratordata.ShardIterator;
setInterval(function() {
kinesis.getRecords({ ShardIterator: shardIterator }, function ( err, recordsData ) {
if ( err ) {
console.log( err, err.stack ); // an error occurred
} else {
// console.log( JSON.stringify( recordsData ) ); // successful response
recordsData.Records.forEach(record => {
console.log( record.Data.toString(), shard.ShardId );
});
shardIterator = iterator = recordsData.NextShardIterator;
}
});
}, 1000 * 1 );
}
});
});
}
});
}