Halo, Habr! Dalam posting ini, kita akan menulis sebuah aplikasi pada Spring Boot 2 menggunakan Apache Kafka di Linux, dari menginstal JRE ke aplikasi microservice yang berfungsi.
Kolega dari departemen pengembangan front-end yang melihat artikel mengeluh bahwa saya tidak menjelaskan apa itu Apache Kafka dan Spring Boot. Saya percaya bahwa siapa pun yang perlu merakit proyek selesai menggunakan teknologi di atas tahu apa itu dan mengapa mereka membutuhkannya. Jika bagi pembaca pertanyaannya bukan iseng, berikut adalah artikel-artikel bagus tentang Habr, apa itu
Apache Kafka dan
Spring Boot .
Kita dapat melakukannya tanpa penjelasan panjang lebar tentang apa itu Kafka, Spring Boot dan Linux, dan sebaliknya, menjalankan server Kafka dari awal pada mesin Linux, menulis dua layanan microser dan membuat salah satu dari mereka mengirim pesan ke yang lain - secara umum, konfigurasikan arsitektur microservice penuh.

Pos akan terdiri dari dua bagian. Pada bagian pertama kita mengkonfigurasi dan menjalankan Apache Kafka pada mesin Linux, pada bagian kedua kita menulis dua layanan microser di Java.
Di startup, di mana saya memulai karir profesional saya sebagai seorang programmer, ada microservices di Kafka, dan salah satu microservices saya juga bekerja dengan orang lain melalui Kafka, tetapi saya tidak tahu bagaimana server itu sendiri bekerja, apakah itu ditulis sebagai aplikasi atau sudah benar-benar kotak. produk. Apa yang mengejutkan dan mengecewakan saya ketika ternyata bahwa Kafka masih merupakan produk kotak, dan tugas saya tidak hanya untuk menulis klien di Jawa (yang saya suka lakukan), serta menyebarkan dan mengkonfigurasi aplikasi jadi sebagai devOps (yang saya benci melakukannya). Namun, bahkan jika saya bisa meningkatkannya di server virtual Kafka dalam waktu kurang dari sehari, itu benar-benar sangat sederhana untuk melakukan ini. Jadi
Aplikasi kita akan memiliki struktur interaksi berikut:
Di akhir posting, seperti biasa, akan ada tautan ke git dengan kode kerja.
Menyebarkan Apache Kafka + Zookeeper pada mesin virtual
Saya mencoba menaikkan Kafka di Linux lokal, di atas poppy dan di Linux jarak jauh. Dalam dua kasus (Linux), saya berhasil dengan cukup cepat. Dengan poppy namun tidak ada yang terjadi. Karena itu, kami akan meningkatkan Kafka di Linux. Saya memilih Ubuntu 18.04.
Agar Kafka bekerja, dia membutuhkan penjaga kebun binatang. Untuk melakukan ini, Anda harus mengunduh dan menjalankannya sebelum meluncurkan Kafka.
Jadi
0. Pasang JRE
Ini dilakukan dengan perintah berikut:
sudo apt-get update sudo apt-get install default-jre
Jika semuanya berjalan ok, maka Anda dapat memasukkan perintah
java -version
dan pastikan Java diinstal.
1. Unduh Zookeeper
Saya tidak suka tim sulap di Linux, terutama ketika mereka hanya memberikan beberapa perintah dan tidak jelas apa yang mereka lakukan. Oleh karena itu, saya akan menjelaskan setiap tindakan - apa tepatnya yang dilakukannya. Jadi, kita perlu mengunduh Zookeeper dan unzip ke folder yang nyaman. Dianjurkan jika semua aplikasi disimpan di folder / opt, yaitu, dalam kasus kami, itu akan menjadi / opt / zookeeper.
Saya menggunakan perintah di bawah ini. Jika Anda tahu perintah Linux lain yang, menurut pendapat Anda, akan memungkinkan Anda untuk melakukan ini secara lebih rasial, gunakanlah. Saya seorang pengembang, bukan devoop, dan saya berkomunikasi dengan server di tingkat "kambing itu sendiri". Jadi, unduh aplikasi:
wget -P /home/xpendence/downloads/ "http://apache-mirror.rbc.ru/pub/apache/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz"
Aplikasi diunduh ke folder yang Anda tentukan, saya membuat folder / home / xpendence / unduhan untuk mengunduh semua aplikasi yang saya butuhkan di sana.
2. Buka paket Zookeeper
Saya menggunakan perintah:
tar -xvzf /home/xpendence/downloads/zookeeper-3.4.12.tar.gz
Perintah ini membongkar arsip ke dalam folder di mana Anda berada. Anda mungkin perlu mentransfer aplikasi ke / opt / zookeeper. Dan Anda dapat langsung masuk ke sana dan dari sana sudah membongkar arsip.
3. Edit pengaturan
Di folder / zookeeper / conf / ada file zoo-sample.cfg, saya sarankan untuk mengganti nama ke zoo.conf, ini adalah file yang akan dicari JVM saat startup. Yang berikut harus ditambahkan ke file ini di akhir:
tickTime=2000 dataDir=/var/zookeeper clientPort=2181
Juga, buat direktori / var / zookeeper.
4. Luncurkan Zookeeper
Buka folder / opt / zookeeper dan mulai server dengan perintah:
bin/zkServer.sh start
"MULAI" akan muncul.
Setelah itu, saya mengusulkan untuk memeriksa apakah server berfungsi. Kami menulis:
telnet localhost 2181
Akan muncul pesan bahwa koneksi berhasil. Jika Anda memiliki server yang lemah dan pesan tidak muncul, coba lagi - bahkan ketika MULAI muncul, aplikasi mulai mendengarkan port jauh kemudian. Ketika saya mencoba semua ini di server yang lemah, itu terjadi pada saya setiap saat. Jika semuanya terhubung, masukkan perintah
ruok
Apa artinya: "Apakah kamu baik-baik saja?" Server harus merespons:
imok ( !)
dan lepaskan. Jadi, semuanya sesuai rencana. Kami melanjutkan untuk meluncurkan Apache Kafka.
5. Buat pengguna di bawah Kafka
Untuk bekerja dengan Kafka, kami membutuhkan pengguna yang terpisah.
sudo adduser --system --no-create-home --disabled-password --disabled-login kafka
6. Unduh Apache Kafka
Ada dua distribusi - biner dan sumber. Kami membutuhkan biner. Secara tampilan, arsip dengan biner memiliki ukuran yang berbeda. Biner memiliki berat 59 MB, dan berat 6,5 MB.
Unduh biner ke direktori di sana, menggunakan tautan di bawah ini:
wget -P /home/xpendence/downloads/ "http://mirror.linux-ia64.org/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz"
7. Buka paket Apache Kafka
Prosedur membongkar tidak berbeda dari yang sama untuk Zookeeper. Kami juga membongkar arsip ke direktori / opt dan ganti namanya menjadi kafka sehingga path ke folder / bin adalah / opt / kafka / bin
tar -xvzf /home/xpendence/downloads/kafka_2.11-2.1.0.tgz
8. Edit pengaturan
Pengaturan ada di /opt/kafka/config/server.properties. Tambahkan satu baris:
delete.topic.enable = true
Pengaturan ini tampaknya opsional, berfungsi tanpa itu. Pengaturan ini memungkinkan Anda untuk menghapus topik. Jika tidak, Anda tidak bisa menghapus topik melalui baris perintah.
9. Kami memberikan akses ke direktori kafka pengguna Kafka
chown -R kafka:nogroup /opt/kafka chown -R kafka:nogroup /var/lib/kafka
10. Peluncuran Apache Kafka yang telah lama ditunggu-tunggu
Kami memasukkan perintah, setelah itu Kafka harus memulai:
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
Jika tindakan biasa (Kafka ditulis dalam Java dan Scala) tidak meluas ke log, maka semuanya bekerja dan Anda dapat menguji layanan kami.
10.1. Masalah server yang lemah
Untuk percobaan di Apache Kafka, saya mengambil server yang lemah dengan satu inti dan RAM 512 MB (hanya 99 rubel), yang ternyata menjadi beberapa masalah bagi saya.
Kehabisan memori. Tentu saja, Anda tidak dapat melakukan overclock dengan 512 MB, dan server tidak dapat menggunakan Kafka karena kurangnya memori. Faktanya adalah bahwa secara default, Kafka mengkonsumsi 1 GB memori. Tidak heran dia hilang :)
Kami pergi ke kafka-server-start.sh, zookeeper-server-start.sh. Sudah ada garis yang mengatur memori:
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
Ubah ke:
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
Ini akan mengurangi selera Kafka dan memungkinkan Anda untuk memulai server.
Masalah kedua dengan komputer yang lemah adalah kurangnya waktu untuk terhubung ke Zookeeper. Secara default, ini diberikan 6 detik. Jika setrika lemah, ini tentu saja tidak cukup. Di server.properties, kami menambah waktu koneksi ke zukipper:
zookeeper.connection.timeout.ms=30000
Saya menetapkan setengah menit.
11. Tes server Kafka
Untuk melakukan ini, kita akan membuka dua terminal, di satu kita akan meluncurkan produsen, di sisi lain - konsumen.
Di konsol pertama, masukkan satu baris:
/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Ikon ini akan muncul, menunjukkan bahwa produsen siap untuk pesan spam:
>
Di konsol kedua, masukkan perintah:
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Sekarang, ketikkan konsol produsen, ketika Anda menekan Enter, itu akan muncul di konsol konsumen.

