Évier Cassandra pour le streaming structuré Spark

Il y a quelques mois, j'ai commencé à étudier Spark, et à un moment donné, j'ai été confronté au problème de l'enregistrement des calculs de Structured Streaming dans la base de données Cassandra.

Dans cet article, je donne un exemple simple de création et d'utilisation de Cassandra Sink pour Spark Structured Streaming. J'espère que le message sera utile à ceux qui ont récemment commencé à travailler avec Spark Structured Streaming et se demandent comment télécharger les résultats des calculs dans la base de données.

L'idée de l'application est très simple - pour recevoir et analyser des messages de Kafka, effectuer des transformations simples dans une paire et enregistrer les résultats dans cassandra.

Avantages du streaming structuré


Vous pouvez en savoir plus sur Structured Streaming dans la documentation . En bref, Structured Streaming est un moteur de traitement d'informations de streaming bien évolutif basé sur le moteur Spark SQL. Il vous permet d'utiliser Dataset / DataFrame pour agréger des données, calculer des fonctions de fenêtre, des connexions, etc. C'est-à-dire que Structured Streaming vous permet d'utiliser le bon vieux SQL pour travailler avec des flux de données.

Quel est le problème?


La version stable de Spark Structured Streaming a été publiée en 2017. Autrement dit, il s'agit d'une API assez nouvelle qui implémente les fonctionnalités de base, mais certaines choses devront être faites par nous-mêmes. Par exemple, Structured Streaming a des fonctions standard pour écrire la sortie dans un fichier, une tuile, une console ou une mémoire, mais pour enregistrer les données dans la base de données, vous devez utiliser le récepteur foreach disponible dans Structured Streaming et implémenter l'interface ForeachWriter . Depuis Spark 2.3.1, cette fonctionnalité ne peut être implémentée que dans Scala et Java .

Je suppose que le lecteur sait déjà comment le streaming structuré fonctionne en termes généraux, sait comment mettre en œuvre les transformations nécessaires et est maintenant prêt à télécharger les résultats dans la base de données. Si certaines des étapes ci-dessus ne sont pas claires, la documentation officielle peut servir de bon point de départ pour l'apprentissage du streaming structuré. Dans cet article, je voudrais me concentrer sur la dernière étape lorsque vous devez enregistrer les résultats dans une base de données.

Ci-dessous, je vais décrire un exemple d'implémentation du récepteur Cassandra pour le streaming structuré et expliquer comment l'exécuter dans un cluster. Le code complet est disponible ici .

Lorsque j'ai rencontré le problème ci-dessus pour la première fois, ce projet s'est avéré très utile. Cependant, cela peut sembler un peu compliqué si le lecteur vient de commencer à travailler avec le streaming structuré et cherche un exemple simple de la façon de télécharger des données sur cassandra. En outre, le projet est écrit pour fonctionner en mode local et nécessite certaines modifications pour s'exécuter dans le cluster.

Je veux également donner des exemples de la façon de sauvegarder des données sur MongoDB et toute autre base de données en utilisant JDBC .

Solution simple


Pour télécharger des données vers un système externe, vous devez utiliser le récepteur foreach . En savoir plus à ce sujet ici . En bref, l'interface ForeachWriter doit être implémentée. Autrement dit, il est nécessaire de déterminer comment ouvrir la connexion, comment traiter chaque élément de données et comment fermer la connexion à la fin du traitement. Le code source est le suivant:

class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] { // This class implements the interface ForeachWriter, which has methods that get called // whenever there is a sequence of rows generated as output val cassandraDriver = new CassandraDriver(); def open(partitionId: Long, version: Long): Boolean = { // open connection println(s"Open connection") true } def process(record: org.apache.spark.sql.Row) = { println(s"Process new $record") cassandraDriver.connector.withSessionDo(session => session.execute(s""" insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} (fx_marker, timestamp_ms, timestamp_dt) values('${record(0)}', '${record(1)}', '${record(2)}')""") ) } def close(errorOrNull: Throwable): Unit = { // close the connection println(s"Close connection") } } 

La définition de CassandraDriver et la structure du tableau de sortie que je décrirai plus tard, mais pour l'instant, examinons de plus près comment fonctionne le code ci-dessus. Pour me connecter à Kasandra depuis Spark, je crée un objet CassandraDriver qui donne accès à CassandraConnector , un connecteur développé par DataStax . Le CassandraConnector est responsable de l'ouverture et de la fermeture de la connexion à la base de données, donc j'affiche simplement les messages de débogage dans les méthodes open et close de la classe CassandraSinkForeach .

Le code ci-dessus est appelé depuis l'application principale comme suit:

 val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start() 

CassandraSinkForeach est créé pour chaque ligne de données, de sorte que chaque nœud de travail insère sa partie des lignes dans la base de données. Autrement dit, chaque nœud de travail exécute val cassandraDriver = new CassandraDriver (); Voici à quoi ressemble CassandraDriver:

