Membuat templat Dataflow untuk mengalirkan data dari Pub / Sub ke BigQuery berdasarkan GCP menggunakan Apache Beam SDK dan Python

gambar


Saat ini, saya terlibat dalam tugas streaming (dan mengkonversi) data. Di beberapa kalangan
proses seperti itu dikenal sebagai ETL , mis. ekstraksi, konversi, dan pemuatan informasi.


Seluruh proses mencakup partisipasi layanan Google Cloud Platform berikut:


  • Pub / Sub -layanan untuk streaming data realtime
  • Dataflow - layanan untuk mengonversi data (bisa
    bekerja baik dalam mode waktu nyata maupun dalam mode batch)
  • BigQuery - layanan untuk menyimpan data dalam bentuk tabel
    (mendukung SQL)

0. Status saat ini

Saat ini, ada versi streaming yang berfungsi pada layanan di atas, namun pada
Sebagai templat, salah satu yang standar digunakan .


Masalahnya adalah bahwa templat ini menyediakan transfer data 1 banding 1, mis. pada
di pintu masuk ke Pub / Sub kami memiliki string format JSON, pada output kami memiliki tabel BigQuery dengan bidang,
yang sesuai dengan kunci objek di tingkat atas input JSON.


1. Pernyataan masalah

Buat templat Dataflow yang memungkinkan Anda untuk mendapatkan tabel atau tabel di output
sesuai dengan kondisi yang diberikan. Misalnya, kami ingin membuat tabel terpisah untuk masing-masing
nilai kunci JSON input tertentu. Perlu untuk mempertimbangkan fakta bahwa beberapa
Objek input JSON dapat berisi JSON bersarang sebagai nilai, mis. diperlukan
dapat membuat tabel BigQuery dengan bidang tipe RECORD untuk menyimpan bersarang
data.


2. Persiapan untuk keputusan

Untuk membuat templat Dataflow, gunakan Apache Beam SDK , yang, pada gilirannya,
mendukung Java dan Python sebagai bahasa pemrograman. Saya harus mengatakan itu
hanya versi Python 2.7.x yang didukung, yang sedikit mengejutkan saya. Apalagi dukungan
Jawa agak lebih luas, karena untuk Python, misalnya, beberapa fungsi tidak tersedia dan banyak lagi
Daftar konektor bawaan yang sederhana . Omong-omong, Anda dapat menulis konektor Anda sendiri.


Namun, karena saya tidak terbiasa dengan Java, saya menggunakan Python.


Sebelum Anda mulai membuat templat, Anda harus memiliki yang berikut ini:


  1. masukan format JSON dan seharusnya tidak berubah dalam waktu
  2. skema atau skema tabel BigQuery di mana data akan dialirkan
  3. jumlah tabel di mana aliran data output akan di-stream

Perhatikan bahwa setelah membuat template dan memulai Pekerjaan Dataflow berdasarkan itu, parameter ini bisa
ubah hanya dengan membuat templat baru.


Katakan saja beberapa kata tentang batasan ini. Mereka semua berasal dari kenyataan bahwa tidak ada kemungkinan
buat template dinamis yang bisa mengambil string apa pun sebagai input, parsing
menurut logika internal dan kemudian mengisi tabel yang dibuat secara dinamis dengan dinamis
dibuat oleh sirkuit. Sangat mungkin bahwa kemungkinan ini ada, tetapi di dalam data
Saya tidak berhasil menerapkan skema seperti itu. Sejauh yang saya mengerti keseluruhan
pipeline dibangun sebelum menjalankannya dalam runtime dan oleh karena itu tidak ada cara untuk mengubahnya
terbang. Mungkin seseorang akan membagikan keputusan mereka.


3. Keputusan

Untuk pemahaman yang lebih lengkap tentang proses, ada baiknya membawa diagram yang disebut pipa
dari dokumentasi Apache Beam.


gambar


