Apache Kafka und Streaming mit Spark Streaming

Hallo Habr! Heute werden wir ein System erstellen, das Apark Kafka verwendet, um Nachrichtenströme mit Spark Streaming zu verarbeiten und das Verarbeitungsergebnis in die AWS RDS-Cloud-Datenbank zu schreiben.

Stellen Sie sich vor, ein bestimmtes Kreditinstitut hat uns die Aufgabe gestellt, eingehende Transaktionen in allen Filialen im laufenden Betrieb zu verarbeiten. Dies kann erfolgen, um schnell die offene Währungsposition für das Treasury, Limits oder Finanzergebnisse für Transaktionen usw. zu berechnen.

Wie man diesen Fall ohne den Einsatz von Magie und Zaubersprüchen umsetzt - wir lesen unter dem Schnitt! Lass uns gehen!


(Bildquelle)

Einführung


Natürlich bietet die Verarbeitung eines großen Datenarrays in Echtzeit zahlreiche Möglichkeiten für den Einsatz in modernen Systemen. Eine der beliebtesten Kombinationen hierfür ist das Apache Kafka- und Spark Streaming-Tandem, bei dem Kafka einen Stream eingehender Nachrichtenpakete erstellt und Spark Streaming diese Pakete in einem bestimmten Zeitintervall verarbeitet.

Um die Fehlertoleranz der Anwendung zu erhöhen, verwenden wir Checkpoints - Checkpoints. Wenn das Spark-Streaming-Modul mithilfe dieses Mechanismus verlorene Daten wiederherstellen muss, muss es nur zum letzten Kontrollpunkt zurückkehren und die Berechnungen von diesem fortsetzen.

Architektur des in Entwicklung befindlichen Systems




Verwendete Komponenten:

  • Apache Kafka ist ein verteiltes Nachrichtensystem mit Publish und Subscribe. Geeignet für den Offline- und Online-Nachrichtenverbrauch. Um Datenverlust zu vermeiden, werden Kafka-Nachrichten auf der Festplatte gespeichert und im Cluster repliziert. Das Kafka-System basiert auf dem ZooKeeper-Synchronisierungsdienst.
  • Apache Spark Streaming - Spark-Komponente zur Verarbeitung von Streaming-Daten. Das Spark-Streaming-Modul wird unter Verwendung der Micro-Batch-Architektur erstellt, wenn der Datenstrom als fortlaufende Folge kleiner Datenpakete interpretiert wird. Spark Streaming empfängt Daten aus verschiedenen Quellen und kombiniert sie zu kleinen Paketen. In regelmäßigen Abständen werden neue Pakete erstellt. Zu Beginn jedes Zeitintervalls wird ein neues Paket erstellt, und alle während dieses Intervalls empfangenen Daten werden in das Paket aufgenommen. Am Ende des Intervalls stoppt das Paketwachstum. Die Größe des Intervalls wird durch einen Parameter bestimmt, der als Stapelintervall bezeichnet wird.
  • Apache Spark SQL - Kombiniert relationale Verarbeitung mit funktionsfähiger Spark-Programmierung. Strukturierte Daten beziehen sich auf Daten mit einem Schema, dh einem einzelnen Satz von Feldern für alle Datensätze. Spark SQL unterstützt Eingaben aus einer Vielzahl strukturierter Datenquellen und kann dank der Verfügbarkeit von Schemainformationen nur die erforderlichen Datensatzfelder effizient abrufen und bietet außerdem DataFrame-APIs
  • AWS RDS ist eine relativ kostengünstige Cloud-basierte relationale Datenbank, ein Webdienst, der Konfiguration, Betrieb und Skalierung vereinfacht und direkt von Amazon verwaltet wird.

Installieren und starten Sie den Kafka-Server


Bevor Sie Kafka direkt verwenden, müssen Sie sicherstellen, dass Java verfügbar ist JVM wird für die Arbeit verwendet:

sudo apt-get update sudo apt-get install default-jre java -version 

Erstellen Sie einen neuen Benutzer für die Arbeit mit Kafka:

 sudo useradd kafka -m sudo passwd kafka sudo adduser kafka sudo 

Laden Sie als Nächstes die Distribution von der offiziellen Apache Kafka-Website herunter:

 wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz" 

Entpacken Sie das heruntergeladene Archiv:
 tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka 

Der nächste Schritt ist optional. Tatsache ist, dass die Standardeinstellungen nicht die vollständige Nutzung aller Funktionen von Apache Kafka ermöglichen. Löschen Sie beispielsweise ein Thema, eine Kategorie oder eine Gruppe, für die Nachrichten veröffentlicht werden können. Um dies zu ändern, bearbeiten Sie die Konfigurationsdatei:

 vim ~/kafka/config/server.properties 

