
No momento, estou envolvido na tarefa de transmitir (e converter) dados. Em alguns círculos
esse processo é conhecido como ETL , ou seja, extração, conversão e carregamento de informações.
Todo o processo inclui a participação dos seguintes serviços do Google Cloud Platform :
- Pub / Sub - serviço para streaming de dados em tempo real
- Dataflow - um serviço de conversão de dados (pode
trabalhar tanto em tempo real quanto em lote) - BigQuery - um serviço para armazenar dados na forma de tabelas
(suporta SQL)
0. Status atual
No momento, há uma versão funcional do streaming nos serviços acima, no entanto,
Como modelo, um dos padrões é usado .
O problema é que esse modelo fornece transferência de 1 para 1, ou seja, em
na entrada de Pub / Sub, temos uma string no formato JSON; na saída, temos uma tabela BigQuery com campos,
que correspondem às chaves dos objetos no nível superior do JSON de entrada.
1. Declaração do problema
Crie um modelo de fluxo de dados que permita obter uma tabela ou tabelas na saída
de acordo com as condições dadas. Por exemplo, queremos criar uma tabela separada para cada
valores de uma chave JSON de entrada específica. É necessário levar em conta o fato de que algumas
Os objetos JSON de entrada podem conter JSON aninhado como um valor, ou seja, é necessário
ser capaz de criar tabelas do BigQuery com campos do tipo RECORD
para armazenar aninhados
dados.
2. Preparação para a decisão
Para criar um modelo de fluxo de dados, use o Apache Beam SDK , que, por sua vez,
suporta Java e Python como uma linguagem de programação. Devo dizer que
apenas a versão Python 2.7.x é suportada, o que me surpreendeu um pouco. Além disso, o apoio
Java é um pouco mais amplo, porque para Python, por exemplo, algumas funcionalidades não estão disponíveis e mais
Uma lista modesta de conectores internos . A propósito, você pode escrever seus próprios conectores.
No entanto, devido ao fato de eu não estar familiarizado com Java, usei o Python.
Antes de começar a criar um modelo, você deve ter o seguinte:
- formato JSON de entrada e não deve mudar com o tempo
- esquema ou esquemas de tabelas do BigQuery para as quais os dados serão transmitidos
- o número de tabelas nas quais o fluxo de dados de saída será transmitido
Observe que, após criar um modelo e iniciar o Trabalho de fluxo de dados com base nele, esses parâmetros podem ser
alterar apenas criando um novo modelo.
Digamos algumas palavras sobre essas restrições. Todos eles vêm do fato de que não há possibilidade
crie um modelo dinâmico que possa receber qualquer string como entrada, analise-o
de acordo com a lógica interna e, em seguida, preencha tabelas criadas dinamicamente com
criado pelo circuito. É muito provável que essa possibilidade exista, mas dentro dos dados
Não consegui implementar esse esquema. Tanto quanto eu entendo o todo
o pipeline é construído antes de executá-lo em tempo de execução e, portanto, não há como alterá-lo para
voar. Talvez alguém compartilhe sua decisão.
3. Decisão
Para uma compreensão mais completa do processo, vale a pena trazer um diagrama do chamado pipeline
da documentação do Apache Beam.

