代码之家  ›  专栏  ›  技术社区  ›  vick_4444

Apache kafka变换对扁平化的有效负载进行去扁平化

  •  0
  • vick_4444  · 技术社区  · 1 年前

    我正在尝试为ServiceNow设置Apache Kafka源和接收器连接器。接收器连接器工作得非常完美,我可以使用下面的转换将嵌套的JSON有效负载扁平化为ServiceNow列名。 Reference

    "transforms": "FlattenSchema,RenameFields"
    "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
    "transforms.flatten.delimiter": "."
    

    基本上,转换会在JSON以下变平

    "payload": {
        "prjtId": "123"
    }
    

    payload.prjtId
    

    现在我正在尝试源连接器,即使用Apache Kafka poll ServiceNow表查找新记录,并将其作为Kafka消息进行流式传输。流消息工作得很好,但我在将表记录转换为预期的嵌套JSON结构时遇到了问题。

    我的ServiceNow表有以下列:

    messageType
    messageName
    version
    payload.projectId
    payload.responseCode
    

    当我使用ServiceNow连接器将此记录作为Kafka消息检索时,我希望它具有以下预期结构,

    预期结果:

    {
    
        "messageType": "InventoryRecordUpdate",
        "messageName": "Validated", 
        "version": "v1",
        "payload": {
            "projectId": "123456", 
            "responseCode" : "XXX"
        }
    }
    

    但当我尝试以下配置时,我会得到如下所示的result1或result2:

    具有

    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    

    Result1:它基本上在消息中抛出架构,并将表列封装在“payload”下:

    {
        "schema": {
            "type": "struct",
            "fields": [
                {
                    "type": "string",
                    "optional": true,
                    "field": "u_messagetype"
                },
                {
                    "type": "string",
                    "optional": true,
                    "field": "version"
                },
                {
                    "type": "string",
                    "optional": true,
                    "field": "u_messagename"
                },
                {
                    "type": "string",
                    "optional": true,
                    "field": "u_payload_projectid"
                },
                {
                    "type": "string",
                    "optional": true,
                    "field": "u_payload_responsecode"
                }
            ],
            "optional": false
        },
        "payload": {
            "u_messagetype": "InventoryRecordUpdate",
            "u_version": "v2",
            "u_messagename": "Validated",
            "u_payload_projectid": "123456",
            "u_payload_responsecode": "200"
        }
    }
    

    通过以下配置,

    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    

    结果2:它排除了kafka消息体中的发送模式,还删除了名为“payload”的字段,并在没有嵌套的情况下按原样发送字段,如下所示:

    {
        "u_messagetype": "InventoryRecordUpdate",
        "u_version": "v2",
        "u_messagename": "Validated",
        "u_payload_projectid": "123456",
        "u_payload_responsecode": "200"
    }
    

    我没有找到关于将键“u_payload_projectid”反扁平化为嵌套结构的文档。如果有人能给我指明正确的方向,我将不胜感激。

    0 回复  |  直到 1 年前
        1
  •  1
  •   OneCricketeer Gabriele Mariotti    1 年前

    不存在“去扁平化”的转换。您获得的数据在很大程度上取决于连接器本身。由于架构返回了两个顶级字符串字段,而不是一个记录/结构,因此如果不修改连接器的源代码,就无法从连接端真正完成任何操作

    因此,您需要编写一些流处理器来检测具有共享前缀的字段,并自行重新映射数据