Dalam kasus kami (kami akan menggunakan divisi menjadi beberapa tabel):


  • input - data berasal dari PubSub di Dataflow Job
  • Transform # 1 - data dikonversi dari string ke kamus Python, kita mendapatkan output
    PCollection # 1
  • Transform # 2 - data diberi tag, untuk pemisahan lebih lanjut menurut tabel terpisah, menjadi
    outputnya adalah PCollection # 2 (sebenarnya tuple PCollection)
  • Transform # 3 - data dari PCollection # 2 ditulis ke tabel menggunakan skema
    meja

Dalam proses menulis templat saya sendiri, saya secara aktif terinspirasi oleh contoh - contoh ini .


Kode templat dengan komentar (meninggalkan komentar dengan cara yang sama dari penulis sebelumnya):
 # coding=utf-8 from __future__ import absolute_import import logging import json import os import apache_beam as beam from apache_beam.pvalue import TaggedOutput from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.io.gcp.bigquery import parse_table_schema_from_json #  GCP  gcp_project = '' #  Pub/Sub  topic_name = '' # Pub/Sub    'projects/_GCP_/topics/_' input_topic = 'projects/%s/topics/%s' % (gcp_project, topic_name) #  BigQuery  bq_dataset = 'segment_eu_test' #       schema_dir = './' class TransformToBigQuery(beam.DoFn): #          ,   # BigQuery IO     python dict def process(self, element, *args, **kwargs): body = json.loads(element) #       ,      # python dict       ,     #   yield body class TagDataWithReqType(beam.DoFn): #      , ..      #     ,       #  with_outputs + default def process(self, element, *args, **kwargs): req_type = element.get('_') types = ( 'type1', 'type2', 'type3', ) if req_type in types: yield TaggedOutput(req_type, element) else: yield element def run(): #       _.json   schema_dir,  #         ()  schema_dct = {} for schema_file in os.listdir(schema_dir): filename_list = schema_file.split('.') if filename_list[-1] == 'json': with open('%s/%s' % (schema_dir, schema_file)) as f: schema_json = f.read() schema_dct[filename_list[0]] = json.dumps({'fields': json.loads(schema_json)}) # We use the save_main_session option because one or more DoFn's in this # workflow rely on global context (eg, a module imported at module level). pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True pipeline_options.view_as(StandardOptions).streaming = True # Read from PubSub into a PCollection. input_stream = p | beam.io.ReadFromPubSub(input_topic) # Transform stream to BigQuery IO format stream_bq = input_stream | 'transform to BigQuery' >> beam.ParDo(TransformToBigQuery()) # Tag stream by schema name tagged_stream = \ stream_bq \ | 'tag data by type' >> beam.ParDo(TagDataWithReqType()). with_outputs(*schema_dct.keys(), main='default') # Stream unidentified data to default table tagged_stream.default | 'push to default table' >> beam.io.WriteToBigQuery( '%s:%s.default' % ( gcp_project, bq_dataset, ), schema=parse_table_schema_from_json(schema_dct.get('default')), ) # Stream data to BigQuery tables by number of schema names for name, schema in schema_dct.iteritems(): tagged_stream[name] | 'push to table %s' % name >> beam.io.WriteToBigQuery( '%s:%s.%s' % ( gcp_project, bq_dataset, name), schema=parse_table_schema_from_json(schema), ) result = p.run() result.wait_until_finish() if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) logger = logging.getLogger(__name__) run() 

Sekarang kita akan membaca kode dan memberikan penjelasan, tetapi pertama-tama layak untuk mengatakan bahwa utama
kesulitan dalam menulis template ini adalah untuk berpikir dalam hal "aliran data", dan
bukan pesan tertentu. Juga penting untuk memahami bahwa Pub / Sub beroperasi dengan pesan dan
dari merekalah kami akan menerima informasi untuk menandai aliran.


 pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True pipeline_options.view_as(StandardOptions).streaming = True 

Karena Konektor Apache Beam Pub / Sub IO hanya digunakan dalam mode streaming yang diperlukan
tambahkan PipelineOptions () (walaupun sebenarnya opsi tidak digunakan); jika tidak, buat templat
jatuh dengan pengecualian. Harus dikatakan tentang opsi untuk meluncurkan template. Mereka bisa
statis dan disebut "runtime". Berikut ini tautan ke dokumentasi tentang topik ini. Opsi memungkinkan Anda untuk membuat templat tanpa menentukan parameter di muka, tetapi meneruskannya saat Anda memulai Pekerjaan Dataflow dari templat, tapi saya masih tidak bisa mengimplementasikannya, mungkin karena fakta bahwa konektor ini tidak mendukung RuntimeValueProvider .


 # Read from PubSub into a PCollection. input_stream = p | beam.io.ReadFromPubSub(input_topic) 