No nosso caso (usaremos a divisão em várias tabelas):
- input - os dados vêm do PubSub no Dataflow Job
- Transform # 1 - os dados são convertidos de uma string para um dicionário Python, obtemos saída
PCollection # 1 - Transformação nº 2 - os dados são marcados, para posterior separação de acordo com tabelas separadas, em
a saída é PCollection # 2 (na verdade, uma tupla de PCollection) - Transformação nº 3 - os dados do PCollection nº 2 são gravados em tabelas usando esquemas
mesas
No processo de escrever meu próprio modelo, fui ativamente inspirado por esses exemplos.
Código do modelo com comentários (comentários à esquerda da mesma maneira dos autores anteriores): Agora vamos analisar o código e dar explicações, mas primeiro vale a pena dizer que os principais
a dificuldade em escrever este modelo é pensar em termos do "fluxo de dados" e
não é uma mensagem específica. Também é necessário entender que o Pub / Sub opera com mensagens e
é com eles que receberemos informações para marcar o fluxo.
pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True pipeline_options.view_as(StandardOptions).streaming = True
Porque O conector Apache Beam Pub / Sub IO é usado apenas no modo de streaming necessário
adicione PipelineOptions () (embora de fato as opções não sejam usadas); caso contrário, crie um modelo
cai com a exceção. Deve-se dizer sobre as opções para iniciar o modelo. Eles podem ser
estático e chamado "tempo de execução". Aqui está um link para a documentação sobre este tópico. As opções permitem criar um modelo sem especificar parâmetros antecipadamente, mas transmiti-los ao iniciar o Trabalho de fluxo de dados a partir do modelo, mas ainda não consegui implementá-lo, provavelmente devido ao fato de este conector não suportar RuntimeValueProvider
.
Tudo está claro a partir do comentário, lemos a discussão do tópico. Vale acrescentar que você pode aproveitar o fluxo
do tópico e da assinatura (assinatura). Se o tópico for especificado como uma entrada, então
uma assinatura temporária para este tópico será criada automaticamente. A sintaxe também é bonita
claro, o fluxo de dados de entrada beam.io.ReadFromPubSub(input_topic)
enviado ao nosso
gasoduto p
.
É aqui que a transformação nº 1 acontece e nossa entrada é convertida de uma string python para
python dict, e na saída temos o PCollection # 1. >>
aparece na sintaxe. Ativado
de fato, o texto entre aspas é o nome do fluxo (deve ser exclusivo), além de um comentário,
que será adicionado ao bloco no gráfico na interface da web do GCP Dataflow. Vamos considerar em mais detalhes
classe substituída TransformToBigQuery
.
class TransformToBigQuery(beam.DoFn):
A variável do element
conterá uma mensagem da assinatura do PubSub. Como visto de
código, no nosso caso, deve ser JSON válido. Na sala de aula deve ser
o método do process
é redefinido, no qual as transformações necessárias devem ser feitas
linha de entrada para obter uma saída que corresponda ao circuito
a tabela na qual esses dados serão carregados. Porque nosso fluxo neste caso é
contínuo, unbounded
em termos de Apache Beam, você deve devolvê-lo usando
yield
, não return
, como no fluxo de dados final. No caso de um fluxo final, você pode
(e necessário) configurar adicionalmente windowing
e triggers
Esse código direciona a PCollection # 1 à Transform # 2, onde a marcação ocorrerá
(separação) do fluxo de dados. Na variável schema_dct
neste caso, um dicionário, em que a chave é o nome do arquivo do esquema sem a extensão, essa será a tag e o valor é o JSON válido do esquema
Tabelas do BigQuery para esta tag. Note-se que o esquema deve ser transmitido exatamente para
visualize {'fields': }
que
é o esquema da tabela do BigQuery no formato JSON (você pode
exportar da interface da web).
main='default'
é o nome da tag do segmento para a qual eles irão
Todas as mensagens que não estão sujeitas às condições de marcação. Considere a classe
TagDataWithReqType
.
class TagDataWithReqType(beam.DoFn):
Como você pode ver, a classe de process
também é redefinida aqui. A variável types
contém nomes
tags e devem corresponder ao número e nome com o número e os nomes das chaves do dicionário
schema_dct
. Embora o método do process
possa aceitar argumentos, eu nunca
Eu fui capaz de passar por eles. Ainda não descobri o motivo.
Na saída, obtemos uma tupla de threads no número de tags, ou seja, o número de nossos
tags predefinidas + segmento padrão que não foram identificados.
Transformação # ... (na verdade, não está no diagrama, é um "ramo") - escrevemos o fluxo padrão
para a tabela padrão.
tagged_stream.default
- um fluxo com a tag default
é obtido, uma sintaxe alternativa é tagged_stream['default']
schema=parse_table_schema_from_json(schema_dct.get('default'))
- aqui o esquema é definido
mesas. Observe que o arquivo default.json
com o esquema de tabela válido do BigQuery
deve estar no schema_dir = './'
atual.
O fluxo irá para uma tabela chamada default
.
Se uma tabela com este nome (no conjunto de dados fornecido deste projeto) não existir, ela será
será criado automaticamente a partir do esquema, graças à configuração padrão
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED
Na transformação nº 3, tudo deve ficar claro para quem lê o artigo desde o início e possui
sintaxe python. Separamos a tupla de fluxo com um loop e escrevemos cada fluxo em sua própria tabela com
o esquema dele. Deve-se lembrar que o nome do fluxo deve ser exclusivo - '%s:%s.%s' % (gcp_project, bq_dataset, name)
.
Agora deve ficar claro como isso funciona e você pode criar um modelo. Para isso você precisa
execute no console (não esqueça de ativar o venv, se disponível) ou no IDE:
python _.py / --runner DataflowRunner / --project dreamdata-test / --staging_location gs://STORAGE_NAME/STAGING_DIR / --temp_location gs://STORAGE_NAME/TEMP_DIR / --template_location gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME
Ao mesmo tempo, o acesso à Conta do Google deve ser organizado, por exemplo, através da exportação
a GOOGLE_APPLICATION_CREDENTIALS
ambiente GOOGLE_APPLICATION_CREDENTIALS
ou de outra maneira .
Algumas palavras sobre --runner
. Nesse caso, o DataflowRunner
diz que esse código
será executado como um modelo para o trabalho de fluxo de dados. Ainda é possível especificar
DirectRunner
, será usado por padrão se não houver opção --runner
e código
funcionará como um trabalho de fluxo de dados, mas localmente, o que é muito conveniente para depuração.
Se nenhum erro ocorreu, gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME
será
modelo criado. Vale dizer que em gs://STORAGE_NAME/STAGING_DIR
também será gravado
arquivos de serviço necessários para a operação bem-sucedida do Datafow Job criado com base em
modelo e você não precisa excluí-los.
Em seguida, você precisa criar um trabalho de fluxo de dados usando este modelo, manualmente ou por qualquer
de outra maneira (IC por exemplo).
4. Conclusões
Assim, conseguimos transmitir o fluxo do PubSub para o BigQuery usando
transformações de dados necessárias para fins de armazenamento, transformação e
uso de dados.
Links principais
Neste artigo, imprecisões e até erros são possíveis, serei grato pelo construtivo
crítica. No final, quero acrescentar que, de fato, nem todos são usados aqui
recursos do SDK do Apache Beam, mas esse não era o objetivo.