Integração contínua no Yandex. Parte 2

No artigo anterior , falamos sobre a transferência de desenvolvimento para um único repositório com uma abordagem de desenvolvimento baseada em tronco, com sistemas unificados para montagem, teste, implantação e monitoramento, sobre quais tarefas um sistema de integração contínua deve resolver para funcionar efetivamente nessas condições.


Hoje contaremos aos leitores da Habr sobre o dispositivo do sistema de integração contínua.


imagem


Um sistema de integração contínua deve funcionar de maneira confiável e rápida. O sistema deve responder rapidamente aos eventos recebidos e não deve introduzir atrasos adicionais no processo de entrega dos resultados da execução de teste ao usuário. Os resultados da montagem e teste devem ser entregues ao usuário em tempo real.


O sistema de integração contínua é um sistema de processamento de dados de streaming com atrasos mínimos.


Depois de enviar todos os resultados em um determinado estágio (configurar, criar, modelar, pequenos testes, testes médios etc.), o sistema de build sinaliza isso para o sistema de integração contínua ("fecha" o estágio) e o usuário vê isso para essa verificação e Nesta fase, todos os resultados são conhecidos. Cada estágio fecha independentemente. O usuário recebe um sinal útil mais rapidamente. Após o fechamento de todas as etapas, a verificação é considerada concluída.


Para implementar o sistema, escolhemos a arquitetura Kappa . O sistema consiste em 2 subsistemas:


  • O processamento de eventos e dados ocorre em um circuito em tempo real. Quaisquer dados de entrada são tratados como fluxos de dados (fluxos). Primeiro, os eventos são registrados no fluxo e somente depois são processados.
  • Os resultados do processamento de dados são gravados continuamente no banco de dados, para onde as chamadas pela API passam. Na arquitetura Kappa, isso é chamado de camada de veiculação.

Todas as solicitações de modificação de dados devem passar pelo circuito em tempo real, porque você sempre precisa ter o estado atual do sistema. Os pedidos de leitura vão apenas para o banco de dados.




Sempre que possível, seguimos a regra de somente anexar. Nenhuma modificação ou exclusão de objetos, exceto a exclusão de dados antigos e desnecessários.


Mais de 2 TB de dados brutos passam pelo serviço por dia.


Vantagens:


  • Os fluxos contêm todos os eventos e mensagens. Sempre podemos entender o que e quando aconteceu. O fluxo pode ser percebido como um grande log.
  • Alta eficiência e sobrecarga mínima. Acontece que um sistema totalmente orientado a eventos, sem qualquer perda nas pesquisas. Não há evento - não estamos fazendo nada extra.
  • O código do aplicativo praticamente não lida com as primitivas da sincronização de threads e a memória compartilhada entre threads. Isso torna o sistema mais confiável.
  • Os processadores estão bem isolados um do outro, porque não interaja diretamente, apenas através de fluxos. Pode ser fornecida uma boa cobertura de teste.

Mas o processamento de dados de streaming não é tão simples:


  • É necessário um bom entendimento do modelo computacional. Você precisará repensar os algoritmos de processamento de dados existentes. Nem todos os algoritmos caem imediatamente no modelo de fluxo e você precisa esmagar um pouco a cabeça.
  • É necessário garantir que a ordem de recebimento e processamento de eventos seja mantida.
  • Você precisa gerenciar eventos inter-relacionados, ou seja, tenha acesso rápido a todos os dados necessários ao processar uma nova mensagem.
  • Você também precisa lidar com eventos duplicados.

Processamento de fluxo


Enquanto trabalhava no projeto, a biblioteca Stream Processor foi escrita, o que nos ajudou a implementar e lançar rapidamente algoritmos de processamento de dados de streaming na produção.


Stream Processor é uma biblioteca para a construção de sistemas de processamento de dados de streaming. O fluxo é uma sequência potencialmente interminável de dados (mensagens) na qual apenas é possível adicionar novas mensagens; as mensagens já gravadas não são alteradas e não são excluídas do fluxo. Os conversores de um fluxo para outro (processadores de fluxo) consistem funcionalmente em três partes: um provedor de mensagens recebidas, que geralmente lê mensagens de um ou mais fluxos e as coloca em uma fila de processamento, um processador de mensagens que converte mensagens recebidas em mensagens enviadas e as coloca em uma fila para o registro e o gravador, onde as mensagens de saída agrupadas na janela de tempo caem no fluxo de saída. As mensagens de dados geradas por um processador de fluxo podem ser usadas por outros posteriormente. Assim, fluxos e processadores formam um gráfico direcionado no qual são possíveis loops, em particular, um processador de fluxo pode até gerar mensagens no mesmo fluxo de onde recebe dados.