Semuanya jelas dari komentar, kami membaca utas dari topik. Perlu ditambahkan bahwa Anda dapat mengikuti arus
baik dari topik dan dari berlangganan (berlangganan). Jika topik ditentukan sebagai input, maka
langganan sementara untuk topik ini akan dibuat secara otomatis. Sintaksnya juga cantik
jelas, input stream data beam.io.ReadFromPubSub(input_topic) dikirim ke kami
pipa p .


 # Transform stream to BigQuery IO format stream_bq = input_stream | 'transform to BigQuery' >> beam.ParDo(TransformToBigQuery()) 

Di sinilah Transform # 1 terjadi dan input kami dikonversi dari string python ke
python dict, dan dalam output kita mendapatkan PCollection # 1. >> muncul di sintaks. Aktif
sebenarnya, teks dalam tanda kutip adalah nama aliran (harus unik), serta komentar,
yang akan ditambahkan ke blok pada grafik di antarmuka web GCP Dataflow. Mari kita pertimbangkan lebih detail
kelas utama TransformToBigQuery .


 class TransformToBigQuery(beam.DoFn): #          ,   # BigQuery IO     python dict def process(self, element, *args, **kwargs): body = json.loads(element) #       ,      # python dict       ,     #  ,      python dict yield body 

Variabel element akan berisi satu pesan dari langganan PubSub. Seperti yang terlihat dari
kode, dalam kasus kami harus JSON yang valid. Harus di dalam kelas
metode process didefinisikan ulang, di mana transformasi yang diperlukan harus dilakukan
jalur input untuk mendapatkan output yang sesuai dengan rangkaian
tabel tempat data ini akan dimuat. Karena aliran kami dalam hal ini adalah
terus menerus, tidak unbounded dalam hal Apache Beam, Anda harus mengembalikannya menggunakan
yield , bukan return , seperti untuk aliran data akhir. Dalam hal aliran akhir, Anda bisa
(dan perlu) tambahan mengkonfigurasi windowing dan triggers


 # Tag stream by schema name tagged_stream = \ stream_bq \ | 'tag data by type' >> beam.ParDo(TagDataWithReqType()).with_outputs(*schema_dct.keys(), main='default') 

Kode ini mengarahkan PCollection # 1 ke Transform # 2 di mana penandaan akan dilakukan
(pemisahan) dari aliran data. Dalam schema_dct variabel schema_dct dalam hal ini, kamus, di mana kuncinya adalah nama file skema tanpa ekstensi, ini akan menjadi tag, dan nilainya JSON valid dari skema
Tabel BigQuery untuk tag ini. Perlu dicatat bahwa skema tersebut harus ditransmisikan secara tepat kepada
lihat {'fields': } mana adalah skema dari tabel BigQuery dalam bentuk JSON (Anda bisa
ekspor dari antarmuka web).


main='default' adalah nama dari tag thread yang akan mereka tuju
Semua pesan yang tidak tunduk pada kondisi pemberian tag. Pertimbangkan kelasnya
TagDataWithReqType .


 class TagDataWithReqType(beam.DoFn): #      , ..      #     ,       #  with_outputs + default def process(self, element, *args, **kwargs): req_type = element.get('_') types = ( 'type1', 'type2', 'type3', ) if req_type in types: yield TaggedOutput(req_type, element) else: yield element 

Seperti yang Anda lihat, kelas process juga diganti di sini. Variabel types berisi nama
tag dan mereka harus mencocokkan nomor dan nama dengan nomor dan nama kunci kamus
schema_dct . Meskipun metode process memiliki kemampuan untuk menerima argumen, saya tidak pernah
Saya bisa melewati mereka. Saya belum menemukan alasannya.


