代码之家  ›  专栏  ›  技术社区  ›  Suhail Gupta

运动流中的数据丢失。这可能是什么原因?

  •  1
  • Suhail Gupta  · 技术社区  · 6 年前

    我面临一个问题。我正在测试3个消费者和1个生产者。从制作者正在制作的所有按键中,消费者无法接收制作者发送的所有数据。这可能是什么原因?

    在下面的屏幕截图中,制片人发送了 a , b , c d 但仅限于 d 已收到。 enter image description here

    右下角是生产者,其他3个是收听同一流的消费者。正如我们所看到的,只有左下角的一个消费者收到了 d 其他数据已丢失。

    我正在测试的代码:

    生产商:

    var stdin = process.openStdin();
    
    function insert( input ) {
    
        var params = {
        Data: input,
        PartitionKey: 'users',
        StreamName: 'test-stream1'
        };
        kinesis.putRecord( params, function ( err, data ) {
        if ( err ) console.log( err, err.stack ); // an error occurred
        else console.log( data );           // successful response
        } );
    }
    
    
    
    stdin.addListener( "data", function ( d ) {
        // PRODUCING THE KEY STROKES
        // TYPED BY USER INPUT
        insert( d.toString().trim() );
    } );
    

    消费者:

        function getRecord() {
            kinesis.describeStream( {
            StreamName: 'test-stream1'
            }, function ( err, streamData ) {
            if ( err ) {
                console.log( err, err.stack ); // an error occurred
            } else {
                // console.log( streamData ); // successful response
                streamData.StreamDescription.Shards.forEach( shard => {
                kinesis.getShardIterator( {
                    ShardId: shard.ShardId,
                    ShardIteratorType: 'LATEST',
                    StreamName: 'test-stream1'
                }, function ( err, shardIteratordata ) {
                    if ( err ) {
                        // console.log( err, err.stack ); // an error occurred
                    } else {
                        //console.log(shardIteratordata); // successful response
                        kinesis.getRecords( {
                            ShardIterator: shardIteratordata.ShardIterator
                        }, function ( err, recordsData ) {
                            if ( err ) {
                                // console.log( err, err.stack ); // an error occurred
                            } else {
                                // console.log( JSON.stringify( recordsData ) ); // successful response
                                recordsData.Records.forEach( record => {
                                    console.log( record.Data.toString(), shard.ShardId );
                                } );
                            }
                        } );
                    }
                } );
                } );
            }
            } );
        }
    
        setInterval( getRecord, 1000 * 1 );
    

    我使用迭代器类型作为 LATEST 这样每个消费者都可以从生产者那里获得最新的数据。

    1 回复  |  直到 6 年前
        1
  •  1
  •   Fionn    6 年前

    如果我没记错的话,你总是在看最新的记录。这是通过 ShardIteratorType: 'Latest' . 根据 documentation 上面写着

    LATEST-在碎片中最近的记录之后开始读取,以便始终读取碎片中最新的数据。

    这只应用于获取第一个迭代器,然后需要获取下一个迭代器,该迭代器的起始位置与最后一个迭代器的结束位置完全相同。

    因此,您可以使用 NextShardIterator GetIterator 如果存在,请要求跟踪comping记录。看见 doc .

    目前,您在每个间隔之后都会丢弃迭代器,并从最末尾再次开始。

    实例

    我拿走了你的密码 setInterval 仅重复 getRecords 请求下一个迭代器

    function getRecord() {
      kinesis.describeStream({ StreamName: 'test-stream1'}, function ( err, streamData ) {
        if ( err ) {
          console.log( err, err.stack ); // an error occurred
        } else {
          // console.log( streamData ); // successful response
          streamData.StreamDescription.Shards.forEach( shard => {
            kinesis.getShardIterator({
              ShardId: shard.ShardId,
              ShardIteratorType: 'LATEST',
              StreamName: 'test-stream1'
            }, function ( err, shardIteratordata ) {
              if ( err ) {
                console.log( err, err.stack ); // an error occurred
              } else {
                var shardIterator = shardIteratordata.ShardIterator;
    
                setInterval(function() {
                  kinesis.getRecords({ ShardIterator: shardIterator }, function ( err, recordsData ) {
                    if ( err ) {
                      console.log( err, err.stack ); // an error occurred
                    } else {
                      // console.log( JSON.stringify( recordsData ) ); // successful response
                      recordsData.Records.forEach(record => {
                        console.log( record.Data.toString(), shard.ShardId );
                      });
                      shardIterator = iterator = recordsData.NextShardIterator;
                    }
                  });
                }, 1000 * 1 );
    
              }
            });
          });
        }
      });
    }
    
    推荐文章