Creación de una plantilla de flujo de datos para transmitir datos desde Pub / Sub a BigQuery basado en GCP utilizando Apache Beam SDK y Python

imagen


En este momento estoy comprometido con la tarea de transmitir (y convertir) datos. En algunos círculos
dicho proceso se conoce como ETL , es decir extracción, conversión y carga de información.


Todo el proceso incluye la participación de los siguientes servicios de Google Cloud Platform :


  • Pub / Sub - servicio para transmisión de datos en tiempo real
  • Flujo de datos : un servicio para convertir datos (puede
    trabajar tanto en tiempo real como en modo por lotes)
  • BigQuery : un servicio para almacenar datos en forma de tablas
    (admite SQL)

0. Estado actual

Por el momento, hay una versión funcional de transmisión en los servicios anteriores, sin embargo, en
Como plantilla, se usa uno de los estándares .


El problema es que esta plantilla proporciona transferencia de datos 1 a 1, es decir en
en la entrada de Pub / Sub tenemos una cadena de formato JSON, en la salida tenemos una tabla BigQuery con campos,
que corresponden a las teclas de los objetos en el nivel superior de la entrada JSON.


1. Declaración del problema

Cree una plantilla de flujo de datos que le permita obtener una tabla o tablas en la salida
según las condiciones dadas. Por ejemplo, queremos crear una tabla separada para cada
valores de una clave JSON de entrada específica. Es necesario tener en cuenta el hecho de que algunos
Los objetos JSON de entrada pueden contener JSON anidado como un valor, es decir es necesario
ser capaz de crear tablas BigQuery con campos de tipo RECORD para almacenar anidados
datos


2. Preparación para la decisión.

Para crear una plantilla de flujo de datos, use el SDK de Apache Beam , que, a su vez,
admite Java y Python como lenguaje de programación. Debo decir que
solo se admite la versión Python 2.7.x, lo que me sorprendió un poco. Por otra parte, apoyo
Java es algo más amplio, porque para Python, por ejemplo, algunas funciones no están disponibles y más
Una modesta lista de conectores incorporados. Por cierto, puedes escribir tus propios conectores.


Sin embargo, debido al hecho de que no estoy familiarizado con Java, utilicé Python.


Antes de comenzar a crear una plantilla, debe tener lo siguiente:


  1. ingrese el formato JSON y no debería cambiar a tiempo
  2. esquema o esquemas de tablas BigQuery en las que se transmitirán datos
  3. El número de tablas en las que se transmitirá el flujo de datos de salida

Tenga en cuenta que después de crear una plantilla e iniciar el trabajo de flujo de datos basado en ella, estos parámetros pueden ser
cambiar solo creando una nueva plantilla.


Digamos algunas palabras sobre estas restricciones. Todos provienen del hecho de que no hay posibilidad
crear una plantilla dinámica que pueda tomar cualquier cadena como entrada, analizarla
de acuerdo con la lógica interna y luego llenar tablas creadas dinámicamente con dinámicamente
creado por el circuito. Es muy probable que exista esta posibilidad, pero dentro de los datos
No logré implementar tal esquema. Por lo que yo entiendo todo
la tubería se construye antes de ejecutarla en tiempo de ejecución y, por lo tanto, no hay forma de cambiarla a
para volar Quizás alguien compartirá su decisión.


3. Decisión

Para una comprensión más completa del proceso, vale la pena traer un diagrama de la llamada tubería
de la documentación de Apache Beam.


imagen


En nuestro caso (usaremos la división en varias tablas):


  • input: los datos provienen de PubSub en el trabajo de flujo de datos
  • Transformación n. ° 1: los datos se convierten de una cadena a un diccionario de Python, obtenemos resultados
    PCollection # 1
  • Transformación n. ° 2: los datos se etiquetan, para una mayor separación de acuerdo con tablas separadas, en
    la salida es PCollection # 2 (en realidad una tupla PCollection)
  • Transformación # 3: los datos de PCollection # 2 se escriben en tablas usando esquemas
    mesas

En el proceso de escribir mi propia plantilla, estos ejemplos me inspiraron activamente.


