كيفية طهي الخلية الخلية مع الذهاب



اسمي Artyom ، أعمل في مجموعة Rambler في مشروع "Stream" في منصب Go developer developer.
لقد أمضينا الكثير من الوقت في ترويض mysql binlog. هذه المقالة هي قصة حول كيفية تنفيذ آلية العمل مع bin binss بسرعة وبحد أدنى من المزالق.

لماذا نحتاج هذا؟


تحت غطاء الدفق هناك وحدات تحميل عالية ، حيث يؤدي كل استعلام إلى قاعدة البيانات إلى نقل المستخدم بعيدًا عن تلقي النتيجة. يعد التخزين المؤقت حلاً جيدًا ، ولكن متى يتم مسح ذاكرة التخزين المؤقت؟ دع البيانات نفسها تخبرنا أنه قد تم تحديثها.

في mysql هناك شيء مثل النسخ العبد الرئيسي. يمكن أن يتظاهر خدمتنا بكوننا عبداً ويستقبلون البيانات عبر binlog. يجب تكوين Binlog في تنسيق الصف. يحتوي على جميع أوامر تغيير قاعدة البيانات ، يتم تنفيذ الأوامر تحت المعاملة فقط بعد الالتزام. عند الوصول إلى الحجم الأقصى المسموح به (1 أزعج افتراضيًا) ، يتم إنشاء الملف التالي. يحتوي كل ملف جديد على رقم تسلسلي بعد الاسم.

مزيد من المعلومات هنا أو هنا .

تحتوي المقالة على جزأين:

1. كيفية البدء بسرعة في معالجة الإدخالات الواردة في السجل.
2. كيفية تخصيص وتوسيع ما تحت غطاء محرك السيارة.

الجزء 1. نبدأ في أقرب وقت ممكن.


للعمل مع binlog ، سنستخدم مكتبة github.com/siddontang/go-mysql
اتصل بالقناة الجديدة (للعمل مع القنوات ، يلزم تنسيق ROW لـ binlog ).

func binLogListener() { c, err := getDefaultCanal() if err == nil { coords, err := c.GetMasterPos() if err == nil { c.SetEventHandler(&binlogHandler{}) c.RunFrom(coords) } } } func getDefaultCanal() (*canal.Canal, error) { cfg := canal.NewDefaultConfig() cfg.Addr = fmt.Sprintf("%s:%d", "127.0.0.1", 3306) cfg.User = "root" cfg.Password = "root" cfg.Flavor = "mysql" cfg.Dump.ExecutionPath = "" return canal.NewCanal(cfg) } 

