بداية سريعة: Go + Apache Kafka + Redis

لقد نظرت مؤخرًا ، بسبب الضرورة ، في جميع إعلانات الوظائف الشاغرة لمطوري Go ، وذكر نصفهم (على الأقل) نظام معالجة رسائل Apache Kafka وقاعدة بيانات Nois Redis. حسنًا ، الكل ، بالطبع ، يريد أن يعرف المرشح دوكر وآخرين مثله. كل هذه المتطلبات بالنسبة لنا ، الذين شاهدوا آراء مهندسي النظام ، تبدو تافهة إلى حد ما أو شيء من هذا القبيل. حسنًا ، في الواقع ، كيف يختلف سطر واحد عن الآخر؟ الموقف مع قواعد بيانات NoSQL ، بطبيعة الحال ، أكثر تنوعًا ، ولكن لا يزال يبدو أبسط من أي MS SQL Server. كل هذا ، بالطبع ، هو شخصيتي ، تأثير Dunning - Kruger ، الذي تم ذكره عدة مرات على Habré.
لذا ، بما أن جميع أصحاب العمل يطلبون ، فمن الضروري دراسة هذه التقنيات. لكن البدء بقراءة جميع الوثائق من البداية إلى النهاية ليس مثيرا للاهتمام. في رأيي ، من الأفضل قراءة المقدمة وتقديم نموذج عمل أولي وإصلاح الأخطاء ومواجهة المشكلات وحلها. وبعد كل هذا ، بفهم ، اقرأ الوثائق ، أو حتى كتاب منفصل.



أولئك الذين يرغبون في وقت قصير للتعرف على القدرات الأساسية لهذه المنتجات ، يرجى قراءة.

سوف برنامج التدريب عامل في الأرقام. وستتألف من مولد عدد كبير ومعالج أرقام وقائمة انتظار وتخزين أعمدة وخادم ويب.

أثناء التطوير ، سيتم تطبيق أنماط التصميم التالية:


ستبدو بنية النظام كما يلي:



في الصورة ، تشير البيضاوي إلى نمط تصميم ناقل. سوف أتناولها بمزيد من التفصيل.

يفترض قالب "الناقل" أن المعلومات تأتي في شكل دفق وتتم معالجتها على مراحل. عادة ما يكون هناك بعض المولد (مصدر المعلومات) ومعالج واحد أو أكثر (معالجات المعلومات). في هذه الحالة ، سيكون المولد برنامجًا على "Go" يصطف بأعداد كبيرة عشوائية. وسيكون المعالج (الوحيد) هو البرنامج الذي يأخذ البيانات من قائمة الانتظار وينفذ عامل. على Go النقي ، يكون هذا النمط سهل التنفيذ باستخدام القنوات (chan). أعلاه هناك رابط لجيثب بلدي مع مثال. هنا ، سوف تلعب قائمة انتظار الرسائل دور القنوات.

عادةً ما تستخدم قوالب المعجبين معًا ، وكما هو مطبق على Go ، يعني موازاة الحسابات باستخدام goroutines ، تليها تلخيص النتائج ونقلها ، على سبيل المثال ، مزيد من خط الأنابيب. ويرد رابط لمثال أيضا أعلاه. مرة أخرى ، تم استبدال القناة بقائمة الانتظار ، وظلت goroutines في مكانها.

الآن بضع كلمات عن أباتشي كافكا. Kafka هو نظام لإدارة الرسائل مع أدوات تجميع ممتازة تستخدم سجل معاملات (مثلما هو الحال تمامًا في RDBMS) لتخزين الرسائل ، ويدعم كلاً من نموذج قائمة الانتظار ونموذج الناشر / المشترك. يتم تحقيق الأخير من خلال مجموعات من مستلمي الرسائل. تستقبل كل رسالة عضوًا واحدًا فقط من المجموعة (معالجة متوازية) ، ولكن سيتم تسليم الرسالة مرة واحدة إلى كل مجموعة. يمكن أن يكون هناك العديد من هذه المجموعات ، وكذلك المستلمين داخل كل مجموعة.

