
Saat ini, saya terlibat dalam tugas streaming (dan mengkonversi) data. Di beberapa kalangan
proses seperti itu dikenal sebagai ETL , mis. ekstraksi, konversi, dan pemuatan informasi.
Seluruh proses mencakup partisipasi layanan Google Cloud Platform berikut:
- Pub / Sub -layanan untuk streaming data realtime
- Dataflow - layanan untuk mengonversi data (bisa
bekerja baik dalam mode waktu nyata maupun dalam mode batch) - BigQuery - layanan untuk menyimpan data dalam bentuk tabel
(mendukung SQL)
0. Status saat ini
Saat ini, ada versi streaming yang berfungsi pada layanan di atas, namun pada
Sebagai templat, salah satu yang standar digunakan .
Masalahnya adalah bahwa templat ini menyediakan transfer data 1 banding 1, mis. pada
di pintu masuk ke Pub / Sub kami memiliki string format JSON, pada output kami memiliki tabel BigQuery dengan bidang,
yang sesuai dengan kunci objek di tingkat atas input JSON.
1. Pernyataan masalah
Buat templat Dataflow yang memungkinkan Anda untuk mendapatkan tabel atau tabel di output
sesuai dengan kondisi yang diberikan. Misalnya, kami ingin membuat tabel terpisah untuk masing-masing
nilai kunci JSON input tertentu. Perlu untuk mempertimbangkan fakta bahwa beberapa
Objek input JSON dapat berisi JSON bersarang sebagai nilai, mis. diperlukan
dapat membuat tabel BigQuery dengan bidang tipe RECORD
untuk menyimpan bersarang
data.
2. Persiapan untuk keputusan
Untuk membuat templat Dataflow, gunakan Apache Beam SDK , yang, pada gilirannya,
mendukung Java dan Python sebagai bahasa pemrograman. Saya harus mengatakan itu
hanya versi Python 2.7.x yang didukung, yang sedikit mengejutkan saya. Apalagi dukungan
Jawa agak lebih luas, karena untuk Python, misalnya, beberapa fungsi tidak tersedia dan banyak lagi
Daftar konektor bawaan yang sederhana . Omong-omong, Anda dapat menulis konektor Anda sendiri.
Namun, karena saya tidak terbiasa dengan Java, saya menggunakan Python.
Sebelum Anda mulai membuat templat, Anda harus memiliki yang berikut ini:
- masukan format JSON dan seharusnya tidak berubah dalam waktu
- skema atau skema tabel BigQuery di mana data akan dialirkan
- jumlah tabel di mana aliran data output akan di-stream
Perhatikan bahwa setelah membuat template dan memulai Pekerjaan Dataflow berdasarkan itu, parameter ini bisa
ubah hanya dengan membuat templat baru.
Katakan saja beberapa kata tentang batasan ini. Mereka semua berasal dari kenyataan bahwa tidak ada kemungkinan
buat template dinamis yang bisa mengambil string apa pun sebagai input, parsing
menurut logika internal dan kemudian mengisi tabel yang dibuat secara dinamis dengan dinamis
dibuat oleh sirkuit. Sangat mungkin bahwa kemungkinan ini ada, tetapi di dalam data
Saya tidak berhasil menerapkan skema seperti itu. Sejauh yang saya mengerti keseluruhan
pipeline dibangun sebelum menjalankannya dalam runtime dan oleh karena itu tidak ada cara untuk mengubahnya
terbang. Mungkin seseorang akan membagikan keputusan mereka.
3. Keputusan
Untuk pemahaman yang lebih lengkap tentang proses, ada baiknya membawa diagram yang disebut pipa
dari dokumentasi Apache Beam.