 class CassandraDriver extends SparkSessionBuilder { // This object will be used in CassandraSinkForeach to connect to Cassandra DB from an executor. // It extends SparkSessionBuilder so to use the same SparkSession on each node. val spark = buildSparkSession import spark.implicits._ val connector = CassandraConnector(spark.sparkContext.getConf) // Define Cassandra's table which will be used as a sink /* For this app I used the following table: CREATE TABLE fx.spark_struct_stream_sink ( fx_marker text, timestamp_ms timestamp, timestamp_dt date, primary key (fx_marker)); */ val namespace = "fx" val foreachTableSink = "spark_struct_stream_sink" } 

Examinons de plus près l'objet étincelle . Le code de SparkSessionBuilder est le suivant:

 class SparkSessionBuilder extends Serializable { // Build a spark session. Class is made serializable so to get access to SparkSession in a driver and executors. // Note here the usage of @transient lazy val def buildSparkSession: SparkSession = { @transient lazy val conf: SparkConf = new SparkConf() .setAppName("Structured Streaming from Kafka to Cassandra") .set("spark.cassandra.connection.host", "ec2-52-23-103-178.compute-1.amazonaws.com") .set("spark.sql.streaming.checkpointLocation", "checkpoint") @transient lazy val spark = SparkSession .builder() .config(conf) .getOrCreate() spark } } 

Sur chaque nœud de travail, SparkSessionBuilder fournit un accès à la SparkSession qui a été créée sur le pilote. Pour rendre un tel accès possible, il est nécessaire de sérialiser SparkSessionBuilder et d'utiliser la valeur transitoire paresseuse , ce qui permet au système de sérialisation d'ignorer les objets conf et spark lorsque le programme est initialisé et jusqu'à ce que les objets soient accessibles. Ainsi, lorsque le programme buildSparkSession démarre, il est sérialisé et envoyé à chaque nœud de travail, mais les objets conf et spark ne sont autorisés que lorsque le nœud de travail y accède.

Examinons maintenant le code d'application principal:

 object KafkaToCassandra extends SparkSessionBuilder { // Main body of the app. It also extends SparkSessionBuilder. def main(args: Array[String]) { val spark = buildSparkSession import spark.implicits._ // Define location of Kafka brokers: val broker = "ec2-18-209-75-68.compute-1.amazonaws.com:9092,ec2-18-205-142-57.compute-1.amazonaws.com:9092,ec2-50-17-32-144.compute-1.amazonaws.com:9092" /*Here is an example massage which I get from a Kafka stream. It contains multiple jsons separated by \n {"timestamp_ms": "1530305100936", "fx_marker": "EUR/GBP"} {"timestamp_ms": "1530305100815", "fx_marker": "USD/CHF"} {"timestamp_ms": "1530305100969", "fx_marker": "EUR/CHF"} {"timestamp_ms": "1530305100011", "fx_marker": "USD/CAD"} */ // Read incoming stream val dfraw = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", broker) .option("subscribe", "currency_exchange") .load() val schema = StructType( Seq( StructField("fx_marker", StringType, false), StructField("timestamp_ms", StringType, false) ) ) val df = dfraw .selectExpr("CAST(value AS STRING)").as[String] .flatMap(_.split("\n")) val jsons = df.select(from_json($"value", schema) as "data").select("data.*") // Process data. Create a new date column val parsed = jsons .withColumn("timestamp_dt", to_date(from_unixtime($"timestamp_ms"/1000.0, "yyyy-MM-dd HH:mm:ss.SSS"))) .filter("fx_marker != ''") // Output results into a database val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start() sink.awaitTermination() } } 

Lorsque l'application est envoyée pour exécution, buildSparkSession est sérialisé et envoyé aux nœuds de travail, cependant, les objets conf et spark restent non résolus. Ensuite, le pilote crée un objet étincelle à l'intérieur de KafkaToCassandra et répartit le travail entre les nœuds de travail. Chaque nœud de travail lit les données de Kafka, effectue des transformations simples sur la partie reçue des enregistrements et lorsque le nœud de travail est prêt à écrire les résultats dans la base de données, il autorise les objets conf et spark , obtenant ainsi l'accès à la SparkSession créée sur le pilote.

Comment construire et exécuter l'application?


Lorsque j'ai déménagé de PySpark à Scala, il m'a fallu un certain temps pour comprendre comment créer l'application. Par conséquent, j'ai inclus Maven pom.xml dans mon projet. Le lecteur peut créer l'application à l'aide de Maven en exécutant la commande de package mvn . Après que l'application peut être envoyée pour exécution en utilisant

 ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar 

Pour créer et exécuter l'application, il est nécessaire de remplacer les noms de mes machines AWS par les vôtres (c'est-à-dire remplacer tout ce qui ressemble à ec2-xx-xxx-xx-xx.compute-1.amazonaws.com).

Le Spark et le Streaming Structuré en particulier est un nouveau sujet pour moi, donc je serai très reconnaissant aux lecteurs pour leurs commentaires, discussions et corrections.

Source: https://habr.com/ru/post/fr425503/


All Articles