للعمل مع Kafka ، سأستخدم الحزمة "github.com/segmentio/kafka-go".
Redis ، من ناحية أخرى ، هي قاعدة بيانات عمود القيمة في الذاكرة التي تدعم القدرة على تخزين البيانات بشكل دائم. نوع البيانات الرئيسي للمفاتيح والقيم هو السلاسل ، ولكن هناك بعض الأنواع الأخرى. تعتبر Redis واحدة من أسرع (أو معظم) قواعد البيانات في فئتها. من الجيد تخزين جميع أنواع الإحصاءات والمقاييس وتدفقات الرسائل وما إلى ذلك.
للعمل مع Redis ، سأستخدم الحزمة "github.com/go-redis/redis".

نظرًا لأن هذه المقالة هي بداية سريعة ، سنقوم بنشر كلا النظامين باستخدام Docker باستخدام صور جاهزة من DockerHub. أستخدم docker-compose على Windows 10 في وضع الحاوية على Linux VM (يتم إنشاؤه تلقائيًا بواسطة Docker VM) مع ملف docker-compose.yml هذا مثل:

version: '2' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka:latest ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CREATE_TOPICS: "Generated:1:1,Solved:1:1,Unsolved:1:1" KAFKA_DELETE_TOPIC_ENABLE: "true" volumes: - /var/run/docker.sock:/var/run/docker.sock redis: image: redis ports: - "6379:6379" 

احفظ هذا الملف ، اذهب إلى الدليل به وقم بتنفيذ:

 docker-compose up -d 

يجب تنزيل ثلاث حاويات وبدء تشغيلها: Kafka (قائمة الانتظار) ، Zookeeper (خادم التكوين لـ Kafka) و (Redis).

يمكنك التحقق من أن الحاويات تعمل باستخدام الأمر:

 docker-compose ps 

يجب أن يكون شيء مثل:

 Name State Ports -------------------------------------------------------------------------------------- docker-compose_kafka_1 Up 0.0.0.0:9092->9092/tcp docker-compose_redis_1 Up 0.0.0.0:6379->6379/tcp docker-compose_zookeeper_1 Up 0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp 

وفقًا لملف yml ، يجب إنشاء ثلاث قوائم انتظار تلقائيًا ، ويمكنك رؤيتها باستخدام الأمر:

 docker exec kafka-container_kafka_1 /opt/kafka_2.12-2.1.0/bin/kafka-topics.sh --list --zookeeper zookeeper:2181 

يجب أن تكون هناك قوائم انتظار (موضوعات - مواضيع من حيث كافكا) يتم إنشاؤها وحلها وغير محلولة.

منشئ البيانات بلا حدود قوائم الانتظار الأرقام مع تأخير عشوائي. رمزها بسيط للغاية. يمكنك التحقق من وجود الرسائل في قائمة انتظار الإنشاء باستخدام الأمر:

 docker exec kafka-container_kafka_1 /opt/kafka_2.12-2.1.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Generated --from-beginning 

التالي هو المعالج - هنا يجب الانتباه إلى موازاة معالجة القيم من قائمة الانتظار في كتلة التعليمات البرمجية التالية:

  var wg sync.WaitGroup c := 0 //counter for { //       15     ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() //      //    -     m, err := r.ReadMessage(ctx) if err != nil { fmt.Println("3") fmt.Println(err) break } wg.Add(1) //       10      goCtx, goCcancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer goCcancel() //     () go process(goCtx, c, &wg, m) c++ } //     wg.Wait() 

بما أن القراءة من قائمة انتظار الرسائل تحظر البرنامج ، فقد قمتُ بإنشاء كائن سياق. سياق مع مهلة 15 ثانية. ستنتهي مهلة البرنامج إذا كانت قائمة الانتظار فارغة لفترة طويلة.

أيضا ، لكل gorutin الذي يعامل الرقم ، يتم تعيين الحد الأقصى لوقت التشغيل أيضا. أردت أن الأرقام التي كانت قادرة على عامل أن تكون مكتوبة في قاعدة بيانات واحدة. والأرقام التي لا يمكن معالجتها في الوقت المخصص تم نقلها إلى قاعدة بيانات أخرى.

لتحديد الوقت التقريبي ، تم استخدام المعيار:

 func BenchmarkFactorize(b *testing.B) { ch := make(chan []int) var factors []int for i := 1; i < bN; i++ { num := 2345678901234 go factorize(num, ch) factors = <-ch b.Logf("\n%d   %+v\n\n", num, factors) } } 