Código de plantilla con comentarios (comentarios de la misma manera de autores anteriores):
 # coding=utf-8 from __future__ import absolute_import import logging import json import os import apache_beam as beam from apache_beam.pvalue import TaggedOutput from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.io.gcp.bigquery import parse_table_schema_from_json #  GCP  gcp_project = '' #  Pub/Sub  topic_name = '' # Pub/Sub    'projects/_GCP_/topics/_' input_topic = 'projects/%s/topics/%s' % (gcp_project, topic_name) #  BigQuery  bq_dataset = 'segment_eu_test' #       schema_dir = './' class TransformToBigQuery(beam.DoFn): #          ,   # BigQuery IO     python dict def process(self, element, *args, **kwargs): body = json.loads(element) #       ,      # python dict       ,     #   yield body class TagDataWithReqType(beam.DoFn): #      , ..      #     ,       #  with_outputs + default def process(self, element, *args, **kwargs): req_type = element.get('_') types = ( 'type1', 'type2', 'type3', ) if req_type in types: yield TaggedOutput(req_type, element) else: yield element def run(): #       _.json   schema_dir,  #         ()  schema_dct = {} for schema_file in os.listdir(schema_dir): filename_list = schema_file.split('.') if filename_list[-1] == 'json': with open('%s/%s' % (schema_dir, schema_file)) as f: schema_json = f.read() schema_dct[filename_list[0]] = json.dumps({'fields': json.loads(schema_json)}) # We use the save_main_session option because one or more DoFn's in this # workflow rely on global context (eg, a module imported at module level). pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True pipeline_options.view_as(StandardOptions).streaming = True # Read from PubSub into a PCollection. input_stream = p | beam.io.ReadFromPubSub(input_topic) # Transform stream to BigQuery IO format stream_bq = input_stream | 'transform to BigQuery' >> beam.ParDo(TransformToBigQuery()) # Tag stream by schema name tagged_stream = \ stream_bq \ | 'tag data by type' >> beam.ParDo(TagDataWithReqType()). with_outputs(*schema_dct.keys(), main='default') # Stream unidentified data to default table tagged_stream.default | 'push to default table' >> beam.io.WriteToBigQuery( '%s:%s.default' % ( gcp_project, bq_dataset, ), schema=parse_table_schema_from_json(schema_dct.get('default')), ) # Stream data to BigQuery tables by number of schema names for name, schema in schema_dct.iteritems(): tagged_stream[name] | 'push to table %s' % name >> beam.io.WriteToBigQuery( '%s:%s.%s' % ( gcp_project, bq_dataset, name), schema=parse_table_schema_from_json(schema), ) result = p.run() result.wait_until_finish() if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) logger = logging.getLogger(__name__) run() 

Ahora revisaremos el código y daremos algunas explicaciones, pero primero vale la pena decir que
La dificultad para escribir esta plantilla es pensar en términos del "flujo de datos", y
No es un mensaje específico. También es necesario comprender que Pub / Sub opera con mensajes y
de ellos recibiremos información para etiquetar la transmisión.


 pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True pipeline_options.view_as(StandardOptions).streaming = True 

Porque El conector Apache Beam Pub / Sub IO se usa solo en modo de transmisión necesario
agregue PipelineOptions () (aunque de hecho las opciones no se usan); de lo contrario, cree una plantilla
cae con la excepción. Hay que decir acerca de las opciones para iniciar la plantilla. Pueden ser
estático y llamado "tiempo de ejecución". Aquí hay un enlace a la documentación sobre este tema. Las opciones le permiten crear una plantilla sin especificar parámetros de antemano, pero pasándolos cuando inicia el Trabajo de flujo de datos desde la plantilla, pero aún no pude implementarlo, probablemente debido a que este conector no es compatible con RuntimeValueProvider .


 # Read from PubSub into a PCollection. input_stream = p | beam.io.ReadFromPubSub(input_topic) 

Todo está claro en el comentario, leemos el hilo del tema. Vale la pena agregar que puedes tomar la transmisión
tanto del tema como de la suscripción (suscripción). Si el tema se especifica como una entrada, entonces
se creará automáticamente una suscripción temporal a este tema. La sintaxis también es bonita
claro, el flujo de datos de entrada beam.io.ReadFromPubSub(input_topic) envía a nuestro
tubería p .


 # Transform stream to BigQuery IO format stream_bq = input_stream | 'transform to BigQuery' >> beam.ParDo(TransformToBigQuery()) 

Aquí es donde sucede Transform # 1 y nuestra entrada se convierte de una cadena de Python a
python dict, y en la salida obtenemos PCollection # 1. >> aparece en la sintaxis. En
de hecho, el texto entre comillas es el nombre de la secuencia (debe ser único), así como un comentario,
que se agregará al bloque en el gráfico en la interfaz web de GCP Dataflow. Consideremos con más detalle
clase reemplazada TransformToBigQuery .


 class TransformToBigQuery(beam.DoFn): #          ,   # BigQuery IO     python dict def process(self, element, *args, **kwargs): body = json.loads(element) #       ,      # python dict       ,     #  ,      python dict yield body 

La variable del element contendrá un mensaje de la suscripción de PubSub. Como se ve desde
código, en nuestro caso debería ser JSON válido. En el aula debe ser
Se redefine el método de process , en el que se deben realizar las transformaciones necesarias.
línea de entrada para obtener salida que coincida con el circuito
la tabla en la que se cargarán estos datos. Porque nuestro flujo en este caso es
continuo, unbounded en términos de Apache Beam, debe devolverlo usando
yield , no return , como para el flujo de datos final. En el caso de un flujo final,
(y necesario) configurar adicionalmente windowing y triggers


 # Tag stream by schema name tagged_stream = \ stream_bq \ | 'tag data by type' >> beam.ParDo(TagDataWithReqType()).with_outputs(*schema_dct.keys(), main='default') 

