要限制它,以便只有Dataflow作业可以访问它,您可以利用以下事实:Dataflow的线束VM是使用
dataflow
标签否则,您可以在特定网络/子网络上分配GCE实例和DF Worker。
例如,创建一个带有网络标记的GCE实例,如
mariadb
因此,它可以作为将防火墙规则应用于和/或选择特定专有网络/子网络的目标。安装MariaDB(另一个选项是通过云启动器使用初始化脚本或预安装的解决方案)。
对于防火墙规则,您需要在端口上访问数据库
tcp:3306
。对于GCE实例(目标标记
马里亚布
)您需要允许来自任一源标记的入口流量
数据流
或来自上述端口上的子网内。考虑到这一点,对于后一个选项,您还需要允许子网内DF工作者之间的内部通信。
现在,在数据流方面,添加
JdbcIO
和
马里亚布
连接器对的依赖关系
pom.xml
文件:
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-jdbc -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-jdbc</artifactId>
<version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.mariadb.jdbc/mariadb-java-client -->
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>1.1.7</version>
</dependency>
要连接的示例数据流片段(如果使用子网方法,请在JDBC连接字符串中使用内部IP):
public class MariaDB {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<String> account = p.apply(JdbcIO.<String>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("org.mariadb.jdbc.Driver", "jdbc:mariadb://INTERNAL_IP:3306/database").withUsername("root").withPassword("pwd"))
.withQuery("SELECT ⦠FROM table")
.withCoder(SerializableCoder.of(String.class))
.withRowMapper(new JdbcIO.RowMapper<String>() {
public String mapRow(ResultSet rs) throws Exception {
...
}}
));
p.run();
}
}
并启动作业,指定子网和匹配区域(如果需要):
mvn compile exec:java \
-Dexec.mainClass=com.example.MariaDB \
-Dexec.args="--project=PROJECT_ID \
--stagingLocation=gs://BUCKET_NAME/mariadb/staging/ \
--output=gs://BUCKET_NAME/mariadb/output \
--network="dataflow-network" \
--subnetwork="regions/europe-west1/subnetworks/subnet-europe-west" \
--zone="europe-west1" \
--runner=DataflowRunner"