RedisPipe - M谩s diversi贸n juntos

Cuando pienso en c贸mo funcionan los clientes ingenuos de RPC, recuerdo una broma:


El tribunal
"Demandado, 驴por qu茅 mataste a una mujer?"
- Estoy en el autob煤s, el conductor se acerca a la mujer y le exige que compre un boleto. La mujer abri贸 su bolso, sac贸 su bolso, cerr贸 su bolso, abri贸 su bolso, cerr贸 su bolso, abri贸 su bolso, puso su bolso all铆, cerr贸 su bolso, abri贸 su bolso, tom贸 su dinero, abri贸 su bolso, tom贸 su bolso, cerr贸 su bolso, abri贸 su bolso y puso su bolso all铆 , cerr贸 su bolso, abri贸 su bolso, puso su bolso all铆.
- Y que?
- El controlador le dio un boleto. Una mujer abri贸 su bolso, sac贸 su bolso, cerr贸 su bolso, abri贸 su bolso, cerr贸 su bolso, abri贸 su bolso, puso su bolso all铆, cerr贸 su bolso, abri贸 su bolso, coloc贸 su boleto, cerr贸 su bolso, abri贸 su bolso, sac贸 su bolso, cerr贸 su bolso, abri贸 su bolso , puse el bolso all铆, cerr茅 el bolso, abr铆 el bolso, puse el bolso all铆, cerr茅 el bolso.
"Toma el cambio", lleg贸 la voz del controlador. La mujer ... abri贸 su bolso ...
"S铆, no es suficiente matarla", el fiscal no puede soportar.
"As铆 que lo hice".
漏 S. Altov


Aproximadamente lo mismo sucede en el proceso de solicitud-respuesta, si lo aborda fr铆volamente:


  • el proceso del usuario escribe una solicitud serializada en el socket, de hecho la copia en el buffer del socket en el nivel del sistema operativo;
    Esta es una operaci贸n bastante dif铆cil, porque es necesario hacer un cambio de contexto (incluso si puede ser "f谩cil");
  • cuando al sistema operativo le parece que se puede escribir algo en la red, se forma un paquete (la solicitud se copia nuevamente) y se env铆a a la tarjeta de red;
  • la tarjeta de red escribe el paquete en la red (posiblemente despu茅s del almacenamiento en b煤fer);
  • (en el camino, un paquete puede almacenarse varias veces en los enrutadores);
  • finalmente, el paquete llega al host de destino y se almacena en la tarjeta de red;
  • la tarjeta de red env铆a una notificaci贸n al sistema operativo, y cuando el sistema operativo encuentra la hora, copia el paquete en su b煤fer y establece el indicador de listo en el descriptor de archivo;
  • (a煤n debe recordar enviar el ACK en respuesta);
  • despu茅s de un tiempo, la aplicaci贸n del servidor se da cuenta de que el descriptor est谩 listo (usando epoll) y alg煤n d铆a copia la solicitud en el b煤fer de la aplicaci贸n;
  • y finalmente, la aplicaci贸n del servidor procesa la solicitud.

Como saben, la respuesta se transmite exactamente de la misma manera solo en la direcci贸n opuesta. Por lo tanto, cada solicitud pasa un tiempo notable en el sistema operativo para su mantenimiento, y cada respuesta pasa el mismo tiempo nuevamente.


Esto se hizo especialmente notable despu茅s de Meltdown / Spectre, ya que los parches lanzados condujeron a un aumento significativo en el costo de las llamadas al sistema. A principios de enero de 2018, nuestro cl煤ster Redis de repente comenz贸 a consumir una CPU y media o dos veces m谩s, porque Amazon ha aplicado parches de kernel apropiados para cubrir estas vulnerabilidades. (Es cierto que Amazon aplic贸 una nueva versi贸n del parche un poco m谩s tarde, y el consumo de CPU cay贸 casi a los niveles anteriores. Pero el conector ya ha comenzado a nacer).


