Création d'un modèle de flux de données pour diffuser des données de Pub / Sub vers BigQuery basé sur GCP à l'aide du SDK Apache Beam et de Python

image


En ce moment, je suis engagé dans la tâche de streaming (et de conversion) de données. Dans certains cercles
un tel processus est appelé ETL , c'est-à-dire extraction, conversion et chargement d'informations.


L'ensemble du processus comprend la participation des services Google Cloud Platform suivants:


  • Pub / Sub - service de streaming de données en temps réel
  • Dataflow - un service de conversion de données (peut
    fonctionne à la fois en temps réel et en mode batch)
  • BigQuery - un service de stockage de données sous forme de tableaux
    (prend en charge SQL)

0. État actuel

Pour le moment, il existe une version fonctionnelle de la diffusion en continu sur les services ci-dessus, mais
En tant que modèle, l'un des standards est utilisé .


Le problème est que ce modèle fournit un transfert de données 1 à 1, c'est-à-dire sur
à l'entrée de Pub / Sub, nous avons une chaîne au format JSON, à la sortie, nous avons une table BigQuery avec des champs,
qui correspondent aux clés des objets au niveau supérieur du JSON d'entrée.


1. Énoncé du problème

Créez un modèle de flux de données qui vous permet d'obtenir une ou plusieurs tables à la sortie
selon les conditions données. Par exemple, nous voulons créer une table distincte pour chaque
valeurs d'une clé JSON d'entrée spécifique. Il est nécessaire de tenir compte du fait que certains
Les objets JSON en entrée peuvent contenir du JSON imbriqué en tant que valeur, c'est-à-dire est nécessaire
pouvoir créer des tables BigQuery avec des champs de type RECORD pour le stockage imbriqué
les données.


2. Préparation de la décision

Pour créer un modèle Dataflow, utilisez le SDK Apache Beam , qui, à son tour,
prend en charge Java et Python en tant que langage de programmation. Je dois dire que
seule la version Python 2.7.x est prise en charge, ce qui m'a un peu surpris. De plus, le soutien
Java est un peu plus large, car pour Python, par exemple, certaines fonctionnalités ne sont pas disponibles et plus
Une liste modeste de connecteurs intégrés. Au fait, vous pouvez écrire vos propres connecteurs.


Cependant, étant donné que je ne suis pas familier avec Java, j'ai utilisé Python.


Avant de commencer à créer un modèle, vous devez disposer des éléments suivants:


  1. entrez le format JSON et il ne devrait pas changer dans le temps
  2. schéma ou schémas de tables BigQuery dans lesquelles les données seront diffusées
  3. le nombre de tables dans lesquelles le flux de données de sortie sera diffusé

Notez qu'après avoir créé un modèle et démarré Dataflow Job en fonction de celui-ci, ces paramètres peuvent être
changer uniquement en créant un nouveau modèle.


Disons quelques mots sur ces restrictions. Ils viennent tous du fait qu'il n'y a aucune possibilité
créer un modèle dynamique qui pourrait prendre n'importe quelle chaîne en entrée, l'analyser
selon la logique interne, puis remplir dynamiquement les tables créées dynamiquement
créé par le circuit. Il est très probable que cette possibilité existe, mais dans les données
Je n'ai pas réussi à mettre en place un tel schéma. Pour autant que je comprends l'ensemble
le pipeline est construit avant de l'exécuter en runtime et donc il n'y a aucun moyen de le changer en
voler. Peut-être que quelqu'un partagera sa décision.


3. Décision

Pour une compréhension plus complète du processus, il vaut la peine d'apporter un schéma du soi-disant pipeline
de la documentation Apache Beam.


image


