Wie man MySQL Binlog mit go kocht



Mein Name ist Artyom, ich arbeite in der Rambler Group im Projekt "Stream" als Go Lead Developer.
Wir haben viel Zeit damit verbracht, MySQL-Binlog zu zähmen. In diesem Artikel geht es darum, wie Sie schnell und mit einer minimalen Anzahl von Fallstricken den Mechanismus für die Arbeit mit Go-Binlogs implementieren können.

Warum brauchen wir das?


Unter der Haube des Streams befinden sich hoch geladene Module, bei denen jede Abfrage an die Datenbank den Benutzer vom Empfang des Ergebnisses abhält. Caching ist eine gute Lösung, aber wann muss der Cache geleert werden? Lassen Sie sich von den Daten selbst mitteilen, dass sie aktualisiert wurden.

In MySQL gibt es so etwas wie eine Master-Slave-Replikation. Unser Daemon kann sich als Slave ausgeben und Daten über binlog empfangen. Binlog muss im Zeilenformat konfiguriert werden. Es enthält alle Datenbankänderungsbefehle. Die Befehle unter der Transaktion werden erst nach dem Festschreiben ausgeführt. Bei Erreichen der maximal zulässigen Größe (standardmäßig 1 Gig) wird die folgende Datei erstellt. Jede neue Datei hat eine Seriennummer nach dem Namen.

Ein bisschen mehr Infos hier oder hier .

Der Artikel besteht aus zwei Teilen:

1. So starten Sie schnell die Verarbeitung der im Protokoll empfangenen Einträge.
2. So passen Sie an, was sich unter der Haube befindet.

Teil 1. Wir beginnen so schnell wie möglich.


Um mit binlog zu arbeiten, verwenden wir die Bibliothek github.com/siddontang/go-mysql
Stellen Sie eine Verbindung zum neuen Kanal her (um mit Kanälen arbeiten zu können, ist das ROW-Format für binlog erforderlich ).

func binLogListener() { c, err := getDefaultCanal() if err == nil { coords, err := c.GetMasterPos() if err == nil { c.SetEventHandler(&binlogHandler{}) c.RunFrom(coords) } } } func getDefaultCanal() (*canal.Canal, error) { cfg := canal.NewDefaultConfig() cfg.Addr = fmt.Sprintf("%s:%d", "127.0.0.1", 3306) cfg.User = "root" cfg.Password = "root" cfg.Flavor = "mysql" cfg.Dump.ExecutionPath = "" return canal.NewCanal(cfg) } 