Pada output, kita mendapatkan tupel utas dalam jumlah tag, yaitu jumlah kita
tag yang telah ditentukan + utas default yang gagal untuk menandai.


 # Stream unidentified data to default table tagged_stream.default | 'push to default table' >> beam.io.WriteToBigQuery( '%s:%s.default' % ( gcp_project, bq_dataset, ), schema=parse_table_schema_from_json(schema_dct.get('default')), ) 

Transform # ... (pada kenyataannya, ini bukan pada diagram, ini adalah "cabang") - kami menulis aliran default
ke tabel default.


tagged_stream.default - aliran dengan tag default diambil, sintaks alternatif adalah tagged_stream['default']


schema=parse_table_schema_from_json(schema_dct.get('default')) - di sini skema didefinisikan
meja. Harap perhatikan bahwa file default.json dengan skema tabel BigQuery yang valid
harus ada di schema_dir = './' .


Aliran akan pergi ke tabel yang disebut default .


Jika tabel dengan nama ini (dalam dataset yang diberikan proyek ini) tidak ada, maka itu
akan secara otomatis dibuat dari skema berkat pengaturan default
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED


 # Stream data to BigQuery tables by number of schema names for name, schema in schema_dct.iteritems(): tagged_stream[name] | 'push to table %s' % name >> beam.io.WriteToBigQuery( '%s:%s.%s' % ( gcp_project, bq_dataset, name), schema=parse_table_schema_from_json(schema), ) 

Transform # 3, semuanya harus jelas bagi mereka yang membaca artikel dari awal dan sendiri
sintaksis python Kami memisahkan stream tuple dengan loop dan menulis setiap stream dengan tabelnya sendiri
rencananya. Harus diingat bahwa nama aliran harus unik - '%s:%s.%s' % (gcp_project, bq_dataset, name) .


Sekarang harus jelas bagaimana ini bekerja dan Anda dapat membuat templat. Untuk ini Anda perlu
jalankan di konsol (jangan lupa untuk mengaktifkan venv jika tersedia) atau dari IDE:


 python _.py / --runner DataflowRunner / --project dreamdata-test / --staging_location gs://STORAGE_NAME/STAGING_DIR / --temp_location gs://STORAGE_NAME/TEMP_DIR / --template_location gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME 

Pada saat yang sama, akses ke Akun Google harus diatur, misalnya, melalui ekspor
GOOGLE_APPLICATION_CREDENTIALS lingkungan GOOGLE_APPLICATION_CREDENTIALS atau cara lain.


Beberapa kata tentang - --runner . Dalam hal ini, DataflowRunner mengatakan bahwa kode ini
akan berjalan sebagai templat untuk Pekerjaan Dataflow. Masih mungkin untuk menentukan
DirectRunner , ini akan digunakan secara default jika tidak ada opsi - --runner dan kode
akan bekerja sebagai Pekerjaan Dataflow, tetapi secara lokal, yang sangat nyaman untuk debugging.


Jika tidak ada kesalahan terjadi, maka gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME akan menjadi
template yang dibuat. Layak dikatakan bahwa dalam gs://STORAGE_NAME/STAGING_DIR juga akan ditulis
file layanan yang diperlukan untuk keberhasilan operasi Datafow Job yang dibuat berdasarkan
templat dan Anda tidak perlu menghapusnya.


Selanjutnya, Anda perlu membuat Pekerjaan Dataflow menggunakan templat ini, secara manual atau oleh siapa saja
dengan cara lain (CI misalnya).


4. Kesimpulan

Jadi, kami berhasil melakukan streaming aliran dari PubSub ke BigQuery menggunakan
diperlukan transformasi data untuk tujuan penyimpanan lebih lanjut, transformasi dan
penggunaan data.


Tautan utama



Dalam artikel ini, ketidakakuratan dan bahkan kesalahan mungkin terjadi, saya akan berterima kasih atas konstruktifnya
kritik. Pada akhirnya, saya ingin menambahkan bahwa pada kenyataannya, tidak semua digunakan di sini
fitur dari Apache Beam SDK, tetapi bukan itu tujuannya.

Source: https://habr.com/ru/post/id441892/


All Articles