рдирдорд╕реНрдХрд╛рд░, рд╣реЗрдмреНрд░! рдЗрд╕ рдкреЛрд╕реНрдЯ рдореЗрдВ, рд╣рдо JRE рдХреЛ рдПрдХ рдХрд╛рдо рдХрд░ рд░рд╣реЗ рдорд╛рдЗрдХреНрд░реЛрд╕реИрд╕рд╡рд░реНрдХ рдПрдкреНрд▓рд┐рдХреЗрд╢рди рдХреЛ рд╕реНрдерд╛рдкрд┐рдд рдХрд░рдиреЗ рд╕реЗ, рд▓рд┐рдирдХреНрд╕ рдХреЗ рддрд╣рдд рдЕрдкрд╛рдЪреЗ рдХрд╛рдлреНрдХрд╛ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рд╕реНрдкреНрд░рд┐рдВрдЧ рдмреВрдЯ 2 рдкрд░ рдПрдХ рдЖрд╡реЗрджрди рд▓рд┐рдЦреЗрдВрдЧреЗред
рдлреНрд░рдВрдЯ-рдПрдВрдб рдбреЗрд╡рд▓рдкрдореЗрдВрдЯ рдбрд┐рдкрд╛рд░реНрдЯрдореЗрдВрдЯ рдХреЗ рд╕рд╣рдХрд░реНрдореА рдЬрд┐рдиреНрд╣реЛрдВрдиреЗ рдЗрд╕ рд▓реЗрдЦ рдХреЛ рджреЗрдЦрд╛ рдХрд┐ рдореИрдВ рдпрд╣ рдирд╣реАрдВ рд╕рдордЭрд╛ рд░рд╣рд╛ рд╣реВрдВ рдХрд┐ рдЕрдкрд╛рдЪреЗ рдХрд╛рдлреНрдХрд╛ рдФрд░ рд╕реНрдкреНрд░рд┐рдВрдЧ рдмреВрдЯ рдХреНрдпрд╛ рд╣реИрдВред рдореЗрд░рд╛ рдорд╛рдирдирд╛ тАЛтАЛрд╣реИ рдХрд┐ рдЬрд┐рд╕ рдХрд┐рд╕реА рдХреЛ рднреА рдЙрдкрд░реЛрдХреНрдд рддрдХрдиреАрдХреЛрдВ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рдПрдХ рддреИрдпрд╛рд░ рдкрд░рд┐рдпреЛрдЬрдирд╛ рдХреЛ рдЗрдХрдЯреНрдард╛ рдХрд░рдиреЗ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИ, рд╡рд╣ рдЬрд╛рдирддрд╛ рд╣реИ рдХрд┐ рдпрд╣ рдХреНрдпрд╛ рд╣реИ рдФрд░ рдЙрдиреНрд╣реЗрдВ рдЗрд╕рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рдХреНрдпреЛрдВ рд╣реИред рдпрджрд┐ рдкрд╛рдардХ рдХреЗ рд▓рд┐рдП рдпрд╣ рд╕рд╡рд╛рд▓ рдмреЗрдХрд╛рд░ рдирд╣реАрдВ рд╣реИ, рддреЛ рдпрд╣рд╛рдВ рдкрд░
рдЕрдкрд╛рдЪреЗ рдХрд╛рдлреНрдХрд╛ рдФрд░
рд╕реНрдкреНрд░рд┐рдВрдЧ рдмреВрдЯ рдХреНрдпрд╛ рд╣реИрдВ, рд╣рдмрд░ рдкрд░ рдмреЗрд╣рддрд░реАрди рд▓реЗрдЦ
рд╣реИрдВ ред
рд╣рдо рдХрдлрдХрд╛, рд╕реНрдкреНрд░рд┐рдВрдЧ рдмреВрдЯ рдФрд░ рд▓рд┐рдирдХреНрд╕ рдХреНрдпрд╛ рд╣реИрдВ рдХреА рд▓рдВрдмреА рд╡реНрдпрд╛рдЦреНрдпрд╛ рдХреЗ рдмрд┐рдирд╛ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ, рдФрд░ рдЗрд╕рдХреЗ рдмрдЬрд╛рдп, рд▓рд┐рдирдХреНрд╕ рдорд╢реАрди рдкрд░ рдЦрд░реЛрдВрдЪ рд╕реЗ рдХрд╛рдлреНрдХрд╛ рд╕рд░реНрд╡рд░ рдХреЛ рдЪрд▓рд╛рдПрдВ, рджреЛ рдорд╛рдЗрдХреНрд░реЛрд╕рд░реЛрд╡рд░реНрд╕ рд▓рд┐рдЦреЗрдВ рдФрд░ рдЙрдирдореЗрдВ рд╕реЗ рдПрдХ рдХреЛ рджреВрд╕рд░реЗ рдХреЛ рд╕рдВрджреЗрд╢ рднреЗрдЬреЗрдВ - рд╕рд╛рдорд╛рдиреНрдп рд░реВрдк рд╕реЗ рдХреЙрдиреНрдлрд╝рд┐рдЧрд░ рдХрд░реЗрдВ рдкреВрд░реНрдг microservice рд╡рд╛рд╕реНрддреБрдХрд▓рд╛ред