Fügen Sie am Ende der Datei Folgendes hinzu:

 delete.topic.enable = true 

Bevor Sie den Kafka-Server starten, müssen Sie den ZooKeeper-Server starten. Wir verwenden das Hilfsskript, das mit der Kafka-Distribution geliefert wird:

 Cd ~/kafka bin/zookeeper-server-start.sh config/zookeeper.properties 

Nachdem ZooKeeper erfolgreich gestartet wurde, starten wir in einem separaten Terminal den Kafka-Server:

 bin/kafka-server-start.sh config/server.properties 

Erstellen Sie ein neues Thema mit dem Namen "Transaktion":

 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction 

Stellen Sie sicher, dass das Thema mit der richtigen Anzahl von Partitionen und Replikationen erstellt wurde:

 bin/kafka-topics.sh --describe --zookeeper localhost:2181 



Wir werden die Momente verpassen, in denen der Produzent und der Konsument auf das neu geschaffene Thema getestet werden. Weitere Informationen zum Testen des Sendens und Empfangens von Nachrichten finden Sie in der offiziellen Dokumentation - Senden einiger Nachrichten . Nun schreiben wir einen Produzenten in Python mit der KafkaProducer-API.

Produzent schreiben


Der Produzent generiert zufällige Daten - 100 Nachrichten pro Sekunde. Mit zufälligen Daten meinen wir ein Wörterbuch, das aus drei Feldern besteht:

  • Filiale - Name der Verkaufsstelle des Kreditinstituts;
  • Währung - Transaktionswährung;
  • Betrag - Transaktionsbetrag. Der Betrag ist eine positive Zahl, wenn es sich um einen Kauf einer Währung durch die Bank handelt, und eine negative Zahl, wenn es sich um einen Verkauf handelt.

Der Code für den Hersteller lautet wie folgt:

 from numpy.random import choice, randint def get_random_value(): new_dict = {} branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut'] currency_list = ['RUB', 'USD', 'EUR', 'GBP'] new_dict['branch'] = choice(branch_list) new_dict['currency'] = choice(currency_list) new_dict['amount'] = randint(-100, 100) return new_dict 

Als Nächstes senden wir mithilfe der send-Methode eine Nachricht im gewünschten Thema im JSON-Format an den Server:

 from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x:dumps(x).encode('utf-8'), compression_type='gzip') my_topic = 'transaction' data = get_random_value() try: future = producer.send(topic = my_topic, value = data) record_metadata = future.get(timeout=10) print('--> The message has been sent to a topic: \ {}, partition: {}, offset: {}' \ .format(record_metadata.topic, record_metadata.partition, record_metadata.offset )) except Exception as e: print('--> It seems an Error occurred: {}'.format(e)) finally: producer.flush() 

Beim Ausführen des Skripts erhalten wir folgende Meldungen im Terminal:


Dies bedeutet, dass alles so funktioniert, wie wir es wollten - der Produzent generiert und sendet Nachrichten an das Thema, das wir benötigen.

Der nächste Schritt besteht darin, Spark zu installieren und diesen Nachrichtenfluss zu verarbeiten.

Installieren Sie Apache Spark


Apache Spark ist eine vielseitige und leistungsstarke Cluster-Computing-Plattform.

In Bezug auf die Leistung übertrifft Spark die gängigen Implementierungen des MapReduce-Modells und bietet gleichzeitig Unterstützung für eine größere Bandbreite von Berechnungstypen, einschließlich interaktiver Abfragen und Stream-Verarbeitung. Geschwindigkeit spielt eine wichtige Rolle bei der Verarbeitung großer Datenmengen, da Sie mit dieser Geschwindigkeit interaktiv arbeiten können, ohne Minuten oder Stunden warten zu müssen. Eine der größten Stärken von Spark bei einer so hohen Geschwindigkeit ist die Fähigkeit, In-Memory-Berechnungen durchzuführen.

Dieses Framework ist in Scala geschrieben, daher müssen Sie es zuerst installieren:

 sudo apt-get install scala 

Laden Sie die Spark-Distribution von der offiziellen Website herunter:

 wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz" 

Packen Sie das Archiv aus:

 sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark 

Fügen Sie den Pfad zum Spark in der Bash-Datei hinzu:

 vim ~/.bashrc 

Fügen Sie die folgenden Zeilen über den Editor hinzu:

 SPARK_HOME=/usr/local/spark export PATH=$SPARK_HOME/bin:$PATH 

