Apache Kafka et le streaming avec Spark Streaming

Bonjour, Habr! Aujourd'hui, nous allons construire un système qui utilisera Apark Kafka pour traiter les flux de messages à l'aide de Spark Streaming et écrire le résultat du traitement dans la base de données cloud AWS RDS.

Imaginez qu'un certain établissement de crédit nous ait confié la tâche de traiter à la volée les transactions entrantes dans toutes ses succursales. Cela peut être fait afin de calculer rapidement la position de change ouverte pour le Trésor, les limites ou les résultats financiers des transactions, etc.

Comment mettre en œuvre ce cas sans utiliser de magie et de sorts magiques - nous lisons sous la coupe! C'est parti!


(Source de l'image)

Présentation


Bien sûr, le traitement d'un grand tableau de données en temps réel offre de nombreuses possibilités d'utilisation dans les systèmes modernes. L'une des combinaisons les plus populaires pour cela est le tandem Apache Kafka et Spark Streaming, où Kafka crée un flux de paquets de messages entrants, et Spark Streaming traite ces paquets à un intervalle de temps spécifié.

Pour augmenter la tolérance aux pannes de l'application, nous utiliserons des points de contrôle - points de contrôle. En utilisant ce mécanisme, lorsque le module Spark Streaming a besoin de récupérer des données perdues, il n'a qu'à retourner au dernier point de contrôle et reprendre ses calculs.

Architecture du système en cours de développement




Composants utilisés:

  • Apache Kafka est un système de messagerie distribuĂ© avec publication et abonnement. Convient Ă  la fois pour la consommation de messages hors ligne et en ligne. Pour Ă©viter la perte de donnĂ©es, les messages Kafka sont stockĂ©s sur disque et rĂ©pliquĂ©s dans le cluster. Le système Kafka est construit au-dessus du service de synchronisation ZooKeeper;
  • Apache Spark Streaming - Composant Spark pour le traitement des donnĂ©es en streaming. Le module Spark Streaming est construit Ă  l'aide de l'architecture micro-batch, lorsque le flux de donnĂ©es est interprĂ©tĂ© comme une sĂ©quence continue de petits paquets de donnĂ©es. Spark Streaming reçoit des donnĂ©es de diverses sources et les combine en petits paquets. De nouveaux packages sont créés Ă  intervalles rĂ©guliers. Au dĂ©but de chaque intervalle de temps, un nouveau paquet est créé et toutes les donnĂ©es reçues pendant cet intervalle sont incluses dans le paquet. Ă€ la fin de l'intervalle, la croissance des paquets s'arrĂŞte. La taille de l'intervalle est dĂ©terminĂ©e par un paramètre appelĂ© intervalle de lot;
  • Apache Spark SQL - Combine le traitement relationnel avec la programmation fonctionnelle Spark. Les donnĂ©es structurĂ©es font rĂ©fĂ©rence aux donnĂ©es qui ont un schĂ©ma, c'est-Ă -dire un ensemble unique de champs pour tous les enregistrements. Spark SQL prend en charge la saisie Ă  partir d'une variĂ©tĂ© de sources de donnĂ©es structurĂ©es et, grâce Ă  la disponibilitĂ© des informations sur le schĂ©ma, il ne peut rĂ©cupĂ©rer efficacement que les champs d'enregistrement requis et fournit Ă©galement des API DataFrame;
  • AWS RDS est une base de donnĂ©es relationnelle basĂ©e sur le cloud relativement peu coĂ»teuse, un service Web qui simplifie la configuration, le fonctionnement et la mise Ă  l'Ă©chelle, et est directement administrĂ© par Amazon.

Installer et démarrer le serveur Kafka


Avant d'utiliser directement Kafka, vous devez vous assurer que Java est disponible, car JVM est utilisé pour le travail:

sudo apt-get update sudo apt-get install default-jre java -version 

Créez un nouvel utilisateur pour travailler avec Kafka:

 sudo useradd kafka -m sudo passwd kafka sudo adduser kafka sudo 

Ensuite, téléchargez la distribution sur le site officiel d'Apache Kafka:

 wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz" 

Décompressez l'archive téléchargée:
 tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka 

L'étape suivante est facultative. Le fait est que les paramètres par défaut ne permettent pas d'utiliser pleinement toutes les fonctionnalités d'Apache Kafka. Par exemple, supprimez un sujet, une catégorie, un groupe dans lequel les messages peuvent être publiés. Pour changer cela, modifiez le fichier de configuration:

 vim ~/kafka/config/server.properties 

Ajoutez ce qui suit Ă  la fin du fichier:

 delete.topic.enable = true 

Avant de démarrer le serveur Kafka, vous devez démarrer le serveur ZooKeeper, nous utiliserons le script auxiliaire fourni avec la distribution Kafka:

 Cd ~/kafka bin/zookeeper-server-start.sh config/zookeeper.properties 

Après le démarrage réussi de ZooKeeper, dans un terminal séparé, nous lançons le serveur Kafka:

 bin/kafka-server-start.sh config/server.properties 

Créez un nouveau sujet appelé Transaction:

 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction 

Assurez-vous que la rubrique avec le bon nombre de partitions et de réplication a été créée:

 bin/kafka-topics.sh --describe --zookeeper localhost:2181 



Nous manquerons les moments de tester le producteur et le consommateur pour le sujet nouvellement créé. Pour plus de détails sur la façon de tester l'envoi et la réception de messages, consultez la documentation officielle - Envoyer des messages . Eh bien, nous passons à l'écriture d'un producteur en Python à l'aide de l'API KafkaProducer.

Écriture du producteur


Le producteur générera des données aléatoires - 100 messages par seconde. Par données aléatoires, nous entendons un dictionnaire composé de trois champs:

  • Succursale - nom du point de vente de l'Ă©tablissement de crĂ©dit;
  • Devise - devise de transaction;
  • Montant - montant de la transaction. Le montant sera un nombre positif s'il s'agit d'un achat de devises par la Banque et nĂ©gatif s'il s'agit d'une vente.

Le code du producteur est le suivant:

 from numpy.random import choice, randint def get_random_value(): new_dict = {} branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut'] currency_list = ['RUB', 'USD', 'EUR', 'GBP'] new_dict['branch'] = choice(branch_list) new_dict['currency'] = choice(currency_list) new_dict['amount'] = randint(-100, 100) return new_dict 

Ensuite, en utilisant la méthode d'envoi, nous envoyons un message au serveur, dans la rubrique dont nous avons besoin, au format JSON:

 from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x:dumps(x).encode('utf-8'), compression_type='gzip') my_topic = 'transaction' data = get_random_value() try: future = producer.send(topic = my_topic, value = data) record_metadata = future.get(timeout=10) print('--> The message has been sent to a topic: \ {}, partition: {}, offset: {}' \ .format(record_metadata.topic, record_metadata.partition, record_metadata.offset )) except Exception as e: print('--> It seems an Error occurred: {}'.format(e)) finally: producer.flush() 

