
Mi nombre es Artyom, trabajo en el grupo Rambler en el proyecto "Stream" en el puesto de desarrollador principal de Go.
Pasamos mucho tiempo domesticando mysql binlog. Este artículo es una historia sobre cómo implementar rápidamente y con un número mínimo de dificultades el mecanismo para trabajar con Go binlogs.
¿Por qué necesitamos esto?
Bajo el capó del Stream hay módulos altamente cargados, donde cada consulta a la base de datos aleja al usuario de recibir el resultado. El almacenamiento en caché es una buena solución, pero ¿cuándo vaciar el caché? Deje que los datos mismos nos digan que han sido actualizados.
En mysql existe la replicación maestro-esclavo. Nuestro demonio puede pretender ser un esclavo y recibir datos a través de binlog. Binlog debe configurarse en formato de fila. Contiene todos los comandos de cambio de la base de datos, los comandos de la transacción se ejecutan solo después de la confirmación. Al alcanzar el tamaño máximo permitido (1 concierto por defecto), se crea el siguiente archivo. Cada archivo nuevo tiene un número de serie después del nombre.
Un poco más de información
aquí o
aquí .
El artículo tiene dos partes:
1. Cómo comenzar a procesar rápidamente las entradas recibidas en el registro.
2. Cómo personalizar y expandir lo que hay debajo del capó.
Parte 1. Comenzamos lo antes posible.
Para trabajar con binlog, utilizaremos la biblioteca
github.com/siddontang/go-mysqlConéctese al nuevo canal (para trabajar con canales,
se requiere el
formato ROW para binlog ).
func binLogListener() { c, err := getDefaultCanal() if err == nil { coords, err := c.GetMasterPos() if err == nil { c.SetEventHandler(&binlogHandler{}) c.RunFrom(coords) } } } func getDefaultCanal() (*canal.Canal, error) { cfg := canal.NewDefaultConfig() cfg.Addr = fmt.Sprintf("%s:%d", "127.0.0.1", 3306) cfg.User = "root" cfg.Password = "root" cfg.Flavor = "mysql" cfg.Dump.ExecutionPath = "" return canal.NewCanal(cfg) }
Cree un contenedor sobre el binlog:
type binlogHandler struct { canal.DummyEventHandler // BinlogParser // } func (h *binlogHandler) OnRow(e *canal.RowsEvent) error {return nil} func (h *binlogHandler) String() string {return "binlogHandler"}
BinlogparserAmpliaremos la lógica de trabajar con la línea binlog resultante agregando lógica al método OnRow ().
func (h *binlogHandler) OnRow(e *canal.RowsEvent) error { var n int // var k int // switch e.Action { case canal.DeleteAction: return nil // case canal.UpdateAction: n = 1 k = 2 case canal.InsertAction: n = 0 k = 1 } for i := n; i < len(e.Rows); i += k { key := e.Table.Schema + "." + e.Table.Name switch key { case User{}.SchemaName() + "." + User{}.TableName(): } } return nil }
La esencia de este contenedor es analizar los datos recibidos. Los datos nos llegan en dos registros para actualizar la línea (la primera línea contendrá los datos originales, la segunda, actualizada). Aquí también consideramos la posibilidad de múltiples inserciones y actualizaciones múltiples. En este caso, necesitaremos tomar cada segundo registro para ACTUALIZAR. Para esto, en el ejemplo anterior, ingresamos n y k.
Creemos un modelo para recibir datos de binlog. En él leeremos los datos de las filas recibidas. En las anotaciones indicamos los nombres de las columnas:
type User struct { Id int `gorm:"column:id"` Name string `gorm:"column:name"` Status string `gorm:"column:status"` Created time.Time `gorm:"column:created"` } func (User) TableName() string { return "User" } func (User) SchemaName() string { return "Test" }
Estructura de tabla en MYSQL:
CREATE TABLE Test.User ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(40) NULL , status ENUM("active","deleted") DEFAULT "active", created TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP ) ENGINE =InnoDB;
Analicemos el análisis en sí mismo: agregue al lugar de preparación para el análisis de datos:
user := User{} h.GetBinLogData(&user, e, i)
En esencia, esto es suficiente: tendremos los datos del nuevo registro en el modelo de usuario, pero para mayor claridad, los mostraremos:
if e.Action == canal.UpdateAction { oldUser := User{} h.GetBinLogData(&oldUser, e, i-1) fmt.Printf("User %d is updated from name %s to name %s\n", user.Id, oldUser.Name, user.Name, ) } else { fmt.Printf("User %d is created with name %s\n", user.Id, user.Name, ) }
El punto principal por el que nos esforzamos es lanzar nuestro "Hola mundo binlog":
func main() { go binLogListener()
A continuación, agregue y actualice los valores:
INSERT INTO Test.User (`id`,`name`) VALUE (1,"Jack"); UPDATE Test.User SET name="Jonh" WHERE id=1;
Veremos:
User 1 is created with name Jack User 1 name changed from Jack to Jonh
El código resultante funciona con binlog y analiza nuevas líneas. Al recibir un registro de la tabla que necesitamos, el código lee los datos en la estructura y muestra el resultado. Detrás de escena estaba el analizador de datos (BinlogParser), que poblaba el modelo.
Parte 2. Como dijo Cobb, necesitamos un nivel más bajo
Considere el trabajo interno del analizador, que se basa en la reflexión.
Para llenar el modelo con datos, utilizamos el método del controlador:
h.GetBinLogData(&user, e, i)
Analiza tipos de datos simples:
bool int float64 string time.Time
y puede analizar estructuras complejas de json.
Si los tipos compatibles no son suficientes para usted, o si simplemente quiere entender cómo funciona el análisis de binlog, puede practicar agregando sus propios tipos.
Primero, considere cómo completar los datos para el campo modelo usando el ejemplo de un campo Id de tipo int:
type User struct { Id int `gorm:"column:id"` }
A través de la reflexión obtenemos el nombre del tipo. El método parseTagSetting convierte las anotaciones en una estructura más conveniente:
element := User{} // , v := reflect.ValueOf(element) s := reflect.Indirect(v) t := s.Type() num := t.NumField() parsedTag := parseTagSetting(t.Field(k).Tag) if columnName, ok = parsedTag["COLUMN"]; !ok || columnName == "COLUMN" { continue } for k := 0; k < num; k++ { name := s.Field(k).Type().Name() switch name { case "int": // } }
Habiendo recibido el tipo int, puede establecer su valor a través del método de reflexión:
func (v Value) SetInt(x int64) {
Método para analizar anotaciones:
func parseTagSetting(tags reflect.StructTag) map[string]string { setting := map[string]string{} for _, str := range []string{tags.Get("sql"), tags.Get("gorm")} { tags := strings.Split(str, ";") for _, value := range tags { v := strings.Split(value, ":") k := strings.TrimSpace(strings.ToUpper(v[0])) if len(v) >= 2 { setting[k] = strings.Join(v[1:], ":") } else { setting[k] = k } } } return setting }
Acepta int64 como entrada. Creemos un método que traduzca los datos recibidos del binlog a int64:
func (m *BinlogParser) intHelper(e *canal.RowsEvent, n int, columnName string) int64 { columnId := m.getBinlogIdByName(e, columnName) if e.Table.Columns[columnId].Type != schema.TYPE_NUMBER { return 0 } switch e.Rows[n][columnId].(type) { case int8: return int64(e.Rows[n][columnId].(int8)) case int32: return int64(e.Rows[n][columnId].(int32)) case int64: return e.Rows[n][columnId].(int64) case int: return int64(e.Rows[n][columnId].(int)) case uint8: return int64(e.Rows[n][columnId].(uint8)) case uint16: return int64(e.Rows[n][columnId].(uint16)) case uint32: return int64(e.Rows[n][columnId].(uint32)) case uint64: return int64(e.Rows[n][columnId].(uint64)) case uint: return int64(e.Rows[n][columnId].(uint)) } return 0 }
Todo parece lógico excepto el método getBinlogIdByName ().
Este ayudante trivial es necesario para trabajar con nombres de columna en lugar de sus números de serie, lo que le permite:
- tomar nombres de columna de anotaciones de gorm;
- no es necesario realizar modificaciones al agregar columnas al principio o al medio;
- Es banal más conveniente trabajar con el campo de nombre que con la columna número 3.
Como resultado, agregamos el controlador en sí:
s.Field(k).SetInt(m.intHelper(e, n, columnName))
Veamos dos ejemplos más.ENUM: aquí los valores vienen como índice, es decir, el estado "activo" vendrá como 1. En la mayoría de los casos, necesitamos una representación de cadena de enum. Se puede obtener de la descripción del campo. Al analizar valores de enumeración, comienza a partir de 1, pero la matriz de valores posibles en sí comienza desde 0.
Un manejador Enum podría verse así:
func (m *BinlogParser) stringHelper(e *canal.RowsEvent, n int, columnName string) string { columnId := m.getBinlogIdByName(e, columnName) if e.Table.Columns[columnId].Type == schema.TYPE_ENUM { values := e.Table.Columns[columnId].EnumValues // if len(values) == 0 || e.Rows[n][columnId] == nil {{ return "" } return values[e.Rows[n][columnId].(int64)-1] // 0 }
Quiero almacenar JSONBuena idea por qué no. JSON en términos de mysql es una cadena. Es necesario indicar de alguna manera que estos datos están serializados, para esto agregaremos la anotación no canónica "fromJson" a Gorm.
Imagine que tal estructura debería considerarse:
type JsonData struct { Int int `gorm:"column:int"` StructData TestData `gorm:"column:struct_data;fromJson"` MapData map[string]string `gorm:"column:map_data;fromJson"` SliceData []int `gorm:"column:slice_data;fromJson"` } type TestData struct { Test string `json:"test"` Int int `json:"int"` }
Puedes escribir muchas condiciones y probablemente tener éxito. Pero cada nuevo tipo de datos matará todos los esfuerzos. Aunque el intento de encontrar respuestas a stackoverflow - "cómo traer y deserializar un tipo desconocido de estructura" comienza con la frase: "No está claro por qué necesita esto, pero intente ...".
Después de enviar el tipo deseado a la interfaz, podemos hacer esto:
if _, ok := parsedTag["FROMJSON"]; ok { newObject := reflect.New(s.Field(k).Type()).Interface() json := m.stringHelper(e, n, columnName) jsoniter.Unmarshal([]byte(json), &newObject) s.Field(k).Set(reflect.ValueOf(newObject).Elem().Convert(s.Field(k).Type())) }
Si tiene preguntas sobre los tipos de datos, puede consultar las
pruebas o hacerlas en los comentarios.
Lo que sucedió al final .