如何用go烹饪mysql binlog



我叫Artyom,我在Rambler Group的Go Stream开发人员项目“ Stream”中工作。
我们花了很多时间来驯服mysql binlog。 本文是有关如何快速且以最少的陷阱实施Go binlogs机制的故事。

我们为什么需要这个?


在Stream的内部有很多模块,在该模块中对数据库的每个查询都使用户远离接收结果。 缓存是一个很好的解决方案,但是何时刷新缓存? 让数据本身告诉我们它们已被更新。

在mysql中有诸如主从复制之类的东西。 我们的守护程序可以假装为从属服务器,并通过binlog接收数据。 Binlog必须以行格式配置。 它包含所有数据库更改命令,事务下的命令仅在提交后执行。 达到最大允许大小(默认为1 gig)后,将创建以下文件。 每个新文件在名称后都有一个序列号。

在这里这里多一点信息。

本文分为两个部分:

1.如何快速开始处理日志中收到的条目。
2.如何自定义和扩展内部功能。

第1部分。我们尽快开始。


要使用binlog,我们将使用github.com/siddontang/go-mysql
连接到新通道(要使用通道, 需要binlogROW格式 )。

func binLogListener() { c, err := getDefaultCanal() if err == nil { coords, err := c.GetMasterPos() if err == nil { c.SetEventHandler(&binlogHandler{}) c.RunFrom(coords) } } } func getDefaultCanal() (*canal.Canal, error) { cfg := canal.NewDefaultConfig() cfg.Addr = fmt.Sprintf("%s:%d", "127.0.0.1", 3306) cfg.User = "root" cfg.Password = "root" cfg.Flavor = "mysql" cfg.Dump.ExecutionPath = "" return canal.NewCanal(cfg) } 

