代码之家  ›  专栏  ›  技术社区  ›  Henry Chen

使用数据流从计算引擎读取数据

  •  0
  • Henry Chen  · 技术社区  · 6 年前

    我想在Google Compute Engine上读取MariaDB的数据,并通过DataFlow将数据写入BigQuery,但当我在DataFlowRunner上运行DataFlow程序时,总会遇到如下异常。

    Java语言lang.RuntimeException:组织。阿帕奇。梁sdk。util。UserCodeException:java。sql。SQLException:无法创建PoolableConnectionFactory(无法连接到地址=(主机=xxx.xxx.xxx.xxx)(端口=3306)(类型=master):连接超时)

    1. 我可以通过DBeaver成功访问MariaDB。
    2. 我可以在DirectRunner上成功运行数据流程序。

    能给我一些理想,谢谢。

    1 回复  |  直到 6 年前
        1
  •  2
  •   Guillem Xercavins    6 年前

    要限制它,以便只有Dataflow作业可以访问它,您可以利用以下事实:Dataflow的线束VM是使用 dataflow 标签否则,您可以在特定网络/子网络上分配GCE实例和DF Worker。

    例如,创建一个带有网络标记的GCE实例,如 mariadb 因此,它可以作为将防火墙规则应用于和/或选择特定专有网络/子网络的目标。安装MariaDB(另一个选项是通过云启动器使用初始化脚本或预安装的解决方案)。

    对于防火墙规则,您需要在端口上访问数据库 tcp:3306 。对于GCE实例(目标标记 马里亚布 )您需要允许来自任一源标记的入口流量 数据流 或来自上述端口上的子网内。考虑到这一点,对于后一个选项,您还需要允许子网内DF工作者之间的内部通信。

    现在,在数据流方面,添加 JdbcIO 马里亚布 连接器对的依赖关系 pom.xml 文件:

    <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-jdbc -->
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-io-jdbc</artifactId>
        <version>2.3.0</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.mariadb.jdbc/mariadb-java-client -->
    <dependency>
        <groupId>org.mariadb.jdbc</groupId>
        <artifactId>mariadb-java-client</artifactId>
        <version>1.1.7</version>
    </dependency>
    

    要连接的示例数据流片段(如果使用子网方法,请在JDBC连接字符串中使用内部IP):

    public class MariaDB {
    
      public static void main(String[] args) { 
        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
        Pipeline p = Pipeline.create(options);
    
        PCollection<String> account = p.apply(JdbcIO.<String>read()
    .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("org.mariadb.jdbc.Driver", "jdbc:mariadb://INTERNAL_IP:3306/database").withUsername("root").withPassword("pwd"))
        .withQuery("SELECT … FROM table")
            .withCoder(SerializableCoder.of(String.class))
        .withRowMapper(new JdbcIO.RowMapper<String>() {
                    public String mapRow(ResultSet rs) throws Exception {
    ...
                    }}
        ));
    
      p.run();
      }
    }
    

    并启动作业,指定子网和匹配区域(如果需要):

    mvn compile exec:java \
          -Dexec.mainClass=com.example.MariaDB \
          -Dexec.args="--project=PROJECT_ID \
          --stagingLocation=gs://BUCKET_NAME/mariadb/staging/ \
          --output=gs://BUCKET_NAME/mariadb/output \
          --network="dataflow-network" \
          --subnetwork="regions/europe-west1/subnetworks/subnet-europe-west" \
          --zone="europe-west1" \
          --runner=DataflowRunner"