É garantido que cada mensagem do fluxo de entrada seja processada por cada processador associado a ele pelo menos uma vez (semântica pelo menos uma vez). Também é garantido que todas as mensagens serão processadas na ordem em que chegaram neste fluxo. Para fazer isso, os processadores de fluxo são distribuídos por todos os nós de serviço em funcionamento, para que, ao mesmo tempo, não mais de uma instância de cada processador registrado esteja funcionando.


O processamento de eventos inter-relacionados é um dos principais problemas na construção de sistemas para processamento de dados de streaming. Como regra, ao transmitir mensagens, os processadores de fluxo criam incrementalmente um determinado estado que era válido no momento em que a mensagem atual foi processada. Esses objetos de estado geralmente não estão associados ao fluxo inteiro como um todo, mas a um determinado subconjunto de mensagens, que é determinado pelo valor da chave nesse fluxo. O armazenamento eficiente de riqueza é a chave do sucesso. Ao processar a próxima mensagem, é importante que o processador possa obter rapidamente esse estado e, com base nele e na mensagem atual, gerar mensagens de saída. Esses objetos de estado são acessíveis aos processadores em L1 (não confunda com o cache da CPU), cache LRU, localizado na memória. No caso de não haver estado no cache L1, ele é restaurado a partir do cache L2 localizado no mesmo armazenamento em que os fluxos são armazenados e onde é periodicamente armazenado durante a operação do processador. Se não houver estado no cache L2, ele será restaurado a partir das mensagens originais do fluxo, como se o processador tivesse processado todas as mensagens originais associadas à chave de mensagem atual. A técnica de armazenamento em cache também permite que você lide com o problema de alta latência do armazenamento, já que geralmente o processamento seqüencial não depende do desempenho do servidor, mas do atraso de solicitações e respostas ao se comunicar com o data warehouse.




Para armazenar efetivamente dados em caches L1 e dados de mensagens na memória, além de estruturas com eficiência de memória, usamos pools de objetos que permitem ter apenas uma cópia de um objeto (ou mesmo partes dele) na memória. Essa técnica já é usada no JDK para cadeias de caracteres internas e se estende de maneira semelhante a outros tipos de objetos, que devem ser imutáveis.


Para armazenamento compacto de dados no armazenamento de fluxo, alguns dados são normalizados antes de serem gravados no fluxo, ou seja, se transformar em números. Algoritmos de compactação eficazes podem ser aplicados aos números (identificadores de objeto). Os números são classificados, os deltas são contados e, em seguida, codificados com ZigZag Encoding e, em seguida, compactados pelo arquivador. A normalização não é uma técnica muito padrão para sistemas de processamento de dados de streaming. Mas essa técnica de compactação é muito eficaz e a quantidade de dados no fluxo mais carregado é reduzida em cerca de 1.000 vezes.




Para cada fluxo e processador, rastreamos o ciclo de vida do processamento de mensagens: a aparência de novas mensagens no fluxo de entrada, o tamanho da fila de mensagens não processadas, o tamanho da fila para gravação no fluxo resultante, o tempo de processamento de mensagens e a distribuição do tempo pelos estágios de processamento de mensagens:




Data warehouse


Os resultados do processamento de dados de streaming devem estar disponíveis para o usuário o mais rápido possível. Os dados processados ​​dos fluxos devem ser registrados continuamente no banco de dados, onde você pode buscar os dados (por exemplo, mostrar um relatório com os resultados do teste, mostrar o histórico do teste).


Características dos dados armazenados e consultas.
A maioria dos dados é executada em teste. Ao longo de um mês, mais de 1,5 bilhão de compilações e testes são lançados. Uma quantidade bastante grande de informações é armazenada para cada inicialização: o resultado e o tipo de erro, uma breve descrição do erro (trecho), vários links para os logs, duração do teste, um conjunto de valores numéricos, métricas, no formato name = value, etc. Alguns desses dados - por exemplo, métricas e duração - são muito difíceis de compactar, pois, na verdade, são valores aleatórios. A outra parte - por exemplo, o resultado, tipo de erro, logs - pode ser salva com mais eficiência, pois quase não muda no mesmo teste de execução para execução.