Dans notre cas (nous utiliserons la division en plusieurs tableaux):


  • entrée - les données proviennent de PubSub dans Dataflow Job
  • Transformer # 1 - les données sont converties d'une chaîne en un dictionnaire Python, nous obtenons une sortie
    PCollection # 1
  • Transformer # 2 - les données sont étiquetées, pour une séparation supplémentaire selon des tableaux séparés, en
    la sortie est PCollection # 2 (en fait un tuple PCollection)
  • Transformation # 3 - les données de PCollection # 2 sont écrites dans des tableaux à l'aide de schémas
    tables

Dans le processus d'écriture de mon propre modèle, j'ai été activement inspiré par ces exemples.


Code de modèle avec commentaires (à gauche des commentaires de la même manière que les auteurs précédents):
 # coding=utf-8 from __future__ import absolute_import import logging import json import os import apache_beam as beam from apache_beam.pvalue import TaggedOutput from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.io.gcp.bigquery import parse_table_schema_from_json #  GCP  gcp_project = '' #  Pub/Sub  topic_name = '' # Pub/Sub    'projects/_GCP_/topics/_' input_topic = 'projects/%s/topics/%s' % (gcp_project, topic_name) #  BigQuery  bq_dataset = 'segment_eu_test' #       schema_dir = './' class TransformToBigQuery(beam.DoFn): #          ,   # BigQuery IO     python dict def process(self, element, *args, **kwargs): body = json.loads(element) #       ,      # python dict       ,     #   yield body class TagDataWithReqType(beam.DoFn): #      , ..      #     ,       #  with_outputs + default def process(self, element, *args, **kwargs): req_type = element.get('_') types = ( 'type1', 'type2', 'type3', ) if req_type in types: yield TaggedOutput(req_type, element) else: yield element def run(): #       _.json   schema_dir,  #         ()  schema_dct = {} for schema_file in os.listdir(schema_dir): filename_list = schema_file.split('.') if filename_list[-1] == 'json': with open('%s/%s' % (schema_dir, schema_file)) as f: schema_json = f.read() schema_dct[filename_list[0]] = json.dumps({'fields': json.loads(schema_json)}) # We use the save_main_session option because one or more DoFn's in this # workflow rely on global context (eg, a module imported at module level). pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True pipeline_options.view_as(StandardOptions).streaming = True # Read from PubSub into a PCollection. input_stream = p | beam.io.ReadFromPubSub(input_topic) # Transform stream to BigQuery IO format stream_bq = input_stream | 'transform to BigQuery' >> beam.ParDo(TransformToBigQuery()) # Tag stream by schema name tagged_stream = \ stream_bq \ | 'tag data by type' >> beam.ParDo(TagDataWithReqType()). with_outputs(*schema_dct.keys(), main='default') # Stream unidentified data to default table tagged_stream.default | 'push to default table' >> beam.io.WriteToBigQuery( '%s:%s.default' % ( gcp_project, bq_dataset, ), schema=parse_table_schema_from_json(schema_dct.get('default')), ) # Stream data to BigQuery tables by number of schema names for name, schema in schema_dct.iteritems(): tagged_stream[name] | 'push to table %s' % name >> beam.io.WriteToBigQuery( '%s:%s.%s' % ( gcp_project, bq_dataset, name), schema=parse_table_schema_from_json(schema), ) result = p.run() result.wait_until_finish() if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) logger = logging.getLogger(__name__) run() 

Maintenant, nous allons parcourir le code et donner des explications, mais d'abord, il vaut la peine de dire que le principal
la difficulté d'écrire ce modèle est de penser en termes de "flux de données", et
pas un message spécifique. Il est également nécessaire de comprendre que Pub / Sub fonctionne avec des messages et
c'est d'eux qu'ils recevront des informations pour baliser le flux.


 pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True pipeline_options.view_as(StandardOptions).streaming = True 

