代码之家  ›  专栏  ›  技术社区  ›  Metadata

如何在spark jdbc应用程序中为读取RDBMS数据库上的数据指定表名?

  •  1
  • Metadata  · 技术社区  · 6 年前

    我试图用spark阅读greenplum数据库中的一个表,如下所示:

    val execQuery = s"select ${allColumns}, 0 as ${flagCol} from schema.table where period_year=2017 and period_num=12"
    val yearDF = spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2016")
                                    .option("user", devUserName)
                                    .option("password", devPassword)
                                    .option("partitionColumn","header_id")
                                    .option("lowerBound", 16550)
                                    .option("upperBound", 1152921481695656862L)
                                    .option("numPartitions",450).load()
    

    当我使用spark submit运行代码时,出现一个异常:

    Exception in thread "main" org.postgresql.util.PSQLException: ERROR: relation "public.(select je_header_id,source_system_name,je_line_num,last_update" does not exist
      Position: 15
        at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2310)
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2023)
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:217)
        at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:421)
        at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:318)
        at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:281)
        at com.zaxxer.hikari.pool.ProxyStatement.executeQuery(ProxyStatement.java:111)
        at com.zaxxer.hikari.pool.HikariProxyStatement.executeQuery(HikariProxyStatement.java)
        at io.pivotal.greenplum.spark.jdbc.Jdbc$.resolveTable(Jdbc.scala:301)
        at io.pivotal.greenplum.spark.GreenplumRelationProvider.createRelation(GreenplumRelationProvider.scala:29)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:309)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
        at com.partition.source.YearPartition$.prepareFinalDF$1(YearPartition.scala:141)
        at com.partition.source.YearPartition$.main(YearPartition.scala:164)
        at com.partition.source.YearPartition.main(YearPartition.scala)
    

    execQuery 我可以看到模式名&表名正确形成。当我提交代码时,上面写着 public.(select je_header_id,source_system_name,) relation not found . 我不明白为什么 public 作为架构名称查询 (select je_header_id,source_system_name,je_line_num,last_update" 作为表名。

    有谁能告诉我我在这里犯了什么错误,怎么解决?

    1 回复  |  直到 6 年前
        1
  •  1
  •   Hari    6 年前

    如果您使用的是spark jdbc,则可以包装查询并将其传递给dbtable参数。如果pivotal像任何jdbc一样工作,那么它应该工作。

    val query = """
      (select a.id,b,id,a.name from a left outer join b on a.id=b.id
        limit 100) foo
    """
    
    val df = sqlContext.format("jdbc").
      option("url", "jdbc:mysql://localhost:3306/local_content").
      option("driver", "com.mysql.jdbc.Driver").
      option("useUnicode", "true").
      option("continueBatchOnError","true").
      option("useSSL", "false").
      option("user", "root").
      option("password", "").
      option("dbtable",query).
      load()