Créez un pipeline pour le traitement en continu des données. Partie 1

Bonjour à tous. Amis, nous partageons avec vous la traduction d'un article préparé spécialement pour les étudiants du cours Data Engineer . C'est parti!



Apache Beam et DataFlow pour les pipelines en temps réel


Le post d'aujourd'hui est basé sur une tâche sur laquelle j'ai récemment travaillé au travail. J'étais vraiment heureux de le mettre en œuvre et de décrire le travail effectué sous la forme d'un article de blog, car cela m'a donné l'opportunité de travailler sur l'ingénierie des données, ainsi que de faire quelque chose qui serait très utile pour mon équipe. Il n'y a pas si longtemps, j'ai découvert que nos systèmes stockaient une quantité assez importante de journaux d'utilisateurs liés à l'un de nos produits pour travailler avec des données. Il s'est avéré que personne n'avait utilisé ces données, alors je me suis immédiatement intéressé à ce que nous pourrions savoir si nous commencions à les analyser régulièrement. Cependant, il y a eu plusieurs problèmes en cours de route. Le premier problème était que les données étaient stockées dans de nombreux fichiers texte différents qui n'étaient pas disponibles pour une analyse instantanée. Le deuxième problème était qu'ils étaient stockés dans un système fermé, donc je ne pouvais utiliser aucun de mes outils d'analyse de données préférés.

J'ai dû décider comment nous faciliter l'accès et ajouter au moins une certaine valeur en incorporant cette source de données dans certaines de nos solutions d'interaction avec les utilisateurs. Après avoir réfléchi un moment, j'ai décidé de construire un pipeline pour transférer ces données vers la base de données cloud afin que moi et l'équipe puissions y accéder et commencer à générer des conclusions. Après avoir terminé ma spécialisation en génie des données à Coursera il y a quelque temps, j'étais impatient d'utiliser certains des outils de cours du projet.

Donc, mettre des données dans une base de données cloud semblait être un moyen intelligent de résoudre mon premier problème, mais que pouvais-je faire avec le problème numéro 2? Heureusement, il y avait un moyen de transférer ces données vers un environnement où je pouvais accéder à des outils comme Python et Google Cloud Platform (GCP). Cependant, c'était un long processus, donc je devais faire quelque chose qui me permettrait de continuer le développement pendant que j'attendais la fin du transfert de données. La solution que j'ai trouvée était de créer de fausses données en utilisant la bibliothèque Faker en Python. Je n'avais jamais utilisé cette bibliothèque auparavant, mais je me suis vite rendu compte de son utilité. Cette approche m'a permis de commencer à écrire du code et à tester le pipeline sans données réelles.

Sur la base de ce qui précède, dans ce post, je vais vous expliquer comment j'ai construit le pipeline décrit ci-dessus en utilisant certaines des technologies disponibles dans GCP. En particulier, j'utiliserai Apache Beam (version pour Python), Dataflow, Pub / Sub et Big Query pour collecter les journaux utilisateur, convertir les données et les transférer dans une base de données pour une analyse plus approfondie. Dans mon cas, je n'avais besoin que de la fonctionnalité batch de Beam, car mes données n'arrivaient pas en temps réel, donc Pub / Sub n'était pas requis. Cependant, je me concentrerai sur la version en streaming, car c'est ce que vous pouvez rencontrer dans la pratique.

Introduction à GCP et Apache Beam


La plateforme Google Cloud fournit un ensemble d'outils vraiment utiles pour le traitement des mégadonnées. Voici quelques-uns des outils que j'utiliserai:

  • Pub / Sub est un service de messagerie utilisant le modèle Publisher-Subscriber qui nous permet de recevoir des données en temps réel.
  • DataFlow est un service qui simplifie la création de pipelines de données et résout automatiquement des tâches telles que la mise à l'échelle de l'infrastructure, ce qui signifie que nous ne pouvons nous concentrer que sur l'écriture de code pour notre pipeline.
  • BigQuery est un entrepôt de données basé sur le cloud. Si vous connaissez d'autres bases de données SQL, vous n'aurez pas à traiter avec BigQuery pendant longtemps.
  • Et enfin, nous utiliserons Apache Beam, à savoir nous concentrer sur la version Python pour créer notre pipeline. Cet outil nous permettra de créer un pipeline de streaming ou de traitement par lots qui s'intègre à GCP. Il est particulièrement utile pour le traitement parallèle et convient à des tâches telles que l'extraction, la transformation et le chargement (ETL), donc si nous devons déplacer des données d'un endroit à un autre avec des transformations ou des calculs, Beam est un bon choix.


