
Im Moment beschäftige ich mich mit dem Streaming (und Konvertieren) von Daten. In einigen Kreisen
ein solcher Prozess ist als ETL bekannt , d.h. Extrahieren, Konvertieren und Laden von Informationen.
Der gesamte Prozess umfasst die Teilnahme der folgenden Google Cloud Platform- Dienste:
- Pub / Sub- Service für Echtzeit-Daten-Streaming
- Datenfluss - ein Dienst zum Konvertieren von Daten (can
sowohl in Echtzeit als auch im Batch-Modus arbeiten) - BigQuery - ein Dienst zum Speichern von Daten in Form von Tabellen
(unterstützt SQL)
0. Aktueller Status
Im Moment gibt es eine funktionierende Version des Streamings für die oben genannten Dienste, jedoch in
Als Vorlage wird eine der Standardvorlagen verwendet .
Das Problem besteht darin, dass diese Vorlage eine 1: 1-Datenübertragung bereitstellt, d. H. auf
Am Eingang zu Pub / Sub haben wir eine Zeichenfolge im JSON-Format. Am Ausgang haben wir eine BigQuery-Tabelle mit Feldern.
die den Schlüsseln der Objekte auf der obersten Ebene des Eingabe-JSON entsprechen.
1. Erklärung des Problems
Erstellen Sie eine Datenflussvorlage, mit der Sie eine oder mehrere Tabellen an der Ausgabe abrufen können
gemäß den gegebenen Bedingungen. Zum Beispiel möchten wir für jede eine eigene Tabelle erstellen
Werte eines bestimmten Eingabe-JSON-Schlüssels. Es ist notwendig, die Tatsache zu berücksichtigen, dass einige
Eingabe-JSON-Objekte können verschachtelten JSON als Wert enthalten, d. H. ist notwendig
Sie können BigQuery-Tabellen mit Feldern vom Typ RECORD
zum Speichern verschachtelter Tabellen erstellen
Daten.
2. Vorbereitung auf die Entscheidung
Verwenden Sie zum Erstellen einer Datenflussvorlage das Apache Beam SDK , das wiederum
unterstützt Java und Python als Programmiersprache. Das muss ich sagen
Es wird nur die Version Python 2.7.x unterstützt, was mich etwas überrascht hat. Darüber hinaus Unterstützung
Java ist etwas breiter, weil Für Python sind beispielsweise einige Funktionen nicht verfügbar und mehr
Eine bescheidene Liste integrierter Anschlüsse . Übrigens können Sie Ihre eigenen Konnektoren schreiben .
Aufgrund der Tatsache, dass ich mit Java nicht vertraut bin, habe ich Python verwendet.
Bevor Sie mit dem Erstellen einer Vorlage beginnen, müssen Sie über Folgendes verfügen:
- Geben Sie das JSON-Format ein und es sollte sich nicht rechtzeitig ändern
- Schema oder Schemata von BigQuery-Tabellen, in die Daten gestreamt werden
- Die Anzahl der Tabellen, in die der Ausgabedatenstrom gestreamt wird
Beachten Sie, dass diese Parameter nach dem Erstellen einer Vorlage und dem Starten des darauf basierenden Datenflussjobs verwendet werden können
Ändern Sie dies nur, indem Sie eine neue Vorlage erstellen.
Lassen Sie uns ein paar Worte zu diesen Einschränkungen sagen. Sie alle kommen von der Tatsache, dass es keine Möglichkeit gibt
Erstellen Sie eine dynamische Vorlage, die eine beliebige Zeichenfolge als Eingabe verwenden kann, und analysieren Sie sie
nach interner Logik und füllen Sie dann dynamisch erstellte Tabellen dynamisch mit
von der Schaltung erstellt. Es ist sehr wahrscheinlich, dass diese Möglichkeit besteht, jedoch innerhalb der Daten
Es ist mir nicht gelungen, ein solches System umzusetzen. Soweit ich das Ganze verstehe
Die Pipeline wird erstellt, bevor sie zur Laufzeit ausgeführt wird. Daher gibt es keine Möglichkeit, sie zu ändern
fliegen. Vielleicht teilt jemand seine Entscheidung.
3. Entscheidung
Für ein vollständigeres Verständnis des Prozesses lohnt es sich, ein Diagramm der sogenannten Pipeline zu erstellen
aus der Apache Beam-Dokumentation.