Führen Sie den folgenden Befehl aus, nachdem Sie Änderungen an bashrc vorgenommen haben:

 source ~/.bashrc 

AWS PostgreSQL-Bereitstellung


Es bleibt die Bereitstellung der Datenbank, in die die verarbeiteten Informationen aus den Streams hochgeladen werden. Hierfür verwenden wir den AWS RDS-Service.

Gehen Sie zur Konsole AWS -> AWS RDS -> Datenbanken -> Datenbank erstellen:


Wählen Sie PostgreSQL und klicken Sie auf die Schaltfläche Weiter:


Weil Dieses Beispiel dient ausschließlich Bildungszwecken. Wir verwenden einen kostenlosen Server "mindestens" (Free Tier):


Setzen Sie als Nächstes ein Häkchen in den Free Tier-Block, und danach wird uns automatisch eine Instanz der t2.micro-Klasse angeboten - obwohl schwach, ist sie kostenlos und für unsere Aufgabe gut geeignet:

Es folgen sehr wichtige Dinge: der Name der Datenbankinstanz, der Name des Hauptbenutzers und sein Passwort. Nennen wir die Instanz: myHabrTest, den Hauptbenutzer : habr , das Passwort: habr12345 und klicken Sie auf die Schaltfläche Weiter:



Die nächste Seite enthält die Parameter, die für die Verfügbarkeit unseres Datenbankservers von außen (öffentliche Zugänglichkeit) und die Verfügbarkeit von Ports verantwortlich sind:


Erstellen wir eine neue Konfiguration für die VPC-Sicherheitsgruppe, mit der wir von außen über Port 5432 (PostgreSQL) auf unseren Datenbankserver zugreifen können.

Wechseln Sie in einem separaten Browserfenster zur AWS-Konsole im Abschnitt VPC-Dashboard -> Sicherheitsgruppen -> Sicherheitsgruppe erstellen:

Legen Sie den Namen für die Sicherheitsgruppe fest - PostgreSQL, eine Beschreibung, geben Sie an, welcher VPC diese Gruppe zugeordnet werden soll, und klicken Sie auf die Schaltfläche Erstellen:


Füllen Sie die neu erstellte Gruppe Eingehende Regeln für Port 5432 aus (siehe Abbildung unten). Sie müssen keinen manuellen Port angeben, sondern wählen PostgreSQL aus der Dropdown-Liste Typ aus.

Streng genommen bedeutet der Wert :: / 0 die Verfügbarkeit von eingehendem Datenverkehr für einen Server aus der ganzen Welt, was kanonisch nicht ganz richtig ist, aber um das Beispiel zu analysieren, verwenden wir diesen Ansatz:


Wir kehren zur Browserseite zurück, auf der "Erweiterte Einstellungen konfigurieren" geöffnet ist und wählen Sie im Abschnitt VPC-Sicherheitsgruppen -> Vorhandene VPC-Sicherheitsgruppen auswählen -> PostgreSQL:


Als nächstes im Abschnitt Datenbankoptionen -> Datenbankname -> den Namen festlegen - habrDB .

Wir können den Rest der Parameter belassen, mit Ausnahme des Deaktivierens der Sicherung (Sicherungsaufbewahrungszeitraum - 0 Tage), der Überwachung und von Performance Insights. Klicken Sie auf die Schaltfläche Datenbank erstellen :


Stream-Handler


Der letzte Schritt wird die Entwicklung von Spark-Jobs sein, die alle zwei Sekunden neue Daten von Kafka verarbeiten und das Ergebnis in die Datenbank eingeben.

Wie oben erwähnt, sind Prüfpunkte der Hauptmechanismus in SparkStreaming, der konfiguriert werden muss, um Fehlertoleranz bereitzustellen. Wir werden Kontrollpunkte verwenden, und im Falle eines Prozedurabbruchs muss das Spark-Streaming-Modul nur zum letzten Kontrollpunkt zurückkehren und die Berechnungen von diesem fortsetzen, um die verlorenen Daten wiederherzustellen.

Sie können den Haltepunkt aktivieren, indem Sie das Verzeichnis in einem fehlertoleranten, zuverlässigen Dateisystem (z. B. HDFS, S3 usw.) festlegen, in dem die Haltepunktinformationen gespeichert werden. Dies geschieht zum Beispiel mit:

 streamingContext.checkpoint(checkpointDirectory) 

In unserem Beispiel verwenden wir den folgenden Ansatz: Wenn checkpointDirectory vorhanden ist, wird der Kontext aus den Kontrollpunktdaten neu erstellt. Wenn das Verzeichnis nicht vorhanden ist (d. H. Zum ersten Mal ausgeführt wird), wird die Funktion functionToCreateContext aufgerufen, um einen neuen Kontext zu erstellen und DStreams zu konfigurieren:

 from pyspark.streaming import StreamingContext context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext) 