Jika Anda melihat di layar kurang lebih sama dengan saya - selamat, yang terburuk sudah berakhir!
Sekarang kita hanya perlu menulis beberapa klien di Spring Boot yang akan berkomunikasi satu sama lain melalui Apache Kafka.
Menulis aplikasi di Spring Boot
Kami akan menulis dua aplikasi yang akan bertukar pesan melalui Apache Kafka. Pesan pertama akan disebut kafka-server dan akan berisi produsen dan konsumen. Yang kedua akan disebut kafka-tester, itu dirancang agar kita memiliki arsitektur microservice.
server kafka
Untuk proyek kami yang dibuat melalui Spring Initializr, kami membutuhkan modul Kafka. Saya menambahkan Lombok dan Web, tapi itu masalah selera.
Klien Kafka terdiri dari dua komponen - produser (ia mengirim pesan ke server Kafka) dan konsumen (ia mendengarkan server Kafka dan mengambil pesan baru dari sana tentang topik yang menjadi langganannya). Tugas kita adalah menulis kedua komponen dan membuatnya bekerja.
Konsumen:
@Configuration public class KafkaConsumerConfig { @Value("${kafka.server}") private String kafkaServer; @Value("${kafka.group.id}") private String kafkaGroupId; @Bean public KafkaListenerContainerFactory<?> batchFactory() { ConcurrentKafkaListenerContainerFactory<Long, AbstractDto> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); factory.setMessageConverter(new BatchMessagingMessageConverter(converter())); return factory; } @Bean public KafkaListenerContainerFactory<?> singleFactory() { ConcurrentKafkaListenerContainerFactory<Long, AbstractDto> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(false); factory.setMessageConverter(new StringJsonMessageConverter()); return factory; } @Bean public ConsumerFactory<Long, AbstractDto> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() { return new ConcurrentKafkaListenerContainerFactory<>(); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); return props; } @Bean public StringJsonMessageConverter converter() { return new StringJsonMessageConverter(); } }
Kami membutuhkan 2 bidang yang diinisialisasi dengan data statis dari kafka.properties.
kafka.server=localhost:9092 kafka.group.id=server.broadcast
kafka.server adalah alamat di mana server kami hang, dalam hal ini, lokal. Secara default, Kafka mendengarkan pada port 9092.
kafka.group.id adalah sekelompok konsumen, yang di dalamnya satu pesan dikirim. Misalnya, Anda memiliki tiga kurir dalam satu grup, dan mereka semua mendengarkan topik yang sama. Segera setelah pesan baru muncul di server dengan topik ini, pesan itu dikirim ke seseorang di dalam grup. Dua konsumen yang tersisa tidak menerima pesan.
Selanjutnya, kami membuat pabrik untuk konsumen - ConsumerFactory.
@Bean public ConsumerFactory<Long, AbstractDto> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); }
Diinisialisasi dengan properti yang kita butuhkan, itu akan berfungsi sebagai pabrik standar bagi konsumen di masa depan.
@Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); return props; }
consumerConfigs hanyalah konfigurasi Peta. Kami memberikan alamat server, grup, dan deserializer.
Selanjutnya, salah satu poin terpenting bagi seorang konsumen. Konsumen dapat menerima objek dan koleksi tunggal - misalnya, StarshipDto dan Daftar. Dan jika kita mendapatkan StarshipDto sebagai JSON, maka kita mendapatkan List sebagai, secara umum, sebagai array JSON. Oleh karena itu, kami memiliki setidaknya dua pabrik pesan - untuk pesan tunggal dan untuk array.
@Bean public KafkaListenerContainerFactory<?> singleFactory() { ConcurrentKafkaListenerContainerFactory<Long, AbstractDto> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(false); factory.setMessageConverter(new StringJsonMessageConverter()); return factory; }
Kami instantiate ConcurrentKafkaListenerContainerFactory, diketik Long (kunci pesan) dan AbstractDto (nilai pesan abstrak) dan inisialisasi bidangnya dengan properti. Kami, tentu saja, menginisialisasi pabrik dengan pabrik standar kami (yang sudah berisi konfigurasi Peta), lalu kami menandai bahwa kami tidak mendengarkan paket (array yang sama) dan menetapkan konverter JSON sederhana sebagai konverter.
Ketika kami membuat pabrik untuk paket / array (batch), perbedaan utama (terlepas dari fakta bahwa kami menandai kami mendengarkan paket) adalah bahwa kami menetapkan sebagai konverter konverter paket khusus yang akan mengonversi paket yang terdiri dari dari string JSON.
@Bean public KafkaListenerContainerFactory<?> batchFactory() { ConcurrentKafkaListenerContainerFactory<Long, AbstractDto> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); factory.setMessageConverter(new BatchMessagingMessageConverter(converter())); return factory; } @Bean public StringJsonMessageConverter converter() { return new StringJsonMessageConverter(); }
Dan satu hal lagi. Saat menginisialisasi kacang Spring, nampan dengan nama kafkaListenerContainerFactory mungkin tidak dihitung dan aplikasi akan hancur. Tentunya ada opsi yang lebih elegan untuk menyelesaikan masalah, menulis tentang mereka di komentar, untuk saat ini saya baru saja membuat sebuah bin dibongkar dengan fungsionalitas dengan nama yang sama:
@Bean public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() { return new ConcurrentKafkaListenerContainerFactory<>(); }
Konsumen sudah diatur. Kami lolos ke produser.
@Configuration public class KafkaProducerConfig { @Value("${kafka.server}") private String kafkaServer; @Value("${kafka.producer.id}") private String kafkaProducerId; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerId); return props; } @Bean public ProducerFactory<Long, StarshipDto> producerStarshipFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<Long, StarshipDto> kafkaTemplate() { KafkaTemplate<Long, StarshipDto> template = new KafkaTemplate<>(producerStarshipFactory()); template.setMessageConverter(new StringJsonMessageConverter()); return template; } }
Dari variabel statis, kita memerlukan alamat server kafka dan ID produsen. Dia bisa menjadi apa saja.
Dalam konfigurasi, seperti yang kita lihat, tidak ada yang istimewa. Hal yang hampir sama. Tetapi sehubungan dengan pabrik, ada perbedaan yang signifikan. Kita harus mendaftarkan templat untuk setiap kelas, objek yang akan kita kirim ke server, serta pabrik untuk itu. Kami memiliki satu pasangan seperti itu, tetapi mungkin ada puluhan.
Dalam templat, kami menandai bahwa kami akan membuat serial objek di JSON, dan ini, mungkin, sudah cukup.
Kami memiliki konsumen dan produsen, masih menulis layanan yang akan mengirim pesan dan menerimanya.
@Service @Slf4j public class StarshipServiceImpl implements StarshipService { private final KafkaTemplate<Long, StarshipDto> kafkaStarshipTemplate; private final ObjectMapper objectMapper; @Autowired public StarshipServiceImpl(KafkaTemplate<Long, StarshipDto> kafkaStarshipTemplate, ObjectMapper objectMapper) { this.kafkaStarshipTemplate = kafkaStarshipTemplate; this.objectMapper = objectMapper; } @Override public void send(StarshipDto dto) { kafkaStarshipTemplate.send("server.starship", dto); } @Override @KafkaListener(id = "Starship", topics = {"server.starship"}, containerFactory = "singleFactory") public void consume(StarshipDto dto) { log.info("=> consumed {}", writeValueAsString(dto)); } private String writeValueAsString(StarshipDto dto) { try { return objectMapper.writeValueAsString(dto); } catch (JsonProcessingException e) { e.printStackTrace(); throw new RuntimeException("Writing value to JSON failed: " + dto.toString()); } } }
Hanya ada dua metode dalam layanan kami, mereka cukup bagi kami untuk menjelaskan pekerjaan klien. Kami mengotomatiskan pola yang kami butuhkan:
private final KafkaTemplate<Long, StarshipDto> kafkaStarshipTemplate;
Metode produsen:
@Override public void send(StarshipDto dto) { kafkaStarshipTemplate.send("server.starship", dto); }
Semua yang diperlukan untuk mengirim pesan ke server adalah memanggil metode kirim pada template dan mentransfer topik (subjek) dan objek kita di sana. Objek akan diserialisasi dalam JSON dan akan terbang ke server di bawah topik yang ditentukan.
Metode mendengarkan terlihat seperti ini:
@Override @KafkaListener(id = "Starship", topics = {"server.starship"}, containerFactory = "singleFactory") public void consume(StarshipDto dto) { log.info("=> consumed {}", writeValueAsString(dto)); }
Kami menandai metode ini dengan anotasi @KafkaListener, tempat kami menunjukkan ID apa pun yang kami suka, mendengarkan topik, dan pabrik yang akan mengonversi pesan yang diterima ke yang kami butuhkan. Dalam hal ini, karena kami menerima satu objek, kami memerlukan singleFactory. Untuk Daftar <?>, Tentukan batchFactory. Akibatnya, kami mengirim objek ke server kafka menggunakan metode kirim dan mendapatkannya menggunakan metode konsumsi.
Anda dapat menulis tes dalam 5 menit yang akan menunjukkan kekuatan penuh Kafka, tetapi kami akan melangkah lebih jauh - menghabiskan 10 menit dan menulis aplikasi lain yang akan mengirim pesan ke server yang akan didengarkan aplikasi pertama kami.
kafka-tester
Memiliki pengalaman menulis aplikasi pertama, kita dapat dengan mudah menulis yang kedua, terutama jika kita menyalin paket paste dan dto, hanya mendaftar produser (kami hanya akan mengirim pesan) dan menambahkan satu-satunya metode kirim ke layanan. Dengan menggunakan tautan di bawah ini, Anda dapat dengan mudah mengunduh kode proyek dan memastikan tidak ada yang rumit di sana.
@Scheduled(initialDelay = 10000, fixedDelay = 5000) @Override public void produce() { StarshipDto dto = createDto(); log.info("<= sending {}", writeValueAsString(dto)); kafkaStarshipTemplate.send("server.starship", dto); } private StarshipDto createDto() { return new StarshipDto("Starship " + (LocalTime.now().toNanoOfDay() / 1000000)); }
Setelah 10 detik pertama, kafka-tester mulai mengirim pesan dengan nama kapal luar angkasa ke server Kafka setiap 5 detik (gambar dapat diklik).

Di sana, mereka didengarkan dan diterima oleh server kafka (gambar juga dapat diklik).

Saya harap mereka yang bermimpi untuk mulai menulis layanan mikro di Kafka akan berhasil semudah saya. Dan di sini ada tautan ke proyek:
→
server kafka→
kafka-tester