Este código dirige PCollection # 1 a Transform # 2 donde se realizará el etiquetado
(separación) del flujo de datos. En la variable schema_dct en este caso, un diccionario, donde la clave es el nombre del archivo de esquema sin la extensión, esta será la etiqueta y el valor es el JSON válido del esquema
Tablas de BigQuery para esta etiqueta. Cabe señalar que el esquema debe transmitirse exactamente a
ver {'fields': } donde es el esquema de la tabla BigQuery en forma JSON (puede
exportar desde la interfaz web).


main='default' es el nombre de la etiqueta de hilo a la que irán
Todos los mensajes que no están sujetos a condiciones de etiquetado. Considera la clase
TagDataWithReqType .


 class TagDataWithReqType(beam.DoFn): #      , ..      #     ,       #  with_outputs + default def process(self, element, *args, **kwargs): req_type = element.get('_') types = ( 'type1', 'type2', 'type3', ) if req_type in types: yield TaggedOutput(req_type, element) else: yield element 

Como puede ver, la clase de process también se reemplaza aquí. La variable de types contiene nombres
etiquetas y deben coincidir el número y el nombre con el número y los nombres de las teclas del diccionario
schema_dct . Aunque el método de process tiene la capacidad de aceptar argumentos, nunca
Pude pasarlos. No he descubierto la razón.


En la salida, obtenemos una tupla de hilos en el número de etiquetas, es decir, el número de nuestras
etiquetas predefinidas + hilo predeterminado que no se pudo etiquetar.


 # Stream unidentified data to default table tagged_stream.default | 'push to default table' >> beam.io.WriteToBigQuery( '%s:%s.default' % ( gcp_project, bq_dataset, ), schema=parse_table_schema_from_json(schema_dct.get('default')), ) 

Transformar # ... (de hecho, no está en el diagrama, esta es una "rama"): escribimos la secuencia predeterminada
a la tabla predeterminada.


tagged_stream.default : se tagged_stream.default una secuencia con la etiqueta default , una sintaxis alternativa es tagged_stream['default']


schema=parse_table_schema_from_json(schema_dct.get('default')) - aquí se define el esquema
mesas. Tenga en cuenta que el archivo default.json con el esquema de tabla BigQuery válido
debe estar en el schema_dir = './' actual de schema_dir = './' .


La transmisión irá a una tabla llamada default .


Si no existe una tabla con este nombre (en el conjunto de datos dado de este proyecto), entonces
se creará automáticamente a partir del esquema gracias a la configuración predeterminada
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED


 # Stream data to BigQuery tables by number of schema names for name, schema in schema_dct.iteritems(): tagged_stream[name] | 'push to table %s' % name >> beam.io.WriteToBigQuery( '%s:%s.%s' % ( gcp_project, bq_dataset, name), schema=parse_table_schema_from_json(schema), ) 

Transformación # 3, todo debe estar claro para aquellos que leen el artículo desde el principio y son dueños
sintaxis de python Separamos la tupla de la secuencia con un bucle y escribimos cada secuencia en su propia tabla con
su esquema Debe recordarse que el nombre de la secuencia debe ser único: '%s:%s.%s' % (gcp_project, bq_dataset, name) .


Ahora debe quedar claro cómo funciona esto y puede crear una plantilla. Para esto necesitas
ejecutar en la consola (no olvide activar venv si está disponible) o desde el IDE:


 python _.py / --runner DataflowRunner / --project dreamdata-test / --staging_location gs://STORAGE_NAME/STAGING_DIR / --temp_location gs://STORAGE_NAME/TEMP_DIR / --template_location gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME 

En este caso, el acceso a la cuenta de Google debe organizarse, por ejemplo, a través de la exportación.
la GOOGLE_APPLICATION_CREDENTIALS entorno GOOGLE_APPLICATION_CREDENTIALS u otra forma .


Algunas palabras sobre --runner . En este caso, DataflowRunner dice que este código
se ejecutará como plantilla para el trabajo de flujo de datos. Todavía es posible especificar
DirectRunner , se usará por defecto si no hay una opción: --runner y código
funcionará como un trabajo de flujo de datos, pero localmente, lo cual es muy conveniente para la depuración.


Si no se han producido errores, entonces gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME será
plantilla creada Vale la pena decir que en gs://STORAGE_NAME/STAGING_DIR también se escribirá
archivos de servicio que son necesarios para la operación exitosa del trabajo Datafow creado sobre la base de
plantilla y no es necesario eliminarlos.


A continuación, debe crear un trabajo de flujo de datos utilizando esta plantilla, manualmente o mediante cualquier
de otra manera (CI por ejemplo).


4. Conclusiones

Por lo tanto, logramos transmitir la secuencia de PubSub a BigQuery usando
transformaciones de datos necesarias con el fin de un mayor almacenamiento, transformación y
uso de datos.


Enlaces principales



En este artículo, son posibles imprecisiones e incluso errores, agradeceré la constructiva
critica Al final, quiero agregar que, de hecho, no todos se usan aquí
características del SDK de Apache Beam, pero ese no era el objetivo.

Source: https://habr.com/ru/post/441892/


All Articles