قم بإنشاء مجمّع فوق binlog:

 type binlogHandler struct { canal.DummyEventHandler //     BinlogParser //      } func (h *binlogHandler) OnRow(e *canal.RowsEvent) error {return nil} func (h *binlogHandler) String() string {return "binlogHandler"} 

Binlogparser

سنقوم بتوسيع منطق العمل مع سطر binlog الناتج عن طريق إضافة منطق إلى أسلوب OnRow ().

 func (h *binlogHandler) OnRow(e *canal.RowsEvent) error { var n int //  var k int //  switch e.Action { case canal.DeleteAction: return nil //    case canal.UpdateAction: n = 1 k = 2 case canal.InsertAction: n = 0 k = 1 } for i := n; i < len(e.Rows); i += k { key := e.Table.Schema + "." + e.Table.Name switch key { case User{}.SchemaName() + "." + User{}.TableName(): /*    */ } } return nil } 

جوهر هذا المجمع هو تحليل البيانات المستلمة. تأتي البيانات إلينا في سجلين لتحديث السطر (سيحتوي السطر الأول على البيانات الأصلية ، والثاني - محدث). هنا نأخذ في الاعتبار أيضًا إمكانية الإدخالات المتعددة والتحديثات المتعددة. في هذه الحالة ، سنحتاج إلى تسجيل كل ثاني ثانية لـ UPDATE. لهذا ، في المثال أعلاه ، أدخلنا n و k.

لنقم بإنشاء نموذج لتلقي البيانات من binlog. سنقرأ فيه البيانات من الصفوف المستلمة. في التعليقات التوضيحية نشير إلى أسماء الأعمدة:

 type User struct { Id int `gorm:"column:id"` Name string `gorm:"column:name"` Status string `gorm:"column:status"` Created time.Time `gorm:"column:created"` } func (User) TableName() string { return "User" } func (User) SchemaName() string { return "Test" } 

هيكل الجدول في MYSQL:

 CREATE TABLE Test.User ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(40) NULL , status ENUM("active","deleted") DEFAULT "active", created TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP ) ENGINE =InnoDB; 

دعونا نحلل التحليل نفسه - أضف إلى مكان التحضير لتحليل البيانات:

 user := User{} h.GetBinLogData(&user, e, i) 

في الجوهر ، هذا يكفي - سيكون لدينا بيانات السجل الجديد في نموذج المستخدم ، ولكن للتوضيح ، سنعرضها:

  if e.Action == canal.UpdateAction { oldUser := User{} h.GetBinLogData(&oldUser, e, i-1) fmt.Printf("User %d is updated from name %s to name %s\n", user.Id, oldUser.Name, user.Name, ) } else { fmt.Printf("User %d is created with name %s\n", user.Id, user.Name, ) } 

النقطة الرئيسية التي كنا نسعى إليها هي إطلاق "Hello binlog world":

 func main() { go binLogListener() //      time.Sleep(2 * time.Minute) fmt.Print("Thx for watching, goodbuy") } 

بعد ذلك ، قم بإضافة القيم وتحديثها:

 INSERT INTO Test.User (`id`,`name`) VALUE (1,"Jack"); UPDATE Test.User SET name="Jonh" WHERE id=1; 

سنرى:

 User 1 is created with name Jack User 1 name changed from Jack to Jonh 

يعمل الكود الناتج مع binlog ويوزع الأسطر الجديدة. عند استلام سجل من الجدول الذي نحتاجه ، يقرأ الرمز البيانات في الهيكل ويعرض النتيجة. خلف الكواليس كان محلل البيانات (BinlogParser) ، الذي ملأ النموذج.

الجزء 2. كما قال كوب ، نحن بحاجة إلى مستوى أقل


خذ بعين الاعتبار العمل الداخلي للمحلل ، الذي يعتمد على التفكير.

لملء النموذج بالبيانات ، استخدمنا طريقة المعالج:

 h.GetBinLogData(&user, e, i) 

يوزع أنواع البيانات البسيطة:
 bool int float64 string time.Time 

ويمكن تحليل الهياكل المعقدة من جسون.

إذا لم تكن الأنواع المدعومة كافية بالنسبة لك ، أو كنت ترغب فقط في فهم كيفية عمل تحليل binlog ، فيمكنك ممارسة إضافة أنواع خاصة بك.

أولاً ، ضع في اعتبارك كيفية تعبئة البيانات لحقل النموذج باستخدام مثال حقل معرف من النوع int:

 type User struct { Id int `gorm:"column:id"` } 

من خلال الانعكاس نحصل على اسم النوع. تحول طريقة parseTagSetting التعليقات التوضيحية إلى هيكل أكثر ملاءمة:

 element := User{} //     ,      v := reflect.ValueOf(element) s := reflect.Indirect(v) t := s.Type() num := t.NumField() parsedTag := parseTagSetting(t.Field(k).Tag) if columnName, ok = parsedTag["COLUMN"]; !ok || columnName == "COLUMN" { continue } for k := 0; k < num; k++ { name := s.Field(k).Type().Name() switch name { case "int": //     } } 

بعد تلقي النوع int ، يمكنك تعيين قيمته من خلال طريقة الانعكاس:

 func (v Value) SetInt(x int64) {//... 

طريقة تحليل التعليقات التوضيحية:

 func parseTagSetting(tags reflect.StructTag) map[string]string { setting := map[string]string{} for _, str := range []string{tags.Get("sql"), tags.Get("gorm")} { tags := strings.Split(str, ";") for _, value := range tags { v := strings.Split(value, ":") k := strings.TrimSpace(strings.ToUpper(v[0])) if len(v) >= 2 { setting[k] = strings.Join(v[1:], ":") } else { setting[k] = k } } } return setting } 

يقبل int64 كمدخل. لنقم بإنشاء طريقة تترجم البيانات المستلمة من binlog إلى int64:

 func (m *BinlogParser) intHelper(e *canal.RowsEvent, n int, columnName string) int64 { columnId := m.getBinlogIdByName(e, columnName) if e.Table.Columns[columnId].Type != schema.TYPE_NUMBER { return 0 } switch e.Rows[n][columnId].(type) { case int8: return int64(e.Rows[n][columnId].(int8)) case int32: return int64(e.Rows[n][columnId].(int32)) case int64: return e.Rows[n][columnId].(int64) case int: return int64(e.Rows[n][columnId].(int)) case uint8: return int64(e.Rows[n][columnId].(uint8)) case uint16: return int64(e.Rows[n][columnId].(uint16)) case uint32: return int64(e.Rows[n][columnId].(uint32)) case uint64: return int64(e.Rows[n][columnId].(uint64)) case uint: return int64(e.Rows[n][columnId].(uint)) } return 0 } 

يبدو كل شيء منطقيًا باستثناء طريقة getBinlogIdByName ().

مطلوب هذا المساعد التافه للعمل مع أسماء الأعمدة بدلاً من أرقامها التسلسلية ، مما يتيح لك:

  • أخذ أسماء الأعمدة من التعليقات التوضيحية الجماعية ؛
  • لا حاجة لإجراء تعديلات عند إضافة الأعمدة إلى البداية أو الوسط ؛
  • من المألوف العمل مع حقل الاسم أكثر من العمل مع العمود رقم 3.

نتيجة لذلك ، نضيف المعالج نفسه:

 s.Field(k).SetInt(m.intHelper(e, n, columnName)) 

دعونا نلقي نظرة على مثالين آخرين.
تعداد: هنا تأتي القيم كمؤشر - أي أن الحالة "نشطة" ستظهر كـ 1. في معظم الحالات ، نحتاج إلى تمثيل سلسلة من التعداد. يمكن الحصول عليه من وصف الحقل. عند تحليل قيم التعداد ، فإنه يبدأ من 1 ، ولكن مجموعة القيم المحتملة نفسها تبدأ من 0.

قد يبدو معالج التعداد مثل هذا:

 func (m *BinlogParser) stringHelper(e *canal.RowsEvent, n int, columnName string) string { columnId := m.getBinlogIdByName(e, columnName) if e.Table.Columns[columnId].Type == schema.TYPE_ENUM { values := e.Table.Columns[columnId].EnumValues //  if len(values) == 0 || e.Rows[n][columnId] == nil {{ return "" } return values[e.Rows[n][columnId].(int64)-1] //     0    } 

أريد تخزين JSON

فكرة جيدة لما لا. JSON من حيث الخلية هو سلسلة. من الضروري الإشارة بطريقة أو بأخرى إلى أن هذه البيانات متسلسلة - لهذا سنضيف التعليقات التوضيحية غير القانونية "fromJson" إلى gorm.

تخيل أنه ينبغي النظر في مثل هذا الهيكل:

 type JsonData struct { Int int `gorm:"column:int"` StructData TestData `gorm:"column:struct_data;fromJson"` MapData map[string]string `gorm:"column:map_data;fromJson"` SliceData []int `gorm:"column:slice_data;fromJson"` } type TestData struct { Test string `json:"test"` Int int `json:"int"` } 

يمكنك كتابة الكثير من الشروط وربما تنجح. لكن كل نوع بيانات جديد سيقتل كل الجهود. على الرغم من أن محاولة العثور على إجابات لـ stackoverflow - "كيفية إحضار نوع غير معروف من البنية وإلغاء تسلسله" تبدأ بالعبارة: "ليس من الواضح سبب حاجتك إلى ذلك ، ولكن حاول ...".

بعد إرسال النوع المطلوب إلى الواجهة ، يمكننا القيام بذلك:

 if _, ok := parsedTag["FROMJSON"]; ok { newObject := reflect.New(s.Field(k).Type()).Interface() json := m.stringHelper(e, n, columnName) jsoniter.Unmarshal([]byte(json), &newObject) s.Field(k).Set(reflect.ValueOf(newObject).Elem().Convert(s.Field(k).Type())) } 

إذا كانت لديك أسئلة حول أنواع البيانات ، يمكنك إلقاء نظرة على الاختبارات أو طرحها في التعليقات.

ما حدث في النهاية .

Source: https://habr.com/ru/post/ar413329/


All Articles