Crie um pipeline para processamento de dados de streaming. Parte 1

Olá pessoal. Amigos, estamos compartilhando com você uma tradução de um artigo preparado especialmente para os alunos do curso Data Engineer . Vamos lá!



Apache Beam e DataFlow para pipelines em tempo real


A publicação de hoje é baseada em uma tarefa na qual trabalhei recentemente. Fiquei muito feliz em traduzi-lo e descrever o trabalho realizado no formato de uma postagem de blog, pois me deu a oportunidade de trabalhar em engenharia de dados e de fazer algo que seria muito útil para minha equipe. Há pouco tempo, descobri que nossos sistemas armazenam uma quantidade bastante grande de logs de usuários relacionados a um de nossos produtos para trabalhar com dados. Aconteceu que ninguém tinha usado esses dados, então fiquei imediatamente interessado no que poderíamos descobrir se começássemos a analisá-los regularmente. No entanto, houve vários problemas ao longo do caminho. O primeiro problema foi que os dados foram armazenados em muitos arquivos de texto diferentes que não estavam disponíveis para análise instantânea. O segundo problema era que eles estavam armazenados em um sistema fechado, portanto não pude usar nenhuma das minhas ferramentas favoritas de análise de dados.

Eu tive que decidir como facilitar o acesso para nós e agregar pelo menos algum valor incorporando essa fonte de dados em algumas de nossas soluções de interação com o usuário. Depois de pensar um pouco, decidi construir um pipeline para transferir esses dados para o banco de dados em nuvem, para que eu e a equipe pudessem acessá-los e começar a gerar conclusões. Depois de concluir minha especialização em Engenharia de Dados na Coursera, há algum tempo, eu estava ansioso para usar algumas das ferramentas do curso no projeto.

Portanto, colocar dados em um banco de dados na nuvem parecia uma maneira inteligente de resolver meu primeiro problema, mas o que eu poderia fazer com o problema número 2? Felizmente, havia uma maneira de transferir esses dados para um ambiente onde eu podia acessar ferramentas como Python e o Google Cloud Platform (GCP). No entanto, como foi um processo longo, eu precisava fazer algo que me permitisse continuar o desenvolvimento enquanto aguardava o término da transferência de dados. A solução que eu encontrei foi criar dados falsos usando a biblioteca Faker em Python. Eu nunca tinha usado essa biblioteca antes, mas rapidamente percebi o quão útil era. O uso dessa abordagem me permitiu começar a escrever código e testar o pipeline sem dados reais.

Com base no exposto, neste post, mostrarei como construí o pipeline descrito acima, usando algumas das tecnologias disponíveis no GCP. Em particular, usarei o Apache Beam (versão para Python), Dataflow, Pub / Sub e Big Query para coletar logs do usuário, converter dados e transferi-los para um banco de dados para análise posterior. No meu caso, eu só precisava da funcionalidade de lote do Beam, já que meus dados não chegaram em tempo real, portanto, Pub / Sub não era necessário. No entanto, vou me concentrar na versão de streaming, pois é isso que você pode encontrar na prática.

Introdução ao GCP e Apache Beam


O Google Cloud Platform fornece um conjunto de ferramentas realmente úteis para o processamento de big data. Aqui estão algumas das ferramentas que vou usar:

  • Pub / Sub é um serviço de mensagens que usa o modelo Publisher-Subscriber que nos permite receber dados em tempo real.
  • O DataFlow é um serviço que simplifica a criação de pipelines de dados e resolve automaticamente tarefas como dimensionar a infraestrutura, o que significa que só podemos focar na escrita de código para o nosso pipeline.
  • O BigQuery é um data warehouse baseado em nuvem. Se você estiver familiarizado com outros bancos de dados SQL, não precisará lidar com o BigQuery por muito tempo.
  • E, finalmente, usaremos o Apache Beam, a saber, o foco na versão Python para criar nosso pipeline. Essa ferramenta nos permitirá criar um pipeline para streaming ou processamento em lote que se integra ao GCP. É especialmente útil para processamento paralelo e é adequado para tarefas como extração, transformação e carregamento (ETL); portanto, se precisarmos mover dados de um local para outro com transformações ou cálculos, o Beam é uma boa opção.


