Erstellen einer Datenflussvorlage zum Streamen von Daten von Pub / Sub zu BigQuery basierend auf GCP mit dem Apache Beam SDK und Python

Bild


Im Moment beschäftige ich mich mit dem Streaming (und Konvertieren) von Daten. In einigen Kreisen
ein solcher Prozess ist als ETL bekannt , d.h. Extrahieren, Konvertieren und Laden von Informationen.


Der gesamte Prozess umfasst die Teilnahme der folgenden Google Cloud Platform- Dienste:


  • Pub / Sub- Service für Echtzeit-Daten-Streaming
  • Datenfluss - ein Dienst zum Konvertieren von Daten (can
    sowohl in Echtzeit als auch im Batch-Modus arbeiten)
  • BigQuery - ein Dienst zum Speichern von Daten in Form von Tabellen
    (unterstützt SQL)

0. Aktueller Status

Im Moment gibt es eine funktionierende Version des Streamings für die oben genannten Dienste, jedoch in
Als Vorlage wird eine der Standardvorlagen verwendet .


Das Problem besteht darin, dass diese Vorlage eine 1: 1-Datenübertragung bereitstellt, d. H. auf
Am Eingang zu Pub / Sub haben wir eine Zeichenfolge im JSON-Format. Am Ausgang haben wir eine BigQuery-Tabelle mit Feldern.
die den Schlüsseln der Objekte auf der obersten Ebene des Eingabe-JSON entsprechen.


1. Erklärung des Problems

Erstellen Sie eine Datenflussvorlage, mit der Sie eine oder mehrere Tabellen an der Ausgabe abrufen können
gemäß den gegebenen Bedingungen. Zum Beispiel möchten wir für jede eine eigene Tabelle erstellen
Werte eines bestimmten Eingabe-JSON-Schlüssels. Es ist notwendig, die Tatsache zu berücksichtigen, dass einige
Eingabe-JSON-Objekte können verschachtelten JSON als Wert enthalten, d. H. ist notwendig
Sie können BigQuery-Tabellen mit Feldern vom Typ RECORD zum Speichern verschachtelter Tabellen erstellen
Daten.


2. Vorbereitung auf die Entscheidung

Verwenden Sie zum Erstellen einer Datenflussvorlage das Apache Beam SDK , das wiederum
unterstützt Java und Python als Programmiersprache. Das muss ich sagen
Es wird nur die Version Python 2.7.x unterstützt, was mich etwas überrascht hat. Darüber hinaus Unterstützung
Java ist etwas breiter, weil Für Python sind beispielsweise einige Funktionen nicht verfügbar und mehr
Eine bescheidene Liste integrierter Anschlüsse . Übrigens können Sie Ihre eigenen Konnektoren schreiben .


Aufgrund der Tatsache, dass ich mit Java nicht vertraut bin, habe ich Python verwendet.


Bevor Sie mit dem Erstellen einer Vorlage beginnen, müssen Sie über Folgendes verfügen:


  1. Geben Sie das JSON-Format ein und es sollte sich nicht rechtzeitig ändern
  2. Schema oder Schemata von BigQuery-Tabellen, in die Daten gestreamt werden
  3. Die Anzahl der Tabellen, in die der Ausgabedatenstrom gestreamt wird

Beachten Sie, dass diese Parameter nach dem Erstellen einer Vorlage und dem Starten des darauf basierenden Datenflussjobs verwendet werden können
Ändern Sie dies nur, indem Sie eine neue Vorlage erstellen.


Lassen Sie uns ein paar Worte zu diesen Einschränkungen sagen. Sie alle kommen von der Tatsache, dass es keine Möglichkeit gibt
Erstellen Sie eine dynamische Vorlage, die eine beliebige Zeichenfolge als Eingabe verwenden kann, und analysieren Sie sie
nach interner Logik und füllen Sie dann dynamisch erstellte Tabellen dynamisch mit
von der Schaltung erstellt. Es ist sehr wahrscheinlich, dass diese Möglichkeit besteht, jedoch innerhalb der Daten
Es ist mir nicht gelungen, ein solches System umzusetzen. Soweit ich das Ganze verstehe
Die Pipeline wird erstellt, bevor sie zur Laufzeit ausgeführt wird. Daher gibt es keine Möglichkeit, sie zu ändern
fliegen. Vielleicht teilt jemand seine Entscheidung.


3. Entscheidung

Für ein vollständigeres Verständnis des Prozesses lohnt es sich, ein Diagramm der sogenannten Pipeline zu erstellen
aus der Apache Beam-Dokumentation.


