代码之家  ›  专栏  ›  技术社区  ›  Ahmad Osama

将数据从Apache Storm插入Azure Cosmos DB

  •  0
  • Ahmad Osama  · 技术社区  · 7 年前

    我正在尝试将storm中的数据插入cosmos db-Mongo db

        MongoClient mongoClient = null;
    
        mongoClient = new MongoClient(new MongoClientURI("mongodb uri from azure portal"));
    
       // Get database
       MongoDatabase database = mongoClient.getDatabase("toystore");
    
       // Get collection
       MongoCollection<Document> collection = database.getCollection("order");
       this.productid = tuple.getIntegerByField("productid");
       this.quantity = tuple.getIntegerByField("quantity");
       this.sales = tuple.getIntegerByField("sales");
       this.refund = tuple.getIntegerByField("refund");
       this.orderdate = tuple.getStringByField("orderdate");
    
       // Insert documents
       Document document = new Document();
       document.append("productid", this.productid);
       document.append("quantity", this.quantity);
       document.append("sales", this.sales);
       document.append("refund", this.refund);
       document.append("orderdate", this.orderdate);
    
       collection.insertOne(document);
    

    数据应插入Cosmos数据库。我能够使用相同的代码从storm以外的单独JAVA程序插入cosmos db。

    2017-12-05 03:45:03.345 o.a.s.d.executor [INFO] Opened spout eventhub-spout:(4)
    2017-12-05 03:45:03.346 o.a.s.d.executor [INFO] Activating spout eventhub-spout:(4)
    2017-12-05 03:45:09.618 o.m.d.cluster [INFO] Cluster created with settings {hosts=[toystore.documents.azure.com:10255], mode=MULTIPLE, requiredClusterType=REPLICA_SET, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500, requiredReplicaSetName='globaldb'}
    2017-12-05 03:45:09.619 o.m.d.cluster [INFO] Adding discovered server toystore.documents.azure.com:10255 to client view of cluster
    2017-12-05 03:45:09.629 o.a.s.util [ERROR] Async loop died!
    java.lang.ExceptionInInitializerError: null
        at com.mongodb.connection.InternalStreamConnectionFactory.<init>(InternalStreamConnectionFactory.java:41) ~[stormjar.jar:?]
        at com.mongodb.connection.DefaultClusterableServerFactory.create(DefaultClusterableServerFactory.java:68) ~[stormjar.jar:?]
        at com.mongodb.connection.BaseCluster.createServer(BaseCluster.java:360) ~[stormjar.jar:?]
        at com.mongodb.connection.MultiServerCluster.addServer(MultiServerCluster.java:305) ~[stormjar.jar:?]
        at com.mongodb.connection.MultiServerCluster.<init>(MultiServerCluster.java:83) ~[stormjar.jar:?]
        at com.mongodb.connection.DefaultClusterFactory.create(DefaultClusterFactory.java:116) ~[stormjar.jar:?]
        at com.mongodb.Mongo.createCluster(Mongo.java:744) ~[stormjar.jar:?]
        at com.mongodb.Mongo.createCluster(Mongo.java:728) ~[stormjar.jar:?]
        at com.mongodb.Mongo.createCluster(Mongo.java:702) ~[stormjar.jar:?]
        at com.mongodb.Mongo.<init>(Mongo.java:310) ~[stormjar.jar:?]
        at com.mongodb.Mongo.<init>(Mongo.java:306) ~[stormjar.jar:?]
        at com.mongodb.MongoClient.<init>(MongoClient.java:284) ~[stormjar.jar:?]
        at com.microsoft.example.CosmosDBBolt.execute(CosmosDBBolt.java:108) ~[stormjar.jar:?]
        at org.apache.storm.daemon.executor$fn__9841$tuple_action_fn__9843.invoke(executor.clj:730) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
        at org.apache.storm.daemon.executor$mk_task_receiver$fn__9762.invoke(executor.clj:462) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
        at org.apache.storm.disruptor$clojure_handler$reify__874.onEvent(disruptor.clj:40) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
        at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
        at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:451) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
        at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
        at org.apache.storm.daemon.executor$fn__9841$fn__9854$fn__9907.invoke(executor.clj:849) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
        at org.apache.storm.util$async_loop$fn__558.invoke(util.clj:484) [storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
    Caused by: java.lang.NullPointerException
        at com.mongodb.connection.ClientMetadataHelper.getDriverVersion(ClientMetadataHelper.java:118) ~[stormjar.jar:?]
        at com.mongodb.connection.ClientMetadataHelper.getDriverInformation(ClientMetadataHelper.java:201) ~[stormjar.jar:?]
        at com.mongodb.connection.ClientMetadataHelper.addDriverInformation(ClientMetadataHelper.java:182) ~[stormjar.jar:?]
        at com.mongodb.connection.ClientMetadataHelper.<clinit>(ClientMetadataHelper.java:64) ~[stormjar.jar:?]
        ... 23 more
    

    它连接到cosmos db,但连接随后中断。

    谢谢 艾哈迈德