代码之家  ›  专栏  ›  技术社区  ›  ChiMo

在Spark中选择独特的Cassandra

  •  1
  • ChiMo  · 技术社区  · 6 年前

    我需要一个列出唯一 复合分区键 火花内部。
    CASSANDRA中的查询: SELECT DISTINCT key1, key2, key3 FROM schema.table; 速度相当快,但是在RDD或spark中放置相同类型的数据过滤器。相比之下,sql检索结果的速度非常慢。

    e、 g。

    ---- SPARK ----
    var t1 = sc.cassandraTable("schema","table").select("key1", "key2", "key3").distinct()
    var t2 = spark.sql("SELECT DISTINCT key1, key2, key3 FROM schema.table")
    
    t1.count // takes 20 minutes
    t2.count // takes 20 minutes
    
    ---- CASSANDRA ----
    // takes < 1 minute while also printing out all results
    SELECT DISTINCT key1, key2, key3 FROM schema.table; 
    

    其中表格格式如下:

    CREATE TABLE schema.table (
        key1 text,
        key2 text,
        key3 text,
        ckey1 text,
        ckey2 text,
        v1 int,
        PRIMARY KEY ((key1, key2, key3), ckey1, ckey2)
    );
    

    spark在其“查询”中不使用cassandra优化吗?
    如何有效地检索这些信息?

    3 回复  |  直到 6 年前
        1
  •  6
  •   Community arnoo    4 年前

    快速回答

    spark在其“查询”中不使用cassandra优化吗?

    对但使用SparkSQL时,只需要对列进行修剪和谓词下推。在RDDs中,它是手动的。

    如何有效地检索这些信息?

    由于您的请求返回得足够快,因此我将直接使用Java驱动程序来获取此结果集。


    冗长的答案

    虽然Spark SQL可以提供一些基于C*的优化,但在使用DataFrame接口时,这些优化通常仅限于谓词下推。这是因为框架只向数据源提供有限的信息。我们可以通过执行 解释 在您编写的查询上。

    让我们从SparkSQL示例开始

    scala> spark.sql("SELECT DISTINCT key1, key2, key3 FROM test.tab").explain
    == Physical Plan ==
    *HashAggregate(keys=[key1#30, key2#31, key3#32], functions=[])
    +- Exchange hashpartitioning(key1#30, key2#31, key3#32, 200)
       +- *HashAggregate(keys=[key1#30, key2#31, key3#32], functions=[])
          +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation test.tab[key1#30,key2#31,key3#32] ReadSchema: struct<key1:string,key2:string,key3:string>
    

    因此,您的Spark示例实际上将分为几个步骤。

    1. 扫描:读取此表中的所有数据。这意味着要序列化C中的每个值 机器到Spark执行器JVM,换句话说,需要大量的工作。
    2. *HashAggregate/Exchange/Hash Aggregate:从每个执行器获取值,在本地对其进行散列,然后在机器之间交换数据并再次进行散列,以确保唯一性。用外行的话说,这意味着创建大型哈希结构,序列化它们,运行复杂的分布式排序,然后运行 再次散列。(昂贵)

    为什么这一切都不能降到C*?这是因为 Datasource (本例中为CassandraSourceRelation)未提供 不同的 查询的一部分。这只是Spark目前工作方式的一部分。 Docs on what is pushable

    那么RDD版本呢?

    使用RDD,我们可以直接为Spark提供一组指令。这意味着如果你想把东西推下去 manually specified .让我们看看RDD请求的调试输出

    scala> sc.cassandraTable("test","tab").distinct.toDebugString
    res2: String =
    (13) MapPartitionsRDD[7] at distinct at <console>:45 []
     |   ShuffledRDD[6] at distinct at <console>:45 []
     +-(13) MapPartitionsRDD[5] at distinct at <console>:45 []
        |   CassandraTableScanRDD[4] at RDD at CassandraRDD.scala:19 []
    

    这里的问题是,您的“distinct”调用是对 RDD 也不是卡桑德拉特有的。由于RDD要求所有优化都是显式的(您键入的就是您得到的),Cassandra从来没有听说过对“Distinct”的需求,我们得到的计划几乎与我们的Spark SQL版本相同。进行完整扫描,将所有数据从Cassandra序列化到Spark。执行随机播放,然后返回结果。

    那么我们能做些什么呢?

    使用SparkSQL,在不向 Catalyst (SparkSQL/Dataframes优化器),让它知道Cassandra可以处理一些 不同的 在服务器级别调用。然后需要为CassandraRDD子类实现它。

    对于RDD,我们需要添加一个类似于现有函数的函数 where ,则, select limit ,调用Cassandra RDD。新的 Distinct 可以添加呼叫 here 虽然只有在特定情况下才允许。这是一个目前在SCC中不存在的函数,但可以相对容易地添加,因为它所做的只是预先结束 DISTINCT requests 并可能添加一些检查以确保它是 不同的 这是有道理的。

    在不修改底层连接器的情况下,我们现在可以做什么?

    由于我们知道要发出的确切CQL请求,因此始终可以直接使用Cassandra驱动程序来获取此信息。Spark Cassandra连接器提供了一个我们可以使用的驱动程序池,或者我们可以直接使用Java驱动程序。要使用游泳池,我们可以

    import com.datastax.spark.connector.cql.CassandraConnector
    CassandraConnector(sc.getConf).withSessionDo{ session => 
      session.execute("SELECT DISTINCT key1, key2, key3 FROM test.tab;").all()
    }
    

    然后,如果需要进一步的Spark工作,则将结果并行化。如果我们真的想发布它,那么很可能需要像我上面描述的那样将该功能添加到Spark Cassandra连接器中。

        2
  •  2
  •   Loki    5 年前

    只要选择分区键,就可以使用 .perPartitionLimit CassandraRDD的功能:

    val partition_keys = sc.cassandraTable("schema","table").select("key1", "key2", "key3").perPartitionLimit(1)
    

    这是因为 SPARKC-436

    select key from some_table per partition limit 1

    给出的结果与

    select distinct key from some_table

    spark cassandra连接器中引入了此功能 2.0.0-RC1 并且至少需要 C* 3.6

        3
  •  0
  •   Snedecor II    6 年前

    Distinct的性能很差。 这里有一个很好的答案,有一些备选方案: How to efficiently select distinct rows on an RDD based on a subset of its columns`

    您可以利用toDebugString来了解代码洗牌了多少数据。