المقاييس في Go هي أنواع مختلفة من الاختبارات ويتم وضعها في ملف مع الاختبارات. بناءً على هذا القياس ، تم اختيار الحد الأقصى لعدد مولد الأرقام العشوائية. على جهاز الكمبيوتر الخاص بي ، كان لدى جزء من الأرقام وقت للتخلص ، وجزء - لا.

هذه الأرقام التي يمكن أن تتحلل كانت مكتوبة في قاعدة بيانات رقم 0 ، أرقام غير منقوصة في قاعدة بيانات رقم 1.
هنا يجب أن أقول أنه في Redis لا توجد الجداول والجداول بالمعنى الكلاسيكي. افتراضيًا ، تحتوي قواعد البيانات على 16 قاعدة بيانات متاحة للمبرمج. هذه القواعد تختلف في أعدادها - من 0 إلى 15.

تم توفير الحد الزمني للكوروتينات في المعالج باستخدام السياق وبيان التحديد:

  //   go factorize(n, outChan) var item data select { case factors = <-outChan: { fmt.Printf("\ngoroutine #%d, input: %d, factors: %+v\n", counter, n, factors) item.Number = n item.Factors = factors err = storeSolved(item) if err != nil { fmt.Println("6") log.Fatal(err) } } case <-ctx.Done(): { fmt.Printf("\ngoroutine #%d, input: %d, exited on context timeout\n", counter, n) err = storeUnsolved(n) if err != nil { fmt.Println("7") log.Fatal(err) } return nil } } 

هذا هو آخر من الحيل التنمية النموذجية على الذهاب. معناها أن عبارة التحديد تتكرر عبر القنوات وتنفذ الشفرة المقابلة لأول قناة نشطة. في هذه الحالة ، إما أن يقوم goroutine بإخراج النتيجة إلى قناته ، أو سيتم إغلاق قناة السياق ذات المهلة المحددة. بدلاً من السياق ، يمكنك استخدام قناة تعسفية ستعمل كمدير وتوفر الإنهاء القسري لل goroutines.

تنفيذ الروتين الفرعي للكتابة إلى قاعدة البيانات الأمر لتحديد قاعدة البيانات المطلوبة (0 أو 1) وكتابة أزواج من النموذج (الأرقام - العوامل) للأرقام التي تم تحليلها أو (الرقم - الرقم) للأرقام غير المخلوقة.

 func storeSolved(item data) (err error) { //    0 cmd := redis.NewStringCmd("select", 0) err = client.Process(cmd) b, err := json.Marshal(item.Factors) err = client.Set(strconv.Itoa(item.Number), string(b), 0).Err() return err } 

سيكون الجزء الأخير هو خادم ويب ، والذي سيعرض قائمة بالأرقام المتحللة وغير المخلوقة في شكل json. سيكون لديه نقطتي النهاية:

  http.HandleFunc("/solved", solvedHandler) http.HandleFunc("/unsolved", unsolvedHandler) 

يبدو أن معالج طلب http الذي يستقبل البيانات من Redis ويعيدها إلى json كما يلي:

 func solvedHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Methods", "GET") w.Header().Set("Access-Control-Allow-Headers", "Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization") //   №0 -   cmd := redis.NewStringCmd("select", 0) err := client.Process(cmd) if err != nil { w.WriteHeader(http.StatusInternalServerError) return } //      keys := client.Keys("*") var solved []data var item data //          for _, key := range keys.Val() { item.Key = key val, err := client.Get(key).Result() if err != nil { w.WriteHeader(http.StatusInternalServerError) return } item.Val = val solved = append(solved, item) } //    JSON err = json.NewEncoder(w).Encode(solved) if err != nil { w.WriteHeader(http.StatusInternalServerError) return } } 

نتيجة الطلب على: المضيف المحلي / حلها

 [{ "Key": "1604388558816", "Val": "[1,2,3,227]" }, { "Key": "545232916387", "Val": "[1,545232916387]" }, { "Key": "1786301239076", "Val": "[1,2]" }, { "Key": "698495534061", "Val": "[1,3,13,641,165331]" }] 

الآن يمكنك الخوض في الوثائق والأدب المتخصص. آمل أن المقالة كانت مفيدة.

أطلب من المتخصصين ألا يكونوا كسالى وأن يشيروا إلى أخطائي.

Source: https://habr.com/ru/post/ar441250/


All Articles