几个月前,我开始研究Spark,在某个时候,我面临着将结构化流计算保存在Cassandra数据库中的问题。
在本文中,我给出了一个简单的示例,该示例创建和使用Cassandra Sink进行Spark结构化流传输。 我希望该帖子对最近开始使用Spark Structured Streaming并想知道如何将计算结果上载到数据库的人员有用。
该应用程序的想法非常简单-接收和解析来自Kafka的消息,成对执行简单的转换并将结果保存在cassandra中。
结构化流的优点
您可以在
文档中阅读有关结构化流的更多信息。 简而言之,结构化流是基于Spark SQL引擎的可扩展的流信息处理引擎。 它允许您使用Dataset / DataFrame聚合数据,计算窗口函数,连接等。也就是说,结构化流允许您使用良好的旧SQL处理数据流。
怎么了
Spark结构化流的稳定版本于2017年发布。 也就是说,这是一个相当新的API,可实现基本功能,但某些事情必须由我们自己完成。 例如,结构化流传输具有用于将输出写入文件,图块,控制台或内存的标准功能,但为了将数据保存到数据库,您必须使用结构化流传输中可用的
foreach接收器并实现
ForeachWriter接口。
从Spark 2.3.1开始,此功能只能在Scala和Java中实现 。
我假定读者已经知道结构化流一般如何工作,知道如何实现必要的转换,并且现在准备将结果上传到数据库。 如果上述步骤中的某些步骤不清楚,则官方文档可以作为学习结构化流媒体的一个很好的起点。 在本文中,当您需要将结果保存到数据库中时,我想着重介绍最后一步。
下面,我将描述用于结构化流的Cassandra接收器的示例实现,并说明如何在集群中运行它。 完整的代码
在这里 。
当我第一次遇到上述问题时,
这个项目非常有用。 但是,如果读者刚刚开始使用结构化流并且正在寻找有关如何将数据上传到cassandra的简单示例,则似乎有点复杂。 另外,该项目被编写为以本地模式工作,并且需要进行一些更改才能在集群中运行。
我还想举例说明如何使用
JDBC将数据保存到
MongoDB和任何其他数据库。
简单的解决方案
要将数据上传到外部系统,必须使用
foreach接收器。
在这里阅读更多有关此的内容。 简而言之,必须实现
ForeachWriter接口。 也就是说,有必要确定如何打开连接,如何处理每条数据以及如何在处理结束时关闭连接。 源代码如下:
class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] {
我稍后将描述
CassandraDriver的定义和输出表的结构,但是现在,让我们仔细看看上面的代码是如何工作的。 为了从Spark连接到Kasandra,我创建了一个
CassandraDriver对象,该对象提供对
CassandraConnector (由
DataStax开发的连接器)的访问。 CassandraConnector负责打开和关闭与数据库的连接,因此我仅在
CassandraSinkForeach类的
open和
close方法中显示调试消息。
从主应用程序调用以上代码,如下所示:
val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start()
为数据的每一行创建
CassandraSinkForeach ,因此每个工作节点都将其部分行插入数据库中。 也就是说,每个工作节点都执行
val cassandraDriver = new CassandraDriver(); 这是CassandraDriver的样子:
class CassandraDriver extends SparkSessionBuilder {
让我们仔细看看
spark对象。
SparkSessionBuilder的代码如下:
class SparkSessionBuilder extends Serializable {
在每个工作节点上,
SparkSessionBuilder提供对在驱动程序上创建的
SparkSession的访问。 为了使这种访问成为可能,必须序列化
SparkSessionBuilder并使用
瞬态 lazy val ,它允许序列化系统在程序初始化时忽略
conf和
spark对象,直到访问对象为止。 因此,在程序启动时,
buildSparkSession被序列化并发送到每个工作节点,但是仅当工作节点正在访问
conf和
spark对象时,才允许
conf和
spark对象。
现在让我们看一下主要的应用程序代码:
object KafkaToCassandra extends SparkSessionBuilder {
当发送应用程序执行时,
buildSparkSession被序列化并发送到工作节点,但是
conf和
spark对象仍未解析。 然后,驱动程序在
KafkaToCassandra内部创建一个spark对象,并在工作节点之间分配工作。 每个工作节点都从Kafka读取数据,对记录的接收部分进行简单的转换,当工作节点准备好将结果写入数据库时,它允许
conf和
spark对象,从而获得对在驱动程序上创建的
SparkSession的访问权限。
如何构建和运行应用程序?
当我从PySpark迁移到Scala时,花了我一段时间才弄清楚如何构建应用程序。 因此,我在项目中包含了Maven
pom.xml 。 读者可以通过运行
mvn package命令使用Maven构建应用程序。 可以将应用程序发送给执行后
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar
为了构建和运行应用程序,有必要用您自己的AWS机器名称替换(即替换所有类似ec2-xx-xxx-xx-xx.compute-1.amazonaws.com的名称)。
火花和结构化流特别是对我来说是一个新主题,因此,我将非常感谢读者提出意见,讨论和更正。