
Meu nome é Artyom, trabalho no Rambler Group no projeto "Stream" na posição de desenvolvedor líder da Go.
Passamos muito tempo domando o mysql binlog. Este artigo é uma história sobre como implementar rápida e com um número mínimo de armadilhas o mecanismo para trabalhar com os binlogs Go.
Por que precisamos disso?
Sob o capô do Stream, existem módulos altamente carregados, nos quais cada consulta ao banco de dados afasta o usuário do recebimento do resultado. O cache é uma boa solução, mas quando liberar o cache? Deixe os próprios dados nos dizerem que foram atualizados.
No mysql existe algo como replicação mestre-escravo. Nosso daemon pode fingir ser escravo e receber dados via binlog. O binlog deve ser configurado no formato de linha. Ele contém todos os comandos de alteração do banco de dados, os comandos sob a transação são executados somente após a confirmação. Ao atingir o tamanho máximo permitido (1 gig por padrão), o seguinte arquivo é criado. Cada novo arquivo possui um número de série após o nome.
Um pouco mais de informação
aqui ou
aqui .
O artigo tem duas partes:
1. Como iniciar rapidamente o processamento das entradas recebidas no log.
2. Como personalizar e expandir o que há sob o capô.
Parte 1. Começamos o mais rápido possível.
Para trabalhar com o
binlog , usaremos a biblioteca
github.com/siddontang/go-mysqlConecte-se ao novo canal (para trabalhar com canais,
é necessário o
formato ROW para o binlog ).
func binLogListener() { c, err := getDefaultCanal() if err == nil { coords, err := c.GetMasterPos() if err == nil { c.SetEventHandler(&binlogHandler{}) c.RunFrom(coords) } } } func getDefaultCanal() (*canal.Canal, error) { cfg := canal.NewDefaultConfig() cfg.Addr = fmt.Sprintf("%s:%d", "127.0.0.1", 3306) cfg.User = "root" cfg.Password = "root" cfg.Flavor = "mysql" cfg.Dump.ExecutionPath = "" return canal.NewCanal(cfg) }
Crie um wrapper sobre o binlog:
type binlogHandler struct { canal.DummyEventHandler // BinlogParser // } func (h *binlogHandler) OnRow(e *canal.RowsEvent) error {return nil} func (h *binlogHandler) String() string {return "binlogHandler"}
BinlogparserExpandiremos a lógica de trabalhar com a linha binlog resultante adicionando lógica ao método OnRow ().
func (h *binlogHandler) OnRow(e *canal.RowsEvent) error { var n int // var k int // switch e.Action { case canal.DeleteAction: return nil // case canal.UpdateAction: n = 1 k = 2 case canal.InsertAction: n = 0 k = 1 } for i := n; i < len(e.Rows); i += k { key := e.Table.Schema + "." + e.Table.Name switch key { case User{}.SchemaName() + "." + User{}.TableName(): } } return nil }
A essência deste wrapper é analisar os dados recebidos. Os dados chegam até nós em dois registros para atualizar a linha (a primeira linha conterá os dados originais, a segunda - atualizada). Aqui também consideramos a possibilidade de inserções múltiplas e atualizações múltiplas. Nesse caso, precisaremos fazer cada segundo registro para UPDATE. Para isso, no exemplo acima, inserimos n e k.
Vamos criar um modelo para receber dados do binlog. Nele, leremos dados das linhas recebidas. Nas anotações, indicamos os nomes das colunas:
type User struct { Id int `gorm:"column:id"` Name string `gorm:"column:name"` Status string `gorm:"column:status"` Created time.Time `gorm:"column:created"` } func (User) TableName() string { return "User" } func (User) SchemaName() string { return "Test" }
Estrutura da tabela no MYSQL:
CREATE TABLE Test.User ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(40) NULL , status ENUM("active","deleted") DEFAULT "active", created TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP ) ENGINE =InnoDB;
Vamos analisar a análise em si - adicione ao local de preparação para a análise de dados:
user := User{} h.GetBinLogData(&user, e, i)
Em essência, isso é suficiente - teremos os dados do novo registro no modelo do usuário, mas, para maior clareza, os exibiremos:
if e.Action == canal.UpdateAction { oldUser := User{} h.GetBinLogData(&oldUser, e, i-1) fmt.Printf("User %d is updated from name %s to name %s\n", user.Id, oldUser.Name, user.Name, ) } else { fmt.Printf("User %d is created with name %s\n", user.Id, user.Name, ) }
O ponto principal pelo qual nos esforçamos é lançar nosso "Hello binlog world":
func main() { go binLogListener()
Em seguida, adicione e atualize os valores:
INSERT INTO Test.User (`id`,`name`) VALUE (1,"Jack"); UPDATE Test.User SET name="Jonh" WHERE id=1;
Vamos ver:
User 1 is created with name Jack User 1 name changed from Jack to Jonh
O código resultante funciona com o binlog e analisa novas linhas. Ao receber um registro da tabela que precisamos, o código lê os dados na estrutura e exibe o resultado. Nos bastidores, estava o analisador de dados (BinlogParser), que preenchia o modelo.
Parte 2. Como Cobb disse, precisamos de um nível mais baixo
Considere o trabalho interno do analisador, que é baseado na reflexão.
Para preencher o modelo com dados, usamos o método manipulador:
h.GetBinLogData(&user, e, i)
Ele analisa tipos de dados simples:
bool int float64 string time.Time
e pode analisar estruturas complexas do json.
Se os tipos suportados não forem suficientes para você, ou você quiser apenas entender como a análise de binlog funciona, poderá praticar a adição de seus próprios tipos.
Primeiro, considere como preencher os dados para o campo modelo usando o exemplo de um campo Id do tipo int:
type User struct { Id int `gorm:"column:id"` }
Através da reflexão, obtemos o nome do tipo. O método parseTagSetting converte anotações em uma estrutura mais conveniente:
element := User{} // , v := reflect.ValueOf(element) s := reflect.Indirect(v) t := s.Type() num := t.NumField() parsedTag := parseTagSetting(t.Field(k).Tag) if columnName, ok = parsedTag["COLUMN"]; !ok || columnName == "COLUMN" { continue } for k := 0; k < num; k++ { name := s.Field(k).Type().Name() switch name { case "int": // } }
Após receber o tipo int, é possível definir seu valor através do método de reflexão:
func (v Value) SetInt(x int64) {
Método para analisar anotações:
func parseTagSetting(tags reflect.StructTag) map[string]string { setting := map[string]string{} for _, str := range []string{tags.Get("sql"), tags.Get("gorm")} { tags := strings.Split(str, ";") for _, value := range tags { v := strings.Split(value, ":") k := strings.TrimSpace(strings.ToUpper(v[0])) if len(v) >= 2 { setting[k] = strings.Join(v[1:], ":") } else { setting[k] = k } } } return setting }
Ele aceita int64 como entrada. Vamos criar um método que converte os dados recebidos do binlog para int64:
func (m *BinlogParser) intHelper(e *canal.RowsEvent, n int, columnName string) int64 { columnId := m.getBinlogIdByName(e, columnName) if e.Table.Columns[columnId].Type != schema.TYPE_NUMBER { return 0 } switch e.Rows[n][columnId].(type) { case int8: return int64(e.Rows[n][columnId].(int8)) case int32: return int64(e.Rows[n][columnId].(int32)) case int64: return e.Rows[n][columnId].(int64) case int: return int64(e.Rows[n][columnId].(int)) case uint8: return int64(e.Rows[n][columnId].(uint8)) case uint16: return int64(e.Rows[n][columnId].(uint16)) case uint32: return int64(e.Rows[n][columnId].(uint32)) case uint64: return int64(e.Rows[n][columnId].(uint64)) case uint: return int64(e.Rows[n][columnId].(uint)) } return 0 }
Tudo parece lógico, exceto o método getBinlogIdByName ().
Esse auxiliar trivial é necessário para trabalhar com nomes de colunas em vez de seus números de série, o que permite:
- pegue nomes de colunas de anotações de gorm;
- não é necessário fazer edições ao adicionar colunas ao início ou ao meio;
- É banal mais conveniente trabalhar com o campo de nome do que com a coluna número 3.
Como resultado, adicionamos o próprio manipulador:
s.Field(k).SetInt(m.intHelper(e, n, columnName))
Vejamos mais dois exemplos.ENUM: aqui os valores vêm como índice - ou seja, o status "ativo" será como 1. Na maioria dos casos, precisamos de uma representação em string de enum. Pode ser obtido na descrição do campo. Ao analisar valores de enumeração, ele começa a partir de 1, mas a matriz de possíveis valores começa a partir de 0.
Um manipulador de Enum pode ter esta aparência:
func (m *BinlogParser) stringHelper(e *canal.RowsEvent, n int, columnName string) string { columnId := m.getBinlogIdByName(e, columnName) if e.Table.Columns[columnId].Type == schema.TYPE_ENUM { values := e.Table.Columns[columnId].EnumValues // if len(values) == 0 || e.Rows[n][columnId] == nil {{ return "" } return values[e.Rows[n][columnId].(int64)-1] // 0 }
Eu quero armazenar JSONBoa ideia porque não. JSON em termos de mysql é uma string. É necessário indicar de alguma forma que esses dados são serializados - para isso, adicionaremos a anotação não-canônica “fromJson” ao gorm.
Imagine que essa estrutura deva ser considerada:
type JsonData struct { Int int `gorm:"column:int"` StructData TestData `gorm:"column:struct_data;fromJson"` MapData map[string]string `gorm:"column:map_data;fromJson"` SliceData []int `gorm:"column:slice_data;fromJson"` } type TestData struct { Test string `json:"test"` Int int `json:"int"` }
Você pode escrever muitas condições e provavelmente terá sucesso. Mas cada novo tipo de dados mata todos os esforços. Embora a tentativa de encontrar respostas para o stackoverflow - "como trazer e desserializar um tipo desconhecido de estrutura" comece com a frase: "Não está claro por que você precisa disso, mas tente ...".
Depois de converter o tipo desejado para a interface, podemos fazer o seguinte:
if _, ok := parsedTag["FROMJSON"]; ok { newObject := reflect.New(s.Field(k).Type()).Interface() json := m.stringHelper(e, n, columnName) jsoniter.Unmarshal([]byte(json), &newObject) s.Field(k).Set(reflect.ValueOf(newObject).Elem().Convert(s.Field(k).Type())) }
Se você tiver dúvidas sobre os tipos de dados, poderá ver os
testes ou perguntar nos comentários.
O que aconteceu no final .