Cassandra Sink用于Spark结构化流

几个月前,我开始研究Spark,在某个时候,我面临着将结构化流计算保存在Cassandra数据库中的问题。

在本文中,我给出了一个简单的示例,该示例创建和使用Cassandra Sink进行Spark结构化流传输。 我希望该帖子对最近开始使用Spark Structured Streaming并想知道如何将计算结果上载到数据库的人员有用。

该应用程序的想法非常简单-接收和解析来自Kafka的消息,成对执行简单的转换并将结果保存在cassandra中。

结构化流的优点


您可以在文档中阅读有关结构化流的更多信息。 简而言之,结构化流是基于Spark SQL引擎的可扩展的流信息处理引擎。 它允许您使用Dataset / DataFrame聚合数据,计算窗口函数,连接等。也就是说,结构化流允许您使用良好的旧SQL处理数据流。

怎么了


Spark结构化流的稳定版本于2017年发布。 也就是说,这是一个相当新的API,可实现基本功能,但某些事情必须由我们自己完成。 例如,结构化流传输具有用于将输出写入文件,图块,控制台或内存的标准功能,但为了将数据保存到数据库,您必须使用结构化流传输中可用的foreach接收器并实现ForeachWriter接口。 从Spark 2.3.1开始,此功能只能在Scala和Java中实现

我假定读者已经知道结构化流一般如何工作,知道如何实现必要的转换,并且现在准备将结果上传到数据库。 如果上述步骤中的某些步骤不清楚,则官方文档可以作为学习结构化流媒体的一个很好的起点。 在本文中,当您需要将结果保存到数据库中时,我想着重介绍最后一步。

下面,我将描述用于结构化流的Cassandra接收器的示例实现,并说明如何在集群中运行它。 完整的代码在这里

当我第一次遇到上述问题时, 这个项目非常有用。 但是,如果读者刚刚开始使用结构化流并且正在寻找有关如何将数据上传到cassandra的简单示例,则似乎有点复杂。 另外,该项目被编写为以本地模式工作,并且需要进行一些更改才能在集群中运行。

我还想举例说明如何使用JDBC将数据保存到MongoDB和任何其他数据库。

简单的解决方案


要将数据上传到外部系统,必须使用foreach接收器。 在这里阅读更多有关此的内容。 简而言之,必须实现ForeachWriter接口。 也就是说,有必要确定如何打开连接,如何处理每条数据以及如何在处理结束时关闭连接。 源代码如下:

class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] { // This class implements the interface ForeachWriter, which has methods that get called // whenever there is a sequence of rows generated as output val cassandraDriver = new CassandraDriver(); def open(partitionId: Long, version: Long): Boolean = { // open connection println(s"Open connection") true } def process(record: org.apache.spark.sql.Row) = { println(s"Process new $record") cassandraDriver.connector.withSessionDo(session => session.execute(s""" insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} (fx_marker, timestamp_ms, timestamp_dt) values('${record(0)}', '${record(1)}', '${record(2)}')""") ) } def close(errorOrNull: Throwable): Unit = { // close the connection println(s"Close connection") } } 

我稍后将描述CassandraDriver的定义和输出表的结构,但是现在,让我们仔细看看上面的代码是如何工作的。 为了从Spark连接到Kasandra,我创建了一个CassandraDriver对象,该对象提供对CassandraConnector (由DataStax开发的连接器)的访问。 CassandraConnector负责打开和关闭与数据库的连接,因此我仅在CassandraSinkForeach类的openclose方法中显示调试消息。

从主应用程序调用以上代码,如下所示:

 val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start() 

为数据的每一行创建CassandraSinkForeach ,因此每个工作节点都将其部分行插入数据库中。 也就是说,每个工作节点都执行val cassandraDriver = new CassandraDriver(); 这是CassandraDriver的样子:

 class CassandraDriver extends SparkSessionBuilder { // This object will be used in CassandraSinkForeach to connect to Cassandra DB from an executor. // It extends SparkSessionBuilder so to use the same SparkSession on each node. val spark = buildSparkSession import spark.implicits._ val connector = CassandraConnector(spark.sparkContext.getConf) // Define Cassandra's table which will be used as a sink /* For this app I used the following table: CREATE TABLE fx.spark_struct_stream_sink ( fx_marker text, timestamp_ms timestamp, timestamp_dt date, primary key (fx_marker)); */ val namespace = "fx" val foreachTableSink = "spark_struct_stream_sink" } 

让我们仔细看看spark对象。 SparkSessionBuilder的代码如下:

 class SparkSessionBuilder extends Serializable { // Build a spark session. Class is made serializable so to get access to SparkSession in a driver and executors. // Note here the usage of @transient lazy val def buildSparkSession: SparkSession = { @transient lazy val conf: SparkConf = new SparkConf() .setAppName("Structured Streaming from Kafka to Cassandra") .set("spark.cassandra.connection.host", "ec2-52-23-103-178.compute-1.amazonaws.com") .set("spark.sql.streaming.checkpointLocation", "checkpoint") @transient lazy val spark = SparkSession .builder() .config(conf) .getOrCreate() spark } } 

在每个工作节点上, SparkSessionBuilder提供对在驱动程序上创建的SparkSession的访问。 为了使这种访问成为可能,必须序列化SparkSessionBuilder并使用瞬态 lazy val ,它允许序列化系统在程序初始化时忽略confspark对象,直到访问对象为止。 因此,在程序启动时, buildSparkSession被序列化并发送到每个工作节点,但是仅当工作节点正在访问confspark对象时,才允许confspark对象。

现在让我们看一下主要的应用程序代码:

 object KafkaToCassandra extends SparkSessionBuilder { // Main body of the app. It also extends SparkSessionBuilder. def main(args: Array[String]) { val spark = buildSparkSession import spark.implicits._ // Define location of Kafka brokers: val broker = "ec2-18-209-75-68.compute-1.amazonaws.com:9092,ec2-18-205-142-57.compute-1.amazonaws.com:9092,ec2-50-17-32-144.compute-1.amazonaws.com:9092" /*Here is an example massage which I get from a Kafka stream. It contains multiple jsons separated by \n {"timestamp_ms": "1530305100936", "fx_marker": "EUR/GBP"} {"timestamp_ms": "1530305100815", "fx_marker": "USD/CHF"} {"timestamp_ms": "1530305100969", "fx_marker": "EUR/CHF"} {"timestamp_ms": "1530305100011", "fx_marker": "USD/CAD"} */ // Read incoming stream val dfraw = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", broker) .option("subscribe", "currency_exchange") .load() val schema = StructType( Seq( StructField("fx_marker", StringType, false), StructField("timestamp_ms", StringType, false) ) ) val df = dfraw .selectExpr("CAST(value AS STRING)").as[String] .flatMap(_.split("\n")) val jsons = df.select(from_json($"value", schema) as "data").select("data.*") // Process data. Create a new date column val parsed = jsons .withColumn("timestamp_dt", to_date(from_unixtime($"timestamp_ms"/1000.0, "yyyy-MM-dd HH:mm:ss.SSS"))) .filter("fx_marker != ''") // Output results into a database val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start() sink.awaitTermination() } } 

当发送应用程序执行时, buildSparkSession被序列化并发送到工作节点,但是confspark对象仍未解析。 然后,驱动程序在KafkaToCassandra内部创建一个spark对象,并在工作节点之间分配工作。 每个工作节点都从Kafka读取数据,对记录的接收部分进行简单的转换,当工作节点准备好将结果写入数据库时​​,它允许confspark对象,从而获得对在驱动程序上创建的SparkSession的访问权限。

如何构建和运行应用程序?


当我从PySpark迁移到Scala时,花了我一段时间才弄清楚如何构建应用程序。 因此,我在项目中包含了Maven pom.xml 。 读者可以通过运行mvn package命令使用Maven构建应用程序。 可以将应用程序发送给执行后

 ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar 

为了构建和运行应用程序,有必要用您自己的AWS机器名称替换(即替换所有类似ec2-xx-xxx-xx-xx.compute-1.amazonaws.com的名称)。

火花和结构化流特别是对我来说是一个新主题,因此,我将非常感谢读者提出意见,讨论和更正。

Source: https://habr.com/ru/post/zh-CN425503/


All Articles