在binlog上创建包装器:

 type binlogHandler struct { canal.DummyEventHandler //     BinlogParser //      } func (h *binlogHandler) OnRow(e *canal.RowsEvent) error {return nil} func (h *binlogHandler) String() string {return "binlogHandler"} 

Binlogparser

我们将通过向OnRow()方法添加逻辑来扩展使用结果binlog行的逻辑。

 func (h *binlogHandler) OnRow(e *canal.RowsEvent) error { var n int //  var k int //  switch e.Action { case canal.DeleteAction: return nil //    case canal.UpdateAction: n = 1 k = 2 case canal.InsertAction: n = 0 k = 1 } for i := n; i < len(e.Rows); i += k { key := e.Table.Schema + "." + e.Table.Name switch key { case User{}.SchemaName() + "." + User{}.TableName(): /*    */ } } return nil } 

包装器的本质是解析接收到的数据。 数据以两条记录的形式提供给我们以更新该行(第一行将包含原始数据,第二行-已更新)。 在这里,我们还考虑了多次插入和多次更新的可能性。 在这种情况下,我们将需要获取第二条记录以进行UPDATE。 为此,在上面的示例中,我们输入了n和k。

让我们创建一个用于从binlog接收数据的模型。 在其中,我们将从接收到的行中读取数据。 在注释中,我们指示列的名称:

 type User struct { Id int `gorm:"column:id"` Name string `gorm:"column:name"` Status string `gorm:"column:status"` Created time.Time `gorm:"column:created"` } func (User) TableName() string { return "User" } func (User) SchemaName() string { return "Test" } 

MYSQL中的表结构:

 CREATE TABLE Test.User ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(40) NULL , status ENUM("active","deleted") DEFAULT "active", created TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP ) ENGINE =InnoDB; 

让我们分析一下解析本身-添加到数据解析的准备位置:

 user := User{} h.GetBinLogData(&user, e, i) 

从本质上讲,这就足够了-我们将在用户模型中拥有新记录的数据,但为清楚起见,我们将显示它们:

  if e.Action == canal.UpdateAction { oldUser := User{} h.GetBinLogData(&oldUser, e, i-1) fmt.Printf("User %d is updated from name %s to name %s\n", user.Id, oldUser.Name, user.Name, ) } else { fmt.Printf("User %d is created with name %s\n", user.Id, user.Name, ) } 

我们一直在努力的重点是启动“ Hello Binlog世界”:

 func main() { go binLogListener() //      time.Sleep(2 * time.Minute) fmt.Print("Thx for watching, goodbuy") } 

接下来,添加并更新值:

 INSERT INTO Test.User (`id`,`name`) VALUE (1,"Jack"); UPDATE Test.User SET name="Jonh" WHERE id=1; 

我们将看到:

 User 1 is created with name Jack User 1 name changed from Jack to Jonh 

结果代码与binlog一起使用并解析新行。 从表中接收记录时,代码将数据读入结构并显示结果。 幕后是数据解析器(BinlogParser),用于填充模型。

第2部分。正如Cobb所说,我们需要降低水平


考虑解析器的内部工作,该工作基于反射。

为了用数据填充模型,我们使用了handler方法:

 h.GetBinLogData(&user, e, i) 

它解析简单的数据类型:
 bool int float64 string time.Time 

并可以解析json中的复杂结构。

如果受支持的类型对您来说还不够,或者您只想了解binlog解析的工作原理,则可以练习添加自己的类型。

首先,考虑如何使用类型为int的Id字段的示例为model字段填充数据:

 type User struct { Id int `gorm:"column:id"` } 

通过反射,我们得到类型名称。 parseTagSetting方法将注释转换为更方便的结构:

 element := User{} //     ,      v := reflect.ValueOf(element) s := reflect.Indirect(v) t := s.Type() num := t.NumField() parsedTag := parseTagSetting(t.Field(k).Tag) if columnName, ok = parsedTag["COLUMN"]; !ok || columnName == "COLUMN" { continue } for k := 0; k < num; k++ { name := s.Field(k).Type().Name() switch name { case "int": //     } } 

收到int类型后,您可以通过反射方法设置其值:

 func (v Value) SetInt(x int64) {//... 

注释解析方法:

 func parseTagSetting(tags reflect.StructTag) map[string]string { setting := map[string]string{} for _, str := range []string{tags.Get("sql"), tags.Get("gorm")} { tags := strings.Split(str, ";") for _, value := range tags { v := strings.Split(value, ":") k := strings.TrimSpace(strings.ToUpper(v[0])) if len(v) >= 2 { setting[k] = strings.Join(v[1:], ":") } else { setting[k] = k } } } return setting } 

它接受int64作为输入。 让我们创建一个将接收到的数据从binlog转换为int64的方法:

 func (m *BinlogParser) intHelper(e *canal.RowsEvent, n int, columnName string) int64 { columnId := m.getBinlogIdByName(e, columnName) if e.Table.Columns[columnId].Type != schema.TYPE_NUMBER { return 0 } switch e.Rows[n][columnId].(type) { case int8: return int64(e.Rows[n][columnId].(int8)) case int32: return int64(e.Rows[n][columnId].(int32)) case int64: return e.Rows[n][columnId].(int64) case int: return int64(e.Rows[n][columnId].(int)) case uint8: return int64(e.Rows[n][columnId].(uint8)) case uint16: return int64(e.Rows[n][columnId].(uint16)) case uint32: return int64(e.Rows[n][columnId].(uint32)) case uint64: return int64(e.Rows[n][columnId].(uint64)) case uint: return int64(e.Rows[n][columnId].(uint)) } return 0 } 

除了getBinlogIdByName()方法外,其他所有内容看起来都很合理。

需要使用这个简单的帮助程序来处理列名而不是其序列号,这使您可以:

  • 从gorm注释中获取列名;
  • 将列添加到开头或中间时无需进行编辑;
  • 与使用第3列相比,使用name字段更方便。

结果,我们添加了处理程序本身:

 s.Field(k).SetInt(m.intHelper(e, n, columnName)) 

让我们再看两个示例。
枚举:这里的值作为索引-即“活动”状态为1。在大多数情况下,我们需要用字符串表示枚举。 可以从字段描述中获得。 解析枚举值时,它从1开始,但是可能值数组本身从0开始。

枚举处理程序可能看起来像这样:

 func (m *BinlogParser) stringHelper(e *canal.RowsEvent, n int, columnName string) string { columnId := m.getBinlogIdByName(e, columnName) if e.Table.Columns[columnId].Type == schema.TYPE_ENUM { values := e.Table.Columns[columnId].EnumValues //  if len(values) == 0 || e.Rows[n][columnId] == nil {{ return "" } return values[e.Rows[n][columnId].(int64)-1] //     0    } 

我想存储JSON

好主意,为什么不呢。 就mysql而言,JSON是一个字符串。 有必要以某种方式指示此数据已序列化-为此,我们将向gorm添加非规范注释“ fromJson”。

想象一下应该考虑这样的结构:

 type JsonData struct { Int int `gorm:"column:int"` StructData TestData `gorm:"column:struct_data;fromJson"` MapData map[string]string `gorm:"column:map_data;fromJson"` SliceData []int `gorm:"column:slice_data;fromJson"` } type TestData struct { Test string `json:"test"` Int int `json:"int"` } 

您可以编写很多条件,并且可能成功。 但是每种新的数据类型都会扼杀所有努力。 尽管试图找到stackoverflow答案的尝试-“如何带来和反序列化未知类型的结构”始于短语:“目前尚不清楚为什么需要这样做,但请尝试……”。

将所需的类型转换为接口后,我们可以执行以下操作:

 if _, ok := parsedTag["FROMJSON"]; ok { newObject := reflect.New(s.Field(k).Type()).Interface() json := m.stringHelper(e, n, columnName) jsoniter.Unmarshal([]byte(json), &newObject) s.Field(k).Set(reflect.ValueOf(newObject).Elem().Convert(s.Field(k).Type())) } 

如果您对数据类型有疑问,可以查看测试或在评论中询问。

最后发生了什么

Source: https://habr.com/ru/post/zh-CN413329/


All Articles