Erstellen Sie ein DirectStream-Objekt, um mithilfe der createDirectStream-Methode der KafkaUtils-Bibliothek eine Verbindung zum Thema "Transaktion" herzustellen:

 from pyspark.streaming.kafka import KafkaUtils sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 2) broker_list = 'localhost:9092' topic = 'transaction' directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": broker_list}) 

Analysieren eingehender Daten im JSON-Format:

 rowRdd = rdd.map(lambda w: Row(branch=w['branch'], currency=w['currency'], amount=w['amount'])) testDataFrame = spark.createDataFrame(rowRdd) testDataFrame.createOrReplaceTempView("treasury_stream") 

Mit Spark SQL erstellen wir eine einfache Gruppierung und drucken das Ergebnis auf der Konsole aus:

 select from_unixtime(unix_timestamp()) as curr_time, t.branch as branch_name, t.currency as currency_code, sum(amount) as batch_value from treasury_stream t group by t.branch, t.currency 

Abfragetext abrufen und über Spark SQL ausführen:

 sql_query = get_sql_query() testResultDataFrame = spark.sql(sql_query) testResultDataFrame.show(n=5) 

Anschließend speichern wir die empfangenen aggregierten Daten in einer Tabelle in AWS RDS. Um die Aggregationsergebnisse in einer Datenbanktabelle zu speichern, verwenden wir die Schreibmethode des DataFrame-Objekts:

 testResultDataFrame.write \ .format("jdbc") \ .mode("append") \ .option("driver", 'org.postgresql.Driver') \ .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") \ .option("dbtable", "transaction_flow") \ .option("user", "habr") \ .option("password", "habr12345") \ .save() 

Ein paar Worte zum Einrichten einer Verbindung zu AWS RDS. Wir haben den Benutzer und das Kennwort dafür im Schritt "Bereitstellen von AWS PostgreSQL" erstellt. Verwenden Sie für die Datenbankserver-URL Endpoint, der im Abschnitt Konnektivität und Sicherheit angezeigt wird:


Um Spark und Kafka korrekt zu verbinden, sollten Sie den Job über smark-submit mit dem Artefakt spark-Streaming-kafka-0-8_2.11 ausführen . Darüber hinaus wenden wir das Artefakt auch auf die Interaktion mit der PostgreSQL-Datenbank an und übertragen es über --packages.

Für die Flexibilität des Skripts nehmen wir auch den Namen des Nachrichtenservers und das Thema heraus, von dem wir Daten als Eingabeparameter empfangen möchten.

Es ist also Zeit, das System zu starten und zu testen:

 spark-submit \ --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,\ org.postgresql:postgresql:9.4.1207 \ spark_job.py localhost:9092 transaction 

Alles hat geklappt! Wie Sie in der Abbildung unten sehen können, werden während des Betriebs der Anwendung alle 2 Sekunden neue Aggregationsergebnisse angezeigt, da wir das Stapelintervall beim Erstellen des StreamingContext-Objekts auf 2 Sekunden festgelegt haben:


Als Nächstes führen wir eine einfache Abfrage an die Datenbank durch, um nach Datensätzen in der Transaktion_Fluss- Tabelle zu suchen:


Fazit


In diesem Artikel wurde ein Beispiel für die Verarbeitung von Streaming-Informationen mithilfe von Spark Streaming in Verbindung mit Apache Kafka und PostgreSQL untersucht. Mit dem Wachstum von Daten aus verschiedenen Quellen ist es schwierig, den praktischen Wert von Spark Streaming für die Erstellung von Streaming-Anwendungen und Anwendungen, die in Echtzeit arbeiten, zu überschätzen.

Den vollständigen Quellcode finden Sie in meinem Repository auf GitHub .

Ich bin bereit, diesen Artikel gerne zu diskutieren, freue mich auf Ihre Kommentare und hoffe auf konstruktive Kritik aller betroffenen Leser.

Ich wünsche Ihnen viel Erfolg!

PS Es war ursprünglich geplant, eine lokale PostgreSQL-Datenbank zu verwenden, aber aufgrund meiner Vorliebe für AWS habe ich beschlossen, die Datenbank in die Cloud zu stellen. Im nächsten Artikel zu diesem Thema werde ich zeigen, wie das gesamte oben in AWS beschriebene System mit AWS Kinesis und AWS EMR implementiert wird. Folgen Sie den Nachrichten!

Source: https://habr.com/ru/post/de451160/


All Articles