Cassandra Sink para Spark Structured Streaming

Hace un par de meses comencé a estudiar Spark, y en algún momento me encontré con el problema de guardar los cálculos de Streaming Estructurado en la base de datos de Cassandra.

En esta publicación, doy un ejemplo simple de creación y uso de Cassandra Sink para Spark Structured Streaming. Espero que la publicación sea útil para aquellos que recientemente comenzaron a trabajar con Spark Structured Streaming y se preguntan cómo cargar los resultados del cálculo en la base de datos.

La idea de la aplicación es muy simple: recibir y analizar mensajes de Kafka, realizar transformaciones simples en un par y guardar los resultados en cassandra.

Ventajas de la transmisión estructurada


Puede leer más sobre la transmisión estructurada en la documentación . En resumen, Structured Streaming es un motor de procesamiento de información de transmisión bien escalable que se basa en el motor Spark SQL. Le permite usar un conjunto de datos / marco de datos para agregar datos, calcular funciones de ventana, conexiones, etc. Es decir, la transmisión estructurada le permite usar el buen SQL antiguo para trabajar con flujos de datos.

Cual es el problema


El lanzamiento estable de Spark Structured Streaming se lanzó en 2017. Es decir, esta es una API bastante nueva que implementa la funcionalidad básica, pero algunas cosas deberán ser realizadas por nosotros mismos. Por ejemplo, Structured Streaming tiene funciones estándar para escribir resultados en un archivo, mosaico, consola o memoria, pero para guardar los datos en la base de datos, debe usar el receptor foreach disponible en Structured Streaming e implementar la interfaz ForeachWriter . A partir de Spark 2.3.1, esta funcionalidad solo se puede implementar en Scala y Java .

Supongo que el lector ya sabe cómo funciona Structured Streaming en términos generales, sabe cómo implementar las transformaciones necesarias y ahora está listo para cargar los resultados en la base de datos. Si alguno de los pasos anteriores no está claro, la documentación oficial puede servir como un buen punto de partida para aprender la Transmisión Estructurada. En este artículo, me gustaría centrarme en el último paso cuando necesite guardar los resultados en una base de datos.

A continuación, describiré un ejemplo de implementación del sumidero Cassandra para la transmisión estructurada y explicaré cómo ejecutarlo en un clúster. El código completo está disponible aquí .

Cuando encontré por primera vez el problema anterior, este proyecto resultó ser muy útil. Sin embargo, puede parecer un poco complicado si el lector acaba de comenzar a trabajar con Structured Streaming y está buscando un ejemplo simple de cómo cargar datos en Cassandra. Además, el proyecto está escrito para funcionar en modo local y requiere algunos cambios para ejecutarse en el clúster.

También quiero dar ejemplos de cómo guardar datos en MongoDB y cualquier otra base de datos usando JDBC .

Solución simple


Para cargar datos a un sistema externo, debe usar el receptor foreach . Lea más sobre esto aquí . En resumen, se debe implementar la interfaz ForeachWriter . Es decir, es necesario determinar cómo abrir la conexión, cómo procesar cada dato y cómo cerrar la conexión al final del procesamiento. El código fuente es el siguiente:

class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] { // This class implements the interface ForeachWriter, which has methods that get called // whenever there is a sequence of rows generated as output val cassandraDriver = new CassandraDriver(); def open(partitionId: Long, version: Long): Boolean = { // open connection println(s"Open connection") true } def process(record: org.apache.spark.sql.Row) = { println(s"Process new $record") cassandraDriver.connector.withSessionDo(session => session.execute(s""" insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} (fx_marker, timestamp_ms, timestamp_dt) values('${record(0)}', '${record(1)}', '${record(2)}')""") ) } def close(errorOrNull: Throwable): Unit = { // close the connection println(s"Close connection") } } 

La definición de CassandraDriver y la estructura de la tabla de salida que describiré más adelante, pero por ahora, echemos un vistazo más de cerca a cómo funciona el código anterior. Para conectarme a Kasandra desde Spark, creo un objeto CassandraDriver que proporciona acceso a CassandraConnector , un conector desarrollado por DataStax . CassandraConnector es responsable de abrir y cerrar la conexión a la base de datos, por lo que simplemente visualizo mensajes de depuración en los métodos de apertura y cierre de la clase CassandraSinkForeach .

El código anterior se llama desde la aplicación principal de la siguiente manera:

 val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start() 

CassandraSinkForeach se crea para cada fila de datos, por lo que cada nodo de trabajo inserta su parte de las filas en la base de datos. Es decir, cada nodo de trabajo ejecuta val cassandraDriver = new CassandraDriver (); Así es como se ve CassandraDriver:

