代码之家  ›  专栏  ›  技术社区  ›  dportman

卡夫卡到熊猫的无火花数据帧

  •  5
  • dportman  · 技术社区  · 6 年前

    我正在阅读卡夫卡主题的流式数据,我想将其中的一些部分存储在一个数据框中。

    from confluent_kafka import Consumer, KafkaError
    
    c = Consumer({
        'bootstrap.servers': "###",
        'group.id': '###',
        'default.topic.config': {
    'auto.offset.reset': 'latest' }
    })
    
    c.subscribe(['scorestore'])
    
    while True:
        msg = c.poll(1.0)
    
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(msg.error())
                break
    
        print('Received message: {}'.format(msg.value().decode('utf-8')))
    
    c.close()
    

    收到的消息是json

    {
      "messageHeader" : {
        "messageId" : "4b604b33-7256-47b6-89d6-eb1d92a282e6",
        "timestamp" : 152520000,
        "sourceHost" : "test",
        "sourceLocation" : "test",
        "tags" : [ ],
        "version" : "1.0"
      },
      "id_value" : {
        "id" : "1234",
        "value" : "333.0"
      }
    }
    

    例如,我正在尝试创建一个包含timestamp、id和value列的数据帧

        timestamp   id  value
    0   152520000   1234    333.0
    

    有没有一种方法可以在不解析json消息并将所需的值逐行附加到数据帧的情况下实现这一点?

    1 回复  |  直到 6 年前
        1
  •  2
  •   migjimen    6 年前

    我提出的解决方案可能有点棘手。假设您的JSON消息位于一个名为“msg\u str”的字符串中:

    import pandas as pd
    
    msg_str = '{  "messageHeader" : { "messageId" : "4b604b33-7256-47b6-89d6-eb1d92a282e6",    "timestamp" : 152520000,    "sourceHost" : "test",    "sourceLocation" : "test",    "tags" : [ ],    "version" : "1.0"  },  "id_value" : {    "id" : "1234",    "value" : "333.0"  }}'
    
    
    #first create a dataframe with read_json
    p = pd.read_json(msg_str)
    # Now you have a dataframe with two columns. Where a column has a value, the other 
    # has a NaN. Now create a new column only with the values which are not 'NaN'
    p['fussion'] = p['id_value'].fillna(p['messageHeader'])
    # Delete columns 'id_value' and 'messageHeader' as you don't need them anymore
    p = p[['fussion']].reset_index()
    # Create a temporal column only to be the index to do a pivot
    p['tmp'] = 0
    # Do the pivot to convert rows into columns
    p = p.pivot(index = 'tmp' ,values='fussion', columns='index')
    # Finally get the columns that you are interested in
    p = p.reset_index()[['timestamp','id','value']]
    
    print(p)
    

    结果:

    index  timestamp    id value
    0      152520000  1234   333
    

    然后,您可以将此数据帧附加到正在累积结果的数据帧中。

    也许有一个最简单的解决方案,但如果不是这样,我希望它能帮助你。