
我叫Artyom,我在Rambler Group的Go Stream开发人员项目“ Stream”中工作。
我们花了很多时间来驯服mysql binlog。 本文是有关如何快速且以最少的陷阱实施Go binlogs机制的故事。
我们为什么需要这个?
在Stream的内部有很多模块,在该模块中对数据库的每个查询都使用户远离接收结果。 缓存是一个很好的解决方案,但是何时刷新缓存? 让数据本身告诉我们它们已被更新。
在mysql中有诸如主从复制之类的东西。 我们的守护程序可以假装为从属服务器,并通过binlog接收数据。 Binlog必须以行格式配置。 它包含所有数据库更改命令,事务下的命令仅在提交后执行。 达到最大允许大小(默认为1 gig)后,将创建以下文件。 每个新文件在名称后都有一个序列号。
在这里或
这里多一点信息。
本文分为两个部分:
1.如何快速开始处理日志中收到的条目。
2.如何自定义和扩展内部功能。
第1部分。我们尽快开始。
要使用binlog,我们将使用
github.com/siddontang/go-mysql库
连接到新通道(要使用通道,
需要binlog的
ROW格式 )。
func binLogListener() { c, err := getDefaultCanal() if err == nil { coords, err := c.GetMasterPos() if err == nil { c.SetEventHandler(&binlogHandler{}) c.RunFrom(coords) } } } func getDefaultCanal() (*canal.Canal, error) { cfg := canal.NewDefaultConfig() cfg.Addr = fmt.Sprintf("%s:%d", "127.0.0.1", 3306) cfg.User = "root" cfg.Password = "root" cfg.Flavor = "mysql" cfg.Dump.ExecutionPath = "" return canal.NewCanal(cfg) }
在binlog上创建包装器:
type binlogHandler struct { canal.DummyEventHandler // BinlogParser // } func (h *binlogHandler) OnRow(e *canal.RowsEvent) error {return nil} func (h *binlogHandler) String() string {return "binlogHandler"}
Binlogparser我们将通过向OnRow()方法添加逻辑来扩展使用结果binlog行的逻辑。
func (h *binlogHandler) OnRow(e *canal.RowsEvent) error { var n int // var k int // switch e.Action { case canal.DeleteAction: return nil // case canal.UpdateAction: n = 1 k = 2 case canal.InsertAction: n = 0 k = 1 } for i := n; i < len(e.Rows); i += k { key := e.Table.Schema + "." + e.Table.Name switch key { case User{}.SchemaName() + "." + User{}.TableName(): } } return nil }
包装器的本质是解析接收到的数据。 数据以两条记录的形式提供给我们以更新该行(第一行将包含原始数据,第二行-已更新)。 在这里,我们还考虑了多次插入和多次更新的可能性。 在这种情况下,我们将需要获取第二条记录以进行UPDATE。 为此,在上面的示例中,我们输入了n和k。
让我们创建一个用于从binlog接收数据的模型。 在其中,我们将从接收到的行中读取数据。 在注释中,我们指示列的名称:
type User struct { Id int `gorm:"column:id"` Name string `gorm:"column:name"` Status string `gorm:"column:status"` Created time.Time `gorm:"column:created"` } func (User) TableName() string { return "User" } func (User) SchemaName() string { return "Test" }
MYSQL中的表结构:
CREATE TABLE Test.User ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(40) NULL , status ENUM("active","deleted") DEFAULT "active", created TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP ) ENGINE =InnoDB;
让我们分析一下解析本身-添加到数据解析的准备位置:
user := User{} h.GetBinLogData(&user, e, i)
从本质上讲,这就足够了-我们将在用户模型中拥有新记录的数据,但为清楚起见,我们将显示它们:
if e.Action == canal.UpdateAction { oldUser := User{} h.GetBinLogData(&oldUser, e, i-1) fmt.Printf("User %d is updated from name %s to name %s\n", user.Id, oldUser.Name, user.Name, ) } else { fmt.Printf("User %d is created with name %s\n", user.Id, user.Name, ) }
我们一直在努力的重点是启动“ Hello Binlog世界”:
func main() { go binLogListener()
接下来,添加并更新值:
INSERT INTO Test.User (`id`,`name`) VALUE (1,"Jack"); UPDATE Test.User SET name="Jonh" WHERE id=1;
我们将看到:
User 1 is created with name Jack User 1 name changed from Jack to Jonh
结果代码与binlog一起使用并解析新行。 从表中接收记录时,代码将数据读入结构并显示结果。 幕后是数据解析器(BinlogParser),用于填充模型。
第2部分。正如Cobb所说,我们需要降低水平
考虑解析器的内部工作,该工作基于反射。
为了用数据填充模型,我们使用了handler方法:
h.GetBinLogData(&user, e, i)
它解析简单的数据类型:
bool int float64 string time.Time
并可以解析json中的复杂结构。
如果受支持的类型对您来说还不够,或者您只想了解binlog解析的工作原理,则可以练习添加自己的类型。
首先,考虑如何使用类型为int的Id字段的示例为model字段填充数据:
type User struct { Id int `gorm:"column:id"` }
通过反射,我们得到类型名称。 parseTagSetting方法将注释转换为更方便的结构:
element := User{} // , v := reflect.ValueOf(element) s := reflect.Indirect(v) t := s.Type() num := t.NumField() parsedTag := parseTagSetting(t.Field(k).Tag) if columnName, ok = parsedTag["COLUMN"]; !ok || columnName == "COLUMN" { continue } for k := 0; k < num; k++ { name := s.Field(k).Type().Name() switch name { case "int": // } }
收到int类型后,您可以通过反射方法设置其值:
func (v Value) SetInt(x int64) {
注释解析方法:
func parseTagSetting(tags reflect.StructTag) map[string]string { setting := map[string]string{} for _, str := range []string{tags.Get("sql"), tags.Get("gorm")} { tags := strings.Split(str, ";") for _, value := range tags { v := strings.Split(value, ":") k := strings.TrimSpace(strings.ToUpper(v[0])) if len(v) >= 2 { setting[k] = strings.Join(v[1:], ":") } else { setting[k] = k } } } return setting }
它接受int64作为输入。 让我们创建一个将接收到的数据从binlog转换为int64的方法:
func (m *BinlogParser) intHelper(e *canal.RowsEvent, n int, columnName string) int64 { columnId := m.getBinlogIdByName(e, columnName) if e.Table.Columns[columnId].Type != schema.TYPE_NUMBER { return 0 } switch e.Rows[n][columnId].(type) { case int8: return int64(e.Rows[n][columnId].(int8)) case int32: return int64(e.Rows[n][columnId].(int32)) case int64: return e.Rows[n][columnId].(int64) case int: return int64(e.Rows[n][columnId].(int)) case uint8: return int64(e.Rows[n][columnId].(uint8)) case uint16: return int64(e.Rows[n][columnId].(uint16)) case uint32: return int64(e.Rows[n][columnId].(uint32)) case uint64: return int64(e.Rows[n][columnId].(uint64)) case uint: return int64(e.Rows[n][columnId].(uint)) } return 0 }
除了getBinlogIdByName()方法外,其他所有内容看起来都很合理。
需要使用这个简单的帮助程序来处理列名而不是其序列号,这使您可以:
- 从gorm注释中获取列名;
- 将列添加到开头或中间时无需进行编辑;
- 与使用第3列相比,使用name字段更方便。
结果,我们添加了处理程序本身:
s.Field(k).SetInt(m.intHelper(e, n, columnName))
让我们再看两个示例。枚举:这里的值作为索引-即“活动”状态为1。在大多数情况下,我们需要用字符串表示枚举。 可以从字段描述中获得。 解析枚举值时,它从1开始,但是可能值数组本身从0开始。
枚举处理程序可能看起来像这样:
func (m *BinlogParser) stringHelper(e *canal.RowsEvent, n int, columnName string) string { columnId := m.getBinlogIdByName(e, columnName) if e.Table.Columns[columnId].Type == schema.TYPE_ENUM { values := e.Table.Columns[columnId].EnumValues // if len(values) == 0 || e.Rows[n][columnId] == nil {{ return "" } return values[e.Rows[n][columnId].(int64)-1] // 0 }
我想存储JSON好主意,为什么不呢。 就mysql而言,JSON是一个字符串。 有必要以某种方式指示此数据已序列化-为此,我们将向gorm添加非规范注释“ fromJson”。
想象一下应该考虑这样的结构:
type JsonData struct { Int int `gorm:"column:int"` StructData TestData `gorm:"column:struct_data;fromJson"` MapData map[string]string `gorm:"column:map_data;fromJson"` SliceData []int `gorm:"column:slice_data;fromJson"` } type TestData struct { Test string `json:"test"` Int int `json:"int"` }
您可以编写很多条件,并且可能成功。 但是每种新的数据类型都会扼杀所有努力。 尽管试图找到stackoverflow答案的尝试-“如何带来和反序列化未知类型的结构”始于短语:“目前尚不清楚为什么需要这样做,但请尝试……”。
将所需的类型转换为接口后,我们可以执行以下操作:
if _, ok := parsedTag["FROMJSON"]; ok { newObject := reflect.New(s.Field(k).Type()).Interface() json := m.stringHelper(e, n, columnName) jsoniter.Unmarshal([]byte(json), &newObject) s.Field(k).Set(reflect.ValueOf(newObject).Elem().Convert(s.Field(k).Type())) }
如果您对数据类型有疑问,可以查看
测试或在评论中询问。
最后发生了什么 。