Halo semuanya. Teman, kami berbagi dengan Anda terjemahan dari artikel yang disiapkan khusus untuk siswa kursus
Insinyur Data . Ayo pergi!

Apache Beam dan DataFlow untuk jaringan pipa real-time
Posting hari ini didasarkan pada tugas yang baru-baru ini saya kerjakan di tempat kerja. Saya sangat senang mengimplementasikannya dan menggambarkan pekerjaan yang dilakukan dalam format posting blog, karena memberi saya kesempatan untuk bekerja di bidang teknik data, serta melakukan sesuatu yang akan sangat berguna bagi tim saya. Belum lama ini, saya menemukan bahwa sistem kami menyimpan sejumlah besar log pengguna yang terkait dengan salah satu produk kami untuk bekerja dengan data. Ternyata tidak ada yang menggunakan data ini, jadi saya langsung tertarik pada apa yang bisa kami ketahui jika kami mulai menganalisisnya secara teratur. Namun, ada beberapa masalah di sepanjang jalan. Masalah pertama adalah bahwa data disimpan dalam banyak file teks berbeda yang tidak tersedia untuk analisis instan. Masalah kedua adalah bahwa mereka disimpan dalam sistem tertutup, jadi saya tidak bisa menggunakan alat analisis data favorit saya.
Saya harus memutuskan bagaimana membuat akses lebih mudah bagi kami dan menambahkan setidaknya beberapa nilai dengan menanamkan sumber data ini di beberapa solusi interaksi pengguna kami. Setelah berpikir sebentar, saya memutuskan untuk membangun saluran pipa untuk mentransfer data ini ke basis data cloud sehingga saya dan tim dapat mengaksesnya dan mulai membuat kesimpulan apa pun. Setelah saya menyelesaikan spesialisasi saya di bidang Rekayasa Data di Coursera beberapa waktu lalu, saya sangat ingin menggunakan beberapa alat kursus dalam proyek ini.
Jadi menempatkan data dalam database cloud sepertinya cara cerdas untuk menyelesaikan masalah pertama saya, tetapi apa yang bisa saya lakukan dengan masalah nomor 2? Untungnya, ada cara untuk mentransfer data ini ke lingkungan di mana saya dapat mengakses alat-alat seperti Python dan Google Cloud Platform (GCP). Namun, itu adalah proses yang panjang, jadi saya perlu melakukan sesuatu yang akan memungkinkan saya untuk melanjutkan pengembangan sementara saya sedang menunggu akhir transfer data. Solusi yang saya buat adalah membuat data palsu menggunakan pustaka
Faker dengan Python. Saya belum pernah menggunakan perpustakaan ini sebelumnya, tetapi segera menyadari betapa bergunanya itu. Menggunakan pendekatan ini memungkinkan saya untuk mulai menulis kode dan menguji pipa tanpa data aktual.
Berdasarkan hal tersebut di atas, dalam posting ini saya akan memberi tahu Anda bagaimana saya membangun pipa yang dijelaskan di atas menggunakan beberapa teknologi yang tersedia di GCP. Secara khusus, saya akan menggunakan
Apache Beam (versi untuk Python), Dataflow, Pub / Sub dan Big Query untuk mengumpulkan log pengguna, mengonversi data dan mentransfernya ke database untuk analisis lebih lanjut. Dalam kasus saya, saya hanya memerlukan fungsionalitas batch Beam, karena data saya tidak tiba secara real time, jadi Pub / Sub tidak diperlukan. Namun, saya akan fokus pada versi streaming, karena inilah yang mungkin Anda temui dalam praktik.
Pengantar GCP dan Apache Beam
Google Cloud Platform menyediakan seperangkat alat yang sangat berguna untuk memproses data besar. Berikut adalah beberapa alat yang akan saya gunakan:
- Pub / Sub adalah layanan pesan menggunakan template Publisher-Subscriber yang memungkinkan kita menerima data secara real time.
- DataFlow adalah layanan yang menyederhanakan pembuatan jalur pipa data dan secara otomatis menyelesaikan tugas-tugas seperti penskalaan infrastruktur, yang berarti bahwa kita hanya dapat fokus pada penulisan kode untuk saluran pipa kami.
- BigQuery adalah gudang data berbasis cloud. Jika Anda terbiasa dengan database SQL lainnya, Anda tidak akan harus berurusan dengan BigQuery lama.
- Dan akhirnya, kita akan menggunakan Apache Beam, yaitu, fokus pada versi Python untuk membuat pipeline kita. Alat ini akan memungkinkan kami untuk membuat saluran pipa untuk streaming atau pemrosesan batch yang terintegrasi dengan GCP. Sangat berguna untuk pemrosesan paralel dan cocok untuk tugas-tugas seperti ekstraksi, transformasi, dan pemuatan (ETL), jadi jika kita perlu memindahkan data dari satu tempat ke tempat lain dengan transformasi atau perhitungan, Beam adalah pilihan yang baik.
Sejumlah besar alat tersedia di GCP, sehingga bisa jadi sulit untuk mencakup semuanya, termasuk tujuannya, namun demikian,
berikut ini ringkasan singkat untuk referensi.
Visualisasi conveyor kami
Mari kita visualisasikan komponen dari pipeline kita pada
Gambar 1 . Pada tingkat tinggi, kami ingin mengumpulkan data pengguna secara real time, memprosesnya dan mentransfernya ke BigQuery. Log dibuat ketika pengguna berinteraksi dengan produk dengan mengirimkan permintaan ke server, yang kemudian dicatat. Data ini dapat sangat berguna untuk memahami bagaimana pengguna berinteraksi dengan produk kami dan apakah mereka berfungsi dengan benar. Secara umum, konveyor akan berisi langkah-langkah berikut:
- Data log pengguna kami dipublikasikan di bagian Pub / Sub.
- Kami akan terhubung ke Pub / Sub dan mengonversi data ke format yang sesuai menggunakan Python dan Beam (langkah 3 dan 4 pada Gambar 1).
- Setelah mengonversi data, Beam kemudian akan terhubung ke BigQuery dan menambahkannya ke tabel kami (langkah 4 dan 5 pada Gambar 1).
- Untuk analisis, kita dapat terhubung ke BigQuery menggunakan berbagai alat seperti Tableau dan Python.
Beam membuat proses ini sangat sederhana, terlepas dari apakah kita memiliki sumber data streaming atau file CSV, dan kami ingin melakukan pemrosesan batch. Kemudian Anda akan melihat bahwa kode hanya berisi perubahan minimum yang diperlukan untuk beralih di antara mereka. Ini adalah salah satu manfaat menggunakan Beam.
Gambar 1: Pipa data utamaMembuat Data Pseudo Menggunakan Faker
Seperti yang saya sebutkan sebelumnya, karena keterbatasan akses ke data, saya memutuskan untuk membuat pseudo-data dalam format yang sama dengan yang sebenarnya. Ini adalah latihan yang sangat berguna, karena saya bisa menulis kode dan menguji pipa sementara saya mengharapkan data. Saya sarankan untuk melihat
dokumentasi Faker jika Anda ingin tahu apa lagi yang ditawarkan perpustakaan ini. Data pengguna kami umumnya akan mirip dengan contoh di bawah ini. Berdasarkan format ini, kita dapat menghasilkan data baris demi baris untuk mensimulasikan data waktu nyata. Log ini memberi kami informasi seperti tanggal, jenis permintaan, respons dari server, alamat IP, dll.
192.52.197.161 - - [30/Apr/2019:21:11:42] "PUT /tag/category/tag HTTP/1.1" [401] 155 "https://harris-lopez.com/categories/about/" "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_11_2) AppleWebKit/5312 (KHTML, like Gecko) Chrome/34.0.855.0 Safari/5312"
Berdasarkan baris di atas, kami ingin membuat variabel
LINE kami menggunakan 7 variabel dalam kurung di bawah ini. Kami juga akan menggunakannya sebagai nama variabel dalam skema tabel kami beberapa saat kemudian.
LINE = """\
{remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"\
"""
Jika kita melakukan pemrosesan batch, kodenya akan sangat mirip, meskipun kita perlu membuat satu set sampel dalam rentang waktu tertentu. Untuk menggunakan pemalsu, kita cukup membuat objek dan memanggil metode yang kita butuhkan. Secara khusus, Faker berguna untuk membuat alamat IP dan juga situs web. Saya menggunakan metode berikut:
fake.ipv4()
fake.uri_path()
fake.uri()
fake.user_agent()
from faker import Faker import time import random import os import numpy as np from datetime import datetime, timedelta LINE = """\ {remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"\ """ def generate_log_line(): fake = Faker() now = datetime.now() remote_addr = fake.ipv4() time_local = now.strftime('%d/%b/%Y:%H:%M:%S') request_type = random.choice(["GET", "POST", "PUT"]) request_path = "/" + fake.uri_path() status = np.random.choice([200, 401, 404], p = [0.9, 0.05, 0.05]) body_bytes_sent = random.choice(range(5, 1000, 1)) http_referer = fake.uri() http_user_agent = fake.user_agent() log_line = LINE.format( remote_addr=remote_addr, time_local=time_local, request_type=request_type, request_path=request_path, status=status, body_bytes_sent=body_bytes_sent, http_referer=http_referer, http_user_agent=http_user_agent ) return log_line
Akhir dari bagian pertama.
Dalam beberapa hari mendatang, kami akan berbagi dengan Anda kelanjutan artikel, tetapi sekarang kami secara tradisional menunggu komentar ;-).
Bagian kedua