Cree una tubería para el procesamiento de datos de transmisión. Parte 1

Hola a todos Amigos, estamos compartiendo con ustedes una traducción de un artículo preparado especialmente para estudiantes del curso de Ingeniero de Datos . Vamos!



Apache Beam y DataFlow para canalizaciones en tiempo real


La publicación de hoy se basa en una tarea en la que trabajé recientemente en el trabajo. Estaba realmente feliz de implementarlo y describir el trabajo realizado en el formato de una publicación de blog, porque me dio la oportunidad de trabajar en ingeniería de datos, así como de hacer algo que sería muy útil para mi equipo. No hace mucho tiempo, descubrí que nuestros sistemas almacenan una cantidad bastante grande de registros de usuarios relacionados con uno de nuestros productos para trabajar con datos. Resultó que nadie había usado estos datos, por lo que inmediatamente me interesé en lo que podríamos descubrir si comenzáramos a analizarlos regularmente. Sin embargo, hubo varios problemas en el camino. El primer problema fue que los datos se almacenaron en muchos archivos de texto diferentes que no estaban disponibles para el análisis instantáneo. El segundo problema era que estaban almacenados en un sistema cerrado, por lo que no podía usar ninguna de mis herramientas de análisis de datos favoritas.

Tenía que decidir cómo hacer que el acceso sea más fácil para nosotros y agregar al menos algo de valor al incorporar esta fuente de datos en algunas de nuestras soluciones de interacción con el usuario. Después de pensar por un tiempo, decidí construir una tubería para transferir estos datos a la base de datos en la nube para que yo y el equipo pudiéramos acceder a ellos y comenzar a generar conclusiones. Después de completar mi especialización en Ingeniería de Datos en Coursera hace algún tiempo, estaba ansioso por usar algunas de las herramientas del curso en el proyecto.

Así que poner datos en una base de datos en la nube parecía una forma inteligente de resolver mi primer problema, pero ¿qué podría hacer con el problema número 2? Afortunadamente, había una manera de transferir estos datos a un entorno donde podía acceder a herramientas como Python y Google Cloud Platform (GCP). Sin embargo, fue un proceso largo, por lo que necesitaba hacer algo que me permitiera continuar el desarrollo mientras esperaba el final de la transferencia de datos. La solución que se me ocurrió fue crear datos falsos utilizando la biblioteca Faker en Python. Nunca antes había usado esta biblioteca, pero rápidamente me di cuenta de lo útil que era. El uso de este enfoque me permitió comenzar a escribir código y probar la tubería sin datos reales.

Con base en lo anterior, en esta publicación le contaré cómo construí la tubería descrita anteriormente utilizando algunas de las tecnologías disponibles en GCP. En particular, usaré Apache Beam (versión para Python), Dataflow, Pub / Sub y Big Query para recopilar registros de usuarios, convertir datos y transferirlos a una base de datos para su posterior análisis. En mi caso, solo necesitaba la funcionalidad por lotes de Beam, ya que mis datos no llegaron en tiempo real, por lo que no se requirió Pub / Sub. Sin embargo, me centraré en la versión de transmisión, ya que esto es lo que puede encontrar en la práctica.

Introducción a GCP y Apache Beam


Google Cloud Platform proporciona un conjunto de herramientas realmente útiles para procesar big data. Estas son algunas de las herramientas que usaré:

  • Pub / Sub es un servicio de mensajería que utiliza la plantilla Publisher-Subscriber que nos permite recibir datos en tiempo real.
  • DataFlow es un servicio que simplifica la creación de canalizaciones de datos y resuelve automáticamente tareas como escalar la infraestructura, lo que significa que solo podemos centrarnos en escribir código para nuestra canalización.
  • BigQuery es un almacén de datos basado en la nube. Si está familiarizado con otras bases de datos SQL, no tendrá que lidiar con BigQuery por mucho tiempo.
  • Y finalmente, usaremos Apache Beam, es decir, nos centraremos en la versión de Python para crear nuestra tubería. Esta herramienta nos permitirá crear una tubería para la transmisión o el procesamiento por lotes que se integra con GCP. Es especialmente útil para el procesamiento en paralelo y es adecuado para tareas como extracción, transformación y carga (ETL), por lo que si necesitamos mover datos de un lugar a otro con transformaciones o cálculos, Beam es una buena opción.