Desafortunadamente, todos los conectores Go ampliamente conocidos para Redis y Memcached funcionan de la siguiente manera: el conector crea un grupo de conexiones, y cuando necesita enviar una solicitud, extrae una conexi贸n del grupo, escribe una solicitud y luego espera una respuesta. (Es especialmente triste que el conector a Memcached haya sido escrito por el propio Brad Fitzpatrick). Y algunos conectores tienen una implementaci贸n tan infructuosa de este grupo que el proceso de eliminar la conexi贸n del grupo se convierte en una botnet en s铆 mismo.


Hay dos formas de facilitar este arduo trabajo de reenv铆o de solicitud / respuesta:


  1. Utilice el acceso directo a la tarjeta de red: DPDK, netmap, PF_RING, etc.
  2. No env铆e cada solicitud / respuesta como un paquete separado, sino comb铆nelas, si es posible, en paquetes m谩s grandes, es decir, extienda la sobrecarga de trabajar con la red para varias solicitudes. Juntos m谩s divertido!

La primera opci贸n, por supuesto, es posible. Pero, en primer lugar, esto es para los valientes, ya que debe escribir la implementaci贸n de TCP / IP usted mismo (por ejemplo, como en ScyllaDB). Y en segundo lugar, de esta manera facilitamos la situaci贸n solo en un lado, en el que nosotros mismos escribimos. No quiero volver a escribir Redis (todav铆a), por lo que los servidores consumir谩n la misma cantidad, incluso si el cliente usa el genial DPDK.


La segunda opci贸n es mucho m谩s simple y, lo m谩s importante, facilita la situaci贸n inmediatamente en el cliente y en el servidor. Por ejemplo, una base de datos en memoria se jacta de que puede servir a millones de RPS, mientras que Redis no puede atender a un par de cientos de miles . Sin embargo, este 茅xito no es tanto la implementaci贸n de esa base en memoria como la decisi贸n una vez aceptada de que el protocolo ser谩 completamente as铆ncrono, y los clientes deben usar esta asincron铆a siempre que sea posible. Lo que muchos clientes (especialmente los utilizados en los puntos de referencia) implementan con 茅xito enviando solicitudes a trav茅s de una conexi贸n TCP y, si es posible, envi谩ndolas juntas a la red.


Un art铆culo bien conocido muestra que Redis tambi茅n puede entregar un mill贸n de respuestas por segundo si se utiliza la canalizaci贸n. La experiencia personal en el desarrollo de historias en memoria tambi茅n indica que la canalizaci贸n reduce significativamente el consumo de CPU SYS y le permite usar el procesador y la red de manera mucho m谩s eficiente.


La 煤nica pregunta es c贸mo usar la canalizaci贸n, si en la solicitud las solicitudes en Redis a menudo llegan de una en una. Y si falta un servidor y Redis Cluster se usa con una gran cantidad de fragmentos, incluso cuando se encuentra un paquete de solicitudes, se divide en solicitudes individuales para cada fragmento.


La respuesta, por supuesto, es "obvia": realice un trazado de tuber铆a impl铆cito, recopile las solicitudes de todas las gorutinas que se ejecutan en paralelo a un servidor Redis y las env铆e a trav茅s de una conexi贸n.


Por cierto, el tendido impl铆cito de tuber铆as no es tan raro en los conectores en otros idiomas: nodejs node_redis , C # RedisBoost , python aioredis y muchos otros. Muchos de estos conectores est谩n escritos en la parte superior de los bucles de eventos y, por lo tanto, parece natural recopilar solicitudes de "flujos de c谩lculo" paralelos. En Go, se promueve el uso de interfaces s铆ncronas y, aparentemente, porque pocas personas deciden organizar su propio "bucle".


Quer铆amos usar Redis de la manera m谩s eficiente posible y, por lo tanto, decidimos escribir un nuevo conector "mejor" (tm): RedisPipe .


驴C贸mo hacer un tendido de tuber铆a impl铆cito?


