Hay muchas formas de procesar mensajes de sistemas Pub-Sub: usando un servicio separado, aislando un proceso aislado, orquestando un grupo de procesos / hilos, IPC complejo, Poll-over-Http y muchos otros. Hoy quiero hablar sobre cómo usar Pub-Sub a través de HTTP y sobre mi servicio escrito específicamente para esto.
El uso de un backend de servicio HTTP ya preparado en algunos casos es una solución ideal para procesar una cola de mensajes:
- Balanceándose fuera de la caja. Por lo general, el backend ya está detrás del equilibrador y tiene una infraestructura lista para cargar, lo que simplifica enormemente el trabajo con mensajes.
- Usando un controlador REST normal (cualquier recurso HTTP). El consumo de mensajes HTTP minimiza el costo de implementar compiladores para diferentes idiomas si el backend es mixto.
- Simplificación del uso de enlaces web de otros servicios. Ahora, casi todos los servicios (Jira, Gitlab, Mattermost, Slack ...) de alguna manera admiten enlaces web para interactuar con el mundo exterior. Puede facilitarle la vida si le enseña a la cola a realizar las funciones de un despachador HTTP.
Este enfoque también tiene desventajas:
- Puedes olvidarte de la ligereza de la solución. HTTP es un protocolo pesado, y el uso de marcos del lado del consumidor aumentará instantáneamente la latencia y la carga.
- Perdemos las fortalezas del enfoque de Encuesta, obteniendo las debilidades de Push.
- El procesamiento de mensajes por las mismas instancias de servicio que procesan clientes pueden afectar la capacidad de respuesta. Esto no es significativo, ya que se trata con equilibrio y aislamiento.
Implementé la idea como un servicio Queue-Over-Http, que se discutirá más adelante. El proyecto está escrito en Kotlin usando Spring Boot 2.1. Como corredor, solo Apache Kafka está disponible actualmente.
Además en el artículo, se supone que el lector está familiarizado con Kafka y conoce los commits (commit) y los offsets (offset) de los mensajes, los principios de grupos (grupo) y consumidores (consumidor), y también comprende cómo la partición (partición) difiere del tema (tema) . Si hay lagunas, le aconsejo que lea esta sección de la documentación de Kafka antes de continuar.Contenido
Revisar
Queue-Over-Http es un servicio que actúa como intermediario entre un intermediario de mensajes y el consumidor HTTP final (el servicio facilita la implementación del soporte para enviar mensajes a los consumidores de cualquier otra forma, por ejemplo, varios * RPC). Por el momento, solo está disponible la suscripción, la cancelación de la suscripción y la visualización de la lista de consumidores. El envío de mensajes al agente (producto) a través de HTTP aún no se ha implementado debido a la imposibilidad de garantizar el orden de los mensajes sin el apoyo especial del productor.
La figura clave del servicio es un consumidor que puede suscribirse a particiones específicas o solo a temas (se admite el patrón de temas). En el primer caso, el equilibrio automático de particiones está desactivado. Después de suscribirse, el recurso HTTP especificado comienza a recibir mensajes de las particiones Kafka asignadas. Arquitectónicamente, cada suscriptor está asociado con un cliente Java Kafka nativo.
entretenida historia sobre KafkaConsumerKafka tiene un maravilloso cliente Java que puede hacer mucho. Lo uso en el adaptador de cola para recibir mensajes del intermediario y luego lo envío a las colas de servicio locales. Vale la pena mencionar que el cliente trabaja exclusivamente en el contexto de un solo hilo.
La idea del adaptador es simple. Comenzamos en un hilo, escribimos el planificador más simple de clientes nativos, centrándonos en reducir la latencia. Es decir, escribimos algo similar:
while (!Thread.interrupted()) { var hasWork = false for (consumer in kafkaConsumers) { val queueGroup = consumers[consumer] ?: continue invalidateSubscription(consumer, queueGroup) val records = consumer.poll(Duration.ZERO) if (!records.isEmpty) { hasWork = true } } val committed = doCommit() if (!hasWork && committed == 0) {
Parece que todo es maravilloso, la latencia es mínima incluso con docenas de consumidores. En la práctica, resultó que
KafkaConsumer
para este modo de operación y ofrece una tasa de asignación de aproximadamente 1.5 MB / s en tiempo de inactividad. Con 100 correos, la tasa de asignación alcanza los 150 MB / sy hace que GC a menudo piense en la aplicación. Por supuesto, toda esta basura está en el área joven, GC es bastante capaz de manejar esto, pero aún así, la solución no es perfecta.
Obviamente, debe seguir el camino típico de
KafkaConsumer
y ahora
KafkaConsumer
cada suscriptor en mi transmisión. Esto proporciona una sobrecarga para la memoria y la programación, pero no hay otra manera.
Reescribo el código de arriba, quitando el bucle interno y cambiando
Duration.ZERO
a
Duration.ofMillis(100)
. Resulta bien, la tasa de asignación cae a 80-150 KB / s aceptables por consumidor. Sin embargo, una encuesta con un tiempo de espera de 100 ms retrasa toda la cola de confirmaciones para estos mismos 100 ms, y esto es inaceptable en gran medida.
En el proceso de encontrar soluciones al problema, recuerdo
KafkaConsumer::wakeup
, que arroja una
WakeupException
e interrumpe cualquier operación de bloqueo en el consumidor. Con este método, el camino hacia la baja latencia es simple: cuando llega una nueva solicitud de confirmación, la ponemos en la cola y al consumidor nativo llamamos
wakeup
. En el ciclo de trabajo,
WakeupException
y vaya a confirmar lo que se ha acumulado. Para la transferencia de control con la ayuda de excepciones, debe entregarlo inmediatamente en sus manos, pero como nada más ...
Resulta que esta opción está lejos de ser perfecta, ya que cualquier operación en el consumidor nativo ahora arroja una
WakeupException
, incluida la confirmación en sí. El procesamiento de esta situación desordenará el código con una bandera que permite que se realice la
wakeup
.
Llegué a la conclusión de que sería bueno modificar el método
KafkaConsumer::poll
para que pueda interrumpirse normalmente, de acuerdo con un indicador adicional. Como resultado,
Frankenstein nació de la reflexión, que copia exactamente el método de encuesta original, agregando una salida del bucle por la bandera. Este indicador se establece mediante un método separado de interruptPoll, que, además, llama a la activación en el selector de clientes para liberar el bloqueo del hilo en las operaciones de E / S.
Después de implementar el cliente de esta manera, obtengo la velocidad de reacción desde el momento en que llega una solicitud de confirmación hasta que se procesa hasta 100 microsegundos, y una latencia excelente para recuperar mensajes de un intermediario, lo cual está bien.
Cada partición está representada por una cola local separada, donde el adaptador escribe mensajes del intermediario. El trabajador toma mensajes de él y los envía para su ejecución, es decir, para enviarlos a través de HTTP.
El servicio admite el procesamiento de mensajes por lotes para aumentar el rendimiento. Al suscribirse, puede especificar el
concurrencyFactor
cada tema (se aplica a cada partición asignada de forma independiente). Por ejemplo,
concurrencyFactor=1000
significa que 1000 mensajes en forma de solicitudes HTTP se pueden enviar al consumidor al mismo tiempo. Tan pronto como todos los mensajes del paquete fueron resueltos inequívocamente por el consumidor, el servicio decide la próxima confirmación de la compensación del último mensaje en Kafka. Por lo tanto, el segundo valor de
concurrencyFactor
es el número máximo de mensajes procesados por el consumidor en caso de un bloqueo de Kafka o Queue-Over-Http.
Para reducir los retrasos, la cola tiene
loadFactor = concurrencyFactor * 2
, que le permite leer el doble de mensajes del agente que se pueden enviar. Dado que la confirmación automática está deshabilitada en el cliente nativo, dicho esquema no viola las garantías de al menos una vez.
Un alto valor de
concurrencyFactor
aumenta el rendimiento de la cola al reducir el número de confirmaciones que toman hasta 10 ms en el peor de los casos. Al mismo tiempo, aumenta la carga sobre el consumidor.
El orden de envío de mensajes dentro del paquete no está garantizado, pero se puede lograr estableciendo
concurrencyFactor=1
.
Se compromete
Los commits son una parte importante del servicio. Cuando el siguiente paquete de datos está listo, el desplazamiento del último mensaje del paquete se confirma inmediatamente a Kafka, y solo después de una confirmación exitosa el siguiente paquete está disponible para su procesamiento. A menudo esto no es suficiente y se requiere una confirmación automática. Para hacer esto, existe el parámetro
autoCommitPeriodMs
, que tiene poco en común con el período clásico de confirmación automática para clientes nativos que confirman el último mensaje leído desde la partición. Imagine
concurrencyFactor=10
. El servicio ha enviado los 10 mensajes y está esperando que cada uno de ellos esté listo. El procesamiento del mensaje 3 se completa primero, luego el mensaje 1 y luego el mensaje 10. En este punto, es hora de confirmación automática. Es importante no violar la semántica de al menos una vez. Por lo tanto, solo puede confirmar el primer mensaje, es decir, el desplazamiento 2, ya que solo se procesó con éxito en ese momento. Además, hasta la próxima confirmación automática, se procesan los mensajes 2, 5, 6, 4 y 8. Ahora solo necesita confirmar la compensación 7, y así sucesivamente. La confirmación automática casi no tiene efecto en el rendimiento.
Manejo de errores
En el modo normal de operación, el servicio envía un mensaje al supervisor una vez. Si por alguna razón causó un error 4xx o 5xx, el servicio reenviará el mensaje, esperando un procesamiento exitoso. El tiempo entre intentos se puede configurar como un parámetro separado.
También es posible establecer el número de intentos después de los cuales el mensaje se marcará como procesado, lo que detendrá las retransmisiones independientemente del estado de la respuesta. No recomiendo usar esto para datos confidenciales, las situaciones de falla de los consumidores siempre deben ajustarse manualmente. Los mensajes fijos se pueden monitorear mediante registros de servicio y monitoreando el estado de la respuesta del consumidor.
sobre pegarsePor lo general, el servidor HTTP, que le da a 4xx o 5xx el estado de la respuesta, también envía la Connection: close
encabezado. Una conexión TCP que se cierra de esta manera permanece en estado TIME_WAITED
hasta que el sistema operativo la borra después de un tiempo. El problema es que tales conexiones ocupan un puerto completo que no se puede reutilizar hasta que se libere. Esto puede resultar en la ausencia de puertos libres en la máquina para establecer una conexión TCP y el servicio se lanzará con excepciones en los registros para cada envío. En la práctica, en Windows 10, los puertos terminan después de 10-20 mil enviando mensajes erróneos en 1-2 minutos. En modo estándar, esto no es un problema.
Mensajes
Cada mensaje extraído del intermediario se envía al asesor a través de HTTP al recurso especificado durante la suscripción. Por defecto, un mensaje es enviado por una solicitud POST en el cuerpo. Este comportamiento se puede cambiar especificando cualquier otro método. Si el método no admite el envío de datos en el cuerpo, puede especificar el nombre del parámetro de cadena en el que se enviará el mensaje. Además, al suscribirse, puede especificar encabezados adicionales que se agregarán a cada mensaje, lo cual es conveniente para la autorización básica mediante tokens. Los encabezados se agregan a cada mensaje con el identificador del consumidor, tema y partición, de donde se leyó el mensaje, número de mensaje, clave de partición, si corresponde, así como el nombre del intermediario.
Rendimiento
Para evaluar el rendimiento, utilicé una PC (Windows 10, OpenJDK-11 (G1 sin sintonización), i7-6700K, 16GB), que ejecuta el servicio y una computadora portátil (Windows 10, i5-8250U, 8GB), en la que giraba el productor del mensaje, HTTP Consumer Consumer y Kafka con la configuración predeterminada. La PC está conectada al enrutador a través de una conexión por cable de 1 Gb / s, la computadora portátil a través de 802.11ac. El productor escribe cada 110 ms cada 100 ms para 110 bytes de mensajes a los temas designados para los que están suscritos los consumidores (
concurrencyFactor=500
, la confirmación automática está desactivada) de diferentes grupos. El soporte está lejos de ser ideal, pero puede obtener alguna imagen.
Un parámetro clave de medición es el efecto del servicio en la latencia.
Dejar:
- t
q - marca de tiempo del servicio que recibe mensajes del cliente nativo
- d
t0 es el tiempo entre t
q y el momento en que se envió el mensaje desde la cola local al grupo de ejecutivos
- d
t es el tiempo entre t
q y el momento en que se envió la solicitud HTTP. Esa es la influencia del servicio en la latencia del mensaje.
Durante las mediciones, se obtuvieron los siguientes resultados (C - consumidores, T - temas, M - mensajes):

En el modo operativo estándar, el servicio en sí casi no afecta la latencia y el consumo de memoria es mínimo. Los valores máximos de d
t (aproximadamente 60 ms) no se indican específicamente, ya que dependen de la operación del GC y no del servicio en sí. El ajuste especial de GC o la sustitución de G1 con Shenandoah puede ayudar a suavizar la propagación de los valores máximos.
Todo cambia dramáticamente cuando el consumidor no hace frente al flujo de mensajes desde la cola y el servicio activa el modo de aceleración. En este modo, el consumo de memoria aumenta, ya que el tiempo de respuesta a las solicitudes aumenta significativamente, lo que impide la limpieza oportuna de los recursos. El efecto sobre la latencia aquí permanece al nivel de los resultados anteriores, y los valores altos de dt son causados por la carga previa de mensajes en la cola local.
Desafortunadamente, no es posible realizar pruebas con una carga mayor, ya que la computadora portátil ya se dobla a 1300 RPS. Si alguien puede ayudar con la organización de las mediciones a altas cargas, con mucho gusto proporcionaré un ensamblaje para las pruebas.
Demostración
Ahora pasemos a la demostración. Para esto necesitamos:
- Agente de Kafka, listo para salir. Tomaré la instancia planteada en 192.168.99.100:9092 de Bitnami.
- Un recurso HTTP que recibirá mensajes. Para mayor claridad, tomé ganchos web de Slack.
En primer lugar, debe generar el servicio Queue-Over-Http. Para hacer esto, cree los siguientes contenidos en un directorio vacío
application.yml
:
spring: profiles: default logging: level: com: viirrtus: queueOverHttp: DEBUG app: persistence: file: storageDirectory: "persist" brokers: - name: "Kafka" origin: "kafka" config: bootstrap.servers: "192.168.99.100:9092"
Aquí le indicamos al servicio los parámetros de conexión de un agente específico, así como dónde almacenar los suscriptores para que no se pierdan entre inicios. En `app.brokers []. Config`, puede especificar cualquier parámetro de conexión admitido por el cliente nativo de Kafka; puede encontrar una lista completa
aquí .
Dado que Spring procesa el archivo de configuración, puede escribir muchas cosas interesantes allí. Incluyendo, configurar el registro.
Ahora ejecute el servicio en sí. Usamos la forma más fácil:
docker-compose.yml
:
version: "2" services: app: image: viirrtus/queue-over-http:0.1.3 restart: unless-stopped command: --debug ports: - "8080:8080" volumes: - ./application.yml:/application.yml - ./persist:/persist
Si esta opción no le conviene, puede compilar el servicio desde la fuente. Instrucciones de montaje en el proyecto Léame, un enlace al que se encuentra al final del artículo.El siguiente paso es registrar el primer suscriptor. Para hacer esto, debe realizar una solicitud HTTP al servicio con una descripción del consumidor:
POST localhost:8080/broker/subscription Content-Type: application/json { "id": "my-first-consumer", "group": { "id": "consumers" }, "broker": "Kafka", "topics": [ { "name": "slack.test", "config": { "concurrencyFactor": 10, "autoCommitPeriodMs": 100 } } ], "subscriptionMethod": { "type": "http", "delayOnErrorMs": 1000, "retryBeforeCommit": 10, "uri": "<slack-wh-uri>", "additionalHeaders": { "Content-Type": "application/json" } } }
Si todo salió bien, la respuesta será casi el mismo contenido enviado.
Veamos cada parámetro:
Consumer.id
- ID de nuestro suscriptorConsumer.group.id
- identificador de grupoConsumer.broker
: indique a cuál de los corredores de servicios debe suscribirseConsumer.topics[0].name
: el nombre del tema del que queremos recibir mensajesConsumer.topics[0].config. concurrencyFactor
Consumer.topics[0].config. concurrencyFactor
: número máximo de mensajes enviados simultáneamenteConsumer.topics[0].config. autoCommitPeriodMs
Consumer.topics[0].config. autoCommitPeriodMs
: período de confirmación forzada para mensajes listosConsumer.subscriptionMethod.type
: tipo de suscripción. Solo HTTP está disponible actualmente.Consumer.subscriptionMethod.delayOnErrorMs
: tiempo antes de reenviar un mensaje que terminó en un errorConsumer.subscriptionMethod.retryBeforeCommit
: el número de intentos de reenviar el mensaje de error. Si es 0, el mensaje girará hasta el procesamiento exitoso. En nuestro caso, la garantía de entrega completa no es tan importante como la constancia del flujo.Consumer.subscriptionMethod.uri
: el recurso al que se enviarán los mensajesConsumer.subscriptionMethod.additionalHeader
: encabezados adicionales que se enviarán con cada mensaje. Tenga en cuenta que habrá JSON en el cuerpo de cada mensaje para que Slack pueda interpretar correctamente la solicitud.
En esta solicitud, se omite el método HTTP, ya que el valor predeterminado, POST, Slack está bastante bien.Desde este momento, el servicio monitorea las particiones asignadas del tema slack.test para nuevos mensajes.
Para escribir mensajes sobre el tema, utilizaré las utilidades integradas en Kafka que se encuentran en
/opt/bitnami/kafka/bin
imagen de Kafka lanzada (la ubicación de las utilidades en otras instancias de Kafka puede diferir):
kafka-console-producer.sh --broker-list localhost:9092 --topic slack.test > {“text”: “Hello!”}
Al mismo tiempo, Slack le notificará de un nuevo mensaje:
Para cancelar la suscripción de un consumidor, es suficiente hacer una solicitud POST para 'intermediar / cancelar la suscripción' con el mismo contenido que estaba durante la suscripción.Conclusión
Por el momento, solo se implementa la funcionalidad básica. Además, se planea mejorar el procesamiento por lotes, tratar de implementar la semántica Exactamente una vez, agregar la capacidad de enviar mensajes al agente a través de HTTP y, lo más importante, agregar soporte para otros Pub-Sub populares.
El servicio Queue-Over-Http se encuentra actualmente en desarrollo activo. La versión 0.1.3 es lo suficientemente estable como para probar en dev y soportes de escenario. El rendimiento ha sido probado en Windows 10, Debian 9 y Ubuntu 18.04. Puede usar productos bajo su propio riesgo. Si desea ayudar con el desarrollo o dar su opinión sobre el servicio, bienvenido al proyecto
Github .