Criando um modelo de fluxo de dados para transmitir dados do Pub / Sub para o BigQuery com base no GCP usando o Apache Beam SDK e Python

imagem


No momento, estou envolvido na tarefa de transmitir (e converter) dados. Em alguns círculos
esse processo é conhecido como ETL , ou seja, extração, conversão e carregamento de informações.


Todo o processo inclui a participação dos seguintes serviços do Google Cloud Platform :


  • Pub / Sub - serviço para streaming de dados em tempo real
  • Dataflow - um serviço de conversão de dados (pode
    trabalhar tanto em tempo real quanto em lote)
  • BigQuery - um serviço para armazenar dados na forma de tabelas
    (suporta SQL)

0. Status atual

No momento, há uma versão funcional do streaming nos serviços acima, no entanto,
Como modelo, um dos padrões é usado .


O problema é que esse modelo fornece transferência de 1 para 1, ou seja, em
na entrada de Pub / Sub, temos uma string no formato JSON; na saída, temos uma tabela BigQuery com campos,
que correspondem às chaves dos objetos no nível superior do JSON de entrada.


1. Declaração do problema

Crie um modelo de fluxo de dados que permita obter uma tabela ou tabelas na saída
de acordo com as condições dadas. Por exemplo, queremos criar uma tabela separada para cada
valores de uma chave JSON de entrada específica. É necessário levar em conta o fato de que algumas
Os objetos JSON de entrada podem conter JSON aninhado como um valor, ou seja, é necessário
ser capaz de criar tabelas do BigQuery com campos do tipo RECORD para armazenar aninhados
dados.


2. Preparação para a decisão

Para criar um modelo de fluxo de dados, use o Apache Beam SDK , que, por sua vez,
suporta Java e Python como uma linguagem de programação. Devo dizer que
apenas a versão Python 2.7.x é suportada, o que me surpreendeu um pouco. Além disso, o apoio
Java é um pouco mais amplo, porque para Python, por exemplo, algumas funcionalidades não estão disponíveis e mais
Uma lista modesta de conectores internos . A propósito, você pode escrever seus próprios conectores.


No entanto, devido ao fato de eu não estar familiarizado com Java, usei o Python.


Antes de começar a criar um modelo, você deve ter o seguinte:


  1. formato JSON de entrada e não deve mudar com o tempo
  2. esquema ou esquemas de tabelas do BigQuery para as quais os dados serão transmitidos
  3. o número de tabelas nas quais o fluxo de dados de saída será transmitido

Observe que, após criar um modelo e iniciar o Trabalho de fluxo de dados com base nele, esses parâmetros podem ser
alterar apenas criando um novo modelo.


Digamos algumas palavras sobre essas restrições. Todos eles vêm do fato de que não há possibilidade
crie um modelo dinâmico que possa receber qualquer string como entrada, analise-o
de acordo com a lógica interna e, em seguida, preencha tabelas criadas dinamicamente com
criado pelo circuito. É muito provável que essa possibilidade exista, mas dentro dos dados
Não consegui implementar esse esquema. Tanto quanto eu entendo o todo
o pipeline é construído antes de executá-lo em tempo de execução e, portanto, não há como alterá-lo para
voar. Talvez alguém compartilhe sua decisão.


3. Decisão

Para uma compreensão mais completa do processo, vale a pena trazer um diagrama do chamado pipeline
da documentação do Apache Beam.


imagem


No nosso caso (usaremos a divisão em várias tabelas):


  • input - os dados vêm do PubSub no Dataflow Job
  • Transform # 1 - os dados são convertidos de uma string para um dicionário Python, obtemos saída
    PCollection # 1
  • Transformação nº 2 - os dados são marcados, para posterior separação de acordo com tabelas separadas, em
    a saída é PCollection # 2 (na verdade, uma tupla de PCollection)
  • Transformação nº 3 - os dados do PCollection nº 2 são gravados em tabelas usando esquemas
    mesas

No processo de escrever meu próprio modelo, fui ativamente inspirado por esses exemplos.