El esquema b谩sico:


  • Las rutinas de la l贸gica de la aplicaci贸n no escriben solicitudes directamente en la red, sino que las pasan al recolector de rutinas;
  • si es posible, el recolector recopila un mont贸n de solicitudes, las escribe en la red y las pasa al lector de rutina;
  • Goroutine-reader lee las respuestas de la red, las compara con las solicitudes correspondientes y notifica a los goroutines de la l贸gica acerca de la respuesta recibida.

Algo debe ser notificado sobre la respuesta. Un astuto programador de Go sin duda dir谩: "隆A trav茅s del canal!"
Pero esta no es la 煤nica primitiva de sincronizaci贸n posible y no la m谩s eficiente incluso en el entorno Go. Y dado que diferentes personas tienen diferentes necesidades, haremos que el mecanismo sea extensible, permitiendo al usuario implementar la interfaz (llam茅mosla Future ):


 type Future interface { Resolve(val interface{}) } 

Y luego el esquema b谩sico se ver谩 as铆:


 type future struct { req Request fut Future } type Conn struct { c net.Conn futmtx sync.Mutex wfutures []future futtimer *time.Timer rfutures chan []future } func (c *Conn) Send(r Request, f Future) { c.futmtx.Lock() defer c.futmtx.Unlock() c.wfutures = append(c.wfutures, future{req: r, fut: f}) if len(c.wfutures) == 1 { futtimer.Reset(100*time.Microsecond) } } func (c *Conn) writer() { for range c.futtimer.C { c.futmtx.Lock() futures, c.wfutures = c.wfutures, nil c.futmtx.Unlock() var b []byte for _, ft := range futures { b = AppendRequest(b, ft.req) } _, _err := ccWrite(b) c.rfutures <- futures } } func (c *Conn) reader() { rd := bufio.NewReader(cc) var futures []future for { response, _err := ParseResponse(rd) if len(futures) == 0 { futures = <- c.rfutures } futures[0].fut.Resolve(response) futures = futures[1:] } } 

Por supuesto, este es un c贸digo muy simplificado. Omitido


  • establecimiento de conexi贸n;
  • Tiempos de espera de E / S
  • manejo de errores de lectura / escritura;
  • restablecer la conexi贸n;
  • la capacidad de cancelar la solicitud antes de enviarla a la red;
  • optimizando la asignaci贸n de memoria (reutilizando el buffer y las matrices de futuros).

Cualquier error de E / S (incluido un tiempo de espera) en el c贸digo real conduce a una resoluci贸n de todos los errores futuros correspondientes a las solicitudes enviadas y en espera de ser enviadas.
La capa de conexi贸n no est谩 involucrada en el reintento de la solicitud, y si es necesario (y posible) volver a intentar la solicitud, se puede hacer en un nivel superior de abstracci贸n (por ejemplo, en la implementaci贸n del soporte de Redis Cluster que se describe a continuaci贸n).


Observaci贸n Inicialmente, el circuito parec铆a un poco m谩s complicado. Pero en el proceso de experimentos simplificado a tal opci贸n.


Observaci贸n 2. Se imponen requisitos muy estrictos al futuro. M茅todo de resoluci贸n: debe ser lo m谩s r谩pido posible, pr谩cticamente sin bloqueo y en ning煤n caso de p谩nico. Esto se debe al hecho de que se llama sincr贸nicamente en el ciclo del lector, y cualquier "freno" inevitablemente conducir谩 a la degradaci贸n. La implementaci贸n de Future.Resolve debe hacer el m铆nimo necesario de acciones lineales: despertar al expectante; tal vez maneje el error y env铆e un reintento asincr贸nico (utilizado en la implementaci贸n del soporte de cl煤ster).


Efecto


隆Un buen punto de referencia es la mitad del art铆culo!


Un buen punto de referencia es uno que est茅 lo m谩s cerca posible de combatir el uso en t茅rminos de efectos observados. Y esto no es f谩cil de hacer.


La opci贸n de referencia , que me parece bastante similar a la real:


  • el "script" principal emula 5 clientes paralelos,
  • en cada "cliente", por cada 300-1000 rps "deseadas", se activa la gorutina (se activan 3 gorutinas por 1000 rps, se ejecutan 124 gorutinas por 128000 rps),
  • gorutin usa una instancia separada del limitador de velocidad y env铆a solicitudes en series aleatorias, de 5 a 15 solicitudes.

