Il y a quelques mois, j'ai commencé à étudier Spark, et à un moment donné, j'ai été confronté au problème de l'enregistrement des calculs de Structured Streaming dans la base de données Cassandra.
Dans cet article, je donne un exemple simple de création et d'utilisation de Cassandra Sink pour Spark Structured Streaming. J'espère que le message sera utile à ceux qui ont récemment commencé à travailler avec Spark Structured Streaming et se demandent comment télécharger les résultats des calculs dans la base de données.
L'idée de l'application est très simple - pour recevoir et analyser des messages de Kafka, effectuer des transformations simples dans une paire et enregistrer les résultats dans cassandra.
Avantages du streaming structuré
Vous pouvez en savoir plus sur Structured Streaming dans la
documentation . En bref, Structured Streaming est un moteur de traitement d'informations de streaming bien évolutif basé sur le moteur Spark SQL. Il vous permet d'utiliser Dataset / DataFrame pour agréger des données, calculer des fonctions de fenêtre, des connexions, etc. C'est-à-dire que Structured Streaming vous permet d'utiliser le bon vieux SQL pour travailler avec des flux de données.
Quel est le problème?
La version stable de Spark Structured Streaming a été publiée en 2017. Autrement dit, il s'agit d'une API assez nouvelle qui implémente les fonctionnalités de base, mais certaines choses devront être faites par nous-mêmes. Par exemple, Structured Streaming a des fonctions standard pour écrire la sortie dans un fichier, une tuile, une console ou une mémoire, mais pour enregistrer les données dans la base de données, vous devez utiliser le récepteur
foreach disponible dans Structured Streaming et implémenter l'interface
ForeachWriter .
Depuis Spark 2.3.1, cette fonctionnalité ne peut être implémentée que dans Scala et Java .
Je suppose que le lecteur sait déjà comment le streaming structuré fonctionne en termes généraux, sait comment mettre en œuvre les transformations nécessaires et est maintenant prêt à télécharger les résultats dans la base de données. Si certaines des étapes ci-dessus ne sont pas claires, la documentation officielle peut servir de bon point de départ pour l'apprentissage du streaming structuré. Dans cet article, je voudrais me concentrer sur la dernière étape lorsque vous devez enregistrer les résultats dans une base de données.
Ci-dessous, je vais décrire un exemple d'implémentation du récepteur Cassandra pour le streaming structuré et expliquer comment l'exécuter dans un cluster. Le code complet est disponible
ici .
Lorsque j'ai rencontré le problème ci-dessus pour la première fois,
ce projet s'est avéré très utile. Cependant, cela peut sembler un peu compliqué si le lecteur vient de commencer à travailler avec le streaming structuré et cherche un exemple simple de la façon de télécharger des données sur cassandra. En outre, le projet est écrit pour fonctionner en mode local et nécessite certaines modifications pour s'exécuter dans le cluster.
Je veux également donner des exemples de la façon de sauvegarder des données sur
MongoDB et toute autre base de données en utilisant
JDBC .
Solution simple
Pour télécharger des données vers un système externe, vous devez utiliser le récepteur
foreach . En savoir plus à ce sujet
ici . En bref, l'interface
ForeachWriter doit être implémentée. Autrement dit, il est nécessaire de déterminer comment ouvrir la connexion, comment traiter chaque élément de données et comment fermer la connexion à la fin du traitement. Le code source est le suivant:
class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] {
La définition de
CassandraDriver et la structure du tableau de sortie que je décrirai plus tard, mais pour l'instant, examinons de plus près comment fonctionne le code ci-dessus. Pour me connecter à Kasandra depuis Spark, je crée un objet
CassandraDriver qui donne accès à
CassandraConnector , un connecteur développé par
DataStax . Le CassandraConnector est responsable de l'ouverture et de la fermeture de la connexion à la base de données, donc j'affiche simplement les messages de débogage dans les méthodes
open et
close de la classe
CassandraSinkForeach .
Le code ci-dessus est appelé depuis l'application principale comme suit:
val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start()
CassandraSinkForeach est créé pour chaque ligne de données, de sorte que chaque nœud de travail insère sa partie des lignes dans la base de données. Autrement dit, chaque nœud de travail exécute
val cassandraDriver = new CassandraDriver (); Voici à quoi ressemble CassandraDriver:
class CassandraDriver extends SparkSessionBuilder {
Examinons de plus près l'objet
étincelle . Le code de
SparkSessionBuilder est le suivant:
class SparkSessionBuilder extends Serializable {
Sur chaque nœud de travail,
SparkSessionBuilder fournit un accès à la
SparkSession qui a été créée sur le pilote. Pour rendre un tel accès possible, il est nécessaire de sérialiser
SparkSessionBuilder et d'utiliser la valeur
transitoire paresseuse , ce qui permet au système de sérialisation d'ignorer
les objets
conf et
spark lorsque le programme est initialisé et jusqu'à ce que les objets soient accessibles. Ainsi, lorsque le programme
buildSparkSession démarre, il est sérialisé et envoyé à chaque nœud de travail, mais
les objets
conf et
spark ne sont autorisés que lorsque le nœud de travail y accède.
Examinons maintenant le code d'application principal:
object KafkaToCassandra extends SparkSessionBuilder {
Lorsque l'application est envoyée pour exécution,
buildSparkSession est sérialisé et envoyé aux nœuds de travail, cependant,
les objets
conf et
spark restent non résolus. Ensuite, le pilote crée un objet étincelle à l'intérieur de
KafkaToCassandra et répartit le travail entre les nœuds de travail. Chaque nœud de travail lit les données de Kafka, effectue des transformations simples sur la partie reçue des enregistrements et lorsque le nœud de travail est prêt à écrire les résultats dans la base de données, il autorise
les objets
conf et
spark , obtenant ainsi l'accès à la
SparkSession créée sur le pilote.
Comment construire et exécuter l'application?
Lorsque j'ai déménagé de PySpark à Scala, il m'a fallu un certain temps pour comprendre comment créer l'application. Par conséquent, j'ai inclus Maven
pom.xml dans mon projet. Le lecteur peut créer l'application à l'aide de Maven en exécutant la commande de
package mvn . Après que l'application peut être envoyée pour exécution en utilisant
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar
Pour créer et exécuter l'application, il est nécessaire de remplacer les noms de mes machines AWS par les vôtres (c'est-à-dire remplacer tout ce qui ressemble à ec2-xx-xxx-xx-xx.compute-1.amazonaws.com).
Le Spark et le Streaming Structuré en particulier est un nouveau sujet pour moi, donc je serai très reconnaissant aux lecteurs pour leurs commentaires, discussions et corrections.