Código do modelo com comentários (comentários à esquerda da mesma maneira dos autores anteriores):
 # coding=utf-8 from __future__ import absolute_import import logging import json import os import apache_beam as beam from apache_beam.pvalue import TaggedOutput from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.io.gcp.bigquery import parse_table_schema_from_json #  GCP  gcp_project = '' #  Pub/Sub  topic_name = '' # Pub/Sub    'projects/_GCP_/topics/_' input_topic = 'projects/%s/topics/%s' % (gcp_project, topic_name) #  BigQuery  bq_dataset = 'segment_eu_test' #       schema_dir = './' class TransformToBigQuery(beam.DoFn): #          ,   # BigQuery IO     python dict def process(self, element, *args, **kwargs): body = json.loads(element) #       ,      # python dict       ,     #   yield body class TagDataWithReqType(beam.DoFn): #      , ..      #     ,       #  with_outputs + default def process(self, element, *args, **kwargs): req_type = element.get('_') types = ( 'type1', 'type2', 'type3', ) if req_type in types: yield TaggedOutput(req_type, element) else: yield element def run(): #       _.json   schema_dir,  #         ()  schema_dct = {} for schema_file in os.listdir(schema_dir): filename_list = schema_file.split('.') if filename_list[-1] == 'json': with open('%s/%s' % (schema_dir, schema_file)) as f: schema_json = f.read() schema_dct[filename_list[0]] = json.dumps({'fields': json.loads(schema_json)}) # We use the save_main_session option because one or more DoFn's in this # workflow rely on global context (eg, a module imported at module level). pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True pipeline_options.view_as(StandardOptions).streaming = True # Read from PubSub into a PCollection. input_stream = p | beam.io.ReadFromPubSub(input_topic) # Transform stream to BigQuery IO format stream_bq = input_stream | 'transform to BigQuery' >> beam.ParDo(TransformToBigQuery()) # Tag stream by schema name tagged_stream = \ stream_bq \ | 'tag data by type' >> beam.ParDo(TagDataWithReqType()). with_outputs(*schema_dct.keys(), main='default') # Stream unidentified data to default table tagged_stream.default | 'push to default table' >> beam.io.WriteToBigQuery( '%s:%s.default' % ( gcp_project, bq_dataset, ), schema=parse_table_schema_from_json(schema_dct.get('default')), ) # Stream data to BigQuery tables by number of schema names for name, schema in schema_dct.iteritems(): tagged_stream[name] | 'push to table %s' % name >> beam.io.WriteToBigQuery( '%s:%s.%s' % ( gcp_project, bq_dataset, name), schema=parse_table_schema_from_json(schema), ) result = p.run() result.wait_until_finish() if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) logger = logging.getLogger(__name__) run() 

Agora vamos analisar o código e dar explicações, mas primeiro vale a pena dizer que os principais
a dificuldade em escrever este modelo é pensar em termos do "fluxo de dados" e
não é uma mensagem específica. Também é necessário entender que o Pub / Sub opera com mensagens e
é com eles que receberemos informações para marcar o fluxo.


 pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True pipeline_options.view_as(StandardOptions).streaming = True 

Porque O conector Apache Beam Pub / Sub IO é usado apenas no modo de streaming necessário
adicione PipelineOptions () (embora de fato as opções não sejam usadas); caso contrário, crie um modelo
cai com a exceção. Deve-se dizer sobre as opções para iniciar o modelo. Eles podem ser
estático e chamado "tempo de execução". Aqui está um link para a documentação sobre este tópico. As opções permitem criar um modelo sem especificar parâmetros antecipadamente, mas transmiti-los ao iniciar o Trabalho de fluxo de dados a partir do modelo, mas ainda não consegui implementá-lo, provavelmente devido ao fato de este conector não suportar RuntimeValueProvider .


 # Read from PubSub into a PCollection. input_stream = p | beam.io.ReadFromPubSub(input_topic) 

Tudo está claro a partir do comentário, lemos a discussão do tópico. Vale acrescentar que você pode aproveitar o fluxo
do tópico e da assinatura (assinatura). Se o tópico for especificado como uma entrada, então
uma assinatura temporária para este tópico será criada automaticamente. A sintaxe também é bonita
claro, o fluxo de dados de entrada beam.io.ReadFromPubSub(input_topic) enviado ao nosso
gasoduto p .


 # Transform stream to BigQuery IO format stream_bq = input_stream | 'transform to BigQuery' >> beam.ParDo(TransformToBigQuery()) 

É aqui que a transformação nº 1 acontece e nossa entrada é convertida de uma string python para
python dict, e na saída temos o PCollection # 1. >> aparece na sintaxe. Ativado
de fato, o texto entre aspas é o nome do fluxo (deve ser exclusivo), além de um comentário,
que será adicionado ao bloco no gráfico na interface da web do GCP Dataflow. Vamos considerar em mais detalhes
classe substituída TransformToBigQuery .


 class TransformToBigQuery(beam.DoFn): #          ,   # BigQuery IO     python dict def process(self, element, *args, **kwargs): body = json.loads(element) #       ,      # python dict       ,     #  ,      python dict yield body 

A variável do element conterá uma mensagem da assinatura do PubSub. Como visto de
código, no nosso caso, deve ser JSON válido. Na sala de aula deve ser
o método do process é redefinido, no qual as transformações necessárias devem ser feitas
linha de entrada para obter uma saída que corresponda ao circuito
a tabela na qual esses dados serão carregados. Porque nosso fluxo neste caso é
contínuo, unbounded em termos de Apache Beam, você deve devolvê-lo usando
yield , não return , como no fluxo de dados final. No caso de um fluxo final, você pode
(e necessário) configurar adicionalmente windowing e triggers


 # Tag stream by schema name tagged_stream = \ stream_bq \ | 'tag data by type' >> beam.ParDo(TagDataWithReqType()).with_outputs(*schema_dct.keys(), main='default') 

