你失踪了
表.name.format
https://docs.confluent.io/3.1.1/connect/connect-jdbc/docs/sink_config_options.html
(数据映射部分)
下面是一个工作示例:
{
"name": "test-0005",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics.regex": "CID1122.(.*)",
"connection.user": "kafka",
"table.name.format": "${topic}",
"connection.password": "kafka",
"connection.url": "jdbc:mysql://databasehost:3306/dbname",
"auto.create": "true",
"transforms": "route",
"transforms.t1.replacement": "$2",
"transforms.route.regex": "([^.]+)\\.([^.]+)",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter"
}
发生了什么事?
-
table.name.format格式:
定义用于写入事件的目标表,在这种情况下,我使用
占位符,表示将使用主题名称。
-
将从与此模式匹配的每个主题中获取数据
如你所见,我补充道
RegexRouter transformation
([^.]+)\.([^.]+)
是为了配合我们的
CID1122.[事件名称]
然后我只提取了组2(事件名称)。
-
变换:
“路线”
-
transforms.t1.regex格式:
"([^.]+)\.([^.]+)"
-
1.t1.替换:
"$2"
最后这个小组
$2
将传递给
,然后您可以连接到数据库并检查通过的数据。