Anteriormente, usamos o MySQL para armazenar dados processados. Gradualmente, começamos a nos apoiar nos recursos do banco de dados:


  • A quantidade de dados processados ​​dobra a cada seis meses.
  • Só conseguimos armazenar dados nos últimos 2 meses, mas queríamos armazenar dados por pelo menos um ano.
  • Problemas com a velocidade de execução de algumas consultas pesadas (quase analíticas).
  • Esquema de banco de dados complicado. Muitas tabelas (normalização), o que complica a gravação no banco de dados. O esquema base é muito diferente do esquema de objetos usados ​​no circuito em tempo real.
  • Não está ocorrendo um desligamento do servidor. A falha de um servidor separado ou o desligamento do datacenter pode levar à falha do sistema.
  • Operação bastante complicada.

Como candidatos ao novo data warehouse, consideramos várias opções: PostgreSQL, MongoDB e várias soluções internas, incluindo ClickHouse .


Algumas soluções não nos permitem armazenar nossos dados com mais eficiência do que a antiga solução baseada em MySQL. Outros não permitem a implementação de consultas rápidas e complexas (quase analíticas). Por exemplo, temos uma solicitação bastante pesada que mostra confirmações que afetam um projeto específico (algum conjunto de testes). Em todos os casos em que não podemos executar consultas SQL rápidas, teríamos que forçar o usuário a esperar muito tempo ou fazer alguns cálculos com perda de flexibilidade. Se você contar algo com antecedência, precisará escrever mais código e, ao mesmo tempo, perder flexibilidade - não há como mudar rapidamente o comportamento e recontar qualquer coisa. É muito mais conveniente e rápido escrever uma consulta SQL que retornará os dados de que o usuário precisa e poderá modificá-los rapidamente se você quiser alterar o comportamento do sistema.


Clickhouse


Optamos pelo ClickHouse . O ClickHouse é um sistema de gerenciamento de banco de dados colunar (DBMS) para processamento de consultas analíticas online (OLAP).


Ao mudar para o ClickHouse, abandonamos deliberadamente algumas das oportunidades oferecidas por outros DBMSs, recebendo uma compensação mais do que digna por isso na forma de consultas analíticas muito rápidas e um armazém de dados compacto.


Nos DBMSs relacionais, os valores relacionados a uma linha são fisicamente armazenados lado a lado. No ClickHouse, os valores de diferentes colunas são armazenados separadamente e os dados de uma coluna são armazenados juntos. Essa ordem de armazenamento de dados permite fornecer um alto grau de compactação de dados com a escolha certa da chave primária. Também afeta em quais cenários o DBMS funcionará bem. O ClickHouse funciona melhor com consultas, em que um pequeno número de colunas é lido e a consulta usa uma tabela grande e o restante das tabelas é pequeno. Mas mesmo em consultas não analíticas, o ClickHouse pode mostrar bons resultados.


Os dados nas tabelas são classificados por chave primária. A classificação é realizada em segundo plano. Isso permite criar um índice esparso de um pequeno volume, o que permite localizar dados rapidamente. ClickHouse não possui índices secundários. A rigor, existe um índice secundário - a chave da partição (o ClickHouse corta os dados da partição onde a chave da partição é especificada na solicitação). Mais detalhes .


O esquema de dados com normalização não é funcional; pelo contrário, é preferível desnormalizar os dados, dependendo das solicitações. É preferível criar tabelas "amplas" com um grande número de colunas. Este item também está relacionado ao anterior, porque a ausência de índices secundários às vezes faz cópias das tabelas usando uma chave primária diferente.


O ClickHouse não possui UPDATE e DELETE no sentido clássico, mas existe a possibilidade de emulação.


Os dados precisam ser inseridos em blocos grandes e não com muita frequência (uma vez a cada poucos segundos). O carregamento de dados linha a linha é praticamente inoperante em volumes de dados reais.


O ClickHouse não suporta transações, o sistema se torna eventualmente consistente .


No entanto, alguns recursos do ClickHouse, semelhantes a outros DBMSs, facilitam a transferência de sistemas existentes para ele.


  • O ClickHouse usa SQL, mas com pequenas diferenças, útil para consultas típicas de sistemas OLAP. Existe um sistema poderoso de funções agregadas, ALL / ANY JOIN, expressões lambda em funções e outras extensões SQL que permitem escrever praticamente qualquer consulta analítica.
  • ClickHouse suporta replicação, gravação de quorum, leitura de quorum. Uma gravação de quorum é necessária para o armazenamento confiável de dados: INSERT é bem-sucedido apenas se o ClickHouse conseguiu gravar dados em um determinado número de réplicas sem erro.