Bild


In unserem Fall (wir werden die Unterteilung in mehrere Tabellen verwenden):


  • Eingabedaten stammen von PubSub im Datenflussjob
  • Transformation Nr. 1 - Daten werden von einer Zeichenfolge in ein Python-Wörterbuch konvertiert. Wir erhalten eine Ausgabe
    PCollection # 1
  • Transformation Nr. 2 - Die Daten werden zur weiteren Trennung gemäß separaten Tabellen in markiert
    Die Ausgabe ist PCollection # 2 (eigentlich ein PCollection-Tupel).
  • Transformation Nr. 3 - Daten aus PCollection Nr. 2 werden mithilfe von Schemata in Tabellen geschrieben
    Tabellen

Beim Schreiben meiner eigenen Vorlage habe ich mich aktiv von diesen Beispielen inspirieren lassen.


Vorlagencode mit Kommentaren (linke Kommentare auf die gleiche Weise wie bei früheren Autoren):
 # coding=utf-8 from __future__ import absolute_import import logging import json import os import apache_beam as beam from apache_beam.pvalue import TaggedOutput from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.io.gcp.bigquery import parse_table_schema_from_json #  GCP  gcp_project = '' #  Pub/Sub  topic_name = '' # Pub/Sub    'projects/_GCP_/topics/_' input_topic = 'projects/%s/topics/%s' % (gcp_project, topic_name) #  BigQuery  bq_dataset = 'segment_eu_test' #       schema_dir = './' class TransformToBigQuery(beam.DoFn): #          ,   # BigQuery IO     python dict def process(self, element, *args, **kwargs): body = json.loads(element) #       ,      # python dict       ,     #   yield body class TagDataWithReqType(beam.DoFn): #      , ..      #     ,       #  with_outputs + default def process(self, element, *args, **kwargs): req_type = element.get('_') types = ( 'type1', 'type2', 'type3', ) if req_type in types: yield TaggedOutput(req_type, element) else: yield element def run(): #       _.json   schema_dir,  #         ()  schema_dct = {} for schema_file in os.listdir(schema_dir): filename_list = schema_file.split('.') if filename_list[-1] == 'json': with open('%s/%s' % (schema_dir, schema_file)) as f: schema_json = f.read() schema_dct[filename_list[0]] = json.dumps({'fields': json.loads(schema_json)}) # We use the save_main_session option because one or more DoFn's in this # workflow rely on global context (eg, a module imported at module level). pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True pipeline_options.view_as(StandardOptions).streaming = True # Read from PubSub into a PCollection. input_stream = p | beam.io.ReadFromPubSub(input_topic) # Transform stream to BigQuery IO format stream_bq = input_stream | 'transform to BigQuery' >> beam.ParDo(TransformToBigQuery()) # Tag stream by schema name tagged_stream = \ stream_bq \ | 'tag data by type' >> beam.ParDo(TagDataWithReqType()). with_outputs(*schema_dct.keys(), main='default') # Stream unidentified data to default table tagged_stream.default | 'push to default table' >> beam.io.WriteToBigQuery( '%s:%s.default' % ( gcp_project, bq_dataset, ), schema=parse_table_schema_from_json(schema_dct.get('default')), ) # Stream data to BigQuery tables by number of schema names for name, schema in schema_dct.iteritems(): tagged_stream[name] | 'push to table %s' % name >> beam.io.WriteToBigQuery( '%s:%s.%s' % ( gcp_project, bq_dataset, name), schema=parse_table_schema_from_json(schema), ) result = p.run() result.wait_until_finish() if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) logger = logging.getLogger(__name__) run() 

Jetzt werden wir den Code durchgehen und Erklärungen geben, aber zuerst ist es erwähnenswert, dass der Hauptcode
Die Schwierigkeit beim Schreiben dieser Vorlage besteht darin, in Bezug auf den "Datenstrom" zu denken, und
keine bestimmte Nachricht. Es ist auch wichtig zu verstehen, dass Pub / Sub mit Nachrichten und arbeitet
Von ihnen erhalten wir Informationen zum Markieren des Streams.


 pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True pipeline_options.view_as(StandardOptions).streaming = True 