Lors de l'exécution du script, nous recevons les messages suivants dans le terminal:


Cela signifie que tout fonctionne comme nous le voulions - le producteur génère et envoie des messages au sujet dont nous avons besoin.

L'étape suivante consiste à installer Spark et à traiter ce flux de messages.

Installer Apache Spark


Apache Spark est une plate-forme informatique en grappe polyvalente et hautes performances.

En termes de performances, Spark surpasse les implémentations populaires du modèle MapReduce, fournissant simultanément la prise en charge d'un plus large éventail de types de calculs, y compris les requêtes interactives et le traitement des flux. La vitesse joue un rôle important dans le traitement de grandes quantités de données, car c'est la vitesse qui vous permet de travailler de manière interactive sans passer des minutes ou des heures à attendre. L'une des plus grandes forces de Spark à une vitesse aussi élevée est sa capacité à effectuer des calculs en mémoire.

Ce framework est écrit en Scala, vous devez donc d'abord l'installer:

 sudo apt-get install scala 

Téléchargez la distribution Spark sur le site officiel:

 wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz" 

Décompressez l'archive:

 sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark 

Ajoutez le chemin d'accès à Spark dans le fichier bash:

 vim ~/.bashrc 

Ajoutez les lignes suivantes via l'éditeur:

 SPARK_HOME=/usr/local/spark export PATH=$SPARK_HOME/bin:$PATH 

