代码之家  ›  专栏  ›  技术社区  ›  user1870400

如何在2.2.0中获得给定Apache Spark数据帧的Cassandra cql字符串?

  •  1
  • user1870400  · 技术社区  · 6 年前

    我正在尝试获取给定数据帧的cql字符串。我遇到了这个 function

    在那里我可以做这样的事情

    TableDef.fromDataFrame(df, "test", "hello", ProtocolVersion.NEWEST_SUPPORTED).cql()
    

    在我看来,库使用第一列作为分区键,而不关心集群键,那么我如何指定使用数据帧的特定列集作为分区键,使用特定列集作为集群键呢?

    看起来我可以创建一个新的TableDef,但是我必须自己完成整个映射,在某些情况下,必要的函数(如ColumnType)在Java中是无法访问的。例如,我尝试创建一个新的ColumnDef,如下所示

    new ColumnDef("col5", new PartitionKeyColumn(), ColumnType is not accessible in Java)
    

    目标: 从Spark数据帧获取CQL create语句。

    输入 我的dataframe可以有任意数量的列及其各自的Spark类型。假设我有一个有100列的Spark数据框,其中我的数据框的col8、col9对应于cassandra partitionKey列,而我的column10对应于cassandra clustering Key列

    col1| col2| ...|col100
    

    现在,我想使用spark cassandra连接器库为我提供一个CQL create table语句,给出上述信息。

    所需输出

    create table if not exists test.hello (
       col1 bigint, (whatever column1 type is from my dataframe I just picked bigint randomly)
       col2 varchar,
       col3 double,
       ...
       ...
       col100 bigint,
       primary key(col8,col9)
    ) WITH CLUSTERING ORDER BY (col10 DESC);
    
    1 回复  |  直到 4 年前
        1
  •  1
  •   Alex Ott    6 年前

    因为需要组件( PartitionKeyColumn &的实例 ColumnType )是Scala中的对象,您需要使用以下语法来访问其Intance:

    // imports
    import com.datastax.spark.connector.cql.ColumnDef;
    import com.datastax.spark.connector.cql.PartitionKeyColumn$;
    import com.datastax.spark.connector.types.TextType$;
    
    // actual code
    ColumnDef a = new ColumnDef("col5",  
          PartitionKeyColumn$.MODULE$, TextType$.MODULE$);
    

    请参见代码 ColumnRole & PrimitiveTypes 查找对象/类名称的完整列表。

    附加要求后更新 :代码很长,但应该可以工作。。。

    SparkSession spark = SparkSession.builder()
                    .appName("Java Spark SQL example").getOrCreate();
    
    Set<String> partitionKeys = new TreeSet<String>() {{
                    add("col1");
                    add("col2");
            }};
    Map<String, Integer> clustereingKeys = new TreeMap<String, Integer>() {{
                    put("col8", 0);
                    put("col9", 1);
            }};
    
    Dataset<Row> df = spark.read().json("my-test-file.json");
    TableDef td = TableDef.fromDataFrame(df, "test", "hello", 
                    ProtocolVersion.NEWEST_SUPPORTED);
    
    List<ColumnDef> partKeyList = new ArrayList<ColumnDef>();
    List<ColumnDef> clusterColumnList = new ArrayList<ColumnDef>();
    List<ColumnDef> regColulmnList = new ArrayList<ColumnDef>();
    
    scala.collection.Iterator<ColumnDef> iter = td.allColumns().iterator();
    while (iter.hasNext()) {
            ColumnDef col = iter.next();
            String colName = col.columnName();
            if (partitionKeys.contains(colName)) {
                    partKeyList.add(new ColumnDef(colName, 
                                    PartitionKeyColumn$.MODULE$, col.columnType()));
            } else if (clustereingKeys.containsKey(colName)) {
                    int idx = clustereingKeys.get(colName);
                    clusterColumnList.add(new ColumnDef(colName, 
                                    new ClusteringColumn(idx), col.columnType()));
            } else {
                    regColulmnList.add(new ColumnDef(colName, 
                                    RegularColumn$.MODULE$, col.columnType()));
            }
    }
    
    TableDef newTd = new TableDef(td.keyspaceName(), td.tableName(), 
                    (scala.collection.Seq<ColumnDef>) partKeyList,
                    (scala.collection.Seq<ColumnDef>) clusterColumnList, 
                    (scala.collection.Seq<ColumnDef>) regColulmnList,
                    td.indexes(), td.isView());
    String cql = newTd.cql();
    System.out.println(cql);