有多种方法可以处理来自Pub-Sub系统的消息:使用单独的服务,隔离孤立的进程,编排进程/流池,复杂的IPC,Http-Poll等等。 今天,我想谈谈如何通过HTTP使用Pub-Sub以及专门为此编写的服务。
在某些情况下,使用现成的HTTP服务后端是处理消息队列的理想解决方案:
- 开箱即用的平衡。 通常,后端已经在平衡器后面,并且具有可立即加载的基础结构,这大大简化了消息处理。
- 使用常规的REST控制器(任何HTTP资源)。 如果后端混合在一起,则使用HTTP消息可以最大程度地减少针对不同语言的Compumer实现成本。
- 简化其他服务的Web挂钩的使用。 现在,几乎每个服务(Jira,Gitlab,Mattermost,Slack ...)都以某种方式支持Web挂钩,以便与外界进行交互。 如果您教队列执行HTTP调度程序的功能,则可以使生活更轻松。
这种方法也有缺点:
- 您可以忘记解决方案的轻巧性。 HTTP是一个繁重的协议,在使用者方面使用框架会立即增加延迟和负载。
- 我们失去了“投票”方法的优势,而获得了“推”的劣势。
- 由与处理客户端相同的服务实例处理消息可能会影响响应能力。 这并不重要,因为它经过了平衡和隔离处理。
我将该想法实现为Http-Over-Http服务,稍后将进行讨论。 该项目使用Spring Boot 2.1用Kotlin编写。 作为代理,当前仅提供Apache Kafka。
在本文的进一步内容中,假定读者熟悉Kafka并了解消息的提交(commit)和偏移量(offset),组(group)和使用者(consumer)的原理,并且还了解分区(partition)与主题(topic)的区别。 如果有差距,建议您先阅读Kafka文档的这一部分,然后再继续。目录内容
复习
Queue-Over-Http是一种服务,充当消息代理与最终HTTP使用者之间的中介(该服务可以轻松实现对以任何其他方式(例如,各种* RPC)向消费者发送消息的支持。 目前,只有订阅,退订和查看使用者列表可用,由于没有生产者的特殊支持,无法保证消息的顺序,因此尚未通过HTTP将消息发送到代理(产生者)。
服务的关键人物是消费者,他们可以订阅特定的分区或仅订阅主题(支持主题模式)。 在第一种情况下,将关闭分区的自动平衡。 订阅后,指定的HTTP资源开始从分配的Kafka分区接收消息。 在架构上,每个订阅者都与一个本地Kafka Java客户端关联。
关于KafkaConsumer的有趣故事Kafka有一个很棒的Java客户端,可以做很多事情。 我在队列适配器中使用它来接收来自代理的消息,然后将其发送到本地服务队列。 值得一提的是,客户端仅在单个线程的上下文中工作。
适配器的想法很简单。 我们从一个线程开始,编写最简单的本机客户端调度程序,重点是减少延迟。 也就是说,我们写类似的东西:
while (!Thread.interrupted()) { var hasWork = false for (consumer in kafkaConsumers) { val queueGroup = consumers[consumer] ?: continue invalidateSubscription(consumer, queueGroup) val records = consumer.poll(Duration.ZERO) if (!records.isEmpty) { hasWork = true } } val committed = doCommit() if (!hasWork && committed == 0) {
看起来一切都很棒,即使有几十个用户,延迟也很小。 实际上,事实证明
KafkaConsumer
为这种操作模式
KafkaConsumer
,并且在空闲时间提供了大约1.5 MB / s的分配速率。 拥有100个快递公司,分配速率达到150 MB / s,这使GC经常想到该应用程序。 当然,所有这些垃圾都在较年轻的地方,GC能够很好地处理此问题,但是解决方案仍然不是完美的。
显然,您需要按照
KafkaConsumer
典型方式进行
KafkaConsumer
,现在我将每个订户放置在我的流中。 这给内存和调度增加了开销,但是没有其他方法。
我从上面重写了代码,删除了内部循环,并将
Duration.ZERO
更改为
Duration.ofMillis(100)
。 事实证明,每个用户的分配速率下降到可接受的80-150 KB / s。 但是,具有100ms超时的轮询会将整个提交队列延迟到相同的100ms,这在很多情况下是不可接受的。
在寻找问题的解决方案的过程中,我记得
KafkaConsumer::wakeup
,它抛出
WakeupException
并中断对使用者的任何阻塞操作。 使用这种方法,低延迟的路径很简单:当新的提交请求到达时,我们将其放入队列中,而在本机使用者上,我们将其称为
wakeup
。 在工作周期中,捕获
WakeupException
并提交已累积的内容。 为了在例外的帮助下转移控制权,您必须立即将其交到您手中,但除此之外别无其他……
事实证明,此选项远非完美,因为对本机使用者的任何操作现在都将引发
WakeupException
,包括提交本身。 处理这种情况会使代码混乱并带有允许完成
wakeup
的标志。
我得出的结论是,最好修改
KafkaConsumer::poll
方法,以便根据一个附加标志可以正常中断它。 结果,
科学怪人诞生于反射,它精确地复制了原始的轮询方法,并通过标志从循环中添加了一个出口。 此标志由单独的interruptPoll方法设置,此外,该方法在客户端选择器上调用唤醒以释放I / O操作上的线程锁定。
以这种方式实现了客户端之后,从提交请求到处理到处理的整个过程中,我得到的响应速度高达100微秒,并且从代理那里获取消息的延迟非常长,这很好。
每个分区由一个单独的本地队列表示,适配器在其中写入来自代理的消息。 工作程序从中获取消息并将其发送以执行,即通过HTTP发送。
该服务支持批消息处理以增加吞吐量。 订阅时,您可以指定每个主题的
concurrencyFactor
(独立地应用于每个分配的分区)。 例如,
concurrencyFactor=1000
表示可以
concurrencyFactor=1000
将1000条HTTP请求形式的消息发送给使用者。 消费者明确确定了来自包装的所有消息后,该服务便决定下一次提交与Kafka中最后一条消息的偏移量有关的内容。 因此,
concurrencyFactor
的第二个值是在发生Kafka或Http上的队列崩溃时,使用者处理的最大消息数。
为了减少延迟,队列具有
loadFactor = concurrencyFactor * 2
,这使您可以读取来自代理的消息,该消息是发送的消息的两倍。 由于在本机客户端上禁用了自动提交,因此这种方案不会违反“至少一次”保证。
高
concurrencyFactor
值可通过减少最坏情况下长达10毫秒的提交次数来增加队列的吞吐量。 同时,消费者的负担增加。
不能保证捆绑中发送消息的顺序,但是可以通过设置
concurrencyFactor=1
来实现。
提交
提交是服务的重要组成部分。 当下一个数据包准备就绪时,该包中最后一条消息的偏移量立即提交给Kafka,只有成功提交后,下一个数据包才可用于处理。 通常这还不够,并且需要自动提交。 为此,有一个
autoCommitPeriodMs
参数,该参数与提交从分区读取的最后一条消息的本机客户端的经典自动提交期无关。 想象
concurrencyFactor=10
。 该服务已发送所有10条消息,并正在等待它们中的每条消息准备就绪。 消息3的处理首先完成,然后是消息1,然后是消息10。此时,是时候进行自动提交了。 重要的是不要违反“至少一次”语义。 因此,您只能提交第一条消息,即偏移量2,因为此时仅成功处理了它。 此外,在下一次自动提交之前,将处理消息2、5、6、4和8,现在只需要提交偏移量7,依此类推。 自动提交对吞吐量几乎没有影响。
错误处理
在正常操作模式下,服务一次向主管发送一条消息。 如果由于某种原因导致4xx或5xx错误,该服务将重新发送该消息,等待成功处理。 尝试之间的时间可以配置为单独的参数。
也可以设置尝试次数,之后将消息标记为已处理,无论响应状态如何,都将停止重发。 我不建议将其用于敏感数据,应始终手动调整使用者失败的情况。 粘性消息可以通过服务日志和使用者响应状态进行监视。
关于坚持通常,为HTTP服务器提供4xx或5xx响应状态,它还会发送Connection: close
标头。 以这种方式关闭的TCP连接将保持TIME_WAITED
状态,直到一段时间后操作系统将其清除。 问题在于这种连接占用了整个端口,直到释放该端口才能重用。 这可能会导致机器上没有可用端口来建立TCP连接,并且每次发送时都会在日志中抛出该服务异常。 实际上,在Windows 10上,端口在1-2分钟内发送了10-20 000条错误消息后终止。 在标准模式下,这不是问题。
留言内容
从代理提取的每个消息都会通过HTTP发送到预订期间指定的资源的顾问。 默认情况下,正文中的POST请求发送一条消息。 可以通过指定任何其他方法来更改此行为。 如果该方法不支持在正文中发送数据,则可以指定将在其中发送消息的字符串参数的名称。 此外,在订阅时,您可以指定将添加到每个消息的其他标题,这对于使用令牌进行基本授权很方便。 标头会添加到每条消息中,并带有使用者,主题和分区的标识符(从中读取消息),消息号,分区键(如果适用)以及代理的名称。
性能表现
为了评估性能,我使用了一台运行该服务的PC(Windows 10,OpenJDK-11(G1,未进行调整),i7-6700K,16GB)和一台笔记本电脑(Windows 10,i5-8250U,8GB),在笔记本电脑上运行了消息生成器HTTP资源使用方和Kafka具有默认设置。 PC通过1Gb / s有线连接连接到路由器,笔记本电脑通过802.11ac连接到路由器。 生产者每110毫秒每110毫秒将110字节的消息写入到不同组的订阅者所关注的指定主题(
concurrencyFactor=500
,关闭了自动提交)。 架子远不是理想的,但是您可以得到一些图片。
关键的衡量参数是服务对延迟的影响。
让:
-t
q-从本地客户端接收消息的服务时间戳
-d
t0是t
q到消息从本地队列发送到执行人员池之间的时间
-d
t是介于t
q和HTTP请求发送时间之间的时间。 d
t是服务对消息等待时间的影响。
在测量期间,获得了以下结果(C-消费者,T-主题,M-消息):

在标准操作模式下,服务本身几乎不影响延迟,并且内存消耗最小。 d
t的最大值(约60ms)没有具体说明,因为它们取决于GC的操作,而不取决于服务本身。 GC的特殊调整或用Shenandoah代替G1可以帮助平滑最大值的分布。
当使用者无法应对来自队列的消息流并且服务打开限制模式时,一切都会发生巨大变化。 在这种模式下,内存消耗增加,因为对请求的响应时间显着增加,这妨碍了及时清理资源。 这里对延迟的影响保持在与先前结果相同的水平,并且较高的dt值是由本地队列中的消息预加载引起的。
不幸的是,由于笔记本电脑已经以1300 RPS的速度弯曲,因此无法在更高的负载下进行测试。 如果有人可以帮助组织高负载下的测量,我将很乐意提供用于测试的组件。
示范
现在让我们继续演示。 为此,我们需要:
- 卡夫卡经纪人,准备出发。 我将使用Bitnami在192.168.99.100:9092上提出的实例。
- 一个将接收消息的HTTP资源。 为了清楚起见,我从Slack获得了Web挂钩。
首先,您需要提高Queue-Over-Http服务本身。 为此,请在空的
application.yml
目录中创建以下内容:
spring: profiles: default logging: level: com: viirrtus: queueOverHttp: DEBUG app: persistence: file: storageDirectory: "persist" brokers: - name: "Kafka" origin: "kafka" config: bootstrap.servers: "192.168.99.100:9092"
在这里,我们向服务指示特定代理的连接参数,以及在何处存储订户,以便在启动之间不会丢失订户。 在“ app.brokers []。Config”中,您可以指定本机Kafka客户端支持的任何连接参数;可以在
此处找到完整列表。
由于配置文件是由Spring处理的,因此您可以在其中编写很多有趣的东西。 包括配置日志记录。
现在运行服务本身。 我们使用最简单的方法
docker-compose.yml
:
version: "2" services: app: image: viirrtus/queue-over-http:0.1.3 restart: unless-stopped command: --debug ports: - "8080:8080" volumes: - ./application.yml:/application.yml - ./persist:/persist
如果此选项不适合您,则可以从源代码编译服务。 自述文件项目中的组装说明,在文章末尾提供了指向该链接的链接。下一步是注册第一个订户。 为此,您需要执行对服务的HTTP请求,并带有使用者的描述:
POST localhost:8080/broker/subscription Content-Type: application/json { "id": "my-first-consumer", "group": { "id": "consumers" }, "broker": "Kafka", "topics": [ { "name": "slack.test", "config": { "concurrencyFactor": 10, "autoCommitPeriodMs": 100 } } ], "subscriptionMethod": { "type": "http", "delayOnErrorMs": 1000, "retryBeforeCommit": 10, "uri": "<slack-wh-uri>", "additionalHeaders": { "Content-Type": "application/json" } } }
如果一切顺利,则响应将与发送的内容几乎相同。
我们来看一下每个参数:
Consumer.id
我们的订户IDConsumer.group.id
组标识符Consumer.broker
指示您需要订阅哪些服务代理Consumer.topics[0].name
我们要从中接收消息的主题的名称Consumer.topics[0].config. concurrencyFactor
Consumer.topics[0].config. concurrencyFactor
发送的最大消息数Consumer.topics[0].config. autoCommitPeriodMs
Consumer.topics[0].config. autoCommitPeriodMs
就绪消息的强制提交期Consumer.subscriptionMethod.type
订阅类型。 当前仅HTTP可用。Consumer.subscriptionMethod.delayOnErrorMs
重新发送以错误结尾的消息之前的时间Consumer.subscriptionMethod.retryBeforeCommit
重新发送错误消息的尝试次数。 如果为0-消息将旋转直到成功处理。 在我们的案例中,保证完全交付不如保持流量恒定那么重要。Consumer.subscriptionMethod.uri
将消息发送到的资源Consumer.subscriptionMethod.additionalHeader
将与每个消息一起发送的其他头。 请注意,每封邮件的正文中将包含JSON,以便Slack可以正确解释请求。
在此请求中,省略了HTTP方法,因为默认的POST,Slack非常好。从此刻起,该服务将监视slack.test主题的分配分区中是否有新消息。
为了向该主题编写消息,我将使用Kafka中的内置实用程序,它们位于启动的Kafka映像的
/opt/bitnami/kafka/bin
(其他Kafka实例中的实用程序位置可能不同):
kafka-console-producer.sh --broker-list localhost:9092 --topic slack.test > {“text”: “Hello!”}
同时,Slack将通知您新消息:
要取消订阅用户,只需向订阅“经纪人/取消订阅”发出与订阅期间相同的内容的POST请求即可。结论
目前,仅实现了基本功能。 进一步计划改进批处理,尝试实现完全一次语义,增加通过HTTP向代理发送消息的能力,最重要的是,增加对其他流行的Pub-Sub的支持。
目前正在积极开发基于HTTP的Queue-Over服务。 0.1.3版足够稳定,可以在开发人员和舞台上进行测试。 性能已在Windows 10,Debian 9和Ubuntu 18.04上进行了测试。 您可能需要自担风险使用prod。 如果您想为开发提供帮助或对服务提出任何反馈,欢迎来到
Github项目。