 class CassandraDriver extends SparkSessionBuilder { // This object will be used in CassandraSinkForeach to connect to Cassandra DB from an executor. // It extends SparkSessionBuilder so to use the same SparkSession on each node. val spark = buildSparkSession import spark.implicits._ val connector = CassandraConnector(spark.sparkContext.getConf) // Define Cassandra's table which will be used as a sink /* For this app I used the following table: CREATE TABLE fx.spark_struct_stream_sink ( fx_marker text, timestamp_ms timestamp, timestamp_dt date, primary key (fx_marker)); */ val namespace = "fx" val foreachTableSink = "spark_struct_stream_sink" } 

Echemos un vistazo más de cerca al objeto de chispa . El código para SparkSessionBuilder es el siguiente:

 class SparkSessionBuilder extends Serializable { // Build a spark session. Class is made serializable so to get access to SparkSession in a driver and executors. // Note here the usage of @transient lazy val def buildSparkSession: SparkSession = { @transient lazy val conf: SparkConf = new SparkConf() .setAppName("Structured Streaming from Kafka to Cassandra") .set("spark.cassandra.connection.host", "ec2-52-23-103-178.compute-1.amazonaws.com") .set("spark.sql.streaming.checkpointLocation", "checkpoint") @transient lazy val spark = SparkSession .builder() .config(conf) .getOrCreate() spark } } 

En cada nodo de trabajo, SparkSessionBuilder proporciona acceso a la SparkSession que se creó en el controlador. Para hacer posible dicho acceso, es necesario serializar SparkSessionBuilder y usar el valor vago transitorio , que permite que el sistema de serialización ignore los objetos conf y spark cuando el programa se inicializa y hasta que se accede a los objetos. Por lo tanto, cuando se inicia el programa, buildSparkSession se serializa y se envía a cada nodo de trabajo, pero los objetos conf y spark solo se permiten cuando el nodo de trabajo está accediendo a ellos.

Ahora veamos el código de la aplicación principal:

 object KafkaToCassandra extends SparkSessionBuilder { // Main body of the app. It also extends SparkSessionBuilder. def main(args: Array[String]) { val spark = buildSparkSession import spark.implicits._ // Define location of Kafka brokers: val broker = "ec2-18-209-75-68.compute-1.amazonaws.com:9092,ec2-18-205-142-57.compute-1.amazonaws.com:9092,ec2-50-17-32-144.compute-1.amazonaws.com:9092" /*Here is an example massage which I get from a Kafka stream. It contains multiple jsons separated by \n {"timestamp_ms": "1530305100936", "fx_marker": "EUR/GBP"} {"timestamp_ms": "1530305100815", "fx_marker": "USD/CHF"} {"timestamp_ms": "1530305100969", "fx_marker": "EUR/CHF"} {"timestamp_ms": "1530305100011", "fx_marker": "USD/CAD"} */ // Read incoming stream val dfraw = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", broker) .option("subscribe", "currency_exchange") .load() val schema = StructType( Seq( StructField("fx_marker", StringType, false), StructField("timestamp_ms", StringType, false) ) ) val df = dfraw .selectExpr("CAST(value AS STRING)").as[String] .flatMap(_.split("\n")) val jsons = df.select(from_json($"value", schema) as "data").select("data.*") // Process data. Create a new date column val parsed = jsons .withColumn("timestamp_dt", to_date(from_unixtime($"timestamp_ms"/1000.0, "yyyy-MM-dd HH:mm:ss.SSS"))) .filter("fx_marker != ''") // Output results into a database val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start() sink.awaitTermination() } } 

Cuando la aplicación se envía para su ejecución, buildSparkSession se serializa y se envía a los nodos de trabajo, sin embargo, los objetos conf y spark permanecen sin resolver. Luego, el controlador crea un objeto de chispa dentro de KafkaToCassandra y distribuye el trabajo entre los nodos de trabajo. Cada nodo de trabajo lee datos de Kafka, realiza transformaciones simples en la parte recibida de los registros, y cuando el nodo de trabajo está listo para escribir los resultados en la base de datos, permite conf y objetos de chispa , obteniendo así acceso a la SparkSession creada en el controlador.

¿Cómo construir y ejecutar la aplicación?


Cuando me mudé de PySpark a Scala, me llevó un tiempo descubrir cómo construir la aplicación. Por lo tanto, incluí Maven pom.xml en mi proyecto. El lector puede construir la aplicación usando Maven ejecutando el comando mvn package . Después de que la aplicación se puede enviar para su ejecución usando

 ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar 

Para construir y ejecutar la aplicación, es necesario reemplazar los nombres de mis máquinas AWS con los suyos (es decir, reemplazar todo lo que parece ec2-xx-xxx-xx-xx.compute-1.amazonaws.com).

Spark and Structured Streaming en particular es un tema nuevo para mí, por lo que agradeceré mucho a los lectores por sus comentarios, discusiones y correcciones.

Source: https://habr.com/ru/post/es425503/


All Articles