
Nama saya Artyom, saya bekerja di Rambler Group dalam proyek "Stream" di posisi Go lead developer.
Kami menghabiskan banyak waktu menjinakkan bin mysql. Artikel ini adalah kisah tentang cara cepat dan dengan jumlah jebakan minimum menerapkan mekanisme untuk bekerja dengan Go binlogs.
Mengapa kita membutuhkan ini?
Di bawah kap Stream ada modul yang sangat dimuat, di mana setiap permintaan ke database menjauhkan pengguna dari menerima hasilnya. Caching adalah solusi yang baik, tetapi kapan harus membersihkan cache? Biarkan data itu sendiri memberi tahu kami bahwa mereka telah diperbarui.
Dalam mysql ada replikasi master-slave. Daemon kami dapat berpura-pura menjadi budak dan menerima data melalui binlog. Binlog harus dikonfigurasi dalam format baris. Ini berisi semua perintah perubahan basis data, perintah di bawah transaksi dieksekusi hanya setelah komit. Setelah mencapai ukuran maksimum yang diizinkan (1 pertunjukan secara default), file berikut dibuat. Setiap file baru memiliki nomor seri setelah namanya.
Sedikit info lebih lanjut di
sini atau di
sini .
Artikel ini memiliki dua bagian:
1. Cara cepat mulai memproses entri yang diterima dalam log.
2. Cara menyesuaikan dan memperluas apa yang ada di bawah tenda.
Bagian 1. Kami mulai sesegera mungkin.
Untuk bekerja dengan binlog, kami akan menggunakan pustaka
github.com/siddontang/go-mysqlSambungkan ke saluran baru (untuk bekerja dengan saluran,
diperlukan format ROW untuk binlog ).
func binLogListener() { c, err := getDefaultCanal() if err == nil { coords, err := c.GetMasterPos() if err == nil { c.SetEventHandler(&binlogHandler{}) c.RunFrom(coords) } } } func getDefaultCanal() (*canal.Canal, error) { cfg := canal.NewDefaultConfig() cfg.Addr = fmt.Sprintf("%s:%d", "127.0.0.1", 3306) cfg.User = "root" cfg.Password = "root" cfg.Flavor = "mysql" cfg.Dump.ExecutionPath = "" return canal.NewCanal(cfg) }
Buat pembungkus di atas binlog:
type binlogHandler struct { canal.DummyEventHandler // BinlogParser // } func (h *binlogHandler) OnRow(e *canal.RowsEvent) error {return nil} func (h *binlogHandler) String() string {return "binlogHandler"}
BinlogparserKami akan memperluas logika bekerja dengan baris binlog yang dihasilkan dengan menambahkan logika ke metode OnRow ().
func (h *binlogHandler) OnRow(e *canal.RowsEvent) error { var n int // var k int // switch e.Action { case canal.DeleteAction: return nil // case canal.UpdateAction: n = 1 k = 2 case canal.InsertAction: n = 0 k = 1 } for i := n; i < len(e.Rows); i += k { key := e.Table.Schema + "." + e.Table.Name switch key { case User{}.SchemaName() + "." + User{}.TableName(): } } return nil }
Inti dari pembungkus ini adalah mem-parsing data yang diterima. Data datang kepada kami dalam dua catatan untuk memperbarui baris (baris pertama akan berisi data asli, yang kedua diperbarui). Di sini kami juga mempertimbangkan kemungkinan multi-sisipan dan multi-pembaruan. Dalam hal ini, kita harus mengambil setiap detik catatan untuk UPDATE. Untuk ini, dalam contoh di atas, kami memasukkan n dan k.
Mari kita membuat model untuk menerima data dari binlog. Di dalamnya kita akan membaca data dari baris yang diterima. Dalam anotasi, kami menunjukkan nama kolom:
type User struct { Id int `gorm:"column:id"` Name string `gorm:"column:name"` Status string `gorm:"column:status"` Created time.Time `gorm:"column:created"` } func (User) TableName() string { return "User" } func (User) SchemaName() string { return "Test" }
Struktur tabel dalam MYSQL:
CREATE TABLE Test.User ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(40) NULL , status ENUM("active","deleted") DEFAULT "active", created TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP ) ENGINE =InnoDB;
Mari kita analisis parsing itu sendiri - tambahkan ke tempat persiapan untuk parsing data:
user := User{} h.GetBinLogData(&user, e, i)
Intinya, ini sudah cukup - kita akan memiliki data catatan baru dalam model pengguna, tetapi untuk kejelasan, kita akan menampilkannya:
if e.Action == canal.UpdateAction { oldUser := User{} h.GetBinLogData(&oldUser, e, i-1) fmt.Printf("User %d is updated from name %s to name %s\n", user.Id, oldUser.Name, user.Name, ) } else { fmt.Printf("User %d is created with name %s\n", user.Id, user.Name, ) }
Poin utama yang kami perjuangkan adalah untuk meluncurkan "Hello binlog world" kami:
func main() { go binLogListener()
Selanjutnya, tambahkan dan perbarui nilainya:
INSERT INTO Test.User (`id`,`name`) VALUE (1,"Jack"); UPDATE Test.User SET name="Jonh" WHERE id=1;
Kita akan melihat:
User 1 is created with name Jack User 1 name changed from Jack to Jonh
Kode yang dihasilkan bekerja dengan binlog dan mem-parsing baris baru. Saat menerima catatan dari tabel yang kita butuhkan, kode membaca data ke dalam struktur dan menampilkan hasilnya. Di belakang layar adalah parser data (BinlogParser), yang mengisi model.
Bagian 2. Seperti yang dikatakan Cobb, kita perlu level yang lebih rendah
Pertimbangkan pekerjaan internal pengurai, yang didasarkan pada refleksi.
Untuk mengisi model dengan data, kami menggunakan metode handler:
h.GetBinLogData(&user, e, i)
Ini mem-parsing tipe data sederhana:
bool int float64 string time.Time
dan dapat mem-parsing struktur kompleks dari json.
Jika tipe yang didukung tidak cukup untuk Anda, atau Anda hanya ingin memahami cara kerja parsing binlog, maka Anda bisa berlatih menambahkan tipe Anda sendiri.
Pertama, pertimbangkan cara mengisi data untuk bidang model menggunakan contoh bidang Id dari tipe int:
type User struct { Id int `gorm:"column:id"` }
Melalui refleksi kita mendapatkan nama tipe. Metode parseTagSetting mengubah anotasi menjadi struktur yang lebih nyaman:
element := User{} // , v := reflect.ValueOf(element) s := reflect.Indirect(v) t := s.Type() num := t.NumField() parsedTag := parseTagSetting(t.Field(k).Tag) if columnName, ok = parsedTag["COLUMN"]; !ok || columnName == "COLUMN" { continue } for k := 0; k < num; k++ { name := s.Field(k).Type().Name() switch name { case "int": // } }
Setelah menerima tipe int, Anda dapat menetapkan nilainya melalui metode refleksi:
func (v Value) SetInt(x int64) {
Metode untuk menguraikan anotasi:
func parseTagSetting(tags reflect.StructTag) map[string]string { setting := map[string]string{} for _, str := range []string{tags.Get("sql"), tags.Get("gorm")} { tags := strings.Split(str, ";") for _, value := range tags { v := strings.Split(value, ":") k := strings.TrimSpace(strings.ToUpper(v[0])) if len(v) >= 2 { setting[k] = strings.Join(v[1:], ":") } else { setting[k] = k } } } return setting }
Ia menerima int64 sebagai input. Mari kita buat metode yang menerjemahkan data yang diterima dari binlog ke int64:
func (m *BinlogParser) intHelper(e *canal.RowsEvent, n int, columnName string) int64 { columnId := m.getBinlogIdByName(e, columnName) if e.Table.Columns[columnId].Type != schema.TYPE_NUMBER { return 0 } switch e.Rows[n][columnId].(type) { case int8: return int64(e.Rows[n][columnId].(int8)) case int32: return int64(e.Rows[n][columnId].(int32)) case int64: return e.Rows[n][columnId].(int64) case int: return int64(e.Rows[n][columnId].(int)) case uint8: return int64(e.Rows[n][columnId].(uint8)) case uint16: return int64(e.Rows[n][columnId].(uint16)) case uint32: return int64(e.Rows[n][columnId].(uint32)) case uint64: return int64(e.Rows[n][columnId].(uint64)) case uint: return int64(e.Rows[n][columnId].(uint)) } return 0 }
Semuanya terlihat logis kecuali untuk metode getBinlogIdByName ().
Helper sepele ini diperlukan untuk bekerja dengan nama kolom dan bukan nomor seri mereka, yang memungkinkan Anda untuk:
- mengambil nama kolom dari penjelasan gorm;
- tidak perlu mengedit ketika menambahkan kolom ke awal atau tengah;
- Dangkal lebih nyaman untuk bekerja dengan bidang nama daripada dengan nomor kolom 3.
Sebagai hasilnya, kami menambahkan handler itu sendiri:
s.Field(k).SetInt(m.intHelper(e, n, columnName))
Mari kita lihat dua contoh lagi.ENUM: di sini nilai datang sebagai indeks - yaitu, status "aktif" akan datang sebagai 1. Dalam kebanyakan kasus, kita memerlukan representasi string enum. Itu bisa diperoleh dari deskripsi lapangan. Saat parsing nilai enum, ia mulai dari 1, tetapi array dari nilai yang mungkin dimulai dari 0.
Seorang pawang Enum mungkin terlihat seperti ini:
func (m *BinlogParser) stringHelper(e *canal.RowsEvent, n int, columnName string) string { columnId := m.getBinlogIdByName(e, columnName) if e.Table.Columns[columnId].Type == schema.TYPE_ENUM { values := e.Table.Columns[columnId].EnumValues // if len(values) == 0 || e.Rows[n][columnId] == nil {{ return "" } return values[e.Rows[n][columnId].(int64)-1] // 0 }
Saya ingin menyimpan JSONIde bagus kenapa tidak. JSON dalam hal mysql adalah string. Penting untuk menunjukkan bahwa data ini adalah serial - untuk ini kami akan menambahkan penjelasan noncanonical "fromJson" ke gorm.
Bayangkan bahwa struktur seperti itu harus dipertimbangkan:
type JsonData struct { Int int `gorm:"column:int"` StructData TestData `gorm:"column:struct_data;fromJson"` MapData map[string]string `gorm:"column:map_data;fromJson"` SliceData []int `gorm:"column:slice_data;fromJson"` } type TestData struct { Test string `json:"test"` Int int `json:"int"` }
Anda dapat menulis banyak syarat dan mungkin berhasil. Tetapi setiap tipe data baru akan membunuh semua upaya. Meskipun upaya untuk menemukan jawaban untuk stackoverflow - "cara membawa dan menghapus jenis struktur yang tidak dikenal" dimulai dengan frasa: "Tidak jelas mengapa Anda memerlukan ini, tetapi coba ...".
Setelah memasukkan tipe yang diinginkan ke antarmuka, kita dapat melakukan ini:
if _, ok := parsedTag["FROMJSON"]; ok { newObject := reflect.New(s.Field(k).Type()).Interface() json := m.stringHelper(e, n, columnName) jsoniter.Unmarshal([]byte(json), &newObject) s.Field(k).Set(reflect.ValueOf(newObject).Elem().Convert(s.Field(k).Type())) }
Jika Anda memiliki pertanyaan tentang tipe data, Anda dapat melihat
tes atau bertanya di komentar.
Apa yang terjadi pada akhirnya .