我建议你用斯卡拉的
case classes
用于数据建模。
final case class Product(name: String, catalogNumber: String, cas: String, formula: String, weight: Double, mld: String)
现在你可以
List
属于
Product
val inMemoryRecords: List[Product] = List(
Product("Cyclohexanecarboxylic acid", " D19706", "1148027-03-5", "C(11)H(13)Cl(2)NO(5)", 310.131, "MFCD11226417"),
Product("Tacrolimus", "G51159", "104987-11-3", "C(44)H(69)NO(12)", 804.018, "MFCD00869853"),
Product("Methanol", "T57494", "173310-45-7", "C(8)H(8)Cl(2)O", 191.055, "MFCD27756662")
)
这个
structured streaming API
使用广为人知的
Dataset[T]
抽象。粗略地说,你只需要担心三件事:
-
Source
:源可以生成输入数据流,我们可以将其表示为
Dataset[Input]
Input
它将被追加到这个无限的数据集中。您可以随心所欲地操作数据(例如。
数据集[输入]
Dataset[Output]
).
-
StreamingQueries
和
Sink
:查询生成一个结果表,该表在每个触发间隔从源更新。更改被写入称为接收器的外部存储器。
-
Output modes
:有不同的模式可以将数据写入接收器:完整模式、附加模式和更新模式。
假设你想知道分子量大于200单位的产物。
正如您所说,使用批处理API相当简单和直接:
// Create an static dataset using the in-memory data
val staticData: Dataset[Product] = spark.createDataset(inMemoryRecords)
// Processing...
val result: Dataset[Product] = staticData.filter(_.weight > 200)
// Print results!
result.show()
使用流式API时,只需定义
source
以及
sink
作为额外的一步。在这个例子中,我们可以使用
MemoryStream
以及
console
// Create an streaming dataset using the in-memory data (memory source)
val productSource = MemoryStream[Product]
productSource.addData(inMemoryRecords)
val streamingData: Dataset[Product] = productSource.toDS()
// Processing...
val result: Dataset[Product] = streamingData.filter(_.weight > 200)
// Print results by using the console sink.
val query: StreamingQuery = result.writeStream.format("console").start()
// Stop streaming
query.awaitTermination(timeoutMs=5000)
query.stop()
请注意
staticData
streamingData
具有准确的类型签名(即。,
Dataset[Product]
). 这允许我们应用相同的处理步骤,而不必使用批处理或流式API。您还可以考虑实现泛型方法
def processing[In, Out](inputData: Dataset[In]): Dataset[Out] = ???
完整代码示例:
object ExMemoryStream extends App {
// Boilerplate code...
val spark: SparkSession = SparkSession.builder
.appName("ExMemoryStreaming")
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
implicit val sqlContext: SQLContext = spark.sqlContext
// Define your data models
final case class Product(name: String, catalogNumber: String, cas: String, formula: String, weight: Double, mld: String)
// Create some in-memory instances
val inMemoryRecords: List[Product] = List(
Product("Cyclohexanecarboxylic acid", " D19706", "1148027-03-5", "C(11)H(13)Cl(2)NO(5)", 310.131, "MFCD11226417"),
Product("Tacrolimus", "G51159", "104987-11-3", "C(44)H(69)NO(12)", 804.018, "MFCD00869853"),
Product("Methanol", "T57494", "173310-45-7", "C(8)H(8)Cl(2)O", 191.055, "MFCD27756662")
)
// Defining processing step
def processing(inputData: Dataset[Product]): Dataset[Product] =
inputData.filter(_.weight > 200)
// STATIC DATASET
val datasetStatic: Dataset[Product] = spark.createDataset(inMemoryRecords)
println("This is the static dataset:")
processing(datasetStatic).show()
// STREAMING DATASET
val productSource = MemoryStream[Product]
productSource.addData(inMemoryRecords)
val datasetStreaming: Dataset[Product] = productSource.toDS()
println("This is the streaming dataset:")
val query: StreamingQuery = processing(datasetStreaming).writeStream.format("console").start()
query.awaitTermination(timeoutMs=5000)
// Stop query and close Spark
query.stop()
spark.close()
}