Dalam kasus kami (kami akan menggunakan divisi menjadi beberapa tabel):
- input - data berasal dari PubSub di Dataflow Job
- Transform # 1 - data dikonversi dari string ke kamus Python, kita mendapatkan output
PCollection # 1 - Transform # 2 - data diberi tag, untuk pemisahan lebih lanjut menurut tabel terpisah, menjadi
outputnya adalah PCollection # 2 (sebenarnya tuple PCollection) - Transform # 3 - data dari PCollection # 2 ditulis ke tabel menggunakan skema
meja
Dalam proses menulis templat saya sendiri, saya secara aktif terinspirasi oleh contoh - contoh ini .
Kode templat dengan komentar (meninggalkan komentar dengan cara yang sama dari penulis sebelumnya): Sekarang kita akan membaca kode dan memberikan penjelasan, tetapi pertama-tama layak untuk mengatakan bahwa utama
kesulitan dalam menulis template ini adalah untuk berpikir dalam hal "aliran data", dan
bukan pesan tertentu. Juga penting untuk memahami bahwa Pub / Sub beroperasi dengan pesan dan
dari merekalah kami akan menerima informasi untuk menandai aliran.
pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True pipeline_options.view_as(StandardOptions).streaming = True
Karena Konektor Apache Beam Pub / Sub IO hanya digunakan dalam mode streaming yang diperlukan
tambahkan PipelineOptions () (walaupun sebenarnya opsi tidak digunakan); jika tidak, buat templat
jatuh dengan pengecualian. Harus dikatakan tentang opsi untuk meluncurkan template. Mereka bisa
statis dan disebut "runtime". Berikut ini tautan ke dokumentasi tentang topik ini. Opsi memungkinkan Anda untuk membuat templat tanpa menentukan parameter di muka, tetapi meneruskannya saat Anda memulai Pekerjaan Dataflow dari templat, tapi saya masih tidak bisa mengimplementasikannya, mungkin karena fakta bahwa konektor ini tidak mendukung RuntimeValueProvider
.
Semuanya jelas dari komentar, kami membaca utas dari topik. Perlu ditambahkan bahwa Anda dapat mengikuti arus
baik dari topik dan dari berlangganan (berlangganan). Jika topik ditentukan sebagai input, maka
langganan sementara untuk topik ini akan dibuat secara otomatis. Sintaksnya juga cantik
jelas, input stream data beam.io.ReadFromPubSub(input_topic)
dikirim ke kami
pipa p
.
Di sinilah Transform # 1 terjadi dan input kami dikonversi dari string python ke
python dict, dan dalam output kita mendapatkan PCollection # 1. >>
muncul di sintaks. Aktif
sebenarnya, teks dalam tanda kutip adalah nama aliran (harus unik), serta komentar,
yang akan ditambahkan ke blok pada grafik di antarmuka web GCP Dataflow. Mari kita pertimbangkan lebih detail
kelas utama TransformToBigQuery
.
class TransformToBigQuery(beam.DoFn):
Variabel element
akan berisi satu pesan dari langganan PubSub. Seperti yang terlihat dari
kode, dalam kasus kami harus JSON yang valid. Harus di dalam kelas
metode process
didefinisikan ulang, di mana transformasi yang diperlukan harus dilakukan
jalur input untuk mendapatkan output yang sesuai dengan rangkaian
tabel tempat data ini akan dimuat. Karena aliran kami dalam hal ini adalah
terus menerus, tidak unbounded
dalam hal Apache Beam, Anda harus mengembalikannya menggunakan
yield
, bukan return
, seperti untuk aliran data akhir. Dalam hal aliran akhir, Anda bisa
(dan perlu) tambahan mengkonfigurasi windowing
dan triggers
Kode ini mengarahkan PCollection # 1 ke Transform # 2 di mana penandaan akan dilakukan
(pemisahan) dari aliran data. Dalam schema_dct
variabel schema_dct
dalam hal ini, kamus, di mana kuncinya adalah nama file skema tanpa ekstensi, ini akan menjadi tag, dan nilainya JSON valid dari skema
Tabel BigQuery untuk tag ini. Perlu dicatat bahwa skema tersebut harus ditransmisikan secara tepat kepada
lihat {'fields': }
mana
adalah skema dari tabel BigQuery dalam bentuk JSON (Anda bisa
ekspor dari antarmuka web).
main='default'
adalah nama dari tag thread yang akan mereka tuju
Semua pesan yang tidak tunduk pada kondisi pemberian tag. Pertimbangkan kelasnya
TagDataWithReqType
.
class TagDataWithReqType(beam.DoFn):
Seperti yang Anda lihat, kelas process
juga diganti di sini. Variabel types
berisi nama
tag dan mereka harus mencocokkan nomor dan nama dengan nomor dan nama kunci kamus
schema_dct
. Meskipun metode process
memiliki kemampuan untuk menerima argumen, saya tidak pernah
Saya bisa melewati mereka. Saya belum menemukan alasannya.
Pada output, kita mendapatkan tupel utas dalam jumlah tag, yaitu jumlah kita
tag yang telah ditentukan + utas default yang gagal untuk menandai.
Transform # ... (pada kenyataannya, ini bukan pada diagram, ini adalah "cabang") - kami menulis aliran default
ke tabel default.
tagged_stream.default
- aliran dengan tag default
diambil, sintaks alternatif adalah tagged_stream['default']
schema=parse_table_schema_from_json(schema_dct.get('default'))
- di sini skema didefinisikan
meja. Harap perhatikan bahwa file default.json
dengan skema tabel BigQuery yang valid
harus ada di schema_dir = './'
.
Aliran akan pergi ke tabel yang disebut default
.
Jika tabel dengan nama ini (dalam dataset yang diberikan proyek ini) tidak ada, maka itu
akan secara otomatis dibuat dari skema berkat pengaturan default
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED
Transform # 3, semuanya harus jelas bagi mereka yang membaca artikel dari awal dan sendiri
sintaksis python Kami memisahkan stream tuple dengan loop dan menulis setiap stream dengan tabelnya sendiri
rencananya. Harus diingat bahwa nama aliran harus unik - '%s:%s.%s' % (gcp_project, bq_dataset, name)
.
Sekarang harus jelas bagaimana ini bekerja dan Anda dapat membuat templat. Untuk ini Anda perlu
jalankan di konsol (jangan lupa untuk mengaktifkan venv jika tersedia) atau dari IDE:
python _.py / --runner DataflowRunner / --project dreamdata-test / --staging_location gs://STORAGE_NAME/STAGING_DIR / --temp_location gs://STORAGE_NAME/TEMP_DIR / --template_location gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME
Pada saat yang sama, akses ke Akun Google harus diatur, misalnya, melalui ekspor
GOOGLE_APPLICATION_CREDENTIALS
lingkungan GOOGLE_APPLICATION_CREDENTIALS
atau cara lain.
Beberapa kata tentang - --runner
. Dalam hal ini, DataflowRunner
mengatakan bahwa kode ini
akan berjalan sebagai templat untuk Pekerjaan Dataflow. Masih mungkin untuk menentukan
DirectRunner
, ini akan digunakan secara default jika tidak ada opsi - --runner
dan kode
akan bekerja sebagai Pekerjaan Dataflow, tetapi secara lokal, yang sangat nyaman untuk debugging.
Jika tidak ada kesalahan terjadi, maka gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME
akan menjadi
template yang dibuat. Layak dikatakan bahwa dalam gs://STORAGE_NAME/STAGING_DIR
juga akan ditulis
file layanan yang diperlukan untuk keberhasilan operasi Datafow Job yang dibuat berdasarkan
templat dan Anda tidak perlu menghapusnya.
Selanjutnya, Anda perlu membuat Pekerjaan Dataflow menggunakan templat ini, secara manual atau oleh siapa saja
dengan cara lain (CI misalnya).
4. Kesimpulan
Jadi, kami berhasil melakukan streaming aliran dari PubSub ke BigQuery menggunakan
diperlukan transformasi data untuk tujuan penyimpanan lebih lanjut, transformasi dan
penggunaan data.
Tautan utama
Dalam artikel ini, ketidakakuratan dan bahkan kesalahan mungkin terjadi, saya akan berterima kasih atas konstruktifnya
kritik. Pada akhirnya, saya ingin menambahkan bahwa pada kenyataannya, tidak semua digunakan di sini
fitur dari Apache Beam SDK, tetapi bukan itu tujuannya.