Comment faire cuire binlog mysql avec go



Je m'appelle Artyom, je travaille au sein du groupe Rambler dans le projet "Stream" au poste de Go lead developer.
Nous avons passé beaucoup de temps à apprivoiser le binlog mysql. Cet article est une histoire sur la façon d'implémenter rapidement et avec un nombre minimum d'embûches le mécanisme de travail avec les binlogs Go.

Pourquoi en avons-nous besoin?


Sous le capot du Stream, il y a des modules très chargés, où chaque requête vers la base de données éloigne l'utilisateur du résultat. La mise en cache est une bonne solution, mais quand vider le cache? Laissez les données elles-mêmes nous dire qu'elles ont été mises à jour.

Dans mysql, il existe une réplication maître-esclave. Notre démon peut prétendre être un esclave et recevoir des données via binlog. Binlog doit être configuré au format de ligne. Il contient toutes les commandes de changement de base de données, les commandes sous la transaction ne sont exécutées qu'après validation. Une fois la taille maximale autorisée atteinte (1 gig par défaut), le fichier suivant est créé. Chaque nouveau fichier a un numéro de série après le nom.

Un peu plus d'infos ici ou ici .

L'article comprend deux parties:

1. Comment démarrer rapidement le traitement des entrées reçues dans le journal.
2. Comment personnaliser et étendre ce qui se trouve sous le capot.

Partie 1. Nous commençons dès que possible.


Pour travailler avec binlog, nous utiliserons la bibliothèque github.com/siddontang/go-mysql
Connectez-vous au nouveau canal (pour travailler avec les canaux, le format ROW pour binlog est requis ).

func binLogListener() { c, err := getDefaultCanal() if err == nil { coords, err := c.GetMasterPos() if err == nil { c.SetEventHandler(&binlogHandler{}) c.RunFrom(coords) } } } func getDefaultCanal() (*canal.Canal, error) { cfg := canal.NewDefaultConfig() cfg.Addr = fmt.Sprintf("%s:%d", "127.0.0.1", 3306) cfg.User = "root" cfg.Password = "root" cfg.Flavor = "mysql" cfg.Dump.ExecutionPath = "" return canal.NewCanal(cfg) } 

