我正在尝试为ServiceNow设置Apache Kafka源和接收器连接器。接收器连接器工作得非常完美,我可以使用下面的转换将嵌套的JSON有效负载扁平化为ServiceNow列名。
Reference
"transforms": "FlattenSchema,RenameFields"
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "."
基本上,转换会在JSON以下变平
"payload": {
"prjtId": "123"
}
到
payload.prjtId
现在我正在尝试源连接器,即使用Apache Kafka poll ServiceNow表查找新记录,并将其作为Kafka消息进行流式传输。流消息工作得很好,但我在将表记录转换为预期的嵌套JSON结构时遇到了问题。
我的ServiceNow表有以下列:
messageType
messageName
version
payload.projectId
payload.responseCode
当我使用ServiceNow连接器将此记录作为Kafka消息检索时,我希望它具有以下预期结构,
预期结果:
{
"messageType": "InventoryRecordUpdate",
"messageName": "Validated",
"version": "v1",
"payload": {
"projectId": "123456",
"responseCode" : "XXX"
}
}
但当我尝试以下配置时,我会得到如下所示的result1或result2:
具有
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
Result1:它基本上在消息中抛出架构,并将表列封装在“payload”下:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "u_messagetype"
},
{
"type": "string",
"optional": true,
"field": "version"
},
{
"type": "string",
"optional": true,
"field": "u_messagename"
},
{
"type": "string",
"optional": true,
"field": "u_payload_projectid"
},
{
"type": "string",
"optional": true,
"field": "u_payload_responsecode"
}
],
"optional": false
},
"payload": {
"u_messagetype": "InventoryRecordUpdate",
"u_version": "v2",
"u_messagename": "Validated",
"u_payload_projectid": "123456",
"u_payload_responsecode": "200"
}
}
通过以下配置,
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
结果2:它排除了kafka消息体中的发送模式,还删除了名为“payload”的字段,并在没有嵌套的情况下按原样发送字段,如下所示:
{
"u_messagetype": "InventoryRecordUpdate",
"u_version": "v2",
"u_messagename": "Validated",
"u_payload_projectid": "123456",
"u_payload_responsecode": "200"
}
我没有找到关于将键“u_payload_projectid”反扁平化为嵌套结构的文档。如果有人能给我指明正确的方向,我将不胜感激。