La aleatoriedad de la serie de consultas nos permite lograr una distribuci贸n aleatoria de la serie en la l铆nea de tiempo, que refleja m谩s correctamente la carga real.


Texto oculto

Las opciones incorrectas fueron:
a) utilice un limitador de velocidad para todas las gorutinas del "cliente" y recurra a 茅l para cada solicitud; esto lleva a un consumo excesivo de CPU por el propio limitador de velocidad, as铆 como a una mayor rotaci贸n de tiempo de goroutin, lo que degrada el rendimiento de RedisPipe a rps medias (pero inexplicablemente mejora a la altura);
b) utilice un limitador de velocidad para todas las gorutinas del "cliente" y env铆e solicitudes en serie: el limitador de velocidad no est谩 consumiendo tanto la CPU, pero la alternancia de gorutinas en el tiempo solo aumenta;
c) use un limitador de velocidad para cada goroutine, pero env铆e la misma serie de 10 solicitudes; en este escenario, las goroutines se despiertan demasiado simult谩neamente, lo que mejora injustamente los resultados de RedisPipe.


Las pruebas se realizaron en una instancia de AWS c5-2xlarge de cuatro n煤cleos. La versi贸n de Redis es 5.0.


La relaci贸n de la intensidad de consulta deseada, la intensidad total resultante y consumida por el r谩bano de la CPU:


rps previstosredigo
rps /% cpu
redispipe no espere
rps /% cpu
redispipe 50碌s
rps /% cpu
redispipe 150碌s
rps /% cpu
1000 * 55015/7%5015/6%5015/6%5015/6%
2000 * 510022/11%10022/10%10022/10%10022/10%
4000 * 520036/21%20036/18%20035/17%20035/15%
8000 * 540020/45%40062/37%40060/26%40056/19%
16000 * 579994/71%80102/58%80096/33%80090/23%
32000 * 5159590/96%160 180/80%160,167 / 39%160150/29%
64000 * 5187774/99%320 313/98%320,283 / 47%320,258 / 37%
92000 * 5183206/99%480,443 / 97%480,407 / 52%480 366/42%
128000 * 5179744/99%640,484 / 97%640,488 / 55%640,428 / 46%

Solicitar tarifaCPU Redis


Puede notar que con un conector que funciona de acuerdo con el esquema cl谩sico (solicitud / respuesta + grupo de conexiones), Redis consume r谩pidamente el n煤cleo del procesador, despu茅s de lo cual se convierte en una tarea imposible obtener m谩s de 190 krps.


RedisPipe le permite exprimir toda la potencia requerida de Redis. Y cuanto m谩s nos detenemos para recopilar solicitudes paralelas, menos Redis consume CPU. Ya se obtiene un beneficio tangible a 4krps del cliente (20krps en total) si se usa una pausa de 150 microsegundos.


Incluso si la pausa no se usa expl铆citamente cuando Redis descansa en la CPU, el retraso aparece por s铆 solo. Adem谩s, las solicitudes comienzan a ser almacenadas por el sistema operativo. Esto permite que RedisPipe aumente el n煤mero de solicitudes ejecutadas con 茅xito cuando el conector cl谩sico ya est谩 bajando sus patas.


Este es el resultado principal, para lo cual fue necesario crear un nuevo conector.


驴Qu茅 sucede entonces con el consumo de CPU en el cliente y con las solicitudes retrasadas?


rps previstosredigo
% cpu / ms
redispipe nowait
% cpu / ms
redispipe 50ms
% cpu / ms
redispipe 150ms
% cpu / ms
1000 * 513 / 0.0320 / 0.0446 / 0.1644 / 0.26
2000 * 525 / 0.0333 / 0.0477 / 0.1671 / 0.26
4000 * 548 / 0.0360 / 0.04124 / 0.16107 / 0.26
8000 * 594 / 0.03119 / 0.04178 / 0.15141 / 0.26
16000 * 5184 / 0.04206 / 0.04228 / 0.15177 / 0.25
32000 * 5341 / 0.08322 / 0.05280 / 0.15226 / 0.26
64000 * 5316 / 1.88469 / 0.08345 / 0.16307 / 0.26
92000 * 5313 / 2,88511 / 0.16398 / 0.17366 / 0.27
128000 * 5312 / 3.54509 / 0.37441 / 0,19418 / 0.29

