代码之家  ›  专栏  ›  技术社区  ›  kmh

从Spark写入Mongo副本集(scala)

  •  0
  • kmh  · 技术社区  · 6 年前

    我正在尝试使用MongoSpark连接器从Spark RDD写入MongoDB。

    我面临两个问题

    • [主要问题] 如果根据文档定义主机(使用Mongo副本集中的所有实例),则无法连接到Mongo
    • [次要/相关问题] 如果我只连接到主服务器,我可以写。。。但我通常会让写第一个集合的主集合崩溃

    • mongo火花连接器1.1
    • 火花1.6
    • 斯卡拉2.10.5

    首先,我将设置一个虚拟示例来演示。。。

    import org.bson.Document 
    import com.mongodb.spark.MongoSpark 
    import com.mongodb.spark.config.WriteConfig
    
    import org.apache.spark.rdd.RDD
    
    /** 
      * fake json data
      */
    
    val recs: List[String] = List(
      """{"a": 123, "b": 456, "c": "apple"}""",
      """{"a": 345, "b":  72, "c": "banana"}""",
      """{"a": 456, "b": 754, "c": "cat"}""",
      """{"a": 876, "b":  43, "c": "donut"}""",
      """{"a": 432, "b": 234, "c": "existential"}"""
    )
    
    val rdd_json_str: RDD[String] = sc.parallelize(recs, 5)
    val rdd_hex_bson: RDD[Document] = rdd_json_str.map(json_str => Document.parse(json_str))
    

    一些不会改变的价值观。。。

    // credentials
    val user = ???
    val pwd  = ???
    
    // fixed values
    val db              = "db_name"
    val replset         = "replset_name"
    val collection_name = "collection_name"
    

    以下是不起作用的。。。在这种情况下,“url”看起来像 machine.unix.domain.org “ip”看起来像。。。一个IP地址。

    val host = "url1:27017,url2:27017,url3:27017"
    val host = "ip_address1:27017,ip_address2:27017,ip_address3:27017"
    

    我两个都不能用。使用我能想到的所有uri排列。。。

    val uri = s"mongodb://${user}:${pwd}@${host}/${db}?replicaSet=${replset}"
    val uri = s"mongodb://${user}:${pwd}@${host}/?replicaSet=${replset}"
    val uri = s"mongodb://${user}:${pwd}@${replset}/${host}/${db}"
    val uri = s"mongodb://${user}:${pwd}@${replset}/${host}/${db}.${collection_name}"
    val uri = s"mongodb://${user}:${pwd}@${host}"       // setting db, collection, replica set in WriteConfig
    val uri = s"mongodb://${user}:${pwd}@${host}/${db}" // this works IF HOST IS PRIMARY ONLY; not for hosts as defined above
    

    编辑 有关错误消息的详细信息。。这些错误形成了。。。

    表1

    通常包括 java.net.UnknownHostException: machine.unix.domain.org

    com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting 
    for a server that matches WritableServerSelector. Client view of cluster 
    state is {type=REPLICA_SET, servers=[{address=machine.unix.domain.org:27017, 
    type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException: 
    machine.unix.domain.org}, caused by {java.net.UnknownHostException: 
    machine.unix.domain.org}}, {address=machine.unix.domain.org:27017, 
    type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException: 
    machine.unix.domain.org}, caused by {java.net.UnknownHostException: 
    machine.unix.domain.org}}, {address=machine.unix.domain.org:27017, 
    type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException: 
    machine.unix.domain.org}, caused by {java.net.UnknownHostException: 
    machine.unix.domain.org}}]
    

    表2

    (身份验证错误。。。尽管使用相同的凭据连接到主服务器只起作用)

    com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting 
    for a server that matches WritableServerSelector. Client view of cluster  
    state is {type=REPLICA_SET, servers=[{address=xx.xx.xx.xx:27017,  
    type=UNKNOWN, state=CONNECTING, exception= 
    {com.mongodb.MongoSecurityException: Exception authenticating  
    MongoCredential{mechanism=null, userName='xx', source='admin', password= 
    <hidden>, mechanismProperties={}}}, caused by  
    {com.mongodb.MongoCommandException: Command failed with error 18:  
    'Authentication failed.' on server xx.xx.xx.xx:27017. The full response is {  
    "ok" : 0.0, "errmsg" : "Authentication failed.", "code" : 18, "codeName" :  
    "AuthenticationFailed", "operationTime" : { "$timestamp" : { "t" :  
    1534459121, "i" : 1 } }, "$clusterTime" : { "clusterTime" : { "$timestamp" :  
    { "t" : 1534459121, "i" : 1 } }, "signature" : { "hash" : { "$binary" :  
    "xxx=", "$type" : "0" }, "keyId" : { "$numberLong" : "123456" } } } }}}...
    

    结束编辑

    以下是工作原理。。。仅在虚拟数据上。。。更多关于下面。。。

    val host = s"${primary_ip_address}:27017" // primary only
    val uri = s"mongodb://${user}:${pwd}@${host}/${db}"
    
    val writeConfig: WriteConfig = 
      WriteConfig(Map(
        "uri"        -> uri, 
        "database"   -> db, 
        "collection" -> collection_name, 
        "replicaSet" -> replset))
    
    // write data to mongo
    MongoSpark.save(rdd_hex_bson, writeConfig)
    

    我想如果我重新分配,这样就能减少连接,我会有更好的运气。。。但我猜这也与只写初选相关,而不是把它推广到所有的例子中。

    我在这里能找到的东西都读过了。。。但大多数例子都是针对单实例连接。。。 https://docs.mongodb.com/spark-connector/v1.1/configuration/#output-configuration

    1 回复  |  直到 6 年前
        1
  •  0
  •   kmh    6 年前

    原来这里有两个问题。在最初的问题中,这些被称为“表单1”和“表单2”的错误。

    “形式1”解的错误

    问题的关键是mongo spark连接器中的一个错误。结果发现它无法使用IP地址连接到副本集。。。它需要uri。因为我们云中的DNS服务器没有这些查找,所以我通过修改 /etc/hosts 在每个执行器上,然后使用如下连接字符串格式:

    val host = "URI1:27017,URI2:27017,URI3:27017"
    
    val uri  = s"mongodb://${user}:${pwd}@${host}/${db}?replicaSet=${replset}&authSource=${db}"
    
    val writeConfig: WriteConfig = 
      WriteConfig(Map(
        "uri"->uri, 
        "database"->db, 
        "collection"->collection, 
        "replicaSet"->replset, 
        "writeConcern.w"->"majority"))
    

    这需要首先将以下内容添加到 /etc/主机 在每台机器上:

    IP1 URI1
    IP2 URI2
    IP3 URI3
    

    当然,现在我不知道如何在AWS EMR中使用引导操作来更新 当星团旋转时。但那是另一个问题。( AWS EMR bootstrap action as sudo )

    “形式2”解的错误

    &authSource=${db} 到uri解决了这个问题。