Exécutez la commande ci-dessous après avoir apporté des modifications à bashrc:

 source ~/.bashrc 

Déploiement d'AWS PostgreSQL


Il reste à déployer la base de données, où nous téléchargerons les informations traitées à partir des flux. Pour cela, nous utiliserons le service AWS RDS.

Accédez à la console AWS -> AWS RDS -> Bases de données -> Créer une base de données:


Sélectionnez PostgreSQL et cliquez sur le bouton Suivant:


Parce que Cet exemple est compris uniquement à des fins pédagogiques, nous utiliserons un serveur gratuit «au minimum» (Free Tier):


Ensuite, cochez le bloc Free Tier, et après cela, on nous proposera automatiquement une instance de la classe t2.micro - bien que faible, elle est gratuite et tout à fait adaptée à notre tâche:

Des choses très importantes suivent: le nom de l'instance de base de données, le nom de l'utilisateur principal et son mot de passe. Nommons l'instance: myHabrTest, l'utilisateur principal: habr , le mot de passe: habr12345 et cliquez sur le bouton Suivant:



La page suivante contient les paramètres responsables de la disponibilité de notre serveur de base de données de l'extérieur (accessibilité publique) et de la disponibilité des ports:


Créons une nouvelle configuration pour le groupe de sécurité VPC, qui nous permettra d'accéder à notre serveur de base de données de l'extérieur via le port 5432 (PostgreSQL).

Dans une fenêtre de navigateur distincte, accédez à la console AWS dans le tableau de bord VPC -> Groupes de sécurité -> Créer un groupe de sécurité:

Définissez le nom du groupe Sécurité - PostgreSQL, une description, indiquez à quel VPC ce groupe doit être associé et cliquez sur le bouton Créer:


Remplissez le groupe de règles entrantes nouvellement créé pour le port 5432, comme indiqué dans l'image ci-dessous. Vous n'avez pas besoin de spécifier un port manuel, mais sélectionnez PostgreSQL dans la liste déroulante Type.

À strictement parler, la valeur :: / 0 signifie la disponibilité du trafic entrant pour un serveur de partout dans le monde, ce qui n'est canoniquement pas tout à fait vrai, mais pour l'analyse de l'exemple, utilisons cette approche:


Nous revenons à la page du navigateur, où nous avons «Configurer les paramètres avancés» ouvert et sélectionnez dans la section Groupes de sécurité VPC -> Choisissez les groupes de sécurité VPC existants -> PostgreSQL:


Ensuite, dans la section Options de la base de données -> Nom de la base de données -> définissez le nom - habrDB .

Nous pouvons laisser le reste des paramètres, à l'exception de la désactivation de la sauvegarde (période de rétention de la sauvegarde - 0 jour), de la surveillance et de Performance Insights, par défaut. Cliquez sur le bouton Créer une base de données :


Gestionnaire de flux


La dernière étape sera le développement de travaux Spark, qui traiteront toutes les deux secondes les nouvelles données provenant de Kafka et entreront le résultat dans la base de données.

Comme indiqué ci-dessus, les points de contrôle sont le principal mécanisme de SparkStreaming qui doit être configuré pour fournir une tolérance aux pannes. Nous utiliserons des points de contrôle et, en cas de chute d'une procédure, le module Spark Streaming n'aura qu'à retourner au dernier point de contrôle et reprendre ses calculs pour restaurer les données perdues.

Vous pouvez activer le point d'arrêt en définissant le répertoire dans un système de fichiers fiable et tolérant aux pannes (par exemple, HDFS, S3, etc.), dans lequel les informations sur le point d'arrêt seront enregistrées. Cela se fait en utilisant, par exemple:

 streamingContext.checkpoint(checkpointDirectory) 

Dans notre exemple, nous utiliserons l'approche suivante, à savoir, si checkpointDirectory existe, le contexte sera recréé à partir des données du point de contrôle. Si le répertoire n'existe pas (c'est-à-dire qu'il est exécuté pour la première fois), la fonction functionToCreateContext est appelée pour créer un nouveau contexte et configurer DStreams:

 from pyspark.streaming import StreamingContext context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext) 