рдкреЛрд╕реНрдЯ рдореЗрдВ рджреЛ рдЦрдВрдб рд╣реЛрдВрдЧреЗред рдкрд╣рд▓реЗ рд╣рдо рдПрдХ рд▓рд┐рдирдХреНрд╕ рдорд╢реАрди рдкрд░ рдЕрдкрд╛рдЪреЗ рдХрд╛рдлреНрдХрд╛ рдХреЛ рдХреЙрдиреНрдлрд╝рд┐рдЧрд░ рдХрд░рддреЗ рд╣реИрдВ рдФрд░ рдЪрд▓рд╛рддреЗ рд╣реИрдВ, рджреВрд╕рд░реЗ рдореЗрдВ рд╣рдо рдЬрд╛рд╡рд╛ рдореЗрдВ рджреЛ рдорд╛рдЗрдХреНрд░реЛрд╕рд┐рд╕реНрдЯреНрд░рд┐рд╕ рд▓рд┐рдЦрддреЗ рд╣реИрдВред
рд╕реНрдЯрд╛рд░реНрдЯрдЕрдк рдореЗрдВ, рдЬрд┐рд╕рдореЗрдВ рдореИрдВрдиреЗ рдПрдХ рдкреНрд░реЛрдЧреНрд░рд╛рдорд░ рдХреЗ рд░реВрдк рдореЗрдВ рдЕрдкрдиреЗ рдкреЗрд╢реЗрд╡рд░ рдХрд░рд┐рдпрд░ рдХреА рд╢реБрд░реБрдЖрдд рдХреА, рдХрд╛рдлреНрдХрд╛ рдкрд░ рдорд╛рдЗрдХреНрд░реЛрд╕рд░реНрд╡рд┐рд╕ рдереЗ, рдФрд░ рдореЗрд░реЗ рдПрдХ рдорд╛рдЗрдХреНрд░реЛрд╕реЗрд╡рд╛ рдиреЗ рднреА рдХрд╛рдлреНрдХрд╛ рдХреЗ рдорд╛рдзреНрдпрдо рд╕реЗ рджреВрд╕рд░реЛрдВ рдХреЗ рд╕рд╛рде рдХрд╛рдо рдХрд┐рдпрд╛ рдерд╛, рд▓реЗрдХрд┐рди рдореБрдЭреЗ рдирд╣реАрдВ рдкрддрд╛ рдерд╛ рдХрд┐ рд╕рд░реНрд╡рд░ рдиреЗ рдХреИрд╕реЗ рдХрд╛рдо рдХрд┐рдпрд╛, рдЪрд╛рд╣реЗ рд╡рд╣ рдПрдХ рдПрдкреНрд▓реАрдХреЗрд╢рди рдХреЗ рд░реВрдк рдореЗрдВ рд▓рд┐рдЦрд╛ рдЧрдпрд╛ рд╣реЛ рдпрд╛ рдпрд╣ рдкрд╣рд▓реЗ рд╕реЗ рд╣реА рдкреВрд░реА рддрд░рд╣ рд╕реЗ рдмреЙрдХреНрд╕рд┐рдВрдЧ рд╣реЛред рдЙрддреНрдкрд╛рджред рдореЗрд░рд╛ рдЖрд╢реНрдЪрд░реНрдп рдФрд░ рдирд┐рд░рд╛рд╢рд╛ рдХреНрдпрд╛ рдерд╛ рдЬрдм рдпрд╣ рдкрддрд╛ рдЪрд▓рд╛ рдХрд┐ рдХрд╛рдлреНрдХрд╛ рдЕрднреА рднреА рдПрдХ рдмреЙрдХреНрд╕рд┐рдВрдЧ рдЙрддреНрдкрд╛рдж рдерд╛, рдФрд░ рдореЗрд░рд╛ рдХрд╛рдо рди рдХреЗрд╡рд▓ рдЬрд╛рд╡рд╛ (рдЬреЛ рдореБрдЭреЗ рдХрд░рдирд╛ рдкрд╕рдВрдж рд╣реИ) рдореЗрдВ рдПрдХ рдХреНрд▓рд╛рдЗрдВрдЯ рд▓рд┐рдЦрдирд╛ рд╣реЛрдЧрд╛, рд╕рд╛рде рд╣реА рд╕рд╛рде рддреИрдпрд╛рд░ рдПрдкреНрд▓рд┐рдХреЗрд╢рди рдХреЛ devOps рдХреЗ рд░реВрдк рдореЗрдВ рддреИрдирд╛рдд рдХрд░рдирд╛ рдФрд░ рдХреЙрдиреНрдлрд╝рд┐рдЧрд░ рдХрд░рдирд╛ рд╣реЛрдЧрд╛ (рдЬреЛ рдореИрдВ) рдШреГрдгрд╛ рдХрд░рдирд╛)ред рд╣рд╛рд▓рд╛рдВрдХрд┐, рдпрд╣рд╛рдВ рддрдХ тАЛтАЛрдХрд┐ рдЕрдЧрд░ рдореИрдВ рдПрдХ рджрд┐рди рд╕реЗ рднреА рдХрдо рд╕рдордп рдореЗрдВ рдЗрд╕реЗ рдХрд╛рдлреНрдХрд╛ рд╡рд░реНрдЪреБрдЕрд▓ рд╕рд░реНрд╡рд░ рдкрд░ рдЙрдард╛ рд╕рдХрддрд╛ рдерд╛, рддреЛ рдпрд╣ рд╡рд╛рд╕реНрддрд╡ рдореЗрдВ рдРрд╕рд╛ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдХрд╛рдлреА рд╕рд░рд▓ рд╣реИред So.
рд╣рдорд╛рд░реЗ рдЖрд╡реЗрджрди рдореЗрдВ рдирд┐рдореНрдирд▓рд┐рдЦрд┐рдд рдмрд╛рддрдЪреАрдд рд╕рдВрд░рдЪрдирд╛ рд╣реЛрдЧреА:
рдкреЛрд╕реНрдЯ рдХреЗ рдЕрдВрдд рдореЗрдВ, рд╣рдореЗрд╢рд╛ рдХреА рддрд░рд╣, рд╡рд░реНрдХрд┐рдВрдЧ рдХреЛрдб рдХреЗ рд╕рд╛рде git рдХреЗ рд▓рд┐рдВрдХ рд╣реЛрдВрдЧреЗред
рдПрдХ рд╡рд░реНрдЪреБрдЕрд▓ рдорд╢реАрди рдкрд░ Apache Kafka + Zookeeper рддреИрдирд╛рдд рдХрд░реЗрдВ
рдореИрдВрдиреЗ рдХрд╛рдлреНрдХрд╛ рдХреЛ рд╕реНрдерд╛рдиреАрдп рд▓рд┐рдирдХреНрд╕ рдкрд░, рдЦрд╕рдЦрд╕ тАЛтАЛрдкрд░ рдФрд░ рджреВрд░рд╕реНрде рд▓рд┐рдирдХреНрд╕ рдкрд░ рдЙрдард╛рдиреЗ рдХреА рдХреЛрд╢рд┐рд╢ рдХреАред рджреЛ рдорд╛рдорд▓реЛрдВ рдореЗрдВ (рд▓рд┐рдирдХреНрд╕), рдореИрдВ рдмрд╣реБрдд рдЬрд▓реНрджреА рд╕рдлрд▓ рд╣реБрдЖред рдЦрд╕рдЦрд╕ рдХреЗ рд╕рд╛рде рдЕрднреА рддрдХ рдХреБрдЫ рдирд╣реАрдВ рд╣реБрдЖред рдЗрд╕рд▓рд┐рдП, рд╣рдо рд▓рд┐рдирдХреНрд╕ рдкрд░ рдХрд╛рдлреНрдХрд╛ рдмрдврд╝рд╛рдПрдВрдЧреЗред рдореИрдВрдиреЗ рдЙрдмрдВрдЯреВ рдХреЛ 18.04 рдЪреБрдирд╛ред
рдХрд╛рдлреНрдХрд╛ рдХреЛ рдХрд╛рдо рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдЙрд╕реЗ рдПрдХ рдЬрд╝реВрдХреАрдкрд░ рдХреА рдЬрд░реВрд░рдд рд╣реИред рдРрд╕рд╛ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП, рдЖрдкрдХреЛ рдХрд╛рдлреНрдХрд╛ рд▓реЙрдиреНрдЪ рдХрд░рдиреЗ рд╕реЗ рдкрд╣рд▓реЗ рдЗрд╕реЗ рдбрд╛рдЙрдирд▓реЛрдб рдФрд░ рдЪрд▓рд╛рдирд╛ рд╣реЛрдЧрд╛ред
So.
0. рдЬреЗрдЖрд░рдИ рд╕реНрдерд╛рдкрд┐рдд рдХрд░реЗрдВ
рдпрд╣ рдирд┐рдореНрдирд▓рд┐рдЦрд┐рдд рдЖрджреЗрд╢реЛрдВ рджреНрд╡рд╛рд░рд╛ рдХрд┐рдпрд╛ рдЬрд╛рддрд╛ рд╣реИ:
sudo apt-get update sudo apt-get install default-jre
рдпрджрд┐ рд╕рдм рдХреБрдЫ рдареАрдХ рд░рд╣рд╛, рддреЛ рдЖрдк рдХрдорд╛рдВрдб рджрд░реНрдЬ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ
java -version
рдФрд░ рд╕реБрдирд┐рд╢реНрдЪрд┐рдд рдХрд░реЗрдВ рдХрд┐ рдЬрд╛рд╡рд╛ рд╕реНрдерд╛рдкрд┐рдд рд╣реИред
1. рдбрд╛рдЙрдирд▓реЛрдб Zookeeper
рдореБрдЭреЗ рд▓рд┐рдирдХреНрд╕ рдкрд░ рдЬрд╛рджреВ рдХреА рдЯреАрдореЗрдВ рдкрд╕рдВрдж рдирд╣реАрдВ рд╣реИрдВ, рдЦрд╛рд╕рдХрд░ рдЬрдм рд╡реЗ рдмрд╕ рдХреБрдЫ рдХрдорд╛рдВрдб рджреЗрддреЗ рд╣реИрдВ рдФрд░ рдпрд╣ рд╕реНрдкрд╖реНрдЯ рдирд╣реАрдВ рд╣реИ рдХрд┐ рд╡реЗ рдХреНрдпрд╛ рдХрд░ рд░рд╣реЗ рд╣реИрдВред рдЗрд╕рд▓рд┐рдП, рдореИрдВ рдкреНрд░рддреНрдпреЗрдХ рдХреНрд░рд┐рдпрд╛ рдХрд╛ рд╡рд░реНрдгрди рдХрд░реВрдВрдЧрд╛ - рдпрд╣ рд╡рд╛рд╕реНрддрд╡ рдореЗрдВ рдХреНрдпрд╛ рдХрд░рддрд╛ рд╣реИред рдЗрд╕рд▓рд┐рдП, рд╣рдореЗрдВ Zookeeper рдХреЛ рдбрд╛рдЙрдирд▓реЛрдб рдХрд░рдиреЗ рдФрд░ рдЗрд╕реЗ рдПрдХ рд╕реБрд╡рд┐рдзрд╛рдЬрдирдХ рдлрд╝реЛрд▓реНрдбрд░ рдореЗрдВ рдЕрдирдЬрд╝рд┐рдк рдХрд░рдиреЗ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИред рдпрд╣ рд╕рд▓рд╛рд╣ рджреА рдЬрд╛рддреА рд╣реИ рдХрд┐ рдпрджрд┐ рд╕рднреА рдПрдкреНрд▓рд┐рдХреЗрд╢рди / рдСрдкреНрдЯ рдлрд╝реЛрд▓реНрдбрд░ рдореЗрдВ рд╕рдВрдЧреНрд░рд╣реАрдд рд╣реИрдВ, рдЕрд░реНрдерд╛рдд, рд╣рдорд╛рд░реЗ рдорд╛рдорд▓реЗ рдореЗрдВ, рдпрд╣ / рдСрдкреНрдЯ / рдЬрд╝реБрдХреАрдкрд░ рд╣реЛрдЧрд╛ред
рдореИрдВрдиреЗ рдиреАрдЪреЗ рдХрдорд╛рдВрдб рдХрд╛ рдЗрд╕реНрддреЗрдорд╛рд▓ рдХрд┐рдпрд╛ред рдпрджрд┐ рдЖрдк рдЕрдиреНрдп рд▓рд┐рдирдХреНрд╕ рдХрдорд╛рдВрдб рдЬрд╛рдирддреЗ рд╣реИрдВ, рдЬреЛ рдЖрдкрдХреА рд░рд╛рдп рдореЗрдВ, рдЖрдкрдХреЛ рдпрд╣ рдЕрдзрд┐рдХ рдирд╕реНрд▓реАрдп рд░реВрдк рд╕реЗ рд╕рд╣реА рдврдВрдЧ рд╕реЗ рдХрд░рдиреЗ рдХреА рдЕрдиреБрдорддрд┐ рджреЗрдЧрд╛, рддреЛ рдЙрдирдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░реЗрдВред рдореИрдВ рдПрдХ рдбреЗрд╡рд▓рдкрд░ рд╣реВрдВ, рдПрдХ рднрдХреНрдд рдирд╣реАрдВ рд╣реВрдВ, рдФрд░ рдореИрдВ "рдмрдХрд░реА рд╣реА" рдХреЗ рд╕реНрддрд░ рдкрд░ рд╕рд░реНрд╡рд░ рдХреЗ рд╕рд╛рде рд╕рдВрд╡рд╛рдж рдХрд░рддрд╛ рд╣реВрдВред рддреЛ, рдЖрд╡реЗрджрди рдбрд╛рдЙрдирд▓реЛрдб рдХрд░реЗрдВ:
wget -P /home/xpendence/downloads/ "http://apache-mirror.rbc.ru/pub/apache/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz"
рдПрдкреНрд▓рд┐рдХреЗрд╢рди рдХреЛ рдЖрдкрдХреЗ рджреНрд╡рд╛рд░рд╛ рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдлрд╝реЛрд▓реНрдбрд░ рдореЗрдВ рдбрд╛рдЙрдирд▓реЛрдб рдХрд┐рдпрд╛ рдЬрд╛рддрд╛ рд╣реИ, рдореИрдВрдиреЗ рдЙрди рд╕рднреА рдПрдкреНрд▓рд┐рдХреЗрд╢рдиреЛрдВ рдХреЛ рдбрд╛рдЙрдирд▓реЛрдб рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдлрд╝реЛрд▓реНрдбрд░ / рд╣реЛрдо / xpendence / рдбрд╛рдЙрдирд▓реЛрдб рдмрдирд╛рдпрд╛ рд╣реИ рдЬрд┐рдирдХреА рдореБрдЭреЗ рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИред
2. рдЕрдирдкреИрдХ рдЬрд╝реБрдХрд╛рдЗрдкрд░
рдореИрдВрдиреЗ рдХрдорд╛рдВрдб рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд┐рдпрд╛:
tar -xvzf /home/xpendence/downloads/zookeeper-3.4.12.tar.gz
рдпрд╣ рдХрдорд╛рдВрдб рдЙрд╕ рдлрд╝реЛрд▓реНрдбрд░ рдореЗрдВ рд╕рдВрдЧреНрд░рд╣ рдХреЛ рдЕрдирдкреИрдХ рдХрд░рддрд╛ рд╣реИ рдЬрд┐рд╕рдореЗрдВ рдЖрдк рд╕реНрдерд┐рдд рд╣реИрдВред рдлрд┐рд░ рдЖрдкрдХреЛ рдПрдкреНрд▓рд┐рдХреЗрд╢рди рдХреЛ / рдСрдкреНрдЯ / рдЬрд╝реБрдХреАрдкрд░ рдХреЛ рд╕реНрдерд╛рдирд╛рдВрддрд░рд┐рдд рдХрд░рдиреЗ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реЛ рд╕рдХрддреА рд╣реИред рдФрд░ рдЖрдк рддреБрд░рдВрдд рдЗрд╕рдореЗрдВ рдЬрд╛ рд╕рдХрддреЗ рд╣реИрдВ рдФрд░ рд╡рд╣рд╛рдВ рд╕реЗ рдкрд╣рд▓реЗ рд╕реЗ рд╣реА рд╕рдВрдЧреНрд░рд╣ рдХреЛ рдЕрдирдкреИрдХ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВред
3. рд╕реЗрдЯрд┐рдВрдЧреНрд╕ рд╕рдВрдкрд╛рджрд┐рдд рдХрд░реЗрдВ
рдлрд╝реЛрд▓реНрдбрд░ рдореЗрдВ / zookeeper / conf / рдПрдХ рдлрд╝рд╛рдЗрд▓ zoo-sample.cfg рд╣реИ, рдореИрдВ рдЗрд╕реЗ zoo.conf рдореЗрдВ рдирд╛рдо рдмрджрд▓рдиреЗ рдХрд╛ рдкреНрд░рд╕реНрддрд╛рд╡ рджреЗрддрд╛ рд╣реВрдВ, рдпрд╣ рд╡рд╣ рдлрд╛рдЗрд▓ рд╣реИ рдЬрд┐рд╕реЗ JVM рд╕реНрдЯрд╛рд░реНрдЯрдЕрдк рдХреЗ рд▓рд┐рдП рджреЗрдЦреЗрдЧрд╛ред рдирд┐рдореНрдирд▓рд┐рдЦрд┐рдд рдХреЛ рдЗрд╕ рдлрд╝рд╛рдЗрд▓ рдореЗрдВ рдЕрдВрдд рдореЗрдВ рдЬреЛрдбрд╝рд╛ рдЬрд╛рдирд╛ рдЪрд╛рд╣рд┐рдП:
tickTime=2000 dataDir=/var/zookeeper clientPort=2181
рдЗрд╕рдХреЗ рдЕрд▓рд╛рд╡рд╛, / var / zookeeper рдирд┐рд░реНрджреЗрд╢рд┐рдХрд╛ рдмрдирд╛рдПрдВред
4. рдЬрд╝реВрдХреАрдкрд░ рд▓реЙрдиреНрдЪ рдХрд░реЗрдВ
/ Opt / zookeeper рдлрд╝реЛрд▓реНрдбрд░ рдореЗрдВ рдЬрд╛рдПрдВ рдФрд░ рдХрдорд╛рдВрдб рдХреЗ рд╕рд╛рде рд╕рд░реНрд╡рд░ рд╢реБрд░реВ рдХрд░реЗрдВ:
bin/zkServer.sh start
"рд╢реБрд░реВ" рджрд┐рдЦрд╛рдИ рджреЗрдирд╛ рдЪрд╛рд╣рд┐рдПред
рдЬрд┐рд╕рдХреЗ рдмрд╛рдж, рдореИрдВ рдпрд╣ рдЬрд╛рдВрдЪрдиреЗ рдХрд╛ рдкреНрд░рд╕реНрддрд╛рд╡ рдХрд░рддрд╛ рд╣реВрдВ рдХрд┐ рд╕рд░реНрд╡рд░ рдХрд╛рдо рдХрд░ рд░рд╣рд╛ рд╣реИред рд╣рдо рд▓рд┐рдЦрддреЗ рд╣реИрдВ:
telnet localhost 2181
рдПрдХ рд╕рдВрджреЗрд╢ рджрд┐рдЦрд╛рдИ рджреЗрдирд╛ рдЪрд╛рд╣рд┐рдП рдХрд┐ рдХрдиреЗрдХреНрд╢рди рдПрдХ рд╕рдлрд▓рддрд╛ рдереАред рдпрджрд┐ рдЖрдкрдХреЗ рдкрд╛рд╕ рдПрдХ рдХрдордЬреЛрд░ рд╕рд░реНрд╡рд░ рд╣реИ рдФрд░ рд╕рдВрджреЗрд╢ рджрд┐рдЦрд╛рдИ рдирд╣реАрдВ рджрд┐рдпрд╛, рддреЛ рдлрд┐рд░ рд╕реЗ рдкреНрд░рдпрд╛рд╕ рдХрд░реЗрдВ - рдЬрдм рднреА STARTED рджрд┐рдЦрд╛рдИ рджреЗрддрд╛ рд╣реИ, рддреЛ рдПрдкреНрд▓рд┐рдХреЗрд╢рди рдмрд╣реБрдд рдмрд╛рдж рдореЗрдВ рдкреЛрд░реНрдЯ рдХреЛ рд╕реБрдирдирд╛ рд╢реБрд░реВ рдХрд░рддрд╛ рд╣реИред рдЬрдм рдореИрдВрдиреЗ рдХрдордЬреЛрд░ рд╕рд░реНрд╡рд░ рдкрд░ рдпрд╣ рд╕рдм рдХрд░рдиреЗ рдХреА рдХреЛрд╢рд┐рд╢ рдХреА, рддреЛ рдпрд╣ рд╣рд░ рдмрд╛рд░ рдореЗрд░реЗ рд╕рд╛рде рд╣реБрдЖред рдпрджрд┐ рд╕рдм рдХреБрдЫ рдЬреБрдбрд╝рд╛ рд╣реБрдЖ рд╣реИ, рддреЛ рдХрдорд╛рдВрдб рджрд░реНрдЬ рдХрд░реЗрдВ
ruok
рдЗрд╕рдХрд╛ рдХреНрдпрд╛ рдорддрд▓рдм рд╣реИ: "рдХреНрдпрд╛ рдЖрдк рдареАрдХ рд╣реИрдВ?" рд╕рд░реНрд╡рд░ рдХреЛ рдЬрд╡рд╛рдм рджреЗрдирд╛ рдЪрд╛рд╣рд┐рдП:
imok ( !)
рдФрд░ рдбрд┐рд╕реНрдХрдиреЗрдХреНрдЯ рдХрд░реЗрдВред рддреЛ, рд╕рдм рдХреБрдЫ рдпреЛрдЬрдирд╛ рдХреЗ рдЕрдиреБрд╕рд╛рд░ рд╣реИред рд╣рдо рдЕрдкрд╛рдЪреЗ рдХрд╛рдлреНрдХрд╛ рд▓реЙрдиреНрдЪ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдЖрдЧреЗ рдмрдврд╝реЗрдВред
5. рдХрд╛рдлреНрдХрд╛ рдХреЗ рддрд╣рдд рдПрдХ рдЙрдкрдпреЛрдЧрдХрд░реНрддрд╛ рдмрдирд╛рдПрдВ
рдХрд╛рдлреНрдХрд╛ рдХреЗ рд╕рд╛рде рдХрд╛рдо рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рд╣рдореЗрдВ рдПрдХ рдЕрд▓рдЧ рдЙрдкрдпреЛрдЧрдХрд░реНрддрд╛ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИред
sudo adduser --system --no-create-home --disabled-password --disabled-login kafka
6. рдЕрдкрд╛рдЪреЗ рдХрд╛рдлреНрдХрд╛ рдбрд╛рдЙрдирд▓реЛрдб рдХрд░реЗрдВ
рджреЛ рд╡рд┐рддрд░рдг рд╣реИрдВ - рджреНрд╡рд┐рдЖрдзрд╛рд░реА рдФрд░ рд╕реНрд░реЛрддред рд╣рдореЗрдВ рдПрдХ рдмрд╛рдЗрдирд░реА рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИред рдЙрдкрд╕реНрдерд┐рддрд┐ рдореЗрдВ, рдмрд╛рдЗрдирд░реА рдХреЗ рд╕рд╛рде рд╕рдВрдЧреНрд░рд╣ рдЖрдХрд╛рд░ рдореЗрдВ рдЕрд▓рдЧ рд╣реИред рдмрд╛рдЗрдирд░реА рдХрд╛ рд╡рдЬрди 59 рдПрдордмреА, рдФрд░ 6.5 рдПрдордмреА рдХрд╛ рд╡рдЬрди рд╣реИред
рдиреАрдЪреЗ рджреА рдЧрдИ рд▓рд┐рдВрдХ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ, рдмрд╛рдЗрдирд░реА рдХреЛ рдбрд╛рдЙрдирд▓реЛрдб рдХрд░реЗрдВ
wget -P /home/xpendence/downloads/ "http://mirror.linux-ia64.org/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz"
7. рдЕрдкрд╛рдЪреЗ рдХрд╛рдлреНрдХрд╛
рдЕрдирдкреИрдХрд┐рдВрдЧ рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдЬрд╝реБрдХрд╛рдЗрдкрд░ рдХреЗ рд▓рд┐рдП рд╕рдорд╛рди рд╕реЗ рдЕрд▓рдЧ рдирд╣реАрдВ рд╣реИред рд╣рдо рдЖрд░реНрдХ рдХреЛ / рдСрдкреНрдЯ рдбрд╛рдпрд░реЗрдХреНрдЯрд░реА рдореЗрдВ рднреА рдЕрдирдкреИрдХ рдХрд░рддреЗ рд╣реИрдВ рдФрд░ рдЗрд╕реЗ рдХрд╛рдХрд╛рдХрд╛ рдХрд╛ рдирд╛рдо рджреЗрддреЗ рд╣реИрдВ рддрд╛рдХрд┐ / рдмрд┐рди рдлреЛрд▓реНрдбрд░ рдХрд╛ рд░рд╛рд╕реНрддрд╛ / рдСрдкреНрдЯ / рдХрд╛рдлреНрдХрд╛ / рдмрд┐рди рд╣реЛ рдЬрд╛рдП
tar -xvzf /home/xpendence/downloads/kafka_2.11-2.1.0.tgz
8. рд╕реЗрдЯрд┐рдВрдЧреНрд╕ рд╕рдВрдкрд╛рджрд┐рдд рдХрд░реЗрдВ
рд╕реЗрдЯрд┐рдВрдЧреНрд╕ рдореЗрдВ / рдЕрдкрдирд╛рдиреЗ /kafka/config/server.properties рд╣реИрдВред рдПрдХ рдкрдВрдХреНрддрд┐ рдЬреЛрдбрд╝реЗрдВ:
delete.topic.enable = true
рдпрд╣ рд╕реЗрдЯрд┐рдВрдЧ рд╡реИрдХрд▓реНрдкрд┐рдХ рдкреНрд░рддреАрдд рд╣реЛрддреА рд╣реИ, рдпрд╣ рдЗрд╕рдХреЗ рдмрд┐рдирд╛ рдХрд╛рдо рдХрд░рддреА рд╣реИред рдпрд╣ рд╕реЗрдЯрд┐рдВрдЧ рдЖрдкрдХреЛ рд╡рд┐рд╖рдпреЛрдВ рдХреЛ рд╣рдЯрд╛рдиреЗ рдХреА рдЕрдиреБрдорддрд┐ рджреЗрддреА рд╣реИред рдЕрдиреНрдпрдерд╛, рдЖрдк рдмрд╕ рдХрдорд╛рдВрдб рд▓рд╛рдЗрди рдХреЗ рдорд╛рдзреНрдпрдо рд╕реЗ рд╡рд┐рд╖рдпреЛрдВ рдХреЛ рд╣рдЯрд╛ рдирд╣реАрдВ рд╕рдХрддреЗ рд╣реИрдВред
9. рд╣рдо рдЙрдкрдпреЛрдЧрдХрд░реНрддрд╛ рдХрд╛рдлреНрдХрд╛ рдирд┐рд░реНрджреЗрд╢рд┐рдХрд╛ рдХрд╛рдлреНрдХрд╛ рдХреЛ рдПрдХреНрд╕реЗрд╕ рджреЗрддреЗ рд╣реИрдВ
chown -R kafka:nogroup /opt/kafka chown -R kafka:nogroup /var/lib/kafka
10. рдЕрдкрд╛рдЪреЗ рдХрд╛рдлреНрдХрд╛ рдХрд╛ рд▓рдВрдмреЗ рд╕рдордп рд╕реЗ рдкреНрд░рддреАрдХреНрд╖рд┐рдд рд▓реЙрдиреНрдЪ
рд╣рдо рдХрдорд╛рдВрдб рджрд░реНрдЬ рдХрд░рддреЗ рд╣реИрдВ, рдЬрд┐рд╕рдХреЗ рдмрд╛рдж рдХрд╛рдлреНрдХрд╛ рд╢реБрд░реВ рд╣реЛрдирд╛ рдЪрд╛рд╣рд┐рдП:
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
рдпрджрд┐ рд╕рд╛рдорд╛рдиреНрдп рдХреНрд░рд┐рдпрд╛рдУрдВ (рдХрд╛рдлреНрдХрд╛ рдЬрд╛рд╡рд╛ рдФрд░ рд╕реНрдХрд╛рд▓рд╛ рдореЗрдВ рд▓рд┐рдЦреА рдЧрдИ рд╣реИ) рд▓реЙрдЧ рдореЗрдВ рдЦрддреНрдо рдирд╣реАрдВ рд╣реБрдИ, рддреЛ рд╕рдм рдХреБрдЫ рдХрд╛рдо рдХрд░ рдЧрдпрд╛ рдФрд░ рдЖрдк рд╣рдорд╛рд░реА рд╕реЗрд╡рд╛ рдХрд╛ рдкрд░реАрдХреНрд╖рдг рдХрд░ рд╕рдХрддреЗ рд╣реИрдВред
10.1ред рдХрдордЬреЛрд░ рд╕рд░реНрд╡рд░ рд╕рдорд╕реНрдпрд╛рдПрдБ
рдЕрдкрд╛рдЪреЗ рдХрд╛рдлреНрдХрд╛ рдкрд░ рдкреНрд░рдпреЛрдЧреЛрдВ рдХреЗ рд▓рд┐рдП, рдореИрдВрдиреЗ рдПрдХ рдХреЛрд░ рдФрд░ 512 рдПрдордмреА рд░реИрдо (рд╕рд┐рд░реНрдл 99 рд░реВрдмрд▓ рдХреЗ рд▓рд┐рдП) рдХреЗ рд╕рд╛рде рдПрдХ рдХрдордЬреЛрд░ рд╕рд░реНрд╡рд░ рд▓рд┐рдпрд╛, рдЬреЛ рдореЗрд░реЗ рд▓рд┐рдП рдХрдИ рд╕рдорд╕реНрдпрд╛рдПрдВ рдмрди рдЧрдпрд╛ред
рд╕реНрдореГрддрд┐ рд╕реЗ рдмрд╛рд╣рд░ред рдмреЗрд╢рдХ, рдЖрдк 512 рдПрдордмреА рдХреЗ рд╕рд╛рде рдУрд╡рд░рдХреНрд▓реЙрдХ рдирд╣реАрдВ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ, рдФрд░ рд╕рд░реНрд╡рд░ рдореЗрдореЛрд░реА рдХреА рдХрдореА рдХреЗ рдХрд╛рд░рдг рдХрд╛рдлреНрдХрд╛ рдХреЛ рддреИрдирд╛рдд рдирд╣реАрдВ рдХрд░ рд╕рдХрддрд╛ рд╣реИред рддрдереНрдп рдпрд╣ рд╣реИ рдХрд┐ рдбрд┐рдлрд╝реЙрд▓реНрдЯ рд░реВрдк рд╕реЗ, рдХрд╛рдлреНрдХрд╛ 1 рдЬреАрдмреА рдореЗрдореЛрд░реА рдХреА рдЦрдкрдд рдХрд░рддрд╛ рд╣реИред рдХреЛрдИ рдЖрд╢реНрдЪрд░реНрдп рдирд╣реАрдВ рдХрд┐ рд╡рд╣ рдЧрд╛рдпрдм рдерд╛ :)
рд╣рдо kafka-server-start.sh, zookeeper-server-start.sh рдкрд░ рдЬрд╛рддреЗ рд╣реИрдВред рд╕реНрдореГрддрд┐ рдХреЛ рдирд┐рдпрдВрддреНрд░рд┐рдд рдХрд░рдиреЗ рд╡рд╛рд▓реА рдПрдХ рдкрдВрдХреНрддрд┐ рдкрд╣рд▓реЗ рд╕реЗ рд╣реА рд╣реИ:
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
рдЗрд╕реЗ рдЗрд╕рдореЗрдВ рдмрджрд▓реЗрдВ:
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
рдпрд╣ рдХрд╛рдлреНрдХрд╛ рдХреА рднреВрдЦ рдХреЛ рдХрдо рдХрд░реЗрдЧрд╛ рдФрд░ рдЖрдкрдХреЛ рд╕рд░реНрд╡рд░ рд╢реБрд░реВ рдХрд░рдиреЗ рдХреА рдЕрдиреБрдорддрд┐ рджреЗрдЧрд╛ред
рдПрдХ рдХрдордЬреЛрд░ рдХрдВрдкреНрдпреВрдЯрд░ рдХреЗ рд╕рд╛рде рджреВрд╕рд░реА рд╕рдорд╕реНрдпрд╛ рдЬрд╝реБрдХрд╛рдЗрдкрд░ рд╕реЗ рдХрдиреЗрдХреНрдЯ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рд╕рдордп рдХреА рдХрдореА рд╣реИред рдбрд┐рдлрд╝реЙрд▓реНрдЯ рд░реВрдк рд╕реЗ, рдпрд╣ 6 рд╕реЗрдХрдВрдб рджрд┐рдпрд╛ рдЬрд╛рддрд╛ рд╣реИред рдпрджрд┐ рд▓реЛрд╣рд╛ рдХрдордЬреЛрд░ рд╣реИ, рддреЛ рдпрд╣, рдирд┐рд╢реНрдЪрд┐рдд рд░реВрдк рд╕реЗ, рдкрд░реНрдпрд╛рдкреНрдд рдирд╣реАрдВ рд╣реИред Server.properties рдореЗрдВ рд╣рдо zukipper рдХреЛ рдХрдиреЗрдХреНрд╢рди рд╕рдордп рдмрдврд╝рд╛рддреЗ рд╣реИрдВ:
zookeeper.connection.timeout.ms=30000
рдореИрдВрдиреЗ рдЖрдзрд╛ рдорд┐рдирдЯ рд╕реЗрдЯ рдХрд┐рдпрд╛ред
11. рдХрд╛рдлреНрдХрд╛-рд╕рд░реНрд╡рд░ рдХрд╛ рдкрд░реАрдХреНрд╖рдг рдХрд░реЗрдВ
рдРрд╕рд╛ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП, рд╣рдо рджреЛ рдЯрд░реНрдорд┐рдирд▓ рдЦреЛрд▓реЗрдВрдЧреЗ, рдПрдХ рдкрд░ рд╣рдо рдирд┐рд░реНрдорд╛рддрд╛ рдХреЛ рд▓реЙрдиреНрдЪ рдХрд░реЗрдВрдЧреЗ, рджреВрд╕рд░реЗ рдкрд░ - рдЙрдкрднреЛрдХреНрддрд╛ред
рдкрд╣рд▓реЗ рдХрдВрд╕реЛрд▓ рдореЗрдВ, рдПрдХ рдкрдВрдХреНрддрд┐ рджрд░реНрдЬ рдХрд░реЗрдВ:
/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
рдпрд╣ рдЖрдЗрдХрди рдкреНрд░рджрд░реНрд╢рд┐рдд рд╣реЛрдирд╛ рдЪрд╛рд╣рд┐рдП, рдпрд╣ рджрд░реНрд╢рд╛рддрд╛ рд╣реИ рдХрд┐ рдирд┐рд░реНрдорд╛рддрд╛ рд╕реНрдкреИрдо рд╕рдВрджреЗрд╢реЛрдВ рдХреЗ рд▓рд┐рдП рддреИрдпрд╛рд░ рд╣реИ:
>
рджреВрд╕рд░реЗ рдХрдВрд╕реЛрд▓ рдореЗрдВ, рдХрдорд╛рдВрдб рджрд░реНрдЬ рдХрд░реЗрдВ:
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
рдЕрдм, рдирд┐рд░реНрдорд╛рддрд╛ рдХрдВрд╕реЛрд▓ рдореЗрдВ рдЯрд╛рдЗрдк рдХрд░рддреЗ рд╕рдордп, рдЬрдм рдЖрдк Enter рджрдмрд╛рддреЗ рд╣реИрдВ, рддреЛ рдпрд╣ рдЙрдкрднреЛрдХреНрддрд╛ рдХрдВрд╕реЛрд▓ рдореЗрдВ рджрд┐рдЦрд╛рдИ рджреЗрдЧрд╛ред