Esse código direciona a PCollection # 1 à Transform # 2, onde a marcação ocorrerá
(separação) do fluxo de dados. Na variável schema_dct neste caso, um dicionário, em que a chave é o nome do arquivo do esquema sem a extensão, essa será a tag e o valor é o JSON válido do esquema
Tabelas do BigQuery para esta tag. Note-se que o esquema deve ser transmitido exatamente para
visualize {'fields': } que é o esquema da tabela do BigQuery no formato JSON (você pode
exportar da interface da web).


main='default' é o nome da tag do segmento para a qual eles irão
Todas as mensagens que não estão sujeitas às condições de marcação. Considere a classe
TagDataWithReqType .


 class TagDataWithReqType(beam.DoFn): #      , ..      #     ,       #  with_outputs + default def process(self, element, *args, **kwargs): req_type = element.get('_') types = ( 'type1', 'type2', 'type3', ) if req_type in types: yield TaggedOutput(req_type, element) else: yield element 

Como você pode ver, a classe de process também é redefinida aqui. A variável types contém nomes
tags e devem corresponder ao número e nome com o número e os nomes das chaves do dicionário
schema_dct . Embora o método do process possa aceitar argumentos, eu nunca
Eu fui capaz de passar por eles. Ainda não descobri o motivo.


Na saída, obtemos uma tupla de threads no número de tags, ou seja, o número de nossos
tags predefinidas + segmento padrão que não foram identificados.


 # Stream unidentified data to default table tagged_stream.default | 'push to default table' >> beam.io.WriteToBigQuery( '%s:%s.default' % ( gcp_project, bq_dataset, ), schema=parse_table_schema_from_json(schema_dct.get('default')), ) 

Transformação # ... (na verdade, não está no diagrama, é um "ramo") - escrevemos o fluxo padrão
para a tabela padrão.


tagged_stream.default - um fluxo com a tag default é obtido, uma sintaxe alternativa é tagged_stream['default']


schema=parse_table_schema_from_json(schema_dct.get('default')) - aqui o esquema é definido
mesas. Observe que o arquivo default.json com o esquema de tabela válido do BigQuery
deve estar no schema_dir = './' atual.


O fluxo irá para uma tabela chamada default .


Se uma tabela com este nome (no conjunto de dados fornecido deste projeto) não existir, ela será
será criado automaticamente a partir do esquema, graças à configuração padrão
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED


 # Stream data to BigQuery tables by number of schema names for name, schema in schema_dct.iteritems(): tagged_stream[name] | 'push to table %s' % name >> beam.io.WriteToBigQuery( '%s:%s.%s' % ( gcp_project, bq_dataset, name), schema=parse_table_schema_from_json(schema), ) 

Na transformação nº 3, tudo deve ficar claro para quem lê o artigo desde o início e possui
sintaxe python. Separamos a tupla de fluxo com um loop e escrevemos cada fluxo em sua própria tabela com
o esquema dele. Deve-se lembrar que o nome do fluxo deve ser exclusivo - '%s:%s.%s' % (gcp_project, bq_dataset, name) .


Agora deve ficar claro como isso funciona e você pode criar um modelo. Para isso você precisa
execute no console (não esqueça de ativar o venv, se disponível) ou no IDE:


 python _.py / --runner DataflowRunner / --project dreamdata-test / --staging_location gs://STORAGE_NAME/STAGING_DIR / --temp_location gs://STORAGE_NAME/TEMP_DIR / --template_location gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME 

Ao mesmo tempo, o acesso à Conta do Google deve ser organizado, por exemplo, através da exportação
a GOOGLE_APPLICATION_CREDENTIALS ambiente GOOGLE_APPLICATION_CREDENTIALS ou de outra maneira .


Algumas palavras sobre --runner . Nesse caso, o DataflowRunner diz que esse código
será executado como um modelo para o trabalho de fluxo de dados. Ainda é possível especificar
DirectRunner , será usado por padrão se não houver opção --runner e código
funcionará como um trabalho de fluxo de dados, mas localmente, o que é muito conveniente para depuração.


Se nenhum erro ocorreu, gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME será
modelo criado. Vale dizer que em gs://STORAGE_NAME/STAGING_DIR também será gravado
arquivos de serviço necessários para a operação bem-sucedida do Datafow Job criado com base em
modelo e você não precisa excluí-los.


Em seguida, você precisa criar um trabalho de fluxo de dados usando este modelo, manualmente ou por qualquer
de outra maneira (IC por exemplo).


4. Conclusões

Assim, conseguimos transmitir o fluxo do PubSub para o BigQuery usando
transformações de dados necessárias para fins de armazenamento, transformação e
uso de dados.


Links principais



Neste artigo, imprecisões e até erros são possíveis, serei grato pelo construtivo
crítica. No final, quero acrescentar que, de fato, nem todos são usados ​​aqui
recursos do SDK do Apache Beam, mas esse não era o objetivo.

Source: https://habr.com/ru/post/pt441892/


All Articles