Hay una gran cantidad de herramientas disponibles en GCP, por lo que puede ser difícil abarcarlas todas, incluido su propósito, pero, sin embargo, aquí hay un breve resumen de referencia.

Visualización de nuestro transportador.


Visualicemos los componentes de nuestra tubería en la Figura 1 . A un alto nivel, queremos recopilar datos de usuarios en tiempo real, procesarlos y transferirlos a BigQuery. Los registros se crean cuando los usuarios interactúan con el producto enviando solicitudes al servidor, que luego se registran. Estos datos pueden ser especialmente útiles para comprender cómo interactúan los usuarios con nuestro producto y si funcionan correctamente. En general, el transportador contendrá los siguientes pasos:

  1. Los datos de registro de nuestros usuarios se publican en la sección Pub / Sub.
  2. Nos conectaremos a Pub / Sub y convertiremos los datos al formato apropiado usando Python y Beam (pasos 3 y 4 en la Figura 1).
  3. Después de convertir los datos, Beam se conectará a BigQuery y los agregará a nuestra tabla (pasos 4 y 5 en la Figura 1).
  4. Para el análisis, podemos conectarnos a BigQuery usando varias herramientas como Tableau y Python.

Beam hace que este proceso sea muy simple, independientemente de si tenemos una fuente de transmisión de datos o un archivo CSV, y si queremos hacer un procesamiento por lotes. Más adelante verá que el código contiene solo los cambios mínimos necesarios para cambiar entre ellos. Este es uno de los beneficios de usar Beam.


Figura 1: La tubería de datos principal

Crear pseudodatos utilizando Faker


Como mencioné anteriormente, debido al acceso limitado a los datos, decidí crear pseudodatos en el mismo formato que los reales. Este fue un ejercicio realmente útil, ya que podía escribir código y probar la tubería mientras esperaba datos. Sugiero que eche un vistazo a la documentación de Faker si desea saber qué más tiene para ofrecer esta biblioteca. Nuestros datos de usuario generalmente serán similares al ejemplo a continuación. En base a este formato, podemos generar datos línea por línea para simular datos en tiempo real. Estos registros nos brindan información como la fecha, el tipo de solicitud, la respuesta del servidor, la dirección IP, etc.

192.52.197.161 - - [30/Apr/2019:21:11:42] "PUT /tag/category/tag HTTP/1.1" [401] 155 "https://harris-lopez.com/categories/about/" "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_11_2) AppleWebKit/5312 (KHTML, like Gecko) Chrome/34.0.855.0 Safari/5312"

Con base en la línea anterior, queremos crear nuestra variable LINE usando 7 variables entre llaves a continuación. También los usaremos como nombres de variables en nuestro esquema de tabla un poco más tarde.

LINE = """\
{remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"\
"""


Si realizáramos el procesamiento por lotes, el código sería muy similar, aunque necesitaríamos crear un conjunto de muestras en un cierto rango de tiempo. Para usar un falsificador, simplemente creamos un objeto y llamamos a los métodos que necesitamos. En particular, Faker fue útil para crear direcciones IP y sitios web. Usé los siguientes métodos:

fake.ipv4()
fake.uri_path()
fake.uri()
fake.user_agent()


 from faker import Faker import time import random import os import numpy as np from datetime import datetime, timedelta LINE = """\ {remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"\ """ def generate_log_line(): fake = Faker() now = datetime.now() remote_addr = fake.ipv4() time_local = now.strftime('%d/%b/%Y:%H:%M:%S') request_type = random.choice(["GET", "POST", "PUT"]) request_path = "/" + fake.uri_path() status = np.random.choice([200, 401, 404], p = [0.9, 0.05, 0.05]) body_bytes_sent = random.choice(range(5, 1000, 1)) http_referer = fake.uri() http_user_agent = fake.user_agent() log_line = LINE.format( remote_addr=remote_addr, time_local=time_local, request_type=request_type, request_path=request_path, status=status, body_bytes_sent=body_bytes_sent, http_referer=http_referer, http_user_agent=http_user_agent ) return log_line 


El final de la primera parte.

En los próximos días, compartiremos con ustedes la continuación del artículo, pero ahora estamos tradicionalmente esperando comentarios ;-).

Segunda parte

Source: https://habr.com/ru/post/454186/


All Articles