代码之家  ›  专栏  ›  技术社区  ›  Mornor

无数据创建的节点-es索引

  •  0
  • Mornor  · 技术社区  · 6 年前

    我一直在研究一个AWS lambda函数,每当这个函数从CloudFront接收到日志时,它会对S3桶发送的通知作出反应。 我可以解压缩日志包并用 Cloudfront Parser .

    但是,我仍然无法将它们发送到ElasticSearch端点。我主要和 winston-elasticsearch s3-to-logstore 但他们两个都没用。

    这是我的代码:

    exports.handler = function(event, context, callback) {
        var srcBucket = event.Records[0].s3.bucket.name;
        var srcKey = event.Records[0].s3.object.key;
    
        async.waterfall([
            function fetchLogFromS3(next){
                console.log('Fetching compressed log from S3...');
                s3.getObject({
                   Bucket: srcBucket,
                   Key: srcKey
                },
                next);
            },
            function uncompressLog(response, next){
                console.log("Uncompressing log...");
                zlib.gunzip(response.Body, next);
            },
            function publishNotifications(jsonBuffer, next) {
                console.log('Filtering log...');
                var json = jsonBuffer.toString();
                console.log('CloudFront JSON from S3:', json);
    
                var records;
                CloudFrontParser.parse(json, { format: 'web' }, function (err, accesses) {
                  if(err){
                    console.log(err);
                  } else {
                    records = accesses;
                  }
                });
    
                // Here, how to send the parsed data? 
    
                console.log('CloudFront parsed:', records);
            }
        ], function (err) {
            if (err) {
                console.error('Failed to send data: ', err);
            } else {
                console.log('Successfully send data.');
            }
            callback(null,"message");
        });
    };
    

    有没有一种简单的方法可以将数据发送到ES?
    比如:

    var client = new elasticsearch.Client({
      host: process.env.ES_HOST,
      log: 'trace',
      keepAlive: false
    });
    
    client.index({
            index: 'cloudfront_index',
            type: 'log',
            body: records
            }, function(err, resp, status) {
            console.log(resp);
    });
    

    有效,但不发送数据:

    GET cloudfront_index/_search
    {
      "took": 0,
      "timed_out": false,
      "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": 0,
        "max_score": null,
        "hits": []
      }
    }
    
    1 回复  |  直到 6 年前
        1
  •  1
  •   Val    6 年前

    你就快到了。你需要使用 bulk method 为了实现你想要的:

    var client = new elasticsearch.Client({
      host: process.env.ES_HOST,
      log: 'trace',
      keepAlive: false
    });
    
    var bulk = [];
    records.forEach(function(record) {
        bulk.push({"index": {}})
        bulk.push(record);
    });
    client.bulk({
            index: 'cloudfront_index',
            type: 'log',
            body: bulk
    }, function(err, resp, status) {
            console.log(resp);
    });