Parce que Le connecteur Apache Beam Pub / Sub IO est utilisé uniquement en mode streaming nécessaire
ajoutez PipelineOptions () (bien qu'en fait les options ne soient pas utilisées); sinon, créez un modèle
tombe à l'exception. Il faut dire sur les options de lancement du modèle. Ils peuvent être
statique et soi-disant "runtime". Voici un lien vers la documentation sur ce sujet. Les options vous permettent de créer un modèle sans spécifier de paramètres à l'avance, mais en les passant lorsque vous démarrez le travail de flux de données à partir du modèle, mais je n'ai toujours pas pu l'implémenter, probablement parce que ce connecteur ne prend pas en charge RuntimeValueProvider .


 # Read from PubSub into a PCollection. input_stream = p | beam.io.ReadFromPubSub(input_topic) 

Tout est clair d'après le commentaire, nous lisons le fil du sujet. Il convient d'ajouter que vous pouvez prendre le flux
à la fois du sujet et de l'abonnement (abonnement). Si le sujet est spécifié en entrée, alors
un abonnement temporaire à ce sujet sera automatiquement créé. La syntaxe est également jolie
clair, le flux de données d'entrée beam.io.ReadFromPubSub(input_topic) envoyé à notre
pipeline p .


 # Transform stream to BigQuery IO format stream_bq = input_stream | 'transform to BigQuery' >> beam.ParDo(TransformToBigQuery()) 

C'est là que la transformation # 1 se produit et notre entrée est convertie à partir d'une chaîne python en
python dict, et dans la sortie, nous obtenons PCollection # 1. >> apparaît dans la syntaxe. Sur
en fait, le texte entre guillemets est le nom du flux (doit être unique), ainsi qu'un commentaire,
qui sera ajouté au bloc sur le graphique dans l'interface Web GCP Dataflow. Examinons plus en détail
classe substituée TransformToBigQuery .


 class TransformToBigQuery(beam.DoFn): #          ,   # BigQuery IO     python dict def process(self, element, *args, **kwargs): body = json.loads(element) #       ,      # python dict       ,     #  ,      python dict yield body 

La variable d' element contiendra un message de l'abonnement PubSub. Vu de
code, dans notre cas, il doit être JSON valide. En classe doit être
la méthode du process est redéfinie, dans laquelle les transformations nécessaires doivent être effectuées
ligne d'entrée pour obtenir une sortie qui correspond au circuit
la table dans laquelle ces données seront chargées. Parce que notre flux dans ce cas est
continu, unbounded en termes d'Apache Beam, vous devez le renvoyer en utilisant
yield , pas return , comme pour le flux de données final. Dans le cas d'un flux final, vous pouvez
(et nécessaire) configurer en plus le windowing et les triggers


 # Tag stream by schema name tagged_stream = \ stream_bq \ | 'tag data by type' >> beam.ParDo(TagDataWithReqType()).with_outputs(*schema_dct.keys(), main='default') 

Ce code dirige PCollection # 1 vers la transformation # 2 où le balisage aura lieu
(séparation) du flux de données. Dans la variable schema_dct dans ce cas, un dictionnaire, où la clé est le nom du fichier de schéma sans l'extension, ce sera la balise et la valeur est le JSON valide du schéma
Tables BigQuery pour cette balise. Il convient de noter que le schéma doit être transmis exactement aux
afficher {'fields': }est le schéma de la table BigQuery sous forme JSON (vous pouvez
exporter depuis l'interface Web).


main='default' est le nom de la balise de thread vers laquelle ils iront
Tous les messages qui ne sont pas soumis à des conditions de balisage. Considérez la classe
TagDataWithReqType .


 class TagDataWithReqType(beam.DoFn): #      , ..      #     ,       #  with_outputs + default def process(self, element, *args, **kwargs): req_type = element.get('_') types = ( 'type1', 'type2', 'type3', ) if req_type in types: yield TaggedOutput(req_type, element) else: yield element 

Comme vous pouvez le voir, la classe de process est également remplacée ici. La variable types contient des noms
balises et ils doivent faire correspondre le nombre et le nom avec le nombre et les noms des clés de dictionnaire
schema_dct . Bien que la méthode de process ait la capacité d'accepter des arguments, je n'ai jamais
J'ai pu les dépasser. Je n'ai pas encore compris la raison.


En sortie, on obtient un tuple de threads dans le nombre de tags, à savoir le nombre des nôtres
balises prédéfinies + thread par défaut qui n'a pas pu être balisé.


 # Stream unidentified data to default table tagged_stream.default | 'push to default table' >> beam.io.WriteToBigQuery( '%s:%s.default' % ( gcp_project, bq_dataset, ), schema=parse_table_schema_from_json(schema_dct.get('default')), ) 

Transformer # ... (en fait, ce n'est pas sur le diagramme, c'est une "branche") - nous écrivons le flux par défaut
à la table par défaut.


tagged_stream.default - un flux avec la balise default est pris, une syntaxe alternative est tagged_stream['default']


schema=parse_table_schema_from_json(schema_dct.get('default')) - ici le schéma est défini
tables. Veuillez noter que le fichier default.json avec le schéma de table BigQuery valide
doit être dans le schema_dir = './' actuel.


Le flux ira à une table appelée default .


Si une table portant ce nom (dans l'ensemble de données donné de ce projet) n'existe pas, alors elle
sera créé automatiquement à partir du schéma grâce au paramètre par défaut
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED


 # Stream data to BigQuery tables by number of schema names for name, schema in schema_dct.iteritems(): tagged_stream[name] | 'push to table %s' % name >> beam.io.WriteToBigQuery( '%s:%s.%s' % ( gcp_project, bq_dataset, name), schema=parse_table_schema_from_json(schema), ) 

Transformez # 3, tout devrait être clair pour ceux qui lisent l'article depuis le début et qui possèdent
syntaxe python. Nous séparons le tuple de flux avec une boucle et écrivons chaque flux dans sa propre table avec
son plan. Il convient de rappeler que le nom du flux doit être unique - '%s:%s.%s' % (gcp_project, bq_dataset, name) .


Maintenant, il devrait être clair comment cela fonctionne et vous pouvez créer un modèle. Pour cela, vous avez besoin
exécutez dans la console (n'oubliez pas d'activer venv si disponible) ou depuis l'IDE:


 python _.py / --runner DataflowRunner / --project dreamdata-test / --staging_location gs://STORAGE_NAME/STAGING_DIR / --temp_location gs://STORAGE_NAME/TEMP_DIR / --template_location gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME 

Dans le même temps, l'accès au compte Google doit être organisé, par exemple via l'exportation
la GOOGLE_APPLICATION_CREDENTIALS environnement GOOGLE_APPLICATION_CREDENTIALS ou d'une autre manière .


Quelques mots sur --runner . Dans ce cas, DataflowRunner dit que ce code
s'exécutera comme modèle pour le Job Dataflow. Il est toujours possible de spécifier
DirectRunner , il sera utilisé par défaut s'il n'y a pas d'option --runner et code
fonctionnera comme un Job Dataflow, mais localement, ce qui est très pratique pour le débogage.


Si aucune erreur ne s'est produite, alors gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME sera
modèle créé. Il est à noter que dans gs://STORAGE_NAME/STAGING_DIR sera également écrit
les fichiers de service nécessaires au bon fonctionnement du Job Datafow créé sur la base de
modèle et vous n'avez pas besoin de les supprimer.


Ensuite, vous devez créer un Job Dataflow à l'aide de ce modèle, manuellement ou par n'importe quel
d'une autre manière (CI par exemple).


4. Conclusions

Ainsi, nous avons réussi à diffuser le flux de PubSub vers BigQuery en utilisant
les transformations de données nécessaires à des fins de stockage, de transformation et de
l'utilisation des données.


Liens principaux



Dans cet article, des inexactitudes et même des erreurs sont possibles, je serai reconnaissant pour la
la critique. Au final, je veux ajouter qu'en fait, tous ne sont pas utilisés ici
fonctionnalités du SDK Apache Beam, mais ce n'était pas l'objectif.

Source: https://habr.com/ru/post/fr441892/


All Articles