Um grande número de ferramentas está disponível no GCP; portanto, pode ser difícil abranger todas elas, incluindo sua finalidade, mas, no entanto, aqui está um breve resumo para referência.

Visualização do nosso transportador


Vamos visualizar os componentes do nosso pipeline na Figura 1 . Em um nível alto, queremos coletar dados do usuário em tempo real, processá-los e transferi-los para o BigQuery. Os logs são criados quando os usuários interagem com o produto enviando solicitações ao servidor, que são registradas. Esses dados podem ser especialmente úteis para entender como os usuários interagem com nosso produto e se eles funcionam corretamente. Em geral, o transportador conterá as seguintes etapas:

  1. Os dados de log de nossos usuários são publicados na seção Pub / Sub.
  2. Vamos nos conectar ao Pub / Sub e converter os dados para o formato apropriado usando Python e Beam (etapas 3 e 4 na Figura 1).
  3. Após a conversão dos dados, o Beam se conectará ao BigQuery e os adicionará à nossa tabela (etapas 4 e 5 na Figura 1).
  4. Para análise, podemos conectar-se ao BigQuery usando várias ferramentas, como Tableau e Python.

O Beam torna esse processo muito simples, independentemente de termos uma fonte de dados de streaming ou um arquivo CSV e queremos fazer o processamento em lote. Mais tarde, você verá que o código contém apenas as alterações mínimas necessárias para alternar entre eles. Esse é um dos benefícios do uso do Beam.


Figura 1: O principal pipeline de dados

Criando pseudo dados usando o Faker


Como mencionei anteriormente, devido ao acesso limitado aos dados, decidi criar pseudo-dados no mesmo formato que os reais. Este foi um exercício realmente útil, pois eu poderia escrever código e testar o pipeline enquanto esperava dados. Sugiro dar uma olhada na documentação do Faker , se você quiser saber o que mais esta biblioteca tem a oferecer. Nossos dados de usuário geralmente serão semelhantes ao exemplo abaixo. Com base nesse formato, podemos gerar dados linha por linha para simular dados em tempo real. Esses logs nos fornecem informações como data, tipo de solicitação, resposta do servidor, endereço IP etc.

192.52.197.161 - - [30/Apr/2019:21:11:42] "PUT /tag/category/tag HTTP/1.1" [401] 155 "https://harris-lopez.com/categories/about/" "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_11_2) AppleWebKit/5312 (KHTML, like Gecko) Chrome/34.0.855.0 Safari/5312"

Com base na linha acima, queremos criar nossa variável LINE usando 7 variáveis ​​entre chaves abaixo. Também os usaremos como nomes de variáveis ​​em nosso esquema de tabela um pouco mais tarde.

LINE = """\
{remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"\
"""


Se realizássemos o processamento em lote, o código seria muito semelhante, embora precisássemos criar um conjunto de amostras em um determinado intervalo de tempo. Para usar um falsificador, simplesmente criamos um objeto e chamamos os métodos que precisamos. Em particular, o Faker foi útil para criar endereços IP e também sites. Eu usei os seguintes métodos:

fake.ipv4()
fake.uri_path()
fake.uri()
fake.user_agent()


 from faker import Faker import time import random import os import numpy as np from datetime import datetime, timedelta LINE = """\ {remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"\ """ def generate_log_line(): fake = Faker() now = datetime.now() remote_addr = fake.ipv4() time_local = now.strftime('%d/%b/%Y:%H:%M:%S') request_type = random.choice(["GET", "POST", "PUT"]) request_path = "/" + fake.uri_path() status = np.random.choice([200, 401, 404], p = [0.9, 0.05, 0.05]) body_bytes_sent = random.choice(range(5, 1000, 1)) http_referer = fake.uri() http_user_agent = fake.user_agent() log_line = LINE.format( remote_addr=remote_addr, time_local=time_local, request_type=request_type, request_path=request_path, status=status, body_bytes_sent=body_bytes_sent, http_referer=http_referer, http_user_agent=http_user_agent ) return log_line 


O fim da primeira parte.

Nos próximos dias, compartilharemos com você a continuação do artigo, mas agora estamos tradicionalmente aguardando comentários ;-).

Segunda parte

Source: https://habr.com/ru/post/pt454186/


All Articles