CPU del clientelatencia


Puede notar que en peque帽os rps RedisPipe consume m谩s CPU que el "competidor", especialmente si se usa una peque帽a pausa. Esto se debe principalmente a la implementaci贸n de temporizadores en Go y la implementaci贸n de las llamadas del sistema utilizadas en el sistema operativo (en Linux es futexsleep), ya que en el modo "sin pausa" la diferencia es significativamente menor.


A medida que aumenta el rps, la sobrecarga para los temporizadores se compensa con una sobrecarga menor para la conectividad de red, y despu茅s de 16 krps por cliente, el uso de RedisPipe con una pausa de 150 microsegundos comienza a brindar beneficios tangibles.


Por supuesto, despu茅s de que Redis descansaba en la CPU, la latencia de las solicitudes que usan el conector cl谩sico comienza a aumentar. Sin embargo, no estoy seguro de que, en la pr谩ctica, a menudo alcances 180 krps en una instancia de Redis. Pero si es as铆, tenga en cuenta que puede tener problemas.


Mientras Redis no se ejecute en la CPU, la latencia de la solicitud, por supuesto, sufre el uso de una pausa. Este compromiso se establece intencionalmente en el conector. Sin embargo, esta compensaci贸n solo se nota si Redis y el cliente est谩n en el mismo host f铆sico. Dependiendo de la topolog铆a de la red, un viaje de ida y vuelta a un host vecino puede ser de cien microsegundos a un milisegundo. En consecuencia, la diferencia en el retraso ya en lugar de nueve veces (0.26 / 0.03) se convierte en tres veces (0.36 / 0.13) o se mide solo por un par de decenas de por ciento (1.26 / 1.03).


En nuestra carga de trabajo, cuando Redis se usa como cach茅, la expectativa total de respuestas de la base de datos con una p茅rdida de cach茅 es mayor que la expectativa total de respuestas de Redis, porque se cree que el aumento en el retraso no es significativo.


El principal resultado positivo es una tolerancia al crecimiento de la carga: si de repente la carga en el servicio aumenta N veces, Redis no consumir谩 la CPU la misma N veces m谩s. Para soportar cuadruplicar la carga de 160 krps a 640 krps, Redis gast贸 solo 1.6 veces m谩s CPU, aumentando el consumo del 29 al 46%. Esto nos permite no tener miedo de que Redis se doblegue de repente. La escalabilidad de la aplicaci贸n tampoco estar谩 determinada por el funcionamiento del conector y el costo de la conectividad de la red (l茅ase: costos de CPU de SYS).


Observaci贸n El c贸digo de referencia opera con valores peque帽os. Para despejar mi conciencia, repet铆 la prueba con valores de tama帽o 768 bytes. El consumo de CPU por r谩bano aument贸 notablemente (hasta un 66% en una pausa de 150 碌s), y el techo para un conector cl谩sico cae a 170 krps. Pero todas las proporciones observadas permanecieron iguales, y de ah铆 las conclusiones.


Racimo


Para escalar usamos Redis Cluster . Esto nos permite usar Redis no solo como cach茅, sino tambi茅n como almacenamiento vol谩til y, al mismo tiempo, no perder datos al expandir / comprimir un cl煤ster.


Redis Cluster utiliza el principio de cliente inteligente, es decir el cliente debe monitorear el estado del cl煤ster y tambi茅n responder a los errores auxiliares que devuelve el "r谩bano" cuando el "ramo" se mueve de una instancia a otra.


