代码之家  ›  专栏  ›  技术社区  ›  vgkowski

使用testkit和scalatest的Akka流测试

  •  1
  • vgkowski  · 技术社区  · 6 年前

    我在用scalatest测试akka流应用程序时遇到了NullPointerException,我不明白为什么。。。我可能错过了Akka溪流中的一些东西,我只是潜入其中。

    我在scala 2.12.4和sbt 1.0.3中使用scalatest的通用代码结构 这是我的应用程序

    object CdrToMongoReactiveStream extends App {
    
      implicit val system = ActorSystem("cdr-data-generator")
      implicit val materializer = ActorMaterializer()
      implicit val executionContext=materializer.executionContext
      import RandomCdrJsonProtocol._
    
      val randomCdrThrottledSource : Source[RandomCdr,NotUsed]= Source
        .fromIterator(() => Iterator.continually(RandomCdr(msisdnLength,timeRange)))
        .throttle(throughput,1.second,1,ThrottleMode.shaping)
        .named("randomCdrThrottledSource")
    
      val cdrJsonParseFlow : Flow[RandomCdr,String,NotUsed]= Flow[RandomCdr]
        .map((cdr: RandomCdr) => cdr.toJson.toString())
        .named("cdrJsonParseFlow")
    
      val mongodbBulkSink : Sink[String,NotUsed] = Flow[String]
        .map((json: String) => Document.parse(json))
        .map((doc: Document) => new InsertOneModel[Document](doc))
        .grouped(bulkSize)
        .flatMapConcat { (docs: Seq[InsertOneModel[Document]]) ⇒
          Source.fromPublisher(collection.bulkWrite(docs.toList.asJava))
        }
        .to(Sink.ignore)
    
      val f = randomCdrThrottledSource.via(cdrJsonParseFlow).runWith(mongodbBulkSink)
    }
    

    和我的测试文件

    class CdrToMongoReactiveStreamSpec extends WordSpec with Matchers {
    
      import RandomCdrJsonProtocol._
    
      "randomCdrThrottledSource" should {
        "generate RandomCdr elements only" in {
          val future = CdrToMongoReactiveStream.randomCdrThrottledSource
            // line 30 in the error
            .runWith(Sink.head)(CdrToMongoReactiveStream.materializer)
    
          val cdr = Await.result(future,10.second)
          cdr shouldBe a [RandomCdr]
        }
      }
      "cdrJsonParseFlow" should {
        "parse RandomCdr to correct json format" in {
          val randomCdr = RandomCdr("+33612345678",1511448336402L,"+33612345678","SMS","OUT",0,0,0)
          val (pub,sub) = TestSource.probe[RandomCdr]
            .via(CdrToMongoReactiveStream.cdrJsonParseFlow)
            .toMat(TestSink.probe[String])(Keep.both)
            .run()
    
          sub.request(1)
          pub.sendNext(randomCdr)
          sub.expectNext() shouldBe equal(randomCdr.toJson.toString())
        }
      }
    }
    

    以及错误信息

    java.lang.NullPointerException was thrown.
    java.lang.NullPointerException
        at CdrToMongoReactiveStreamSpec.$anonfun$new$2(CdrToMongoReactiveStreamSpec.scala:30)
        at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
        at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
        at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
        at org.scalatest.Transformer.apply(Transformer.scala:22)
        at org.scalatest.Transformer.apply(Transformer.scala:20)
        at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:1078)
        at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
        at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
        at org.scalatest.WordSpec.withFixture(WordSpec.scala:1881)
        at org.scalatest.WordSpecLike.invokeWithFixture$1(WordSpecLike.scala:1076)
        at org.scalatest.WordSpecLike.$anonfun$runTest$1(WordSpecLike.scala:1088)
        at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
        at org.scalatest.WordSpecLike.runTest(WordSpecLike.scala:1088)
        at org.scalatest.WordSpecLike.runTest$(WordSpecLike.scala:1070)
        at org.scalatest.WordSpec.runTest(WordSpec.scala:1881)
        at org.scalatest.WordSpecLike.$anonfun$runTests$1(WordSpecLike.scala:1147)
        at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
        at scala.collection.immutable.List.foreach(List.scala:389)
        at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
        at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:373)
        at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:410)
        at scala.collection.immutable.List.foreach(List.scala:389)
        at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
        at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
        at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
        at org.scalatest.WordSpecLike.runTests(WordSpecLike.scala:1147)
        at org.scalatest.WordSpecLike.runTests$(WordSpecLike.scala:1146)
        at org.scalatest.WordSpec.runTests(WordSpec.scala:1881)
        at org.scalatest.Suite.run(Suite.scala:1147)
        at org.scalatest.Suite.run$(Suite.scala:1129)
        at org.scalatest.WordSpec.org$scalatest$WordSpecLike$$super$run(WordSpec.scala:1881)
        at org.scalatest.WordSpecLike.$anonfun$run$1(WordSpecLike.scala:1192)
        at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
        at org.scalatest.WordSpecLike.run(WordSpecLike.scala:1192)
        at org.scalatest.WordSpecLike.run$(WordSpecLike.scala:1190)
        at org.scalatest.WordSpec.run(WordSpec.scala:1881)
        at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
        at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1340)
        at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1334)
        at scala.collection.immutable.List.foreach(List.scala:389)
        at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
        at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1031)
        at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1010)
        at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
        at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
        at org.scalatest.tools.Runner$.run(Runner.scala:850)
        at org.scalatest.tools.Runner.run(Runner.scala)
        at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
        at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
    
    1 回复  |  直到 6 年前
        1
  •  2
  •   vgkowski    6 年前

    我解决了在主程序之外声明源、流和接收器的问题

    object CdrToMongoReactiveStream {
    
      def randomCdrThrottledSource(msisdnLength : Int,timeRange : Int, throughput : Int): Source[RandomCdr,NotUsed]= {
        Source
          .fromIterator(() => Iterator.continually(RandomCdr(msisdnLength,timeRange)))
          .throttle(throughput,1.second,1,ThrottleMode.shaping)
          .named("randomCdrThrottledSource")
      }
    
      def cdrJsonParseFlow : Flow[RandomCdr,String,NotUsed]= {
        import RandomCdrJsonProtocol._
    
        Flow[RandomCdr]
          .map((cdr: RandomCdr) => cdr.toJson.toString())
          .named("cdrJsonParseFlow")
      }
    
      def mongodbBulkSink(collection : MongoCollection[Document], bulkSize : Int) : Sink[String,NotUsed] = {
    
        Flow[String]
          .map((json: String) => Document.parse(json))
          .map((doc: Document) => new InsertOneModel[Document](doc))
          .grouped(bulkSize)
          .flatMapConcat { (docs: Seq[InsertOneModel[Document]]) ⇒
            Source.fromPublisher(collection.bulkWrite(docs.toList.asJava))
          }
          .to(Sink.ignore)
      }
    
      def main(args: Array[String]): Unit = {
        val f = randomCdrThrottledSource(msisdnLength,timeRange,throughput)
          .via(cdrJsonParseFlow).runWith(mongodbBulkSink(collection,bulkSize))
    
        logger.info("Generated random data")
      }
    }
    

    和测试文件

    class CdrToMongoReactiveStreamSpec extends WordSpec with Matchers {
    
      import CdrToMongoReactiveStream._
      import RandomCdrJsonProtocol._
    
      implicit val system = ActorSystem("cdr-data-generator")
      implicit val materializer = ActorMaterializer()
    
      val collection = new Fongo("mongo test server").getDB("cdrDB").getCollection("cdr")
      val randomCdr = RandomCdr("+33612345678",1511448336402L,"+33612345678","SMS","OUT",0,0,0)
    
      "randomCdrThrottledSource" should {
        "generate RandomCdr elements only" in {
          val future = CdrToMongoReactiveStream.randomCdrThrottledSource(8,86400000,1)
            .runWith(Sink.head)
    
          val cdr = Await.result(future,5.second)
          cdr shouldBe a [RandomCdr]
        }
      }
    }
    
    推荐文章