Erstellen Sie einen Wrapper über dem Binlog:

 type binlogHandler struct { canal.DummyEventHandler //     BinlogParser //      } func (h *binlogHandler) OnRow(e *canal.RowsEvent) error {return nil} func (h *binlogHandler) String() string {return "binlogHandler"} 

Binlogparser

Wir werden die Logik der Arbeit mit der resultierenden binlog-Zeile erweitern, indem wir der OnRow () -Methode Logik hinzufügen.

 func (h *binlogHandler) OnRow(e *canal.RowsEvent) error { var n int //  var k int //  switch e.Action { case canal.DeleteAction: return nil //    case canal.UpdateAction: n = 1 k = 2 case canal.InsertAction: n = 0 k = 1 } for i := n; i < len(e.Rows); i += k { key := e.Table.Schema + "." + e.Table.Name switch key { case User{}.SchemaName() + "." + User{}.TableName(): /*    */ } } return nil } 

Die Essenz dieses Wrappers besteht darin, die empfangenen Daten zu analysieren. Die Daten kommen in zwei Datensätzen zu uns, um die Zeile zu aktualisieren (die erste Zeile enthält die Originaldaten, die zweite - aktualisiert). Hier betrachten wir auch die Möglichkeit von Multi-Inserts und Multi-Updates. In diesem Fall müssen wir jeden zweiten Datensatz für UPDATE erstellen. Dafür haben wir im obigen Beispiel n und k eingegeben.

Erstellen wir ein Modell für den Empfang von Daten aus binlog. Darin werden wir Daten aus den empfangenen Zeilen lesen. In den Anmerkungen geben wir die Namen der Spalten an:

 type User struct { Id int `gorm:"column:id"` Name string `gorm:"column:name"` Status string `gorm:"column:status"` Created time.Time `gorm:"column:created"` } func (User) TableName() string { return "User" } func (User) SchemaName() string { return "Test" } 

Tabellenstruktur in MYSQL:

 CREATE TABLE Test.User ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(40) NULL , status ENUM("active","deleted") DEFAULT "active", created TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP ) ENGINE =InnoDB; 

Lassen Sie uns die Analyse selbst analysieren - fügen Sie den Ort der Vorbereitung für die Datenanalyse hinzu:

 user := User{} h.GetBinLogData(&user, e, i) 

Im Wesentlichen reicht dies aus - wir werden die Daten des neuen Datensatzes im Benutzermodell haben, aber aus Gründen der Übersichtlichkeit werden wir sie anzeigen:

  if e.Action == canal.UpdateAction { oldUser := User{} h.GetBinLogData(&oldUser, e, i-1) fmt.Printf("User %d is updated from name %s to name %s\n", user.Id, oldUser.Name, user.Name, ) } else { fmt.Printf("User %d is created with name %s\n", user.Id, user.Name, ) } 

Der Hauptpunkt, den wir anstrebten, war der Start unserer „Hallo Binlog-Welt“:

 func main() { go binLogListener() //      time.Sleep(2 * time.Minute) fmt.Print("Thx for watching, goodbuy") } 

Fügen Sie als Nächstes die Werte hinzu und aktualisieren Sie sie:

 INSERT INTO Test.User (`id`,`name`) VALUE (1,"Jack"); UPDATE Test.User SET name="Jonh" WHERE id=1; 

Wir werden sehen:

 User 1 is created with name Jack User 1 name changed from Jack to Jonh 

Der resultierende Code arbeitet mit binlog und analysiert neue Zeilen. Beim Empfang eines Datensatzes aus der benötigten Tabelle liest der Code die Daten in die Struktur und zeigt das Ergebnis an. Hinter den Kulissen befand sich der Datenparser (BinlogParser), der das Modell bevölkerte.

Teil 2. Wie Cobb sagte, brauchen wir ein niedrigeres Level


Betrachten Sie die interne Arbeit des Parsers, die auf Reflexion basiert.

Um das Modell mit Daten zu füllen, haben wir die Handler-Methode verwendet:

 h.GetBinLogData(&user, e, i) 

Es analysiert einfache Datentypen:
 bool int float64 string time.Time 

und kann komplexe Strukturen von json analysieren.

Wenn Ihnen unterstützte Typen nicht ausreichen oder Sie nur verstehen möchten, wie das Parsen von Binlog funktioniert, können Sie das Hinzufügen eigener Typen üben.

Überlegen Sie zunächst, wie Sie die Daten für das Modellfeld am Beispiel eines ID-Felds vom Typ int ausfüllen:

 type User struct { Id int `gorm:"column:id"` } 

Durch Reflexion erhalten wir den Typnamen. Die parseTagSetting-Methode konvertiert Anmerkungen in eine bequemere Struktur:

 element := User{} //     ,      v := reflect.ValueOf(element) s := reflect.Indirect(v) t := s.Type() num := t.NumField() parsedTag := parseTagSetting(t.Field(k).Tag) if columnName, ok = parsedTag["COLUMN"]; !ok || columnName == "COLUMN" { continue } for k := 0; k < num; k++ { name := s.Field(k).Type().Name() switch name { case "int": //     } } 

Nachdem Sie den int-Typ erhalten haben, können Sie seinen Wert über die Reflektionsmethode festlegen:

 func (v Value) SetInt(x int64) {//... 

Methode zum Parsen von Anmerkungen:

 func parseTagSetting(tags reflect.StructTag) map[string]string { setting := map[string]string{} for _, str := range []string{tags.Get("sql"), tags.Get("gorm")} { tags := strings.Split(str, ";") for _, value := range tags { v := strings.Split(value, ":") k := strings.TrimSpace(strings.ToUpper(v[0])) if len(v) >= 2 { setting[k] = strings.Join(v[1:], ":") } else { setting[k] = k } } } return setting } 

Es akzeptiert int64 als Eingabe. Erstellen wir eine Methode, die die empfangenen Daten aus dem Binlog in int64 übersetzt:

 func (m *BinlogParser) intHelper(e *canal.RowsEvent, n int, columnName string) int64 { columnId := m.getBinlogIdByName(e, columnName) if e.Table.Columns[columnId].Type != schema.TYPE_NUMBER { return 0 } switch e.Rows[n][columnId].(type) { case int8: return int64(e.Rows[n][columnId].(int8)) case int32: return int64(e.Rows[n][columnId].(int32)) case int64: return e.Rows[n][columnId].(int64) case int: return int64(e.Rows[n][columnId].(int)) case uint8: return int64(e.Rows[n][columnId].(uint8)) case uint16: return int64(e.Rows[n][columnId].(uint16)) case uint32: return int64(e.Rows[n][columnId].(uint32)) case uint64: return int64(e.Rows[n][columnId].(uint64)) case uint: return int64(e.Rows[n][columnId].(uint)) } return 0 } 

Bis auf die Methode getBinlogIdByName () sieht alles logisch aus.

Dieser einfache Helfer wird benötigt, um mit Spaltennamen anstelle ihrer Seriennummern zu arbeiten. So können Sie:

  • Nehmen Sie Spaltennamen aus Gorm-Anmerkungen.
  • Sie müssen keine Änderungen vornehmen, wenn Sie Spalten am Anfang oder in der Mitte hinzufügen.
  • Es ist banal bequemer, mit dem Namensfeld zu arbeiten als mit Spalte Nummer 3.

Als Ergebnis fügen wir den Handler selbst hinzu:

 s.Field(k).SetInt(m.intHelper(e, n, columnName)) 

Schauen wir uns zwei weitere Beispiele an.
ENUM: Hier kommen die Werte als Index - das heißt, der Status "aktiv" wird als 1 angegeben. In den meisten Fällen benötigen wir eine Zeichenfolgendarstellung von enum. Es kann der Feldbeschreibung entnommen werden. Beim Parsen von Enum-Werten beginnt es bei 1, aber das Array möglicher Werte selbst beginnt bei 0.

Ein Enum-Handler könnte folgendermaßen aussehen:

 func (m *BinlogParser) stringHelper(e *canal.RowsEvent, n int, columnName string) string { columnId := m.getBinlogIdByName(e, columnName) if e.Table.Columns[columnId].Type == schema.TYPE_ENUM { values := e.Table.Columns[columnId].EnumValues //  if len(values) == 0 || e.Rows[n][columnId] == nil {{ return "" } return values[e.Rows[n][columnId].(int64)-1] //     0    } 

Ich möchte JSON speichern

Gute Idee warum nicht. JSON in Bezug auf MySQL ist eine Zeichenfolge. Es muss irgendwie angegeben werden, dass diese Daten serialisiert sind - dazu fügen wir die nichtkanonische Annotation "fromJson" zu gorm hinzu.

Stellen Sie sich vor, eine solche Struktur sollte in Betracht gezogen werden:

 type JsonData struct { Int int `gorm:"column:int"` StructData TestData `gorm:"column:struct_data;fromJson"` MapData map[string]string `gorm:"column:map_data;fromJson"` SliceData []int `gorm:"column:slice_data;fromJson"` } type TestData struct { Test string `json:"test"` Int int `json:"int"` } 

Sie können viele Bedingungen schreiben und wahrscheinlich erfolgreich sein. Aber jeder neue Datentyp wird alle Bemühungen zunichte machen. Obwohl der Versuch, Antworten auf den Stapelüberlauf zu finden - "wie man einen unbekannten Strukturtyp bringt und deserialisiert" - mit dem Satz beginnt: "Es ist nicht klar, warum Sie dies benötigen, aber versuchen Sie ...".

Nachdem wir den gewünschten Typ in die Schnittstelle umgewandelt haben, können wir Folgendes tun:

 if _, ok := parsedTag["FROMJSON"]; ok { newObject := reflect.New(s.Field(k).Type()).Interface() json := m.stringHelper(e, n, columnName) jsoniter.Unmarshal([]byte(json), &newObject) s.Field(k).Set(reflect.ValueOf(newObject).Elem().Convert(s.Field(k).Type())) } 

Wenn Sie Fragen zu Datentypen haben, können Sie sich die Tests ansehen oder sie in den Kommentaren stellen.

Was ist am Ende passiert?

Source: https://habr.com/ru/post/de413329/


All Articles