En consecuencia, el cliente debe mantener conexiones a todas las instancias de Redis en el cl煤ster y emitir una conexi贸n a la requerida para cada solicitud. Y fue en este lugar que el cliente us贸 antes (no se帽alaremos con el dedo) que se fastidi贸 mucho. El autor, que sobrestim贸 la comercializaci贸n de Go (CSP, canales, gorutinas), implement贸 la sincronizaci贸n del trabajo con el estado del cl煤ster enviando devoluciones de llamada a la gorutina central. Esto se ha convertido en un botnek serio para nosotros. Como parche temporal, tuvimos que lanzar cuatro clientes en un cl煤ster, cada uno, a su vez, elevando hasta cientos de conexiones en el grupo para cada instancia de Redis.


En consecuencia, el nuevo conector se encarg贸 de evitar este error. Toda interacci贸n con el estado del cl煤ster en la ruta de ejecuci贸n de la consulta se realiza de la manera m谩s libre posible:


  • el estado del cl煤ster se hace pr谩cticamente inmutable, y no hay numerosas mutaciones aromatizadas por 谩tomos
  • el acceso al estado se produce mediante atomic.StorePointer / atomic.LoadPointer y, por lo tanto, se puede obtener sin bloquear.

Por lo tanto, incluso durante la actualizaci贸n del estado del cl煤ster, las solicitudes pueden usar el estado anterior sin temor a esperar en un bloqueo.


 // storeConfig atomically stores config func (c *Cluster) storeConfig(cfg *clusterConfig) { p := (*unsafe.Pointer)(unsafe.Pointer(&c.config)) atomic.StorePointer(p, unsafe.Pointer(cfg)) } // getConfig loads config atomically func (c *Cluster) getConfig() *clusterConfig { p := (*unsafe.Pointer)(unsafe.Pointer(&c.config)) return (*clusterConfig)(atomic.LoadPointer(p)) } func (cfg *clusterConfig) slot2shardno(slot uint16) uint16 { return uint16(atomic.LoadUint32(&cfg.slots[slot])) } func (cfg *clusterConfig) slotSetShard(slot, shard uint16) { atomic.StoreUint32(&cfg.slots[slot], shard) } 

