我正在尝试使用MongoSpark连接器从Spark RDD写入MongoDB。
我面临两个问题
-
[主要问题]
如果根据文档定义主机(使用Mongo副本集中的所有实例),则无法连接到Mongo
-
[次要/相关问题]
如果我只连接到主服务器,我可以写。。。但我通常会让写第一个集合的主集合崩溃
-
mongo火花连接器1.1
-
火花1.6
-
斯卡拉2.10.5
首先,我将设置一个虚拟示例来演示。。。
import org.bson.Document
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.WriteConfig
import org.apache.spark.rdd.RDD
/**
* fake json data
*/
val recs: List[String] = List(
"""{"a": 123, "b": 456, "c": "apple"}""",
"""{"a": 345, "b": 72, "c": "banana"}""",
"""{"a": 456, "b": 754, "c": "cat"}""",
"""{"a": 876, "b": 43, "c": "donut"}""",
"""{"a": 432, "b": 234, "c": "existential"}"""
)
val rdd_json_str: RDD[String] = sc.parallelize(recs, 5)
val rdd_hex_bson: RDD[Document] = rdd_json_str.map(json_str => Document.parse(json_str))
一些不会改变的价值观。。。
val user = ???
val pwd = ???
val db = "db_name"
val replset = "replset_name"
val collection_name = "collection_name"
以下是不起作用的。。。在这种情况下,“url”看起来像
machine.unix.domain.org
“ip”看起来像。。。一个IP地址。
val host = "url1:27017,url2:27017,url3:27017"
val host = "ip_address1:27017,ip_address2:27017,ip_address3:27017"
我两个都不能用。使用我能想到的所有uri排列。。。
val uri = s"mongodb://${user}:${pwd}@${host}/${db}?replicaSet=${replset}"
val uri = s"mongodb://${user}:${pwd}@${host}/?replicaSet=${replset}"
val uri = s"mongodb://${user}:${pwd}@${replset}/${host}/${db}"
val uri = s"mongodb://${user}:${pwd}@${replset}/${host}/${db}.${collection_name}"
val uri = s"mongodb://${user}:${pwd}@${host}" // setting db, collection, replica set in WriteConfig
val uri = s"mongodb://${user}:${pwd}@${host}/${db}" // this works IF HOST IS PRIMARY ONLY; not for hosts as defined above
编辑
有关错误消息的详细信息。。这些错误形成了。。。
表1
通常包括
java.net.UnknownHostException: machine.unix.domain.org
com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting
for a server that matches WritableServerSelector. Client view of cluster
state is {type=REPLICA_SET, servers=[{address=machine.unix.domain.org:27017,
type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException:
machine.unix.domain.org}, caused by {java.net.UnknownHostException:
machine.unix.domain.org}}, {address=machine.unix.domain.org:27017,
type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException:
machine.unix.domain.org}, caused by {java.net.UnknownHostException:
machine.unix.domain.org}}, {address=machine.unix.domain.org:27017,
type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException:
machine.unix.domain.org}, caused by {java.net.UnknownHostException:
machine.unix.domain.org}}]
表2
(身份验证错误。。。尽管使用相同的凭据连接到主服务器只起作用)
com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting
for a server that matches WritableServerSelector. Client view of cluster
state is {type=REPLICA_SET, servers=[{address=xx.xx.xx.xx:27017,
type=UNKNOWN, state=CONNECTING, exception=
{com.mongodb.MongoSecurityException: Exception authenticating
MongoCredential{mechanism=null, userName='xx', source='admin', password=
<hidden>, mechanismProperties={}}}, caused by
{com.mongodb.MongoCommandException: Command failed with error 18:
'Authentication failed.' on server xx.xx.xx.xx:27017. The full response is {
"ok" : 0.0, "errmsg" : "Authentication failed.", "code" : 18, "codeName" :
"AuthenticationFailed", "operationTime" : { "$timestamp" : { "t" :
1534459121, "i" : 1 } }, "$clusterTime" : { "clusterTime" : { "$timestamp" :
{ "t" : 1534459121, "i" : 1 } }, "signature" : { "hash" : { "$binary" :
"xxx=", "$type" : "0" }, "keyId" : { "$numberLong" : "123456" } } } }}}...
结束编辑
以下是工作原理。。。仅在虚拟数据上。。。更多关于下面。。。
val host = s"${primary_ip_address}:27017" // primary only
val uri = s"mongodb://${user}:${pwd}@${host}/${db}"
val writeConfig: WriteConfig =
WriteConfig(Map(
"uri" -> uri,
"database" -> db,
"collection" -> collection_name,
"replicaSet" -> replset))
// write data to mongo
MongoSpark.save(rdd_hex_bson, writeConfig)
我想如果我重新分配,这样就能减少连接,我会有更好的运气。。。但我猜这也与只写初选相关,而不是把它推广到所有的例子中。
我在这里能找到的东西都读过了。。。但大多数例子都是针对单实例连接。。。
https://docs.mongodb.com/spark-connector/v1.1/configuration/#output-configuration