Un grand nombre d'outils sont disponibles dans GCP, il peut donc être difficile de les couvrir tous, y compris leur objectif, mais néanmoins, voici un bref résumé pour référence.

Visualisation de notre convoyeur


Visualisons les composants de notre pipeline dans la figure 1 . À un niveau élevé, nous voulons collecter les données des utilisateurs en temps réel, les traiter et les transférer vers BigQuery. Les journaux sont créés lorsque les utilisateurs interagissent avec le produit en envoyant des demandes au serveur, qui sont ensuite enregistrées. Ces données peuvent être particulièrement utiles pour comprendre comment les utilisateurs interagissent avec notre produit et s'ils fonctionnent correctement. En général, le convoyeur contiendra les étapes suivantes:

  1. Les données du journal de nos utilisateurs sont publiées dans la section Pub / Sub.
  2. Nous allons nous connecter à Pub / Sub et convertir les données au format approprié en utilisant Python et Beam (étapes 3 et 4 de la figure 1).
  3. Après avoir converti les données, Beam se connectera ensuite à BigQuery et les ajoutera à notre table (étapes 4 et 5 de la figure 1).
  4. Pour l'analyse, nous pouvons nous connecter à BigQuery à l'aide de divers outils tels que Tableau et Python.

Beam rend ce processus très simple, que nous ayons une source de données en streaming ou un fichier CSV, et que nous souhaitons effectuer un traitement par lots. Plus tard, vous verrez que le code ne contient que les modifications minimales nécessaires pour basculer entre elles. C'est l'un des avantages de l'utilisation de Beam.


Figure 1: Le principal pipeline de données

Création de pseudo-données à l'aide de Faker


Comme je l'ai mentionné plus tôt, en raison d'un accès limité aux données, j'ai décidé de créer des pseudo-données dans le même format que les vraies. C'était un exercice vraiment utile, car je pouvais écrire du code et tester le pipeline pendant que j'attendais des données. Je suggère de jeter un œil à la documentation de Faker si vous voulez savoir ce que cette bibliothèque a à offrir. Nos données utilisateur seront généralement similaires à l'exemple ci-dessous. Sur la base de ce format, nous pouvons générer des données ligne par ligne pour simuler des données en temps réel. Ces journaux nous fournissent des informations telles que la date, le type de demande, la réponse du serveur, l'adresse IP, etc.

192.52.197.161 - - [30/Apr/2019:21:11:42] "PUT /tag/category/tag HTTP/1.1" [401] 155 "https://harris-lopez.com/categories/about/" "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_11_2) AppleWebKit/5312 (KHTML, like Gecko) Chrome/34.0.855.0 Safari/5312"

Sur la base de la ligne ci-dessus, nous voulons créer notre variable LINE en utilisant 7 variables entre accolades ci-dessous. Nous les utiliserons également comme noms de variables dans notre schéma de table un peu plus tard.

LINE = """\
{remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"\
"""


Si nous devions effectuer un traitement par lots, le code serait très similaire, bien que nous devions créer un ensemble d'échantillons dans une certaine plage de temps. Pour utiliser un faker, nous créons simplement un objet et appelons les méthodes dont nous avons besoin. En particulier, Faker était utile pour créer des adresses IP ainsi que des sites Web. J'ai utilisé les méthodes suivantes:

fake.ipv4()
fake.uri_path()
fake.uri()
fake.user_agent()


 from faker import Faker import time import random import os import numpy as np from datetime import datetime, timedelta LINE = """\ {remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"\ """ def generate_log_line(): fake = Faker() now = datetime.now() remote_addr = fake.ipv4() time_local = now.strftime('%d/%b/%Y:%H:%M:%S') request_type = random.choice(["GET", "POST", "PUT"]) request_path = "/" + fake.uri_path() status = np.random.choice([200, 401, 404], p = [0.9, 0.05, 0.05]) body_bytes_sent = random.choice(range(5, 1000, 1)) http_referer = fake.uri() http_user_agent = fake.user_agent() log_line = LINE.format( remote_addr=remote_addr, time_local=time_local, request_type=request_type, request_path=request_path, status=status, body_bytes_sent=body_bytes_sent, http_referer=http_referer, http_user_agent=http_user_agent ) return log_line 


La fin de la première partie.

Dans les prochains jours, nous partagerons avec vous la suite de l'article, mais maintenant nous attendons traditionnellement des commentaires ;-).

Deuxième partie

Source: https://habr.com/ru/post/fr454186/


All Articles