El estado del cl煤ster se actualiza cada 5 segundos. Pero si hay una sospecha de inestabilidad del cl煤ster, la actualizaci贸n es forzada:


 func (c *Cluster) control() { t := time.NewTicker(c.opts.CheckInterval) defer t.Stop() // main control loop for { select { case <-c.ctx.Done(): // cluster closed, exit control loop c.report(LogContextClosed{Error: c.ctx.Err()}) return case cmd := <-c.commands: // execute some asynchronous "cluster-wide" actions c.execCommand(cmd) continue case <-forceReload: // forced mapping reload c.reloadMapping() case <-tC: // regular mapping reload c.reloadMapping() } } } func (c *Cluster) ForceReloading() { select { case c.forceReload <- struct{}{}: default: } } 

Si la respuesta MOVED o ASK recibida del r谩bano contiene una direcci贸n desconocida, se inicia su adici贸n asincr贸nica a la configuraci贸n. (Lo siento, no descubr铆 c贸mo simplificar el c贸digo, porque aqu铆 est谩 el enlace ). No fue sin el uso de bloqueos, pero se toman por un corto per铆odo de tiempo. La expectativa principal se realiza al guardar la devoluci贸n de llamada en la matriz: la misma vista lateral futura.


Se establecen conexiones con todas las instancias de Redis, con maestros y esclavos. Dependiendo de la pol铆tica preferida y el tipo de solicitud (lectura o escritura), la solicitud puede enviarse tanto al maestro como al esclavo. Esto tiene en cuenta la "vivacidad" de la instancia, que consiste tanto en la informaci贸n obtenida al actualizar el estado del cl煤ster como en el estado actual de la conexi贸n.


 func (c *Cluster) connForSlot(slot uint16, policy ReplicaPolicyEnum) (*redisconn.Connection, *errorx.Error) { var conn *redisconn.Connection cfg := c.getConfig() shard := cfg.slot2shard(slot) nodes := cfg.nodes var addr string switch policy { case MasterOnly: addr = shard.addr[0] // master is always first node := nodes[addr] if conn = node.getConn(c.opts.ConnHostPolicy, needConnected); conn == nil { conn = node.getConn(c.opts.ConnHostPolicy, mayBeConnected) } case MasterAndSlaves: n := uint32(len(shard.addr)) off := c.opts.RoundRobinSeed.Current() for _, needState := range []int{needConnected, mayBeConnected} { mask := atomic.LoadUint32(&shard.good) // load health information for ; mask != 0; off++ { bit := 1 << (off % n) if mask&bit == 0 { // replica isn't healthy, or already viewed continue } mask &^= bit addr = shard.addr[k] if conn = nodes[addr].getConn(c.opts.ConnHostPolicy, needState); conn != nil { return conn, nil } } } } if conn == nil { c.ForceReloading() return nil, c.err(ErrNoAliveConnection) } return conn, nil } func (n *node) getConn(policy ConnHostPolicyEnum, liveness int) *redisconn.Connection { for _, conn := range n.conns { switch liveness { case needConnected: if c.ConnectedNow() { return conn } case mayBeConnected: if c.MayBeConnected() { return conn } } } return nil } 

Hay un cr铆ptico RoundRobinSeed.Current() . Esto, por un lado, es una fuente de aleatoriedad, y por otro, aleatoriedad que no cambia con frecuencia. Si selecciona una nueva conexi贸n para cada solicitud, esto degrada la eficiencia de la canalizaci贸n. Es por eso que la implementaci贸n predeterminada cambia el valor de Current cada pocas decenas de milisegundos. Para tener menos superposiciones en el tiempo, cada host selecciona su propio intervalo.


Como recordar谩, la conexi贸n utiliza el concepto de Futuro para solicitudes asincr贸nicas. El cl煤ster utiliza el mismo concepto: un futuro personalizado se envuelve en uno agrupado, y ese se alimenta a la conexi贸n.


驴Por qu茅 envolver el futuro personalizado? En primer lugar, en el modo de cl煤ster, "r谩bano" devuelve maravillosos "errores" MOVIDO y PREGUNTA con la informaci贸n a d贸nde ir para obtener la clave que necesita, y, despu茅s de recibir dicho error, debe repetir la solicitud a otro host. En segundo lugar, dado que todav铆a necesitamos implementar la l贸gica de redireccionamiento, 驴por qu茅 no incrustar la solicitud para volver a intentarlo con un error de E / S (por supuesto, solo si la solicitud de lectura):


 type request struct { c *Cluster req Request cb Future slot uint16 policy ReplicaPolicyEnum mayRetry bool } func (c *Cluster) SendWithPolicy(policy ReplicaPolicyEnum, req Request, cb Future) { slot := redisclusterutil.ReqSlot(req) policy = c.fixPolicy(slot, req, policy) conn, err := c.connForSlot(slot, policy, nil) if err != nil { cb.Resolve(err) return } r := &request{ c: c, req: req, cb: cb, slot: slot, policy: policy, mayRetry: policy != MasterOnly || redis.ReplicaSafe(req.Cmd), } conn.Send(req, r, 0) } func (r *request) Resolve(res interface{}, _ uint64) { err := redis.AsErrorx(res) if err == nil { r.resolve(res) return } switch { case err.IsOfType(redis.ErrIO): if !r.mayRetry { // It is not safe to retry read-write operation r.resolve(err) return } fallthrough case err.HasTrait(redis.ErrTraitNotSent): // It is request were not sent at all, it is safe to retry both readonly and write requests. conn, err := rcconnForSlot(r.slot, r.policy, r.seen) if err != nil { r.resolve(err) return } conn.Send(r.req, r) return case err.HasTrait(redis.ErrTraitClusterMove): addr := movedTo(err) ask := err.IsOfType(redis.ErrAsk) rcensureConnForAddress(addr, func(conn *redisconn.Connection, cerr error) { if cerr != nil { r.resolve(cerr) } else { r.lastconn = conn conn.SendAsk(r.req, r, ask) } }) return default: // All other errors: just resolve. r.resolve(err) } } 

Este tambi茅n es un c贸digo simplificado. Se omite la restricci贸n en el n煤mero de reintentos, la memorizaci贸n de conexiones problem谩ticas, etc.


Comodidad


Solicitudes asincr贸nicas, 隆Future es una superkule! Pero terriblemente inc贸modo.


La interfaz es lo m谩s importante. Puedes vender cualquier cosa si tiene una buena interfaz. Es por eso que Redis y MongoDB han ganado popularidad.


Por lo tanto, es necesario convertir nuestras solicitudes asincr贸nicas en s铆ncronas.


 // Sync provides convenient synchronous interface over asynchronous Sender. type Sync struct { S Sender } // Do is convenient method to construct and send request. // Returns value that could be either result or error. func (s Sync) Do(cmd string, args ...interface{}) interface{} { return s.Send(Request{cmd, args}) } // Send sends request to redis. // Returns value that could be either result or error. func (s Sync) Send(r Request) interface{} { var res syncRes res.Add(1) sSSend(r, &res) res.Wait() return res.r } type syncRes struct { r interface{} sync.WaitGroup } // Resolve implements Future.Resolve func (s *syncRes) Resolve(res interface{}) { sr = res s.Done() } // Usage func get(s redis.Sender, key string) (interface{}, error) { res := redis.Sync{s}.Do("GET", key) if err := redis.AsError(res); err != nil { return nil, err } return res, nil } 

AsError no se ve como un Go-way nativo para obtener un error. Pero me gusta porque desde mi punto de vista, el resultado es Result<T,Error> y AsError es un patr贸n de coincidencia ersatz.


Desventajas


Pero, desafortunadamente, hay una mosca en la pomada en este bienestar.


El protocolo Redis no implica reordenar solicitudes. Y al mismo tiempo, tiene solicitudes de bloqueo como BLPOP, BRPOP.


Esto es un fracaso


Como sabe, si dicha solicitud est谩 bloqueada, bloquear谩 todas las solicitudes que la sigan. Y no hay nada que hacer al respecto.


Despu茅s de una larga discusi贸n, se decidi贸 prohibir el uso de estas solicitudes en RedisPipe.


Por supuesto, si realmente lo necesita, puede: poner el par谩metro ScriptMode: true , y eso est谩 en su conciencia.


Alternativas


De hecho, todav铆a hay una alternativa que no mencion茅, pero que los lectores expertos pensaron, es el rey de la twemproxy de cach茅s de cl煤ster.


脡l hace por Redis lo que hace nuestro conector: transforma una "solicitud / respuesta" grosera y desalmada en una suave "colocaci贸n de tuber铆as".


Pero la propia twemproxy sufrir谩 el hecho de que tendr谩 que trabajar en un sistema de "solicitud / respuesta". Esta vez Y en segundo lugar, usamos "r谩bano" y "almacenamiento poco confiable" y, a veces, cambiamos el tama帽o del cl煤ster. Y twemproxy no facilita la tarea de reequilibrio de ninguna manera y, adem谩s, requiere un reinicio al cambiar la configuraci贸n del cl煤ster.


Influencia


No tuve tiempo de escribir un art铆culo, y las olas de RedisPipe ya se han ido. Se ha adoptado un parche en Radix.v3 que agrega canalizaci贸n a su Pool:


Consulte RedisPipe y descubra si se puede incorporar su estrategia de canalizaci贸n / procesamiento por lotes impl铆cito
Canalizaci贸n autom谩tica para comandos en Pool


Son ligeramente inferiores en velocidad (a juzgar por sus puntos de referencia, pero no lo dir茅 con certeza). Pero su ventaja es que pueden enviar comandos de bloqueo a otras conexiones desde el grupo.


Conclusi贸n


Ya hace un a帽o que RedisPipe contribuye a la efectividad de nuestro servicio.
Y en previsi贸n de cualquier "d铆a caluroso", uno de los recursos, cuya capacidad no causa preocupaci贸n, es la CPU en los servidores Redis.


Repositorio
Punto de referencia

Source: https://habr.com/ru/post/438026/


All Articles