рдпрджрд┐ рдЖрдк рд╕реНрдХреНрд░реАрди рдкрд░ рд▓рдЧрднрдЧ рдореЗрд░реЗ рдЬреИрд╕реЗ рд╣реА рджрд┐рдЦрддреЗ рд╣реИрдВ - рдмрдзрд╛рдИ, рд╕рдмрд╕реЗ рдЦрд░рд╛рдм рдЦрддреНрдо рд╣реЛ рдЧрдпрд╛ рд╣реИ!
рдЕрдм рд╣рдореЗрдВ рдмрд╕ рд╕реНрдкреНрд░рд┐рдВрдЧ рдмреВрдЯ рдкрд░ рдХреБрдЫ рдЧреНрд░рд╛рд╣рдХреЛрдВ рдХреЛ рд▓рд┐рдЦрдирд╛ рд╣реИ рдЬреЛ рдЕрдкрд╛рдЪреЗ рдХрд╛рдлреНрдХрд╛ рдХреЗ рдорд╛рдзреНрдпрдо рд╕реЗ рдПрдХ рджреВрд╕рд░реЗ рдХреЗ рд╕рд╛рде рд╕рдВрд╡рд╛рдж рдХрд░реЗрдВрдЧреЗред
рд╕реНрдкреНрд░рд┐рдВрдЧ рдмреВрдЯ рдкрд░ рдПрдХ рдПрдкреНрд▓реАрдХреЗрд╢рди рд▓рд┐рдЦрдирд╛
рд╣рдо рджреЛ рдПрдкреНрд▓рд┐рдХреЗрд╢рди рд▓рд┐рдЦреЗрдВрдЧреЗ рдЬреЛ рдЕрдкрд╛рдЪреЗ рдХрд╛рдлреНрдХрд╛ рдХреЗ рдорд╛рдзреНрдпрдо рд╕реЗ рд╕рдВрджреЗрд╢реЛрдВ рдХрд╛ рдЖрджрд╛рди-рдкреНрд░рджрд╛рди рдХрд░реЗрдВрдЧреЗред рдкрд╣рд▓реЗ рд╕рдВрджреЗрд╢ рдХреЛ рдХрд╛рдлреНрдХрд╛-рд╕рд░реНрд╡рд░ рдХрд╣рд╛ рдЬрд╛рдПрдЧрд╛ рдФрд░ рдЗрд╕рдореЗрдВ рдирд┐рд░реНрдорд╛рддрд╛ рдФрд░ рдЙрдкрднреЛрдХреНрддрд╛ рджреЛрдиреЛрдВ рд╢рд╛рдорд┐рд▓ рд╣реЛрдВрдЧреЗред рджреВрд╕рд░реЗ рдХреЛ рдХрд╛рдлреНрдХрд╛-рдкрд░реАрдХреНрд╖рдХ рдХрд╣рд╛ рдЬрд╛рдПрдЧрд╛, рдЗрд╕реЗ рдЗрд╕ рддрд░рд╣ рд╕реЗ рдбрд┐рдЬрд╝рд╛рдЗрди рдХрд┐рдпрд╛ рдЧрдпрд╛ рд╣реИ рдХрд┐ рд╣рдорд╛рд░реЗ рдкрд╛рд╕ рдПрдХ рдорд╛рдЗрдХреНрд░реЛ-рдЖрд░реНрдХрд┐рдЯреЗрдХреНрдЪрд░ рд╣реЛред
рдХрд╛рдлреНрдХрд╛-рд╕рд░реНрд╡рд░
рд╕реНрдкреНрд░рд┐рдВрдЧ рдкреНрд░рд┐рдирд┐рдЬрд╝рд┐рдЬрд╝реНрд░ рдХреЗ рдорд╛рдзреНрдпрдо рд╕реЗ рдмрдирд╛рдИ рдЧрдИ рд╣рдорд╛рд░реА рдкрд░рд┐рдпреЛрдЬрдирд╛рдУрдВ рдХреЗ рд▓рд┐рдП, рд╣рдореЗрдВ рдХрд╛рдлреНрдХрд╛ рдореЙрдбреНрдпреВрд▓ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИред рдореИрдВрдиреЗ рд▓реЛрдореНрдмреЛрдХ рдФрд░ рд╡реЗрдм рдХреЛ рдЬреЛрдбрд╝рд╛, рд▓реЗрдХрд┐рди рдпрд╣ рд╕реНрд╡рд╛рдж рдХрд╛ рдорд╛рдорд▓рд╛ рд╣реИред
рдХрдлрд╝реНрдХрд╛-рдХреНрд▓рд╛рдЗрдВрдЯ рдореЗрдВ рджреЛ рдШрдЯрдХ рд╣реЛрддреЗ рд╣реИрдВ - рдирд┐рд░реНрдорд╛рддрд╛ (рд╡рд╣ рдХрдлрд╝реНрдХрд╛-рд╕рд░реНрд╡рд░ рдХреЛ рд╕рдВрджреЗрд╢ рднреЗрдЬрддрд╛ рд╣реИ) рдФрд░ рдЙрдкрднреЛрдХреНрддрд╛ (рд╡рд╣ рдХрдлрд╝реНрдХрд╛-рд╕рд░реНрд╡рд░ рдХреЛ рд╕реБрдирддрд╛ рд╣реИ рдФрд░ рд╡рд╣рд╛рдБ рд╕реЗ рд╕рдмреНрд╕реНрдХреНрд░рд╛рдЗрдм рдХрд┐рдП рдЧрдП рд╡рд┐рд╖рдпреЛрдВ рдкрд░ рдирдП рд╕рдВрджреЗрд╢ рд▓реЗрддрд╛ рд╣реИ)ред рд╣рдорд╛рд░рд╛ рдХрд╛рдо рджреЛрдиреЛрдВ рдШрдЯрдХреЛрдВ рдХреЛ рд▓рд┐рдЦрдирд╛ рдФрд░ рдЙрдиреНрд╣реЗрдВ рдХрд╛рдо рдХрд░рдирд╛ рд╣реИред
рдЙрдкрднреЛрдХреНрддрд╛:
@Configuration public class KafkaConsumerConfig { @Value("${kafka.server}") private String kafkaServer; @Value("${kafka.group.id}") private String kafkaGroupId; @Bean public KafkaListenerContainerFactory<?> batchFactory() { ConcurrentKafkaListenerContainerFactory<Long, AbstractDto> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); factory.setMessageConverter(new BatchMessagingMessageConverter(converter())); return factory; } @Bean public KafkaListenerContainerFactory<?> singleFactory() { ConcurrentKafkaListenerContainerFactory<Long, AbstractDto> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(false); factory.setMessageConverter(new StringJsonMessageConverter()); return factory; } @Bean public ConsumerFactory<Long, AbstractDto> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() { return new ConcurrentKafkaListenerContainerFactory<>(); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); return props; } @Bean public StringJsonMessageConverter converter() { return new StringJsonMessageConverter(); } }
рд╣рдореЗрдВ kafka.properties рд╕реЗ рд╕реНрдерд┐рд░ рдбреЗрдЯрд╛ рдХреЗ рд╕рд╛рде рдЖрд░рдВрднрд┐рдХ 2 рдлрд╝реАрд▓реНрдб рдЪрд╛рд╣рд┐рдПред
kafka.server=localhost:9092 kafka.group.id=server.broadcast
kafka.server рд╡рд╣ рдкрддрд╛ рд╣реИ рдЬрд┐рд╕ рдкрд░ рд╣рдорд╛рд░рд╛ рд╕рд░реНрд╡рд░ рд▓рдЯрдХрд╛ рд╣реИ, рдЗрд╕ рдорд╛рдорд▓реЗ рдореЗрдВ, рд╕реНрдерд╛рдиреАрдпред рдбрд┐рдлрд╝реЙрд▓реНрдЯ рд░реВрдк рд╕реЗ, рдХрд╛рдлреНрдХрд╛ рдкреЛрд░реНрдЯ 9092 рдкрд░ рд╕реБрдирддрд╛ рд╣реИред
kafka.group.id рдЙрдкрднреЛрдХреНрддрд╛рдУрдВ рдХрд╛ рдПрдХ рд╕рдореВрд╣ рд╣реИ, рдЬрд┐рд╕рдХреЗ рднреАрддрд░ рд╕рдВрджреЗрд╢ рдХрд╛ рдПрдХ рдЙрджрд╛рд╣рд░рдг рджрд┐рдпрд╛ рдЬрд╛рддрд╛ рд╣реИред рдЙрджрд╛рд╣рд░рдг рдХреЗ рд▓рд┐рдП, рдЖрдкрдХреЗ рдкрд╛рд╕ рдПрдХ рд╕рдореВрд╣ рдореЗрдВ рддреАрди рдХреЛрд░рд┐рдпрд░ рд╣реИрдВ, рдФрд░ рд╡реЗ рд╕рднреА рдПрдХ рд╣реА рд╡рд┐рд╖рдп рдХреЛ рд╕реБрдирддреЗ рд╣реИрдВред рдЬреИрд╕реЗ рд╣реА рдЗрд╕ рд╡рд┐рд╖рдп рдХреЗ рд╕рд╛рде рд╕рд░реНрд╡рд░ рдкрд░ рдПрдХ рдирдпрд╛ рд╕рдВрджреЗрд╢ рджрд┐рдЦрд╛рдИ рджреЗрддрд╛ рд╣реИ, рдЙрд╕реЗ рд╕рдореВрд╣ рдореЗрдВ рдХрд┐рд╕реА рдХреЛ рджрд┐рдпрд╛ рдЬрд╛рддрд╛ рд╣реИред рд╢реЗрд╖ рджреЛ рдЙрдкрднреЛрдХреНрддрд╛рдУрдВ рдХреЛ рд╕рдВрджреЗрд╢ рдкреНрд░рд╛рдкреНрдд рдирд╣реАрдВ рд╣реЛ рд░рд╣рд╛ рд╣реИред
рдЕрдЧрд▓рд╛, рд╣рдо рдЙрдкрднреЛрдХреНрддрд╛рдУрдВ рдХреЗ рд▓рд┐рдП рдПрдХ рдХрд╛рд░рдЦрд╛рдирд╛ рдмрдирд╛ рд░рд╣реЗ рд╣реИрдВ - ConsumerFactoryред
@Bean public ConsumerFactory<Long, AbstractDto> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); }
рд╣рдореЗрдВ рдЬрд┐рди рдЧреБрдгреЛрдВ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИ, рдЙрдирдХреЗ рд╕рд╛рде рдЖрд░рдВрдн рдореЗрдВ, рдпрд╣ рднрд╡рд┐рд╖реНрдп рдореЗрдВ рдЙрдкрднреЛрдХреНрддрд╛рдУрдВ рдХреЗ рд▓рд┐рдП рдПрдХ рдорд╛рдирдХ рдХрд╛рд░рдЦрд╛рдиреЗ рдХреЗ рд░реВрдк рдореЗрдВ рдХрд╛рдо рдХрд░реЗрдЧрд╛ред
@Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); return props; }
ConsumerConfigs рд╕рд┐рд░реНрдл рдореИрдк рдХреЙрдиреНтАНрдлрд╝рд┐рдЧрд░ рд╣реИрдВред рд╣рдо рд╕рд░реНрд╡рд░ рдХрд╛ рдкрддрд╛, рд╕рдореВрд╣ рдФрд░ рдбрд┐рд╕рдПрд░рд┐рд▓рд╛рдЗрдЬрд╝рд░ рдкреНрд░рджрд╛рди рдХрд░рддреЗ рд╣реИрдВред
рдЗрд╕рдХреЗ рдЕрд▓рд╛рд╡рд╛, рдПрдХ рдЙрдкрднреЛрдХреНрддрд╛ рдХреЗ рд▓рд┐рдП рд╕рдмрд╕реЗ рдорд╣рддреНрд╡рдкреВрд░реНрдг рдмрд┐рдВрджреБрдУрдВ рдореЗрдВ рд╕реЗ рдПрдХред рдЙрдкрднреЛрдХреНрддрд╛ рдПрдХрд▓ рдСрдмреНрдЬреЗрдХреНрдЯ рдФрд░ рд╕рдВрдЧреНрд░рд╣ рджреЛрдиреЛрдВ рдкреНрд░рд╛рдкреНрдд рдХрд░ рд╕рдХрддрд╛ рд╣реИ - рдЙрджрд╛рд╣рд░рдг рдХреЗ рд▓рд┐рдП, StarshipDto рдФрд░ List рджреЛрдиреЛрдВред рдФрд░ рдЕрдЧрд░ рд╣рдореЗрдВ JSON рдХреЗ рд░реВрдк рдореЗрдВ StarshipDto рдорд┐рд▓рддрд╛ рд╣реИ, рддреЛ рд╣рдореЗрдВ JSON рд╕рд░рдгреА рдХреЗ рд░реВрдк рдореЗрдВ, рдореЛрдЯреЗ рддреМрд░ рдкрд░ рдмреЛрд▓рдХрд░ рд╕реВрдЪреА рдорд┐рд▓рддреА рд╣реИред рдЗрд╕рд▓рд┐рдП, рд╣рдорд╛рд░реЗ рдкрд╛рд╕ рдХрдо рд╕реЗ рдХрдо рджреЛ рд╕рдВрджреЗрд╢ рдХрд╛рд░рдЦрд╛рдиреЗ рд╣реИрдВ - рдПрдХрд▓ рд╕рдВрджреЗрд╢реЛрдВ рдХреЗ рд▓рд┐рдП рдФрд░ рд╕рд░рдгрд┐рдпреЛрдВ рдХреЗ рд▓рд┐рдПред
@Bean public KafkaListenerContainerFactory<?> singleFactory() { ConcurrentKafkaListenerContainerFactory<Long, AbstractDto> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(false); factory.setMessageConverter(new StringJsonMessageConverter()); return factory; }
рд╣рдо ConcurrentKafkaListenerContainerFactory, рдЯрд╛рдЗрдк рдХрд┐рдпрд╛ рд╣реБрдЖ рд▓рдВрдмрд╛ (рд╕рдВрджреЗрд╢ рдХреБрдВрдЬреА) рдФрд░ AbstractDto (рд╕рд╛рд░ рд╕рдВрджреЗрд╢ рдорд╛рди) рдЯрд╛рдЗрдк рдХрд░рддреЗ рд╣реИрдВ, рдФрд░ рдЧреБрдгреЛрдВ рдХреЗ рд╕рд╛рде рдЕрдкрдиреЗ рдХреНрд╖реЗрддреНрд░реЛрдВ рдХреЛ рдЗрдирд┐рд╢рд┐рдпрд▓рд╛рдЗрдЬрд╝ рдХрд░рддреЗ рд╣реИрдВред рд╣рдо, рдирд┐рд╢реНрдЪрд┐рдд рд░реВрдк рд╕реЗ, рд╣рдорд╛рд░реЗ рдорд╛рдирдХ рдХрд╛рд░рдЦрд╛рдиреЗ (рдЬрд┐рд╕рдореЗрдВ рдкрд╣рд▓реЗ рд╕реЗ рд╣реА рдорд╛рдирдЪрд┐рддреНрд░ рдХреЙрдиреНрдлрд╝рд┐рдЧрд░ рд╣реИрдВ) рдХреЗ рд╕рд╛рде рдХрд╛рд░рдЦрд╛рдиреЗ рдХреЛ рдЖрд░рдВрднреАрдХреГрдд рдХрд░рддреЗ рд╣реИрдВ, рдлрд┐рд░ рд╣рдо рдЪрд┐рд╣реНрдирд┐рдд рдХрд░рддреЗ рд╣реИрдВ рдХрд┐ рд╣рдо рдкреИрдХреЗрдЯ (рд╕рдорд╛рди рд╕рд░рдгрд┐рдпреЛрдВ) рдХреЛ рдирд╣реАрдВ рд╕реБрдирддреЗ рд╣реИрдВ рдФрд░ рдХрдирд╡рд░реНрдЯрд░ рдХреЗ рд░реВрдк рдореЗрдВ рдПрдХ рд╕рд╛рдзрд╛рд░рдг JSON рдХрдирд╡рд░реНрдЯрд░ рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХрд░рддреЗ рд╣реИрдВред
рдЬрдм рд╣рдо рд╕рдВрдХреБрд▓ / рд╕рд░рдгрд┐рдпреЛрдВ (рдмреИрдЪ) рдХреЗ рд▓рд┐рдП рдПрдХ рдХрд╛рд░рдЦрд╛рдирд╛ рдмрдирд╛рддреЗ рд╣реИрдВ, рддреЛ рдореБрдЦреНрдп рдЕрдВрддрд░ (рдЗрд╕ рддрдереНрдп рдХреЗ рдЕрд▓рд╛рд╡рд╛ рдХрд┐ рд╣рдо рдЪрд┐рд╣реНрдирд┐рдд рдХрд░рддреЗ рд╣реИрдВ рдХрд┐ рд╣рдо рдкреИрдХреЗрдЬ рд╕реБрди рд░рд╣реЗ рд╣реИрдВ) рдпрд╣ рд╣реИ рдХрд┐ рд╣рдо рдПрдХ рд╡рд┐рд╢реЗрд╖ рдкреИрдХреЗрдЬ рдХрдирд╡рд░реНрдЯрд░ рдХреЗ рд░реВрдк рдореЗрдВ рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХрд░рддреЗ рд╣реИрдВ рдЬреЛ рдкреИрдХреЗрдЬреЛрдВ рдХреЛ рдкрд░рд┐рд╡рд░реНрддрд┐рдд рдХрд░реЗрдЧрд╛ред JSON рд╕реНрдЯреНрд░рд┐рдВрдЧреНрд╕ рд╕реЗред
@Bean public KafkaListenerContainerFactory<?> batchFactory() { ConcurrentKafkaListenerContainerFactory<Long, AbstractDto> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); factory.setMessageConverter(new BatchMessagingMessageConverter(converter())); return factory; } @Bean public StringJsonMessageConverter converter() { return new StringJsonMessageConverter(); }
рдФрд░ рдПрдХ рдмрд╛рддред рд╕реНрдкреНрд░рд┐рдВрдЧ рдмреАрдиреНрд╕ рдХреЛ рдЗрдирд┐рд╢рд┐рдпрд▓рд╛рдЗрдЬрд╝ рдХрд░рддреЗ рд╕рдордп, kafkaListenerContainerFactory рдирд╛рдо рдХреЗ рддрд╣рдд рдмрд┐рди рдХреЛ рдирд╣реАрдВ рдЧрд┐рдирд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИ рдФрд░ рдПрдкреНрд▓рд┐рдХреЗрд╢рди рдХреЛ рдмрд░реНрдмрд╛рдж рдХрд░ рджрд┐рдпрд╛ рдЬрд╛рдПрдЧрд╛ред рдирд┐рд╢реНрдЪрд┐рдд рд░реВрдк рд╕реЗ рд╕рдорд╕реНрдпрд╛ рдХреЛ рд╣рд▓ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдФрд░ рдЕрдзрд┐рдХ рд╕реБрд░реБрдЪрд┐рдкреВрд░реНрдг рд╡рд┐рдХрд▓реНрдк рд╣реИрдВ, рдЙрдирдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рдЯрд┐рдкреНрдкрдгрд┐рдпреЛрдВ рдореЗрдВ рд▓рд┐рдЦреЗрдВ, рдЕрднреА рдХреЗ рд▓рд┐рдП рдореИрдВрдиреЗ рдмрд╕ рдПрдХ рд╣реА рдирд╛рдо рдХреЗ рд╕рд╛рде рдХрд╛рд░реНрдпрдХреНрд╖рдорддрд╛ рдХреЗ рд╕рд╛рде рдПрдХ рдмрд┐рди рд▓реЛрдб рдХрд┐рдпрд╛ рд╣реИ:
@Bean public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() { return new ConcurrentKafkaListenerContainerFactory<>(); }
рдЙрдкрднреЛрдХреНрддрд╛ рд╕реНрдерд╛рдкрд┐рдд рд╣реИред рд╣рдо рдирд┐рд░реНрдорд╛рддрд╛ рдХреЗ рдкрд╛рд╕ рдЬрд╛рддреЗ рд╣реИрдВред
@Configuration public class KafkaProducerConfig { @Value("${kafka.server}") private String kafkaServer; @Value("${kafka.producer.id}") private String kafkaProducerId; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerId); return props; } @Bean public ProducerFactory<Long, StarshipDto> producerStarshipFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<Long, StarshipDto> kafkaTemplate() { KafkaTemplate<Long, StarshipDto> template = new KafkaTemplate<>(producerStarshipFactory()); template.setMessageConverter(new StringJsonMessageConverter()); return template; } }
рд╕реНрдЯреИрдЯрд┐рдХ рд╡реИрд░рд┐рдПрдмрд▓ рдореЗрдВ рд╕реЗ рд╣рдореЗрдВ рдХрд╛рдлреНрдХрд╛ рд╕рд░реНрд╡рд░ рдФрд░ рдкреНрд░реЛрдбреНрдпреВрд╕рд░ рдЖрдИрдбреА рдХрд╛ рдкрддрд╛ рдЪрд╛рд╣рд┐рдПред рд╡рд╣ рдХреБрдЫ рднреА рд╣реЛ рд╕рдХрддрд╛ рд╣реИред
рдХреЙрдиреНрдлрд╝рд┐рдЧрд░реЗрд╢рди рдореЗрдВ, рдЬреИрд╕рд╛ рдХрд┐ рд╣рдо рджреЗрдЦрддреЗ рд╣реИрдВ, рдХреБрдЫ рдЦрд╛рд╕ рдирд╣реАрдВ рд╣реИред рд▓рдЧрднрдЧ рдпрд╣реА рдмрд╛рддред рд▓реЗрдХрд┐рди рдХрд╛рд░рдЦрд╛рдиреЛрдВ рдХреЗ рд╕рдВрдмрдВрдз рдореЗрдВ, рдПрдХ рдорд╣рддреНрд╡рдкреВрд░реНрдг рдЕрдВрддрд░ рд╣реИред рд╣рдореЗрдВ рдкреНрд░рддреНрдпреЗрдХ рд╡рд░реНрдЧ рдХреЗ рд▓рд┐рдП рдПрдХ рдЯреЗрдореНрдкреНрд▓реЗрдЯ рдкрдВрдЬреАрдХреГрдд рдХрд░рдирд╛ рд╣реЛрдЧрд╛ рдЬрд┐рд╕рдХреА рд╡рд╕реНрддреБрдПрдВ рд╣рдо рд╕рд░реНрд╡рд░ рдХреЛ рднреЗрдЬреЗрдВрдЧреЗ, рд╕рд╛рде рд╣реА рдЗрд╕рдХреЗ рд▓рд┐рдП рдПрдХ рдХрд╛рд░рдЦрд╛рдирд╛ рднреАред рд╣рдорд╛рд░реЗ рдкрд╛рд╕ рдРрд╕реА рдПрдХ рдЬреЛрдбрд╝реА рд╣реИ, рд▓реЗрдХрд┐рди рдЙрдирдореЗрдВ рд╕реЗ рджрд░реНрдЬрдиреЛрдВ рд╣реЛ рд╕рдХрддреЗ рд╣реИрдВред
рдЯреЗрдореНрдкреНрд▓реЗрдЯ рдореЗрдВ, рд╣рдо рдЪрд┐рд╣реНрдирд┐рдд рдХрд░рддреЗ рд╣реИрдВ рдХрд┐ рд╣рдо JSON рдореЗрдВ рдСрдмреНрдЬреЗрдХреНрдЯреНрд╕ рдХреЛ рдЕрдиреБрдХреНрд░рдорд┐рдд рдХрд░реЗрдВрдЧреЗ, рдФрд░ рдпрд╣, рд╢рд╛рдпрдж, рдкрд░реНрдпрд╛рдкреНрдд рд╣реИред
рд╣рдорд╛рд░реЗ рдкрд╛рд╕ рдПрдХ рдЙрдкрднреЛрдХреНрддрд╛ рдФрд░ рдПрдХ рдирд┐рд░реНрдорд╛рддрд╛ рд╣реИ, рдпрд╣ рдПрдХ рд╕реЗрд╡рд╛ рд▓рд┐рдЦрдиреЗ рдХреЗ рд▓рд┐рдП рдмрдирд╛ рд╣реБрдЖ рд╣реИ рдЬреЛ рд╕рдВрджреЗрд╢ рднреЗрдЬреЗрдЧрд╛ рдФрд░ рдЙрдиреНрд╣реЗрдВ рдкреНрд░рд╛рдкреНрдд рдХрд░реЗрдЧрд╛ред
@Service @Slf4j public class StarshipServiceImpl implements StarshipService { private final KafkaTemplate<Long, StarshipDto> kafkaStarshipTemplate; private final ObjectMapper objectMapper; @Autowired public StarshipServiceImpl(KafkaTemplate<Long, StarshipDto> kafkaStarshipTemplate, ObjectMapper objectMapper) { this.kafkaStarshipTemplate = kafkaStarshipTemplate; this.objectMapper = objectMapper; } @Override public void send(StarshipDto dto) { kafkaStarshipTemplate.send("server.starship", dto); } @Override @KafkaListener(id = "Starship", topics = {"server.starship"}, containerFactory = "singleFactory") public void consume(StarshipDto dto) { log.info("=> consumed {}", writeValueAsString(dto)); } private String writeValueAsString(StarshipDto dto) { try { return objectMapper.writeValueAsString(dto); } catch (JsonProcessingException e) { e.printStackTrace(); throw new RuntimeException("Writing value to JSON failed: " + dto.toString()); } } }
рд╣рдорд╛рд░реА рд╕реЗрд╡рд╛ рдореЗрдВ рдХреЗрд╡рд▓ рджреЛ рд╡рд┐рдзрд┐рдпрд╛рдВ рд╣реИрдВ, рд╡реЗ рд╣рдорд╛рд░реЗ рд▓рд┐рдП рдХреНрд▓рд╛рдЗрдВрдЯ рдХреЗ рдХрд╛рдо рдХреЛ рд╕рдордЭрд╛рдиреЗ рдХреЗ рд▓рд┐рдП рдкрд░реНрдпрд╛рдкреНрдд рд╣реИрдВред рд╣рдо рдЙрди рдкреИрдЯрд░реНрди рдХреЛ рд╕реНрд╡рд╛рд╣рд╛ рдХрд░рддреЗ рд╣реИрдВ рдЬрд┐рдирдХреА рд╣рдореЗрдВ рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИ:
private final KafkaTemplate<Long, StarshipDto> kafkaStarshipTemplate;
рдирд┐рд░реНрдорд╛рддрд╛ рд╡рд┐рдзрд┐:
@Override public void send(StarshipDto dto) { kafkaStarshipTemplate.send("server.starship", dto); }
рд╕рд░реНрд╡рд░ рдХреЛ рдПрдХ рд╕рдВрджреЗрд╢ рднреЗрдЬрдиреЗ рдХреЗ рд▓рд┐рдП рд╕рднреА рдЖрд╡рд╢реНрдпрдХ рд╣реИ рдХрд┐ рдЯреЗрдореНрдкрд▓реЗрдЯ рдкрд░ рднреЗрдЬреЗрдВ рд╡рд┐рдзрд┐ рдХреЛ рдХреЙрд▓ рдХрд░реЗрдВ рдФрд░ рд╡рд┐рд╖рдп (рд╡рд┐рд╖рдп) рдФрд░ рд╣рдорд╛рд░реА рд╡рд╕реНрддреБ рдХреЛ рд╡рд╣рд╛рдВ рд╕реНрдерд╛рдирд╛рдВрддрд░рд┐рдд рдХрд░реЗрдВред рдСрдмреНрдЬреЗрдХреНрдЯ рдХреЛ JSON рдореЗрдВ рдХреНрд░рдордмрджреНрдз рдХрд┐рдпрд╛ рдЬрд╛рдПрдЧрд╛ рдФрд░ рдирд┐рд░реНрджрд┐рд╖реНрдЯ рд╡рд┐рд╖рдп рдХреЗ рддрд╣рдд рд╕рд░реНрд╡рд░ рдкрд░ рдЙрдбрд╝рд╛рди рднрд░реЗрдЧрд╛ред
рд╕реБрдирдиреЗ рдХреА рд╡рд┐рдзрд┐ рдЗрд╕ рдкреНрд░рдХрд╛рд░ рд╣реИ:
@Override @KafkaListener(id = "Starship", topics = {"server.starship"}, containerFactory = "singleFactory") public void consume(StarshipDto dto) { log.info("=> consumed {}", writeValueAsString(dto)); }
рд╣рдо рдЗрд╕ рд╡рд┐рдзрд┐ рдХреЛ @KafkaListener рдПрдиреЛрдЯреЗрд╢рди рдХреЗ рд╕рд╛рде рдЪрд┐рд╣реНрдирд┐рдд рдХрд░рддреЗ рд╣реИрдВ, рдЬрд╣рд╛рдВ рд╣рдо рдХрд┐рд╕реА рднреА рдЖрдИрдбреА рдХреЛ рдЗрдВрдЧрд┐рдд рдХрд░рддреЗ рд╣реИрдВ рдЬрд┐рд╕реЗ рд╣рдо рдкрд╕рдВрдж рдХрд░рддреЗ рд╣реИрдВ, рд╕реБрдиреЗ рд╣реБрдП рд╡рд┐рд╖рдп рдФрд░ рдПрдХ рдХрд╛рд░рдЦрд╛рдирд╛ рдЬреЛ рд╣рдореЗрдВ рдкреНрд░рд╛рдкреНрдд рд╕рдВрджреЗрд╢ рдХреЛ рд╣рдорд╛рд░реА рдЖрд╡рд╢реНрдпрдХрддрд╛ рдХреЗ рдЕрдиреБрд╕рд╛рд░ рдкрд░рд┐рд╡рд░реНрддрд┐рдд рдХрд░ рджреЗрдЧрд╛ред рдЗрд╕ рдорд╛рдорд▓реЗ рдореЗрдВ, рдЪреВрдВрдХрд┐ рд╣рдо рдХрд┐рд╕реА рдПрдХ рд╡рд╕реНрддреБ рдХреЛ рд╕реНрд╡реАрдХрд╛рд░ рдХрд░рддреЗ рд╣реИрдВ, рдЗрд╕рд▓рд┐рдП рд╣рдореЗрдВ рдПрдХ рдПрдХрд▓ рдХрд╛рд░реНрдп рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИред рд╕реВрдЪреА рдХреЗ рд▓рд┐рдП <?>, рдмреИрдЪ рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХрд░реЗрдВред рдкрд░рд┐рдгрд╛рдорд╕реНрд╡рд░реВрдк, рд╣рдо рдСрдмреНрдЬреЗрдХреНрдЯ рдХреЛ рднреЗрдЬрдиреЗ рдХреА рд╡рд┐рдзрд┐ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рдХрд╛рдлреНрдХрд╛ рд╕рд░реНрд╡рд░ рдХреЛ рднреЗрдЬрддреЗ рд╣реИрдВ рдФрд░ рдЙрдкрднреЛрдЧ рд╡рд┐рдзрд┐ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рдЗрд╕реЗ рдкреНрд░рд╛рдкреНрдд рдХрд░рддреЗ рд╣реИрдВред
рдЖрдк 5 рдорд┐рдирдЯ рдореЗрдВ рдПрдХ рдкрд░реАрдХреНрд╖рдг рд▓рд┐рдЦ рд╕рдХрддреЗ рд╣реИрдВ рдЬреЛ рдХрд╛рдлреНрдХрд╛ рдХреА рдкреВрд░реА рддрд╛рдХрдд рдХрд╛ рдкреНрд░рджрд░реНрд╢рди рдХрд░реЗрдЧрд╛, рд▓реЗрдХрд┐рди рд╣рдо рдЖрдЧреЗ рдмрдврд╝реЗрдВрдЧреЗ - 10 рдорд┐рдирдЯ рдмрд┐рддрд╛рдПрдВрдЧреЗ рдФрд░ рдПрдХ рдФрд░ рдПрдкреНрд▓рд┐рдХреЗрд╢рди рд▓рд┐рдЦреЗрдВрдЧреЗ рдЬреЛ рд╕рд░реНрд╡рд░ рдХреЛ рд╕рдВрджреЗрд╢ рднреЗрдЬреЗрдЧрд╛ рдХрд┐ рд╣рдорд╛рд░рд╛ рдкрд╣рд▓рд╛ рдЖрд╡реЗрджрди рд╕реБрдиреЗрдВрдЧреЗред
рдХрд╛рдлреНрдХрд╛-рдкрд░реАрдХреНрд╖рдХ
рдкрд╣рд▓рд╛ рдЖрд╡реЗрджрди рд▓рд┐рдЦрдиреЗ рдХрд╛ рдЕрдиреБрднрд╡ рд╣реЛрдиреЗ рдХреЗ рдмрд╛рдж, рд╣рдо рдЖрд╕рд╛рдиреА рд╕реЗ рджреВрд╕рд░рд╛ рд▓рд┐рдЦ тАЛтАЛрд╕рдХрддреЗ рд╣реИрдВ, рдЦрд╛рд╕рдХрд░ рдпрджрд┐ рд╣рдо рдкреЗрд╕реНрдЯ рдФрд░ рдбреЛрдЯреЛ рдкреИрдХреЗрдЬ рдХреА рдирдХрд▓ рдХрд░рддреЗ рд╣реИрдВ, рдХреЗрд╡рд▓ рдирд┐рд░реНрдорд╛рддрд╛ рдХреЛ рдкрдВрдЬреАрдХреГрдд рдХрд░реЗрдВ (рд╣рдо рдХреЗрд╡рд▓ рд╕рдВрджреЗрд╢ рднреЗрдЬреЗрдВрдЧреЗ) рдФрд░ рд╕реЗрд╡рд╛ рдореЗрдВ рдПрдХрдорд╛рддреНрд░ рднреЗрдЬрдиреЗ рдХреА рд╡рд┐рдзрд┐ рдЬреЛрдбрд╝реЗрдВред рдиреАрдЪреЗ рджрд┐рдП рдЧрдП рд▓рд┐рдВрдХ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ, рдЖрдк рдЖрд╕рд╛рдиреА рд╕реЗ рдкреНрд░реЛрдЬреЗрдХреНрдЯ рдХреЛрдб рдбрд╛рдЙрдирд▓реЛрдб рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ рдФрд░ рд╕реБрдирд┐рд╢реНрдЪрд┐рдд рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ рдХрд┐ рд╡рд╣рд╛рдБ рдХреБрдЫ рднреА рдЬрдЯрд┐рд▓ рдирд╣реАрдВ рд╣реИред
@Scheduled(initialDelay = 10000, fixedDelay = 5000) @Override public void produce() { StarshipDto dto = createDto(); log.info("<= sending {}", writeValueAsString(dto)); kafkaStarshipTemplate.send("server.starship", dto); } private StarshipDto createDto() { return new StarshipDto("Starship " + (LocalTime.now().toNanoOfDay() / 1000000)); }
рдкрд╣рд▓реЗ 10 рд╕реЗрдХрдВрдб рдХреЗ рдмрд╛рдж, рдХрд╛рдлреНрдХрд╛-рдкрд░реАрдХреНрд╖рдХ рд╣рд░ 5 рд╕реЗрдХрдВрдб (рдЪрд┐рддреНрд░ рдХреНрд▓рд┐рдХ рдХрд░рдиреЗ рдпреЛрдЧреНрдп рд╣реИ) рдХреЛ рдХрд╛рдлреНрдХрд╛ рд╕рд░реНрд╡рд░ рдХреЛ рд╕реНрдЯрд╛рд░рд╢рд┐рдк рдХреЗ рдирд╛рдо рдХреЗ рд╕рд╛рде рд╕рдВрджреЗрд╢ рднреЗрдЬрдирд╛ рд╢реБрд░реВ рдХрд░ рджреЗрддрд╛ рд╣реИред