Créez un objet DirectStream pour vous connecter à la rubrique "transaction" à l'aide de la méthode createDirectStream de la bibliothèque KafkaUtils:

 from pyspark.streaming.kafka import KafkaUtils sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 2) broker_list = 'localhost:9092' topic = 'transaction' directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": broker_list}) 

Analyse des données entrantes au format JSON:

 rowRdd = rdd.map(lambda w: Row(branch=w['branch'], currency=w['currency'], amount=w['amount'])) testDataFrame = spark.createDataFrame(rowRdd) testDataFrame.createOrReplaceTempView("treasury_stream") 

En utilisant Spark SQL, nous faisons un regroupement simple et imprimons le résultat sur la console:

 select from_unixtime(unix_timestamp()) as curr_time, t.branch as branch_name, t.currency as currency_code, sum(amount) as batch_value from treasury_stream t group by t.branch, t.currency 

Obtenir le texte de la requête et l'exécuter via Spark SQL:

 sql_query = get_sql_query() testResultDataFrame = spark.sql(sql_query) testResultDataFrame.show(n=5) 

Et puis nous enregistrons les données agrégées reçues dans une table dans AWS RDS. Pour enregistrer les résultats d'agrégation dans une table de base de données, nous utiliserons la méthode d'écriture de l'objet DataFrame:

 testResultDataFrame.write \ .format("jdbc") \ .mode("append") \ .option("driver", 'org.postgresql.Driver') \ .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") \ .option("dbtable", "transaction_flow") \ .option("user", "habr") \ .option("password", "habr12345") \ .save() 

Quelques mots sur la configuration d'une connexion à AWS RDS. Nous avons créé l'utilisateur et le mot de passe pour celui-ci à l'étape «Déploiement d'AWS PostgreSQL». Pour l'URL du serveur de base de données, utilisez Endpoint, qui s'affiche dans la section Connectivité et sécurité:


Afin de connecter correctement Spark et Kafka, vous devez exécuter le travail via smark-submit en utilisant l' artefact spark-streaming-kafka-0-8_2.11 . De plus, nous appliquons également l'artefact pour interagir avec la base de données PostgreSQL, nous les transférerons via --packages.

Pour la flexibilité du script, nous supprimons également le nom du serveur de messages et le sujet à partir duquel nous voulons recevoir des données comme paramètres d'entrée.

Il est donc temps de démarrer et de tester le système:

 spark-submit \ --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,\ org.postgresql:postgresql:9.4.1207 \ spark_job.py localhost:9092 transaction 

Tout a fonctionné! Comme vous pouvez le voir dans l'image ci-dessous, pendant le fonctionnement de l'application, de nouveaux résultats d'agrégation sont affichés toutes les 2 secondes, car nous avons défini l'intervalle de traitement par lots à 2 secondes lorsque nous avons créé l'objet StreamingContext:


Ensuite, nous effectuons une simple requête dans la base de données pour vérifier les enregistrements dans la table transaction_flow :


Conclusion


Cet article a examiné un exemple de traitement d'informations en continu à l'aide de Spark Streaming en conjonction avec Apache Kafka et PostgreSQL. Avec la croissance des données provenant de diverses sources, il est difficile de surestimer la valeur pratique de Spark Streaming pour créer des applications de streaming et des applications fonctionnant en temps réel.

Vous pouvez trouver le code source complet dans mon référentiel sur GitHub .

Je suis prêt à discuter de cet article avec plaisir, j'attends vos commentaires avec impatience et j'espère également des critiques constructives de tous les lecteurs concernés.

Je vous souhaite du succès!

PS Il était initialement prévu d'utiliser une base de données PostgreSQL locale, mais étant donné mon amour pour AWS, j'ai décidé de mettre la base de données dans le cloud. Dans le prochain article sur ce sujet, je montrerai comment implémenter l'ensemble du système décrit ci-dessus dans AWS à l'aide d'AWS Kinesis et d'AWS EMR. Suivez l'actualité!

Source: https://habr.com/ru/post/fr451160/


All Articles