Weil Der Apache Beam Pub / Sub IO-Anschluss wird nur im erforderlichen Streaming-Modus verwendet
Fügen Sie PipelineOptions () hinzu (obwohl die Optionen tatsächlich nicht verwendet werden). Andernfalls erstellen Sie eine Vorlage
fällt mit der Ausnahme. Es muss über die Optionen zum Starten der Vorlage gesagt werden. Sie können sein
statische und sogenannte "Laufzeit". Hier ist ein Link zur Dokumentation zu diesem Thema. Mit den Optionen können Sie eine Vorlage erstellen, ohne zuvor Parameter anzugeben, diese jedoch beim Starten des Datenflussjobs über die Vorlage übergeben. Ich konnte sie jedoch immer noch nicht implementieren, wahrscheinlich weil dieser Connector RuntimeValueProvider nicht unterstützt.


 # Read from PubSub into a PCollection. input_stream = p | beam.io.ReadFromPubSub(input_topic) 

Alles ist aus dem Kommentar klar, wir lesen den Thread aus dem Thema. Es lohnt sich hinzuzufügen, dass Sie den Stream nehmen können
sowohl aus dem Thema als auch aus dem Abonnement (Abonnement). Wenn das Thema als Eingabe angegeben ist, dann
Ein temporäres Abonnement für dieses Thema wird automatisch erstellt. Die Syntax ist auch hübsch
klar, Eingabedatenstrom beam.io.ReadFromPubSub(input_topic) an unsere gesendet
Pipeline p .


 # Transform stream to BigQuery IO format stream_bq = input_stream | 'transform to BigQuery' >> beam.ParDo(TransformToBigQuery()) 

Hier findet die Transformation Nr. 1 statt und unsere Eingabe wird von einer Python-Zeichenfolge in konvertiert
Python-Diktat, und in der Ausgabe erhalten wir PCollection # 1. >> erscheint in der Syntax. Ein
Tatsächlich ist der Text in Anführungszeichen der Name des Streams (muss eindeutig sein) sowie ein Kommentar.
Dies wird dem Block im Diagramm in der GCP Dataflow-Weboberfläche hinzugefügt. Lassen Sie uns genauer betrachten
überschriebene Klasse TransformToBigQuery .


 class TransformToBigQuery(beam.DoFn): #          ,   # BigQuery IO     python dict def process(self, element, *args, **kwargs): body = json.loads(element) #       ,      # python dict       ,     #  ,      python dict yield body 

Die Elementvariable enthält eine Nachricht aus dem PubSub-Abonnement. Wie von gesehen
Code, in unserem Fall sollte es gültiger JSON sein. Im Klassenzimmer muss sein
Die process wird neu definiert, in der die erforderlichen Transformationen durchgeführt werden sollen
Eingangsleitung, um einen Ausgang zu erhalten, der der Schaltung entspricht
die Tabelle, in die diese Daten geladen werden. Weil Unser Fluss in diesem Fall ist
kontinuierlich, unbounded in Bezug auf Apache Beam, müssen Sie es mit zurückgeben
yield , nicht return , wie beim endgültigen Datenstrom. Im Falle eines endgültigen Ablaufs können Sie
(und notwendig) zusätzlich windowing und triggers konfigurieren


 # Tag stream by schema name tagged_stream = \ stream_bq \ | 'tag data by type' >> beam.ParDo(TagDataWithReqType()).with_outputs(*schema_dct.keys(), main='default') 

Dieser Code leitet PCollection # 1 zu Transform # 2, wo das Tagging stattfinden wird
(Trennung) des Datenstroms. In der Variablen schema_dct in diesem Fall einem Wörterbuch, in dem der Schlüssel der Name der schema_dct ohne die Erweiterung ist, ist dies das Tag und der Wert der gültige JSON des Schemas
BigQuery-Tabellen für dieses Tag. Es ist zu beachten, dass das Schema genau an übertragen werden sollte
view {'fields': } wobei das Schema der BigQuery-Tabelle in JSON-Form ist (Sie können
Export von der Weboberfläche).


main='default' ist der Name des Thread-Tags, zu dem sie gehen
Alle Nachrichten, für die keine Tagging-Bedingungen gelten. Betrachten Sie die Klasse
TagDataWithReqType .


 class TagDataWithReqType(beam.DoFn): #      , ..      #     ,       #  with_outputs + default def process(self, element, *args, **kwargs): req_type = element.get('_') types = ( 'type1', 'type2', 'type3', ) if req_type in types: yield TaggedOutput(req_type, element) else: yield element 

Wie Sie sehen, wird auch hier die process überschrieben. Die Variable types enthält Namen
Tags und sie müssen die Nummer und den Namen mit der Nummer und den Namen der Wörterbuchschlüssel übereinstimmen
schema_dct . Obwohl die process Argumente akzeptieren kann, habe ich nie
Ich konnte sie bestehen. Ich habe den Grund noch nicht herausgefunden.


