代码之家  ›  专栏  ›  技术社区  ›  hasherBaba

从MongoDB获取数据后,如何在将数据推入Kafka的同时加快速度?

  •  0
  • hasherBaba  · 技术社区  · 6 年前

    我正在从MongoDB获取数据并将其放入Kafka。357 responses/second是完成提取和发布的速率。

    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    import json
    import pymongo
    from pymongo import MongoClient
    import sys
    
    try:
      client = MongoClient('my_uri')
      db = client["xxx-dev"]
    except Exception as e:
        print e
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
    producer = KafkaProducer(retries=5)
    id = 1
    for response in db.Response.find():
        try:        
            future = producer.send('collect-production-response', bytes(response))
        except Exception as e:
            print e
        id  += 1
        if(id >= 100000):
            print "Done 100k"
            producer.flush()
            sys.exit()
    
    0 回复  |  直到 6 年前