Hallo an alle. Freunde, wir teilen Ihnen eine Übersetzung eines Artikels mit, der speziell für Studenten des
Data Engineer- Kurses erstellt wurde. Lass uns gehen!

Apache Beam und DataFlow für Echtzeit-Pipelines
Der heutige Beitrag basiert auf einer Aufgabe, an der ich kürzlich bei der Arbeit gearbeitet habe. Ich war sehr glücklich, es zu implementieren und die geleistete Arbeit im Format eines Blogposts zu beschreiben, da es mir die Möglichkeit gab, an der Datenentwicklung zu arbeiten und etwas zu tun, das für mein Team sehr nützlich wäre. Vor nicht allzu langer Zeit habe ich festgestellt, dass unsere Systeme eine relativ große Anzahl von Benutzerprotokollen speichern, die sich auf eines unserer Produkte für die Arbeit mit Daten beziehen. Es stellte sich heraus, dass niemand diese Daten verwendet hatte, und ich interessierte mich sofort für das, was wir herausfinden konnten, wenn wir anfingen, sie regelmäßig zu analysieren. Es gab jedoch einige Probleme auf dem Weg. Das erste Problem war, dass die Daten in vielen verschiedenen Textdateien gespeichert wurden, die für eine sofortige Analyse nicht verfügbar waren. Das zweite Problem war, dass sie in einem geschlossenen System gespeichert waren, sodass ich keines meiner bevorzugten Datenanalysetools verwenden konnte.
Ich musste entscheiden, wie wir den Zugriff erleichtern und zumindest einen Mehrwert schaffen können, indem wir diese Datenquelle in einige unserer Benutzerinteraktionslösungen einbetten. Nachdem ich eine Weile nachgedacht hatte, beschloss ich, eine Pipeline zu erstellen, um diese Daten in die Cloud-Datenbank zu übertragen, damit ich und das Team darauf zugreifen und Schlussfolgerungen ziehen können. Nachdem ich vor einiger Zeit meine Spezialisierung in Data Engineering bei Coursera abgeschlossen hatte, war ich bestrebt, einige der Kurswerkzeuge im Projekt zu verwenden.
Das Einfügen von Daten in eine Cloud-Datenbank schien also eine clevere Möglichkeit zu sein, mein erstes Problem zu lösen. Aber was könnte ich mit Problem Nummer 2 tun? Glücklicherweise gab es eine Möglichkeit, diese Daten in eine Umgebung zu übertragen, in der ich auf Tools wie Python und die Google Cloud Platform (GCP) zugreifen konnte. Es war jedoch ein langer Prozess, daher musste ich etwas tun, das es mir ermöglichte, die Entwicklung fortzusetzen, während ich auf das Ende der Datenübertragung wartete. Die Lösung bestand darin, gefälschte Daten mithilfe der
Faker- Bibliothek in Python zu erstellen. Ich hatte diese Bibliothek noch nie benutzt, erkannte aber schnell, wie nützlich sie war. Mit diesem Ansatz konnte ich Code schreiben und die Pipeline ohne tatsächliche Daten testen.
Auf der Grundlage des Vorstehenden werde ich Ihnen in diesem Beitrag erläutern, wie ich die oben beschriebene Pipeline mit einigen der in GCP verfügbaren Technologien erstellt habe. Insbesondere werde ich
Apache Beam (Version für Python), Dataflow, Pub / Sub und Big Query verwenden , um Benutzerprotokolle zu sammeln, Daten zu konvertieren und zur weiteren Analyse in eine Datenbank zu übertragen. In meinem Fall benötigte ich nur die Batch-Funktionalität von Beam, da meine Daten nicht in Echtzeit eintrafen und Pub / Sub nicht erforderlich war. Ich werde mich jedoch auf die Streaming-Version konzentrieren, da dies in der Praxis der Fall sein kann.
Einführung in GCP und Apache Beam
Die Google Cloud Platform bietet eine Reihe wirklich nützlicher Tools für die Verarbeitung von Big Data. Hier sind einige der Tools, die ich verwenden werde:
- Pub / Sub ist ein Messaging-Dienst, der die Publisher-Subscriber-Vorlage verwendet, mit der wir Daten in Echtzeit empfangen können.
- DataFlow ist ein Dienst, der die Erstellung von Datenpipelines vereinfacht und automatisch Aufgaben wie die Skalierung der Infrastruktur löst. Das bedeutet, dass wir uns nur auf das Schreiben von Code für unsere Pipeline konzentrieren können.
- BigQuery ist ein Cloud-basiertes Data Warehouse. Wenn Sie mit anderen SQL-Datenbanken vertraut sind, müssen Sie sich nicht lange mit BigQuery beschäftigen.
- Und schließlich werden wir Apache Beam verwenden, nämlich uns auf die Python-Version konzentrieren, um unsere Pipeline zu erstellen. Mit diesem Tool können wir eine Pipeline für das Streaming oder die Stapelverarbeitung erstellen, die in GCP integriert ist. Es ist besonders nützlich für die Parallelverarbeitung und eignet sich für Aufgaben wie Extrahieren, Transformieren und Laden (ETL). Wenn wir also Daten mit Transformationen oder Berechnungen von einem Ort an einen anderen verschieben müssen, ist Beam eine gute Wahl.
In GCP steht eine große Anzahl von Tools zur Verfügung, so dass es schwierig sein kann, alle Tools einschließlich ihres Zwecks abzudecken. Dennoch finden Sie
hier eine kurze Zusammenfassung als Referenz.
Visualisierung unseres Förderers
Lassen Sie uns die Komponenten unserer Pipeline in
Abbildung 1 visualisieren. Auf hoher Ebene möchten wir Benutzerdaten in Echtzeit erfassen, verarbeiten und an BigQuery übertragen. Protokolle werden erstellt, wenn Benutzer mit dem Produkt interagieren, indem sie Anforderungen an den Server senden, die dann protokolliert werden. Diese Daten können besonders nützlich sein, um zu verstehen, wie Benutzer mit unserem Produkt interagieren und ob sie ordnungsgemäß funktionieren. Im Allgemeinen enthält der Förderer die folgenden Schritte:
- Die Protokolldaten unserer Benutzer werden im Bereich Pub / Sub veröffentlicht.
- Wir werden eine Verbindung zu Pub / Sub herstellen und die Daten mit Python und Beam in das entsprechende Format konvertieren (Schritte 3 und 4 in Abbildung 1).
- Nach der Konvertierung der Daten stellt Beam eine Verbindung zu BigQuery her und fügt sie unserer Tabelle hinzu (Schritte 4 und 5 in Abbildung 1).
- Zur Analyse können wir mit verschiedenen Tools wie Tableau und Python eine Verbindung zu BigQuery herstellen.
Beam macht diesen Vorgang sehr einfach, unabhängig davon, ob wir eine Streaming-Datenquelle oder eine CSV-Datei haben, und wir möchten die Stapelverarbeitung durchführen. Später werden Sie sehen, dass der Code nur die minimalen Änderungen enthält, die zum Wechseln zwischen ihnen erforderlich sind. Dies ist einer der Vorteile der Verwendung von Beam.
Abbildung 1: Die HauptdatenpipelinePseudodaten mit Faker erstellen
Wie bereits erwähnt, habe ich mich aufgrund des eingeschränkten Zugriffs auf Daten entschlossen, Pseudodaten im gleichen Format wie die tatsächlichen zu erstellen. Dies war eine sehr nützliche Übung, da ich Code schreiben und die Pipeline testen konnte, während ich Daten erwartete. Ich schlage vor, einen Blick auf die Faker-
Dokumentation zu werfen, wenn Sie wissen möchten, was diese Bibliothek sonst noch zu bieten hat. Unsere Benutzerdaten ähneln im Allgemeinen dem folgenden Beispiel. Basierend auf diesem Format können wir zeilenweise Daten generieren, um Echtzeitdaten zu simulieren. Diese Protokolle geben uns Informationen wie Datum, Art der Anfrage, Antwort vom Server, IP-Adresse usw.
192.52.197.161 - - [30/Apr/2019:21:11:42] "PUT /tag/category/tag HTTP/1.1" [401] 155 "https://harris-lopez.com/categories/about/" "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_11_2) AppleWebKit/5312 (KHTML, like Gecko) Chrome/34.0.855.0 Safari/5312"
Basierend auf der obigen Zeile möchten wir unsere
LINE- Variable mit 7 Variablen in geschweiften Klammern erstellen. Wir werden sie etwas später auch als Variablennamen in unserem Tabellenschema verwenden.
LINE = """\
{remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"\
"""
Wenn wir eine Stapelverarbeitung durchführen würden, wäre der Code sehr ähnlich, obwohl wir in einem bestimmten Zeitraum eine Reihe von Beispielen erstellen müssten. Um einen Fälscher zu verwenden, erstellen wir einfach ein Objekt und rufen die Methoden auf, die wir benötigen. Insbesondere war Faker nützlich, um IP-Adressen sowie Websites zu erstellen. Ich habe die folgenden Methoden angewendet:
fake.ipv4()
fake.uri_path()
fake.uri()
fake.user_agent()
from faker import Faker import time import random import os import numpy as np from datetime import datetime, timedelta LINE = """\ {remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"\ """ def generate_log_line(): fake = Faker() now = datetime.now() remote_addr = fake.ipv4() time_local = now.strftime('%d/%b/%Y:%H:%M:%S') request_type = random.choice(["GET", "POST", "PUT"]) request_path = "/" + fake.uri_path() status = np.random.choice([200, 401, 404], p = [0.9, 0.05, 0.05]) body_bytes_sent = random.choice(range(5, 1000, 1)) http_referer = fake.uri() http_user_agent = fake.user_agent() log_line = LINE.format( remote_addr=remote_addr, time_local=time_local, request_type=request_type, request_path=request_path, status=status, body_bytes_sent=body_bytes_sent, http_referer=http_referer, http_user_agent=http_user_agent ) return log_line
Das Ende des ersten Teils.
In den kommenden Tagen werden wir die Fortsetzung des Artikels mit Ihnen teilen, aber jetzt warten wir traditionell auf Kommentare ;-).
Zweiter Teil