рд╡рд╣рд╛рдВ, рдЙрдиреНрд╣реЗрдВ рдХрд╛рдлреНрдХрд╛-рд╕рд░реНрд╡рд░ рджреНрд╡рд╛рд░рд╛ рд╕реБрдиреА рдФрд░ рдкреНрд░рд╛рдкреНрдд рдХреА рдЬрд╛рддреА рд╣реИ (рдЪрд┐рддреНрд░ рднреА рдХреНрд▓рд┐рдХ рдХрд░рдиреЗ рдпреЛрдЧреНрдп рд╣реИ)ред

рдореБрдЭреЗ рдЙрдореНрдореАрдж рд╣реИ рдХрд┐ рдЬреЛ рд▓реЛрдЧ рдХрд╛рдлреНрдХрд╛ рдкрд░ рдорд╛рдЗрдХреНрд░реЛрд╕рд░реНрд╡рд┐рд╕реЗрдЬ рд▓рд┐рдЦрдирд╛ рд╢реБрд░реВ рдХрд░рдиреЗ рдХрд╛ рд╕рдкрдирд╛ рджреЗрдЦрддреЗ рд╣реИрдВ, рд╡реЗ рдЙрддрдиреЗ рд╣реА рдЖрд╕рд╛рдиреА рд╕реЗ рд╕рдлрд▓ рд╣реЛрдВрдЧреЗ рдЬрд┐рддрдирд╛ рдореИрдВрдиреЗ рдХрд┐рдпрд╛ рдерд╛ред рдФрд░ рдпрд╣рд╛рдБ рдкрд░рд┐рдпреЛрдЬрдирд╛рдУрдВ рдХреЗ рд▓рд┐рдП рд▓рд┐рдВрдХ рд╣реИрдВ:
тЖТ
рдХрдлрд╝реНрдХрд╛-рд╕рд░реНрд╡рд░тЖТ
рдХрд╛рдлреНрдХрд╛-рдкрд░реАрдХреНрд╖рдХ