Bei der Ausgabe erhalten wir ein Tupel von Threads in der Anzahl der Tags, nämlich der Anzahl unserer
vordefinierte Tags + Standard-Thread, der nicht markiert werden konnte.


 # Stream unidentified data to default table tagged_stream.default | 'push to default table' >> beam.io.WriteToBigQuery( '%s:%s.default' % ( gcp_project, bq_dataset, ), schema=parse_table_schema_from_json(schema_dct.get('default')), ) 

Transform # ... (tatsächlich ist es nicht im Diagramm, dies ist ein "Zweig") - wir schreiben den Standard-Stream
zur Standardtabelle.


tagged_stream.default - Ein Stream mit dem default Tag wird verwendet. Die alternative Syntax lautet tagged_stream['default']


schema=parse_table_schema_from_json(schema_dct.get('default')) - hier wird das Schema definiert
Tabellen. Beachten Sie, dass die Datei default.json das gültige BigQuery-Tabellenschema enthält
muss sich im aktuellen schema_dir = './' .


Der Stream wird in eine Tabelle namens default .


Wenn eine Tabelle mit diesem Namen (im angegebenen Datensatz dieses Projekts) nicht vorhanden ist, ist sie vorhanden
wird dank der Standardeinstellung automatisch aus dem Schema erstellt
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED


 # Stream data to BigQuery tables by number of schema names for name, schema in schema_dct.iteritems(): tagged_stream[name] | 'push to table %s' % name >> beam.io.WriteToBigQuery( '%s:%s.%s' % ( gcp_project, bq_dataset, name), schema=parse_table_schema_from_json(schema), ) 

Transform # 3, alles sollte für diejenigen klar sein, die den Artikel von Anfang an lesen und besitzen
Python-Syntax. Wir trennen das Stream-Tupel durch eine Schleife und schreiben jeden Stream in eine eigene Tabelle mit
sein Schema. Es sei daran erinnert, dass der '%s:%s.%s' % (gcp_project, bq_dataset, name) eindeutig sein muss - '%s:%s.%s' % (gcp_project, bq_dataset, name) .


Jetzt sollte klar sein, wie das funktioniert und Sie können eine Vorlage erstellen. Dafür brauchst du
Führen Sie es in der Konsole aus (vergessen Sie nicht, venv zu aktivieren, falls verfügbar) oder in der IDE:


 python _.py / --runner DataflowRunner / --project dreamdata-test / --staging_location gs://STORAGE_NAME/STAGING_DIR / --temp_location gs://STORAGE_NAME/TEMP_DIR / --template_location gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME 

Gleichzeitig sollte der Zugriff auf das Google-Konto beispielsweise durch Export organisiert werden
die Umgebungsvariable GOOGLE_APPLICATION_CREDENTIALS oder eine andere Methode .


Ein paar Worte zu --runner . In diesem Fall sagt DataflowRunner , dass dieser Code
wird als Vorlage für den Datenflussjob ausgeführt. Es ist weiterhin möglich anzugeben
DirectRunner wird standardmäßig verwendet, wenn keine Option vorhanden ist - --runner und Code
funktioniert als Datenflussjob, jedoch lokal, was für das Debuggen sehr praktisch ist.


Wenn keine Fehler aufgetreten sind, gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME
Vorlage erstellt. Es ist erwähnenswert, dass in gs://STORAGE_NAME/STAGING_DIR auch geschrieben wird
Servicedateien, die für den erfolgreichen Betrieb des auf Basis von erstellten Datafow-Jobs erforderlich sind
Vorlage und Sie müssen sie nicht löschen.


Als Nächstes müssen Sie einen Datenflussjob mithilfe dieser Vorlage manuell oder nach Belieben erstellen
auf andere Weise (z. B. CI).


4. Schlussfolgerungen

So ist es uns gelungen, den Stream von PubSub nach BigQuery mit zu streamen
notwendige Datentransformationen zum Zwecke der weiteren Speicherung, Transformation und
Datennutzung.


Hauptlinks



In diesem Artikel sind Ungenauigkeiten und sogar Fehler möglich, ich werde für das Konstruktive dankbar sein
Kritik. Am Ende möchte ich hinzufügen, dass hier tatsächlich nicht alle verwendet werden
Funktionen des Apache Beam SDK, aber das war nicht das Ziel.

Source: https://habr.com/ru/post/de441892/


All Articles