
рдореЗрд░рд╛ рдирд╛рдо рдПрд░реНрдЯреЛрдо рд╣реИ, рдореИрдВ рдЧреЛ рд▓реАрдб рдбреЗрд╡рд▓рдкрд░ рдХреА рд╕реНрдерд┐рддрд┐ рдореЗрдВ рдкреНрд░реЛрдЬреЗрдХреНрдЯ "рд╕реНрдЯреНрд░реАрдо" рдореЗрдВ рд░реИрдореНрдмрд▓рд░ рдЧреНрд░реБрдк рдореЗрдВ рдХрд╛рдо рдХрд░рддрд╛ рд╣реВрдВред
рд╣рдордиреЗ рдмрд╣реБрдд рд╕рдордп рдмрд┐рддрд╛рдпрд╛, рдореИрдХреНрдХрд▓ рдмрд┐рдирд▓реЙрдЧ рдХреЛ рддреИрдпрд╛рд░ рдХрд░рдиреЗ рдореЗрдВред рдпрд╣ рдЖрд▓реЗрдЦ рдПрдХ рдХрд╣рд╛рдиреА рд╣реИ рдХрд┐ рдХреИрд╕реЗ рдЬрд▓реНрджреА рд╕реЗ рдФрд░ рдиреНрдпреВрдирддрдо рд╕рдВрдЦреНрдпрд╛ рдореЗрдВ рдиреБрдХрд╕рд╛рди рдХреЗ рд╕рд╛рде рдЧреЛ рдмрд┐рдирд▓реЙрдЧ рдХреЗ рд╕рд╛рде рдХрд╛рдо рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рддрдВрддреНрд░ рдХреЛ рд▓рд╛рдЧреВ рдХрд░реЗрдВред
рд╣рдореЗрдВ рдЗрд╕рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рдХреНрдпреЛрдВ рд╣реИ?
рд╕реНрдЯреНрд░реАрдо рдХреЗ рд╣реБрдб рдХреЗ рддрд╣рдд рдЕрддреНрдпрдзрд┐рдХ рд▓реЛрдб рдХрд┐рдП рдЧрдП рдореЙрдбреНрдпреВрд▓ рд╣реИрдВ, рдЬрд╣рд╛рдВ рдбреЗрдЯрд╛рдмреЗрд╕ рдореЗрдВ рдкреНрд░рддреНрдпреЗрдХ рдХреНрд╡реЗрд░реА рдЙрдкрдпреЛрдЧрдХрд░реНрддрд╛ рдХреЛ рдкрд░рд┐рдгрд╛рдо рдкреНрд░рд╛рдкреНрдд рдХрд░рдиреЗ рд╕реЗ рджреВрд░ рд▓реЗ рдЬрд╛рддреА рд╣реИред рдХреИрд╢рд┐рдВрдЧ рдПрдХ рдЕрдЪреНрдЫрд╛ рд╕рдорд╛рдзрд╛рди рд╣реИ, рд▓реЗрдХрд┐рди рдХреИрд╢ рдХреЛ рдХрдм рдлреНрд▓рд╢ рдХрд░рдирд╛ рд╣реИ? рдбреЗрдЯрд╛ рдЦреБрдж рдмрддрд╛рдПрдВ рдХрд┐ рд╡реЗ рдЕрдкрдбреЗрдЯ рдХрд┐рдП рдЧрдП рд╣реИрдВред
Mysql рдореЗрдВ рдорд╛рд╕реНрдЯрд░-рд╕реНрд▓реЗрд╡ рдкреНрд░рддрд┐рдХреГрддрд┐ рдХреЗ рд░реВрдк рдореЗрдВ рдРрд╕реА рдЪреАрдЬ рд╣реИред рд╣рдорд╛рд░рд╛ рдбреЗрдорди рдПрдХ рджрд╛рд╕ рд╣реЛрдиреЗ рдХрд╛ рджрд┐рдЦрд╛рд╡рд╛ рдХрд░ рд╕рдХрддрд╛ рд╣реИ рдФрд░ рдмрд┐рдирд▓реЛрдЧ рдХреЗ рдорд╛рдзреНрдпрдо рд╕реЗ рдбреЗрдЯрд╛ рдкреНрд░рд╛рдкреНрдд рдХрд░ рд╕рдХрддрд╛ рд╣реИред рдмрд┐рдирд▓реЙрдЧ рдХреЛ рдкрдВрдХреНрддрд┐ рдкреНрд░рд╛рд░реВрдк рдореЗрдВ рдХреЙрдиреНрдлрд╝рд┐рдЧрд░ рдХрд┐рдпрд╛ рдЬрд╛рдирд╛ рдЪрд╛рд╣рд┐рдПред рдЗрд╕рдореЗрдВ рд╕рднреА рдбреЗрдЯрд╛рдмреЗрд╕ рдкрд░рд┐рд╡рд░реНрддрди рдХрдорд╛рдВрдб рд╢рд╛рдорд┐рд▓ рд╣реИрдВ, рд▓реЗрди-рджреЗрди рдХреЗ рддрд╣рдд рдЖрджреЗрд╢ рдкреНрд░рддрд┐рдмрджреНрдз рд╣реЛрдиреЗ рдХреЗ рдмрд╛рдж рд╣реА рдирд┐рд╖реНрдкрд╛рджрд┐рдд рдХрд┐рдП рдЬрд╛рддреЗ рд╣реИрдВред рдЕрдзрд┐рдХрддрдо рдЕрдиреБрдордд рдЖрдХрд╛рд░ (рдбрд┐рдлрд╝реЙрд▓реНрдЯ рд░реВрдк рд╕реЗ 1 рдЧреАрдЧрд╛) рддрдХ рдкрд╣реБрдВрдЪрдиреЗ рдкрд░, рдирд┐рдореНрди рдлрд╝рд╛рдЗрд▓ рдмрдирд╛рдИ рдЬрд╛рддреА рд╣реИред рдкреНрд░рддреНрдпреЗрдХ рдирдИ рдлрд╝рд╛рдЗрд▓ рдореЗрдВ рдирд╛рдо рдХреЗ рдмрд╛рдж рдПрдХ рд╕реАрд░рд┐рдпрд▓ рдирдВрдмрд░ рд╣реЛрддрд╛ рд╣реИред
рдпрд╣рд╛рдБ рдпрд╛
рдпрд╣рд╛рдБ рдереЛрдбрд╝реА рдЕрдзрд┐рдХ рдЬрд╛рдирдХрд╛рд░реАред
рд▓реЗрдЦ рдХреЗ рджреЛ рднрд╛рдЧ рд╣реИрдВ:
1. рд▓реЙрдЧ рдореЗрдВ рдкреНрд░рд╛рдкреНрдд рдкреНрд░рд╡рд┐рд╖реНрдЯрд┐рдпреЛрдВ рдХреЛ рдХреИрд╕реЗ рдЬрд▓реНрджреА рд╕реЗ рд╕рдВрд╕рд╛рдзрд┐рдд рдХрд░рдирд╛ рд╢реБрд░реВ рдХрд░реЗрдВред
2. рд╣реБрдб рдХреЗ рддрд╣рдд рдХреНрдпрд╛ рдЕрдиреБрдХреВрд▓рд┐рдд рдФрд░ рд╡рд┐рд╕реНрддрд╛рд░рд┐рдд рдХрд░рдирд╛ рд╣реИред
рднрд╛рдЧ 1. рд╣рдо рдЬрд▓реНрдж рд╕реЗ рдЬрд▓реНрдж рд╢реБрд░реВ рдХрд░рддреЗ рд╣реИрдВред
рдмрд┐рдирд▓реЙрдЧ рдХреЗ рд╕рд╛рде рдХрд╛рдо рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП, рд╣рдо
github.com/siddontang/go-mysql рд▓рд╛рдЗрдмреНрд░реЗрд░реА рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░реЗрдВрдЧреЗ
рдирдП рдЪреИрдирд▓ рд╕реЗ рдХрдиреЗрдХреНрдЯ рдХрд░реЗрдВ (рдЪреИрдирд▓реЛрдВ рдХреЗ рд╕рд╛рде рдХрд╛рдо рдХрд░рдиреЗ рдХреЗ
рд▓рд┐рдП, рдмрд┐рдирд▓реЙрдЧ рдХреЗ рд▓рд┐рдП рдЖрд░рдУрдбрдмреНрд▓реНрдпреВ рдкреНрд░рд╛рд░реВрдк рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИ )ред
func binLogListener() { c, err := getDefaultCanal() if err == nil { coords, err := c.GetMasterPos() if err == nil { c.SetEventHandler(&binlogHandler{}) c.RunFrom(coords) } } } func getDefaultCanal() (*canal.Canal, error) { cfg := canal.NewDefaultConfig() cfg.Addr = fmt.Sprintf("%s:%d", "127.0.0.1", 3306) cfg.User = "root" cfg.Password = "root" cfg.Flavor = "mysql" cfg.Dump.ExecutionPath = "" return canal.NewCanal(cfg) }
рдмрд┐рдирд▓реЛрдЧ рдХреЗ рдКрдкрд░ рдПрдХ рдЖрд╡рд░рдг рдмрдирд╛рдПрдБ:
type binlogHandler struct { canal.DummyEventHandler // BinlogParser // } func (h *binlogHandler) OnRow(e *canal.RowsEvent) error {return nil} func (h *binlogHandler) String() string {return "binlogHandler"}
BinlogParserрд╣рдо OnRow () рд╡рд┐рдзрд┐ рдореЗрдВ рддрд░реНрдХ рдЬреЛрдбрд╝рдХрд░ рдкрд░рд┐рдгрд╛рдореА Binlog рд▓рд╛рдЗрди рдХреЗ рд╕рд╛рде рдХрд╛рдо рдХрд░рдиреЗ рдХреЗ рддрд░реНрдХ рдХрд╛ рд╡рд┐рд╕реНрддрд╛рд░ рдХрд░реЗрдВрдЧреЗред
func (h *binlogHandler) OnRow(e *canal.RowsEvent) error { var n int // var k int // switch e.Action { case canal.DeleteAction: return nil // case canal.UpdateAction: n = 1 k = 2 case canal.InsertAction: n = 0 k = 1 } for i := n; i < len(e.Rows); i += k { key := e.Table.Schema + "." + e.Table.Name switch key { case User{}.SchemaName() + "." + User{}.TableName(): } } return nil }
рдЗрд╕ рдЖрд╡рд░рдг рдХрд╛ рд╕рд╛рд░ рдкреНрд░рд╛рдкреНрдд рдЖрдВрдХрдбрд╝реЛрдВ рдХреЛ рдкрд╛рд░реНрд╕ рдХрд░рдирд╛ рд╣реИред рд▓рд╛рдЗрди рдХреЛ рдЕрдкрдбреЗрдЯ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдбреЗрдЯрд╛ рджреЛ рд░рд┐рдХреЙрд░реНрдб рдореЗрдВ рдЖрддрд╛ рд╣реИ (рдкрд╣рд▓реА рдкрдВрдХреНрддрд┐ рдореЗрдВ рдореВрд▓ рдбреЗрдЯрд╛ рд╣реЛрдЧрд╛, рджреВрд╕рд░рд╛ - рдЕрдкрдбреЗрдЯ рдХрд┐рдпрд╛ рдЧрдпрд╛)ред рдпрд╣рд╛рдВ рд╣рдо рдмрд╣реБ-рдЖрд╡реЗрд╖рдг рдФрд░ рдмрд╣реБ-рдЕрджреНрдпрддрди рдХреА рд╕рдВрднрд╛рд╡рдирд╛ рдкрд░ рднреА рд╡рд┐рдЪрд╛рд░ рдХрд░рддреЗ рд╣реИрдВред рдЗрд╕ рд╕реНрдерд┐рддрд┐ рдореЗрдВ, рд╣рдореЗрдВ UPDATE рдХреЗ рд▓рд┐рдП рд╣рд░ рджреВрд╕рд░рд╛ рд░рд┐рдХреЙрд░реНрдб рд▓реЗрдирд╛ рд╣реЛрдЧрд╛ред рдЗрд╕рдХреЗ рд▓рд┐рдП, рдКрдкрд░ рдХреЗ рдЙрджрд╛рд╣рд░рдг рдореЗрдВ, рд╣рдордиреЗ n рдФрд░ k рдореЗрдВ рдкреНрд░рд╡реЗрд╢ рдХрд┐рдпрд╛ред
рдЖрдЗрдП, Binlog рд╕реЗ рдбреЗрдЯрд╛ рдкреНрд░рд╛рдкреНрдд рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдПрдХ рдореЙрдбрд▓ рдмрдирд╛рдПрдВред рдЗрд╕рдореЗрдВ рд╣рдо рдкреНрд░рд╛рдкреНрдд рдкрдВрдХреНрддрд┐рдпреЛрдВ рд╕реЗ рдбреЗрдЯрд╛ рдкрдврд╝реЗрдВрдЧреЗред рдПрдиреЛрдЯреЗрд╢рди рдореЗрдВ рд╣рдо рдХреЙрд▓рдо рдХреЗ рдирд╛рдо рджрд░реНрд╢рд╛рддреЗ рд╣реИрдВ:
type User struct { Id int `gorm:"column:id"` Name string `gorm:"column:name"` Status string `gorm:"column:status"` Created time.Time `gorm:"column:created"` } func (User) TableName() string { return "User" } func (User) SchemaName() string { return "Test" }
MYSQL рдореЗрдВ рддрд╛рд▓рд┐рдХрд╛ рд╕рдВрд░рдЪрдирд╛:
CREATE TABLE Test.User ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(40) NULL , status ENUM("active","deleted") DEFAULT "active", created TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP ) ENGINE =InnoDB;
рдЪрд▓реЛ рд╕реНрд╡рдпрдВ рдкрд╛рд░реНрд╕рд┐рдВрдЧ рдХрд╛ рд╡рд┐рд╢реНрд▓реЗрд╖рдг рдХрд░реЗрдВ - рдбреЗрдЯрд╛ рдкрд╛рд░реНрд╕рд┐рдВрдЧ рдХреЗ рд▓рд┐рдП рддреИрдпрд╛рд░реА рдХреА рдЬрдЧрд╣ рдореЗрдВ рдЬреЛрдбрд╝реЗрдВ:
user := User{} h.GetBinLogData(&user, e, i)
рд╕рдВрдХреНрд╖реЗрдк рдореЗрдВ, рдпрд╣ рдкрд░реНрдпрд╛рдкреНрдд рд╣реИ - рд╣рдорд╛рд░реЗ рдкрд╛рд╕ рдЙрдкрдпреЛрдЧрдХрд░реНрддрд╛ рдореЙрдбрд▓ рдореЗрдВ рдирдП рд░рд┐рдХреЙрд░реНрдб рдХрд╛ рдбреЗрдЯрд╛ рд╣реЛрдЧрд╛, рд▓реЗрдХрд┐рди рд╕реНрдкрд╖реНрдЯрддрд╛ рдХреЗ рд▓рд┐рдП, рд╣рдо рдЙрдиреНрд╣реЗрдВ рдкреНрд░рджрд░реНрд╢рд┐рдд рдХрд░реЗрдВрдЧреЗ:
if e.Action == canal.UpdateAction { oldUser := User{} h.GetBinLogData(&oldUser, e, i-1) fmt.Printf("User %d is updated from name %s to name %s\n", user.Id, oldUser.Name, user.Name, ) } else { fmt.Printf("User %d is created with name %s\n", user.Id, user.Name, ) }
рдореБрдЦреНрдп рдмрд┐рдВрджреБ рдЬрд┐рд╕рдХреЗ рд▓рд┐рдП рд╣рдо рдкреНрд░рдпрд╛рд╕ рдХрд░ рд░рд╣реЗ рдереЗ, рд╡рд╣ рд╣реИ рд╣рдорд╛рд░реА "рд╣реИрд▓реЛ рдмрд┐рдирд▓реЙрдЧ рд╡рд░реНрд▓реНрдб" рдХреЛ рд▓реЙрдиреНрдЪ рдХрд░рдирд╛:
func main() { go binLogListener()
рдЕрдЧрд▓рд╛, рдорд╛рди рдЬреЛрдбрд╝реЗрдВ рдФрд░ рдЕрдкрдбреЗрдЯ рдХрд░реЗрдВ:
INSERT INTO Test.User (`id`,`name`) VALUE (1,"Jack"); UPDATE Test.User SET name="Jonh" WHERE id=1;
рд╣рдо рджреЗрдЦреЗрдВрдЧреЗ:
User 1 is created with name Jack User 1 name changed from Jack to Jonh
рдкрд░рд┐рдгрд╛рдореА рдХреЛрдб рдмрд┐рдирд▓реЙрдЧ рдФрд░ рдкрд╛рд░реНрд╕ рдирдИ рд▓рд╛рдЗрдиреЛрдВ рдХреЗ рд╕рд╛рде рдХрд╛рдо рдХрд░рддрд╛ рд╣реИред рдЬрдм рд╣рдореЗрдВ рдЬрд┐рд╕ рддрд╛рд▓рд┐рдХрд╛ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реЛрддреА рд╣реИ, рдЙрд╕рд╕реЗ рд░рд┐рдХреЙрд░реНрдб рдкреНрд░рд╛рдкреНрдд рдХрд░рддреЗ рд╣реИрдВ, рддреЛ рдХреЛрдб рдбреЗрдЯрд╛ рдХреЛ рд╕рдВрд░рдЪрдирд╛ рдореЗрдВ рдкрдврд╝рддрд╛ рд╣реИ рдФрд░ рдкрд░рд┐рдгрд╛рдо рдкреНрд░рджрд░реНрд╢рд┐рдд рдХрд░рддрд╛ рд╣реИред рдкрд░реНрджреЗ рдХреЗ рдкреАрдЫреЗ рдбреЗрдЯрд╛ рдкрд╛рд░реНрд╕рд░ (BinlogParser) рдерд╛, рдЬрд┐рд╕рдиреЗ рдореЙрдбрд▓ рдХреЛ рдЖрдмрд╛рдж рдХрд┐рдпрд╛ред
рднрд╛рдЧ 2. рдЬреИрд╕рд╛ рдХрд┐ рдХреЙрдм рдиреЗ рдХрд╣рд╛, рд╣рдореЗрдВ рдПрдХ рд╕реНрддрд░ рдХрдо рдЪрд╛рд╣рд┐рдП
рдкрд╛рд░реНрд╕рд░ рдХреЗ рдЖрдВрддрд░рд┐рдХ рдХрд╛рдо рдкрд░ рд╡рд┐рдЪрд╛рд░ рдХрд░реЗрдВ, рдЬреЛ рдкреНрд░рддрд┐рдмрд┐рдВрдм рдкрд░ рдЖрдзрд╛рд░рд┐рдд рд╣реИред
рдореЙрдбрд▓ рдХреЛ рдбреЗрдЯрд╛ рд╕реЗ рднрд░рдиреЗ рдХреЗ рд▓рд┐рдП, рд╣рдордиреЗ рд╣реИрдВрдбрд▓рд░ рд╡рд┐рдзрд┐ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд┐рдпрд╛:
h.GetBinLogData(&user, e, i)
рдпрд╣ рд╕рд░рд▓ рдбреЗрдЯрд╛ рдкреНрд░рдХрд╛рд░реЛрдВ рдХреЛ рдкрд╛рд░реНрд╕ рдХрд░рддрд╛ рд╣реИ:
bool int float64 string time.Time
рдФрд░ json рд╕реЗ рдЬрдЯрд┐рд▓ рд╕рдВрд░рдЪрдирд╛рдУрдВ рдХреЛ рдкрд╛рд░реНрд╕ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВред
рдпрджрд┐ рд╕рдорд░реНрдерд┐рдд рдкреНрд░рдХрд╛рд░ рдЖрдкрдХреЗ рд▓рд┐рдП рдкрд░реНрдпрд╛рдкреНрдд рдирд╣реАрдВ рд╣реИрдВ, рдпрд╛ рдЖрдк рдмрд╕ рдпрд╣ рд╕рдордЭрдирд╛ рдЪрд╛рд╣рддреЗ рд╣реИрдВ рдХрд┐ рдмрд┐рдирд▓реЙрдЧ рдкрд╛рд░реНрд╕рд┐рдВрдЧ рдХреИрд╕реЗ рдХрд╛рдо рдХрд░рддрд╛ рд╣реИ, рддреЛ рдЖрдк рдЕрдкрдиреЗ рд╕реНрд╡рдпрдВ рдХреЗ рдкреНрд░рдХрд╛рд░реЛрдВ рдХреЛ рдЬреЛрдбрд╝рдиреЗ рдХрд╛ рдЕрднреНрдпрд╛рд╕ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВред
рдкрд╣рд▓реЗ, рд╡рд┐рдЪрд╛рд░ рдХрд░реЗрдВ рдХрд┐ рдЖрджрд░реНрд╢ рдлрд╝реАрд▓реНрдб рдХреЗ Id рдлрд╝реАрд▓реНрдб рдХреЗ рдЙрджрд╛рд╣рд░рдг рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рдореЙрдбрд▓ рдлрд╝реАрд▓реНрдб рдХреЗ рд▓рд┐рдП рдбреЗрдЯрд╛ рдХреИрд╕реЗ рднрд░реЗрдВ:
type User struct { Id int `gorm:"column:id"` }
рдкреНрд░рддрд┐рдмрд┐рдВрдм рдХреЗ рдорд╛рдзреНрдпрдо рд╕реЗ рд╣рдореЗрдВ рдЯрд╛рдЗрдк рдирд╛рдо рдорд┐рд▓рддрд╛ рд╣реИред ParseTagSetting рд╡рд┐рдзрд┐ рдПрдиреЛрдЯреЗрд╢рди рдХреЛ рдЕрдзрд┐рдХ рд╕реБрд╡рд┐рдзрд╛рдЬрдирдХ рд╕рдВрд░рдЪрдирд╛ рдореЗрдВ рдкрд░рд┐рд╡рд░реНрддрд┐рдд рдХрд░рддреА рд╣реИ:
element := User{} // , v := reflect.ValueOf(element) s := reflect.Indirect(v) t := s.Type() num := t.NumField() parsedTag := parseTagSetting(t.Field(k).Tag) if columnName, ok = parsedTag["COLUMN"]; !ok || columnName == "COLUMN" { continue } for k := 0; k < num; k++ { name := s.Field(k).Type().Name() switch name { case "int": // } }
рдЕрдВрддрд░ рдкреНрд░рдХрд╛рд░ рдкреНрд░рд╛рдкреНрдд рдХрд░рдиреЗ рдХреЗ рдмрд╛рдж, рдЖрдк рдкреНрд░рддрд┐рдмрд┐рдВрдм рд╡рд┐рдзрд┐ рдХреЗ рдорд╛рдзреНрдпрдо рд╕реЗ рдЗрд╕рдХрд╛ рдорд╛рди рдирд┐рд░реНрдзрд╛рд░рд┐рдд рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ:
func (v Value) SetInt(x int64) {
рдПрдиреЛрдЯреЗрд╢рди рдХреЛ рдкрд╛рд░реНрд╕ рдХрд░рдиреЗ рдХреА рд╡рд┐рдзрд┐:
func parseTagSetting(tags reflect.StructTag) map[string]string { setting := map[string]string{} for _, str := range []string{tags.Get("sql"), tags.Get("gorm")} { tags := strings.Split(str, ";") for _, value := range tags { v := strings.Split(value, ":") k := strings.TrimSpace(strings.ToUpper(v[0])) if len(v) >= 2 { setting[k] = strings.Join(v[1:], ":") } else { setting[k] = k } } } return setting }
рдпрд╣ рдЗрдирдкреБрдЯ рдХреЗ рд░реВрдк рдореЗрдВ int64 рдХреЛ рд╕реНрд╡реАрдХрд╛рд░ рдХрд░рддрд╛ рд╣реИред рдЖрдЗрдП рдПрдХ рдРрд╕реА рд╡рд┐рдзрд┐ рдмрдирд╛рдПрдВ рдЬреЛ рдкреНрд░рд╛рдкреНрдд рдбреЗрдЯрд╛ рдХреЛ рдмрд┐рдирд▓реЙрдЧ рд╕реЗ рдЗрдВрдЯ64 рдореЗрдВ рдЕрдиреБрд╡рд╛рдж рдХрд░рддрд╛ рд╣реИ:
func (m *BinlogParser) intHelper(e *canal.RowsEvent, n int, columnName string) int64 { columnId := m.getBinlogIdByName(e, columnName) if e.Table.Columns[columnId].Type != schema.TYPE_NUMBER { return 0 } switch e.Rows[n][columnId].(type) { case int8: return int64(e.Rows[n][columnId].(int8)) case int32: return int64(e.Rows[n][columnId].(int32)) case int64: return e.Rows[n][columnId].(int64) case int: return int64(e.Rows[n][columnId].(int)) case uint8: return int64(e.Rows[n][columnId].(uint8)) case uint16: return int64(e.Rows[n][columnId].(uint16)) case uint32: return int64(e.Rows[n][columnId].(uint32)) case uint64: return int64(e.Rows[n][columnId].(uint64)) case uint: return int64(e.Rows[n][columnId].(uint)) } return 0 }
GetBinlogIdByName () рдкрджреНрдзрддрд┐ рдХреЛ рдЫреЛрдбрд╝рдХрд░ рд╕рдм рдХреБрдЫ рддрд╛рд░реНрдХрд┐рдХ рджрд┐рдЦрддрд╛ рд╣реИред
рдЗрд╕ рддреБрдЪреНрдЫ рд╕рд╣рд╛рдпрдХ рдХреЛ рдЙрдирдХреЗ рд╕реАрд░рд┐рдпрд▓ рдирдВрдмрд░реЛрдВ рдХреЗ рдмрдЬрд╛рдп рдХреЙрд▓рдо рдирд╛рдореЛрдВ рдХреЗ рд╕рд╛рде рдХрд╛рдо рдХрд░рдиреЗ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИ, рдЬреЛ рдЖрдкрдХреЛ рдирд┐рдореНрди рдХреА рдЕрдиреБрдорддрд┐ рджреЗрддрд╛ рд╣реИ:
- рдЧреЙрд░реНрдо рдПрдиреЛрдЯреЗрд╢рди рд╕реЗ рдХреЙрд▓рдо рдирд╛рдо рд▓реЗрдВ;
- рд╢реБрд░реБрдЖрдд рдпрд╛ рдордзреНрдп рдореЗрдВ рдХреЙрд▓рдо рдЬреЛрдбрд╝рддреЗ рд╕рдордп рд╕рдВрдкрд╛рджрди рдХрд░рдиреЗ рдХреА рдХреЛрдИ рдЖрд╡рд╢реНрдпрдХрддрд╛ рдирд╣реАрдВ рд╣реИ;
- рдХреЙрд▓рдо рдирдВрдмрд░ 3 рдХреА рддреБрд▓рдирд╛ рдореЗрдВ рдирд╛рдо рдХреНрд╖реЗрддреНрд░ рдХреЗ рд╕рд╛рде рдХрд╛рдо рдХрд░рдирд╛ рдЕрдзрд┐рдХ рд╕реБрд╡рд┐рдзрд╛рдЬрдирдХ рд╣реИред
рдкрд░рд┐рдгрд╛рдорд╕реНрд╡рд░реВрдк, рд╣рдо рд╣реИрдВрдбрд▓рд░ рдХреЛ рд╕реНрд╡рдпрдВ рдЬреЛрдбрд╝рддреЗ рд╣реИрдВ:
s.Field(k).SetInt(m.intHelper(e, n, columnName))
рдЖрдЗрдП рджреЛ рдФрд░ рдЙрджрд╛рд╣рд░рдг рджреЗрдЦреЗрдВредENUM: рдпрд╣рд╛рдВ рдореВрд▓реНрдп рд╕реВрдЪрдХрд╛рдВрдХ рдХреЗ рд░реВрдк рдореЗрдВ рдЖрддреЗ рд╣реИрдВ - рдЕрд░реНрдерд╛рдд, "рд╕рдХреНрд░рд┐рдп" рд╕реНрдерд┐рддрд┐ 1 рдХреЗ рд░реВрдк рдореЗрдВ рдЖрдПрдЧреАред рдЬреНрдпрд╛рджрд╛рддрд░ рдорд╛рдорд▓реЛрдВ рдореЗрдВ, рд╣рдореЗрдВ рдПрдирдо рдХреЗ рдПрдХ рд╕реНрдЯреНрд░рд┐рдВрдЧ рдкреНрд░рддрд┐рдирд┐рдзрд┐рддреНрд╡ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИред рдЗрд╕реЗ рдХреНрд╖реЗрддреНрд░ рд╡рд┐рд╡рд░рдг рд╕реЗ рдкреНрд░рд╛рдкреНрдд рдХрд┐рдпрд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИред рдПрдирдо рдореВрд▓реНрдпреЛрдВ рдХреЛ рдкрд╛рд░реНрд╕ рдХрд░рддреЗ рд╕рдордп, рдпрд╣ 1 рд╕реЗ рд╢реБрд░реВ рд╣реЛрддрд╛ рд╣реИ, рд▓реЗрдХрд┐рди рд╕рдВрднрд╛рд╡рд┐рдд рдорд╛рдиреЛрдВ рдХреА рд╕рд░рдгреА рд╕реНрд╡рдпрдВ 0 рд╕реЗ рд╢реБрд░реВ рд╣реЛрддреА рд╣реИред
рдПрдХ Enum рд╣реИрдВрдбрд▓рд░ рдЗрд╕ рддрд░рд╣ рд▓рдЧ рд╕рдХрддрд╛ рд╣реИ:
func (m *BinlogParser) stringHelper(e *canal.RowsEvent, n int, columnName string) string { columnId := m.getBinlogIdByName(e, columnName) if e.Table.Columns[columnId].Type == schema.TYPE_ENUM { values := e.Table.Columns[columnId].EnumValues // if len(values) == 0 || e.Rows[n][columnId] == nil {{ return "" } return values[e.Rows[n][columnId].(int64)-1] // 0 }
рдореИрдВ JSON рд╕реНрдЯреЛрд░ рдХрд░рдирд╛ рдЪрд╛рд╣рддрд╛ рд╣реВрдВрдЕрдЪреНрдЫрд╛ рд╡рд┐рдЪрд╛рд░ рдХреНрдпреЛрдВ рдирд╣реАрдВред JSON mysql рдХреЗ рд╕рдВрджрд░реНрдн рдореЗрдВ рдПрдХ рд╕реНрдЯреНрд░рд┐рдВрдЧ рд╣реИред рдХрд┐рд╕реА рддрд░рд╣ рдпрд╣ рдЗрдВрдЧрд┐рдд рдХрд░рдирд╛ рдЖрд╡рд╢реНрдпрдХ рд╣реИ рдХрд┐ рдпрд╣ рдбреЗрдЯрд╛ рдХреНрд░рдордмрджреНрдз рд╣реИ - рдЗрд╕рдХреЗ рд▓рд┐рдП рд╣рдо рдЧреИрд░-рд╡реИрдЬреНрдЮрд╛рдирд┐рдХ рдПрдиреЛрдЯреЗрд╢рди "Json "рд╕реЗ рдЧреЙрд░реНрдо рддрдХ рдЬреЛрдбрд╝ рджреЗрдВрдЧреЗред
рдХрд▓реНрдкрдирд╛ рдХрд░реЗрдВ рдХрд┐ рдЗрд╕ рддрд░рд╣ рдХреА рд╕рдВрд░рдЪрдирд╛ рдкрд░ рд╡рд┐рдЪрд╛рд░ рдХрд┐рдпрд╛ рдЬрд╛рдирд╛ рдЪрд╛рд╣рд┐рдП:
type JsonData struct { Int int `gorm:"column:int"` StructData TestData `gorm:"column:struct_data;fromJson"` MapData map[string]string `gorm:"column:map_data;fromJson"` SliceData []int `gorm:"column:slice_data;fromJson"` } type TestData struct { Test string `json:"test"` Int int `json:"int"` }
рдЖрдк рдмрд╣реБрдд рд╕рд╛рд░реА рд╢рд░реНрддреЗрдВ рд▓рд┐рдЦ рд╕рдХрддреЗ рд╣реИрдВ рдФрд░ рд╢рд╛рдпрдж рд╕рдлрд▓ рд╣реЛ рд╕рдХрддреЗ рд╣реИрдВред рд▓реЗрдХрд┐рди рдкреНрд░рддреНрдпреЗрдХ рдирдпрд╛ рдбреЗрдЯрд╛ рдкреНрд░рдХрд╛рд░ рд╕рднреА рдкреНрд░рдпрд╛рд╕реЛрдВ рдХреЛ рдорд╛рд░ рджреЗрдЧрд╛ред рдпрджреНрдпрдкрд┐ рд╕реНрдЯреИрдХрдУрд╡рд░рдлрд╝реНрд▓реЛ рдХреЗ рдЙрддреНрддрд░ рдЦреЛрдЬрдиреЗ рдХрд╛ рдкреНрд░рдпрд╛рд╕ - "рдХреИрд╕реЗ рдПрдХ рдЕрдЬреНрдЮрд╛рдд рдкреНрд░рдХрд╛рд░ рдХреА рд╕рдВрд░рдЪрдирд╛ рдХреЛ рд▓рд╛рдиреЗ рдФрд░ рдбрд┐рд╕реНрдХреНрд░рд╛рдЗрдм рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП" рд╡рд╛рдХреНрдпрд╛рдВрд╢ рдХреЗ рд╕рд╛рде рд╢реБрд░реВ рд╣реЛрддрд╛ рд╣реИ: "рдпрд╣ рд╕реНрдкрд╖реНрдЯ рдирд╣реАрдВ рд╣реИ рдХрд┐ рдЖрдкрдХреЛ рдЗрд╕рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рдХреНрдпреЛрдВ рд╣реИ, рд▓реЗрдХрд┐рди рдХреЛрд╢рд┐рд╢ рдХрд░реЗрдВ ..."ред
рдЗрдВрдЯрд░рдлрд╝реЗрд╕ рдХреЛ рд╡рд╛рдВрдЫрд┐рдд рдкреНрд░рдХрд╛рд░ рджреЗрдиреЗ рдХреЗ рдмрд╛рдж, рд╣рдо рдпрд╣ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ:
if _, ok := parsedTag["FROMJSON"]; ok { newObject := reflect.New(s.Field(k).Type()).Interface() json := m.stringHelper(e, n, columnName) jsoniter.Unmarshal([]byte(json), &newObject) s.Field(k).Set(reflect.ValueOf(newObject).Elem().Convert(s.Field(k).Type())) }
рдпрджрд┐ рдЖрдкрдХреЗ рдкрд╛рд╕ рдбреЗрдЯрд╛ рдкреНрд░рдХрд╛рд░реЛрдВ рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рдкреНрд░рд╢реНрди рд╣реИрдВ, рддреЛ рдЖрдк
рдкрд░реАрдХреНрд╖рдгреЛрдВ рдХреЛ рджреЗрдЦ рд╕рдХрддреЗ рд╣реИрдВ рдпрд╛ рдЯрд┐рдкреНрдкрдгрд┐рдпреЛрдВ рдореЗрдВ рдкреВрдЫ рд╕рдХрддреЗ рд╣реИрдВред
рдЖрдЦрд┐рд░ рдореЗрдВ рдХреНрдпрд╛ рд╣реБрдЖ ред