从Kafka主题读取处理数据并使用scala和spark写回Kafka主题
嗨,我正在阅读 kafka 主题,我想处理从 kafka 收到的数据,例如标记化、过滤掉不必要的数据、删除停用词,最后我想写回另一个 Kafka 主题
// read from kafka
val readStream = existingSparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
.load()
val df = readStream.selectExpr("CAST(value AS STRING)" )
df.show(false)
val df_json = df.select(from_json(col("value"), mySchema.defineSchema()).alias("parsed_value"))
val df_text = df_json.withColumn("text", col("parsed_value.payload.Text"))
// perform some data processing actions such as tokenization etc and return cleanedDataframe as the final result
// write back to kafka
val writeStream = cleanedDataframe
.writeStream
.outputMode("append")
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("topic", "writing.val")
.start()
writeStream.awaitTermination()
然后我收到以下错误
线程“main” org.apache.spark.sql.AnalysisException 中的异常:必须使用 writeStream.start(); 执行具有流源的查询;
然后我编辑了我的代码,如下所示从 kafka 读取并写入控制台
// read from kafka
val readStream = existingSparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
.load()
// write to console
val df = readStream.selectExpr("CAST(value AS STRING)" )
val query = df.writeStream
.outputMode("append")
.format("console")
.start().awaitTermination();
// then perform the data processing part as mentioned in the first half
使用第二种方法,在控制台中连续显示数据,但它从未运行过数据处理部分。我可以知道如何从 kafka 主题中读取数据,然后对接收到的数据执行一些操作(标记化、删除停用词),最后写回新的 kafka 主题吗?
编辑
堆栈跟踪在错误期间指向上述代码中的 df.show(false)
回答
您当前的实现中有两个常见问题:
show在流上下文中应用- 之后的代码
awaitTermination不会被执行
到 1。
该方法show是对数据帧的操作(而不是转换)。当您处理流式数据帧时,这将导致错误,因为需要启动流式查询start(正如 Excpetion 文本告诉您的那样)。
到 2。
该方法awaitTermination是一种阻塞方法,这意味着后续代码不会在每个微批处理中执行。
整体解决方案
如果您想读取和写入 Kafka 并且在两者之间想要通过在控制台中显示数据来了解正在处理的数据,您可以执行以下操作:
// read from kafka
val readStream = existingSparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
.load()
// write to console
val df = readStream.selectExpr("CAST(value AS STRING)" )
df.writeStream
.outputMode("append")
.format("console")
.start()
val df_json = df.select(from_json(col("value"), mySchema.defineSchema()).alias("parsed_value"))
val df_text = df_json.withColumn("text", col("parsed_value.payload.Text"))
// perform some data processing actions such as tokenization etc and return cleanedDataframe as the final result
// write back to kafka
// the columns `key` and `value` of the DataFrame `cleanedDataframe` will be used for producing the message into the Kafka topic.
val writeStreamKafka = cleanedDataframe
.writeStream
.outputMode("append")
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("topic", "writing.val")
.start()
existingSparkSession.awaitAnyTermination()
注意existingSparkSession.awaitAnyTermination()代码最后的 ,不要awaitTermination在 之后直接使用start。另外,请记住,DataFrame的列key和将用于将消息生成到 Kafka 主题中。但是,不需要列,另请参见此处valuecleanedDataframekey
此外,如果您使用检查点(推荐),那么您需要设置两个不同的位置:一个用于控制台流,另一个用于 kafka 输出流。重要的是要记住那些流查询独立运行。