Você pode ler mais sobre os recursos do ClickHouse na documentação .


Recursos de trabalho com ClickHouse


Escolha da chave primária e da chave de partição.


Como escolher uma chave primária e uma chave de partição? Talvez esta seja a primeira pergunta que surge ao criar uma nova tabela. A escolha da chave primária e da chave de partição geralmente é ditada pelas consultas que serão executadas nos dados. Ao mesmo tempo, as consultas que usam as duas condições são as mais eficazes: pela chave primária e pela chave de partição.


No nosso caso, as tabelas principais são as matrizes para a execução dos testes. É lógico supor que, com essa estrutura de dados, as chaves devem ser selecionadas para que a ordem de desvio de uma delas seja na ordem de aumentar o número da linha e a ordem de desvio da outra - na ordem de aumentar o número da coluna.


Também é importante ter em mente que a escolha da chave primária pode afetar drasticamente a compactação do armazenamento de dados, pois valores idênticos no desvio da chave primária em outras colunas quase não ocupam espaço na tabela. Portanto, no nosso caso, por exemplo, os estados dos testes mudam pouco de confirmação para confirmação. Esse fato predeterminou essencialmente a escolha da chave primária - um par de identificador de teste e número de confirmação. Além disso, nessa ordem.




A chave da partição tem dois propósitos. Por um lado, permite que as partições sejam “arquivadas” para que possam ser permanentemente excluídas do armazenamento, pois os dados nelas já estão desatualizados. Por outro lado, a chave da partição é um índice secundário, o que significa que permite acelerar as consultas se houver uma expressão para elas.


Para nossas matrizes, escolher o número de confirmação como chave de partição parece bastante natural. Mas se você definir o valor de revisão na expressão da chave de partição, haverá muitas partições em uma tabela irracional, o que prejudicará o desempenho das consultas. Portanto, na expressão da chave de partição, o valor da revisão pode ser dividido em um número grande para reduzir o número de partições, por exemplo, PARTITION BY intDiv (revisão, 2000). Esse número deve ser grande o suficiente para que o número de partições não exceda os valores recomendados, enquanto deve ser pequeno o suficiente para que não haja muitos dados em uma partição e o banco de dados não precise ler muitos dados.


Como implementar UPDATE e DELETE?


No sentido usual, UPDATE e DELETE não são suportados no ClickHouse. No entanto, em vez de UPDATE e DELETE, você pode adicionar uma coluna com uma versão à tabela e usar o mecanismo especial ReplacingMergeTree (remove registros duplicados com o mesmo valor de chave primária). Em alguns casos, a versão estará naturalmente presente na tabela desde o início: por exemplo, se quisermos criar uma tabela para o estado atual do teste, a versão nesta tabela será o número de confirmação.


CREATE TABLE current_tests ( test_id UInt64, value Nullable(String), version UInt64 ) ENGINE = ReplacingMergeTree(version) ORDER BY test_id 

No caso de uma alteração de registro, adicionamos a versão com um novo valor, no caso de exclusão, com um valor NULL (ou algum outro valor especial que não possa ser encontrado nos dados).


O que você conseguiu com o novo armazenamento?


Um dos principais objetivos da mudança para o ClickHouse era a capacidade de armazenar o histórico de testes por um longo período de tempo (vários anos, ou pelo menos um ano no pior caso). Já no estágio de protótipo, ficou claro que poderíamos contornar os SSDs existentes em nossos servidores para armazenar pelo menos um histórico de três anos. As consultas analíticas aceleraram significativamente, agora podemos extrair informações muito mais úteis de nossos dados. A margem RPS aumentou. Além disso, esse valor é quase linearmente dimensionado pela adição de novos servidores ao cluster ClickHouse. Criar um novo data warehouse para o banco de dados ClickHouse é apenas um passo quase imperceptível para o usuário final em direção a um objetivo mais importante - adicionar novos recursos, acelerar e simplificar o desenvolvimento, graças à capacidade de armazenar e processar grandes quantidades de dados.


Venha para nós


Nosso departamento está em constante expansão. Visite-nos se você quiser trabalhar em tarefas e algoritmos complexos e interessantes. Se você tiver dúvidas, pode me perguntar diretamente no PM.


Links úteis


Processamento de fluxo



Arquitetura Kappa



ClickHouse:


Source: https://habr.com/ru/post/pt429956/


All Articles