In unserem Fall (wir werden die Unterteilung in mehrere Tabellen verwenden):
- Eingabedaten stammen von PubSub im Datenflussjob
- Transformation Nr. 1 - Daten werden von einer Zeichenfolge in ein Python-Wörterbuch konvertiert. Wir erhalten eine Ausgabe
PCollection # 1 - Transformation Nr. 2 - Die Daten werden zur weiteren Trennung gemäß separaten Tabellen in markiert
Die Ausgabe ist PCollection # 2 (eigentlich ein PCollection-Tupel). - Transformation Nr. 3 - Daten aus PCollection Nr. 2 werden mithilfe von Schemata in Tabellen geschrieben
Tabellen
Beim Schreiben meiner eigenen Vorlage habe ich mich aktiv von diesen Beispielen inspirieren lassen.
Vorlagencode mit Kommentaren (linke Kommentare auf die gleiche Weise wie bei früheren Autoren): Jetzt werden wir den Code durchgehen und Erklärungen geben, aber zuerst ist es erwähnenswert, dass der Hauptcode
Die Schwierigkeit beim Schreiben dieser Vorlage besteht darin, in Bezug auf den "Datenstrom" zu denken, und
keine bestimmte Nachricht. Es ist auch wichtig zu verstehen, dass Pub / Sub mit Nachrichten und arbeitet
Von ihnen erhalten wir Informationen zum Markieren des Streams.
pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True pipeline_options.view_as(StandardOptions).streaming = True
Weil Der Apache Beam Pub / Sub IO-Anschluss wird nur im erforderlichen Streaming-Modus verwendet
Fügen Sie PipelineOptions () hinzu (obwohl die Optionen tatsächlich nicht verwendet werden). Andernfalls erstellen Sie eine Vorlage
fällt mit der Ausnahme. Es muss über die Optionen zum Starten der Vorlage gesagt werden. Sie können sein
statische und sogenannte "Laufzeit". Hier ist ein Link zur Dokumentation zu diesem Thema. Mit den Optionen können Sie eine Vorlage erstellen, ohne zuvor Parameter anzugeben, diese jedoch beim Starten des Datenflussjobs über die Vorlage übergeben. Ich konnte sie jedoch immer noch nicht implementieren, wahrscheinlich weil dieser Connector RuntimeValueProvider
nicht unterstützt.
Alles ist aus dem Kommentar klar, wir lesen den Thread aus dem Thema. Es lohnt sich hinzuzufügen, dass Sie den Stream nehmen können
sowohl aus dem Thema als auch aus dem Abonnement (Abonnement). Wenn das Thema als Eingabe angegeben ist, dann
Ein temporäres Abonnement für dieses Thema wird automatisch erstellt. Die Syntax ist auch hübsch
klar, Eingabedatenstrom beam.io.ReadFromPubSub(input_topic)
an unsere gesendet
Pipeline p
.
Hier findet die Transformation Nr. 1 statt und unsere Eingabe wird von einer Python-Zeichenfolge in konvertiert
Python-Diktat, und in der Ausgabe erhalten wir PCollection # 1. >>
erscheint in der Syntax. Ein
Tatsächlich ist der Text in Anführungszeichen der Name des Streams (muss eindeutig sein) sowie ein Kommentar.
Dies wird dem Block im Diagramm in der GCP Dataflow-Weboberfläche hinzugefügt. Lassen Sie uns genauer betrachten
überschriebene Klasse TransformToBigQuery
.
class TransformToBigQuery(beam.DoFn):
Die Elementvariable enthält eine Nachricht aus dem PubSub-Abonnement. Wie von gesehen
Code, in unserem Fall sollte es gültiger JSON sein. Im Klassenzimmer muss sein
Die process
wird neu definiert, in der die erforderlichen Transformationen durchgeführt werden sollen
Eingangsleitung, um einen Ausgang zu erhalten, der der Schaltung entspricht
die Tabelle, in die diese Daten geladen werden. Weil Unser Fluss in diesem Fall ist
kontinuierlich, unbounded
in Bezug auf Apache Beam, müssen Sie es mit zurückgeben
yield
, nicht return
, wie beim endgültigen Datenstrom. Im Falle eines endgültigen Ablaufs können Sie
(und notwendig) zusätzlich windowing
und triggers
konfigurieren
Dieser Code leitet PCollection # 1 zu Transform # 2, wo das Tagging stattfinden wird
(Trennung) des Datenstroms. In der Variablen schema_dct
in diesem Fall einem Wörterbuch, in dem der Schlüssel der Name der schema_dct
ohne die Erweiterung ist, ist dies das Tag und der Wert der gültige JSON des Schemas
BigQuery-Tabellen für dieses Tag. Es ist zu beachten, dass das Schema genau an übertragen werden sollte
view {'fields': }
wobei
das Schema der BigQuery-Tabelle in JSON-Form ist (Sie können
Export von der Weboberfläche).
main='default'
ist der Name des Thread-Tags, zu dem sie gehen
Alle Nachrichten, für die keine Tagging-Bedingungen gelten. Betrachten Sie die Klasse
TagDataWithReqType
.
class TagDataWithReqType(beam.DoFn):
Wie Sie sehen, wird auch hier die process
überschrieben. Die Variable types
enthält Namen
Tags und sie müssen die Nummer und den Namen mit der Nummer und den Namen der Wörterbuchschlüssel übereinstimmen
schema_dct
. Obwohl die process
Argumente akzeptieren kann, habe ich nie
Ich konnte sie bestehen. Ich habe den Grund noch nicht herausgefunden.
Bei der Ausgabe erhalten wir ein Tupel von Threads in der Anzahl der Tags, nämlich der Anzahl unserer
vordefinierte Tags + Standard-Thread, der nicht markiert werden konnte.
Transform # ... (tatsächlich ist es nicht im Diagramm, dies ist ein "Zweig") - wir schreiben den Standard-Stream
zur Standardtabelle.
tagged_stream.default
- Ein Stream mit dem default
Tag wird verwendet. Die alternative Syntax lautet tagged_stream['default']
schema=parse_table_schema_from_json(schema_dct.get('default'))
- hier wird das Schema definiert
Tabellen. Beachten Sie, dass die Datei default.json das gültige BigQuery-Tabellenschema enthält
muss sich im aktuellen schema_dir = './'
.
Der Stream wird in eine Tabelle namens default
.
Wenn eine Tabelle mit diesem Namen (im angegebenen Datensatz dieses Projekts) nicht vorhanden ist, ist sie vorhanden
wird dank der Standardeinstellung automatisch aus dem Schema erstellt
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED
Transform # 3, alles sollte für diejenigen klar sein, die den Artikel von Anfang an lesen und besitzen
Python-Syntax. Wir trennen das Stream-Tupel durch eine Schleife und schreiben jeden Stream in eine eigene Tabelle mit
sein Schema. Es sei daran erinnert, dass der '%s:%s.%s' % (gcp_project, bq_dataset, name)
eindeutig sein muss - '%s:%s.%s' % (gcp_project, bq_dataset, name)
.
Jetzt sollte klar sein, wie das funktioniert und Sie können eine Vorlage erstellen. Dafür brauchst du
Führen Sie es in der Konsole aus (vergessen Sie nicht, venv zu aktivieren, falls verfügbar) oder in der IDE:
python _.py / --runner DataflowRunner / --project dreamdata-test / --staging_location gs://STORAGE_NAME/STAGING_DIR / --temp_location gs://STORAGE_NAME/TEMP_DIR / --template_location gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME
Gleichzeitig sollte der Zugriff auf das Google-Konto beispielsweise durch Export organisiert werden
die Umgebungsvariable GOOGLE_APPLICATION_CREDENTIALS
oder eine andere Methode .
Ein paar Worte zu --runner
. In diesem Fall sagt DataflowRunner
, dass dieser Code
wird als Vorlage für den Datenflussjob ausgeführt. Es ist weiterhin möglich anzugeben
DirectRunner
wird standardmäßig verwendet, wenn keine Option vorhanden ist - --runner
und Code
funktioniert als Datenflussjob, jedoch lokal, was für das Debuggen sehr praktisch ist.
Wenn keine Fehler aufgetreten sind, gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME
Vorlage erstellt. Es ist erwähnenswert, dass in gs://STORAGE_NAME/STAGING_DIR
auch geschrieben wird
Servicedateien, die für den erfolgreichen Betrieb des auf Basis von erstellten Datafow-Jobs erforderlich sind
Vorlage und Sie müssen sie nicht löschen.
Als Nächstes müssen Sie einen Datenflussjob mithilfe dieser Vorlage manuell oder nach Belieben erstellen
auf andere Weise (z. B. CI).
4. Schlussfolgerungen
So ist es uns gelungen, den Stream von PubSub nach BigQuery mit zu streamen
notwendige Datentransformationen zum Zwecke der weiteren Speicherung, Transformation und
Datennutzung.
Hauptlinks
In diesem Artikel sind Ungenauigkeiten und sogar Fehler möglich, ich werde für das Konstruktive dankbar sein
Kritik. Am Ende möchte ich hinzufügen, dass hier tatsächlich nicht alle verwendet werden
Funktionen des Apache Beam SDK, aber das war nicht das Ziel.