Créez un wrapper sur le binlog:

 type binlogHandler struct { canal.DummyEventHandler //     BinlogParser //      } func (h *binlogHandler) OnRow(e *canal.RowsEvent) error {return nil} func (h *binlogHandler) String() string {return "binlogHandler"} 

Binlogparser

Nous allons étendre la logique de travail avec la ligne binlog résultante en ajoutant de la logique à la méthode OnRow ().

 func (h *binlogHandler) OnRow(e *canal.RowsEvent) error { var n int //  var k int //  switch e.Action { case canal.DeleteAction: return nil //    case canal.UpdateAction: n = 1 k = 2 case canal.InsertAction: n = 0 k = 1 } for i := n; i < len(e.Rows); i += k { key := e.Table.Schema + "." + e.Table.Name switch key { case User{}.SchemaName() + "." + User{}.TableName(): /*    */ } } return nil } 

L'essence de ce wrapper est d'analyser les données reçues. Les données nous parviennent dans deux enregistrements pour mettre à jour la ligne (la première ligne contiendra les données d'origine, la seconde - mise à jour). Ici, nous considérons également la possibilité de multi-insertions et multi-mises à jour. Dans ce cas, nous devrons prendre un enregistrement sur deux pour UPDATE. Pour cela, dans l'exemple ci-dessus, nous avons entré n et k.

Créons un modèle pour recevoir des données de binlog. Nous y lirons les données des lignes reçues. Dans les annotations, nous indiquons les noms des colonnes:

 type User struct { Id int `gorm:"column:id"` Name string `gorm:"column:name"` Status string `gorm:"column:status"` Created time.Time `gorm:"column:created"` } func (User) TableName() string { return "User" } func (User) SchemaName() string { return "Test" } 

Structure de la table dans MYSQL:

 CREATE TABLE Test.User ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(40) NULL , status ENUM("active","deleted") DEFAULT "active", created TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP ) ENGINE =InnoDB; 

Analysons l'analyse elle-même - ajoutons au lieu de préparation pour l'analyse des données:

 user := User{} h.GetBinLogData(&user, e, i) 

En substance, cela suffit - nous aurons les données du nouvel enregistrement dans le modèle utilisateur, mais pour plus de clarté, nous les afficherons:

  if e.Action == canal.UpdateAction { oldUser := User{} h.GetBinLogData(&oldUser, e, i-1) fmt.Printf("User %d is updated from name %s to name %s\n", user.Id, oldUser.Name, user.Name, ) } else { fmt.Printf("User %d is created with name %s\n", user.Id, user.Name, ) } 

Le principal objectif que nous visions est de lancer notre «Hello binlog world»:

 func main() { go binLogListener() //      time.Sleep(2 * time.Minute) fmt.Print("Thx for watching, goodbuy") } 

Ensuite, ajoutez et mettez à jour les valeurs:

 INSERT INTO Test.User (`id`,`name`) VALUE (1,"Jack"); UPDATE Test.User SET name="Jonh" WHERE id=1; 

Nous verrons:

 User 1 is created with name Jack User 1 name changed from Jack to Jonh 

Le code résultant fonctionne avec binlog et analyse les nouvelles lignes. Lors de la réception d'un enregistrement de la table dont nous avons besoin, le code lit les données dans la structure et affiche le résultat. Dans les coulisses se trouvait l'analyseur de données (BinlogParser), qui remplissait le modèle.

Partie 2. Comme l'a dit Cobb, nous avons besoin d'un niveau inférieur


Considérez le travail interne de l'analyseur, qui est basé sur la réflexion.

Pour remplir le modèle avec des données, nous avons utilisé la méthode du gestionnaire:

 h.GetBinLogData(&user, e, i) 

Il analyse les types de données simples:
 bool int float64 string time.Time 

et peut analyser des structures complexes de json.

Si les types pris en charge ne vous suffisent pas, ou si vous voulez simplement comprendre comment fonctionne l'analyse binlog, vous pouvez vous entraîner à ajouter vos propres types.

Tout d'abord, réfléchissez à la manière de remplir les données du champ modèle en utilisant l'exemple d'un champ Id de type int:

 type User struct { Id int `gorm:"column:id"` } 

Par réflexion, nous obtenons le nom du type. La méthode parseTagSetting convertit les annotations en une structure plus pratique:

 element := User{} //     ,      v := reflect.ValueOf(element) s := reflect.Indirect(v) t := s.Type() num := t.NumField() parsedTag := parseTagSetting(t.Field(k).Tag) if columnName, ok = parsedTag["COLUMN"]; !ok || columnName == "COLUMN" { continue } for k := 0; k < num; k++ { name := s.Field(k).Type().Name() switch name { case "int": //     } } 

Après avoir reçu le type int, vous pouvez définir sa valeur via la méthode de réflexion:

 func (v Value) SetInt(x int64) {//... 

Méthode d'analyse des annotations:

 func parseTagSetting(tags reflect.StructTag) map[string]string { setting := map[string]string{} for _, str := range []string{tags.Get("sql"), tags.Get("gorm")} { tags := strings.Split(str, ";") for _, value := range tags { v := strings.Split(value, ":") k := strings.TrimSpace(strings.ToUpper(v[0])) if len(v) >= 2 { setting[k] = strings.Join(v[1:], ":") } else { setting[k] = k } } } return setting } 

Il accepte l'int64 comme entrée. Créons une méthode qui traduit les données reçues du binlog en int64:

 func (m *BinlogParser) intHelper(e *canal.RowsEvent, n int, columnName string) int64 { columnId := m.getBinlogIdByName(e, columnName) if e.Table.Columns[columnId].Type != schema.TYPE_NUMBER { return 0 } switch e.Rows[n][columnId].(type) { case int8: return int64(e.Rows[n][columnId].(int8)) case int32: return int64(e.Rows[n][columnId].(int32)) case int64: return e.Rows[n][columnId].(int64) case int: return int64(e.Rows[n][columnId].(int)) case uint8: return int64(e.Rows[n][columnId].(uint8)) case uint16: return int64(e.Rows[n][columnId].(uint16)) case uint32: return int64(e.Rows[n][columnId].(uint32)) case uint64: return int64(e.Rows[n][columnId].(uint64)) case uint: return int64(e.Rows[n][columnId].(uint)) } return 0 } 

Tout semble logique à l'exception de la méthode getBinlogIdByName ().

Cette aide triviale est nécessaire pour travailler avec des noms de colonnes au lieu de leurs numéros de série, ce qui vous permet de:

  • prendre les noms des colonnes des annotations gorm;
  • pas besoin de faire des modifications lors de l'ajout de colonnes au début ou au milieu;
  • Il est banal plus pratique de travailler avec le champ du nom qu'avec la colonne numéro 3.

En conséquence, nous ajoutons le gestionnaire lui-même:

 s.Field(k).SetInt(m.intHelper(e, n, columnName)) 

Regardons deux autres exemples.
ENUM: ici les valeurs viennent comme index - c'est-à-dire que le statut "actif" viendra comme 1. Dans la plupart des cas, nous avons besoin d'une représentation sous forme de chaîne d'énum. Il peut être obtenu à partir de la description du champ. Lors de l'analyse des valeurs enum, elle commence à partir de 1, mais le tableau de valeurs possibles lui-même commence à 0.

Un gestionnaire Enum pourrait ressembler à ceci:

 func (m *BinlogParser) stringHelper(e *canal.RowsEvent, n int, columnName string) string { columnId := m.getBinlogIdByName(e, columnName) if e.Table.Columns[columnId].Type == schema.TYPE_ENUM { values := e.Table.Columns[columnId].EnumValues //  if len(values) == 0 || e.Rows[n][columnId] == nil {{ return "" } return values[e.Rows[n][columnId].(int64)-1] //     0    } 

Je veux stocker JSON

Bonne idée pourquoi pas. JSON en termes de mysql est une chaîne. Il est nécessaire d'indiquer d'une manière ou d'une autre que ces données sont sérialisées - pour cela, nous ajouterons l'annotation non canonique «fromJson» à gorm.

Imaginez qu'une telle structure soit envisagée:

 type JsonData struct { Int int `gorm:"column:int"` StructData TestData `gorm:"column:struct_data;fromJson"` MapData map[string]string `gorm:"column:map_data;fromJson"` SliceData []int `gorm:"column:slice_data;fromJson"` } type TestData struct { Test string `json:"test"` Int int `json:"int"` } 

Vous pouvez écrire beaucoup de conditions et réussir probablement. Mais chaque nouveau type de données tuera tous les efforts. Bien que la tentative de trouver des réponses à stackoverflow - "comment apporter et désérialiser un type de structure inconnu" commence par la phrase: "On ne sait pas pourquoi vous en avez besoin, mais essayez ...".

Après avoir casté le type souhaité sur l'interface, nous pouvons le faire:

 if _, ok := parsedTag["FROMJSON"]; ok { newObject := reflect.New(s.Field(k).Type()).Interface() json := m.stringHelper(e, n, columnName) jsoniter.Unmarshal([]byte(json), &newObject) s.Field(k).Set(reflect.ValueOf(newObject).Elem().Convert(s.Field(k).Type())) } 

Si vous avez des questions sur les types de données, vous pouvez consulter les tests ou les poser dans les commentaires.

Qu'est-il arrivé à la fin .

Source: https://habr.com/ru/post/fr413329/


All Articles