我解决了在主程序之外声明源、流和接收器的问题
object CdrToMongoReactiveStream {
def randomCdrThrottledSource(msisdnLength : Int,timeRange : Int, throughput : Int): Source[RandomCdr,NotUsed]= {
Source
.fromIterator(() => Iterator.continually(RandomCdr(msisdnLength,timeRange)))
.throttle(throughput,1.second,1,ThrottleMode.shaping)
.named("randomCdrThrottledSource")
}
def cdrJsonParseFlow : Flow[RandomCdr,String,NotUsed]= {
import RandomCdrJsonProtocol._
Flow[RandomCdr]
.map((cdr: RandomCdr) => cdr.toJson.toString())
.named("cdrJsonParseFlow")
}
def mongodbBulkSink(collection : MongoCollection[Document], bulkSize : Int) : Sink[String,NotUsed] = {
Flow[String]
.map((json: String) => Document.parse(json))
.map((doc: Document) => new InsertOneModel[Document](doc))
.grouped(bulkSize)
.flatMapConcat { (docs: Seq[InsertOneModel[Document]]) â
Source.fromPublisher(collection.bulkWrite(docs.toList.asJava))
}
.to(Sink.ignore)
}
def main(args: Array[String]): Unit = {
val f = randomCdrThrottledSource(msisdnLength,timeRange,throughput)
.via(cdrJsonParseFlow).runWith(mongodbBulkSink(collection,bulkSize))
logger.info("Generated random data")
}
}
和测试文件
class CdrToMongoReactiveStreamSpec extends WordSpec with Matchers {
import CdrToMongoReactiveStream._
import RandomCdrJsonProtocol._
implicit val system = ActorSystem("cdr-data-generator")
implicit val materializer = ActorMaterializer()
val collection = new Fongo("mongo test server").getDB("cdrDB").getCollection("cdr")
val randomCdr = RandomCdr("+33612345678",1511448336402L,"+33612345678","SMS","OUT",0,0,0)
"randomCdrThrottledSource" should {
"generate RandomCdr elements only" in {
val future = CdrToMongoReactiveStream.randomCdrThrottledSource(8,86400000,1)
.runWith(Sink.head)
val cdr = Await.result(future,5.second)
cdr shouldBe a [RandomCdr]
}
}
}