Aplicação prática do ELK. Configurar logstash

1. Introdução


Implantando o próximo sistema, diante da necessidade de processar um grande número de vários logs. Como a ferramenta escolhida ELK. Este artigo discutirá nossa experiência em ajustar essa pilha.

Não estabelecemos uma meta para descrever todas as suas possibilidades, mas queremos nos concentrar precisamente na solução de problemas práticos. Isso é causado pelo fato de que, na presença de uma quantidade suficientemente grande de documentação e imagens prontas, existem muitas armadilhas, pelo menos as encontramos.

Implementamos a pilha via docker-compose. Além disso, tínhamos um docker-compose.yml bem escrito, que nos permitia aumentar a pilha quase sem problemas. E pareceu-nos que a vitória já estava próxima, agora vamos torcer um pouco para atender às nossas necessidades e é isso.

Infelizmente, uma tentativa de ajustar o sistema para receber e processar logs de nosso aplicativo não foi coroada com sucesso. Portanto, decidimos que valia a pena explorar cada componente separadamente e retornar ao relacionamento.

Então, começamos com o logstash.

Ambiente, implantação, inicie o Logstash no contêiner


Para a implantação, usamos o docker-compose, os experimentos descritos aqui foram realizados no MacOS e no Ubuntu 18.0.4.

A imagem do logstash que foi registrada conosco no docker-compose.yml original é docker.elastic.co/logstash/logstash:6.3.2

Vamos usá-lo para experimentos.

Para executar o logstash, escrevemos um docker-compose.yml separado. Obviamente, foi possível iniciar a imagem a partir da linha de comando, mas resolvemos um problema específico, onde tudo, desde o docker-compose é iniciado.

Brevemente sobre arquivos de configuração


Como segue a descrição, o logstash pode ser executado tanto para um canal, neste caso, ele precisa transferir o arquivo * .conf ou para vários canais; nesse caso, precisa ser transferido o arquivo pipelines.yml, que, por sua vez, será vinculado aos arquivos .conf para cada canal.
Fomos pelo segundo caminho. Pareceu-nos mais universal e escalável. Portanto, criamos pipelines.yml e criamos o diretório pipelines no qual colocaremos os arquivos .conf para cada canal.

Dentro do contêiner, há outro arquivo de configuração - logstash.yml. Nós não tocamos nele, usamos como está.

Portanto, a estrutura de nossos diretórios:



Para obter a entrada, por enquanto, acreditamos que seja tcp na porta 5046, e para saída usaremos stdout.

Aqui está uma configuração tão simples para a primeira execução. Desde que a tarefa inicial é iniciar.

Então, temos este docker-compose.yml

version: '3' networks: elk: volumes: elasticsearch: driver: local services: logstash: container_name: logstash_one_channel image: docker.elastic.co/logstash/logstash:6.3.2 networks: - elk ports: - 5046:5046 volumes: - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro - ./config/pipelines:/usr/share/logstash/config/pipelines:ro 

O que vemos aqui?

  1. Redes e volumes foram retirados do docker-compose.yml original (aquele em que toda a pilha é lançada) e acho que eles não afetam significativamente a imagem geral aqui.
  2. Criamos um serviço de logstash a partir da imagem docker.elastic.co/logstash/logstash:6.3.2 e denominamos logstash_one_channel.
  3. Encaminhamos a porta 5046 dentro do contêiner para a mesma porta interna.
  4. Mapeamos nosso arquivo de configurações de canal ./config/pipelines.yml para o arquivo /usr/share/logstash/config/pipelines.yml dentro do contêiner, onde o logstash o coleta e o torna somente leitura, apenas por precaução.
  5. Exibimos o diretório ./config/pipelines, onde temos os arquivos com as configurações do canal, no diretório / usr / share / logstash / config / pipelines e também o tornamos somente leitura.



Arquivo Pipelines.yml

 - pipeline.id: HABR pipeline.workers: 1 pipeline.batch.size: 1 path.config: "./config/pipelines/habr_pipeline.conf" 

Aqui, um canal com o identificador HABR e o caminho para seu arquivo de configuração são descritos.

E finalmente o arquivo "./config/pipelines/habr_pipeline.conf"

 input { tcp { port => "5046" } } filter { mutate { add_field => [ "habra_field", "Hello Habr" ] } } output { stdout { } } 

Não vamos entrar na descrição dele por enquanto, tente executar:

 docker-compose up 

O que nós vemos?

O contêiner foi iniciado. Podemos verificar seu funcionamento:

 echo '13123123123123123123123213123213' | nc localhost 5046 

E vemos a resposta no console do contêiner:



Mas, ao mesmo tempo, também vemos:

logstash_one_channel | [2019-04-29T11: 28: 59.790] [ERRO] [logstash.licensechecker.licensereader] Não é possível recuperar as informações da licença do servidor de licenças {: message => "Elasticsearch inacessível: [http: // elasticsearch: 9200 /] [Manticore :: ResolutionFailure] elasticsearch ", ...

logstash_one_channel | [2019-04-29T11: 28: 59.894] [INFO] [logstash.pipeline] O pipeline foi iniciado com êxito {: pipeline_id => ". Monitoring-logstash" ,: thread => "# <Thread: 0x119abb86 run>"}

logstash_one_channel | [2019-04-29T11: 28: 59.988] [INFO] [logstash.agent] Pipelines executando {: count => 2 ,: running_pipelines => [: HABR ,: ". Monitoring-logstash"],: non_running_pipelines => [ ]}
logstash_one_channel | [2019-04-29T11: 29: 00,015] [ERRO] [logstash.inputs.metrics] O X-Pack está instalado no Logstash, mas não no Elasticsearch. Instale o X-Pack no Elasticsearch para usar o recurso de monitoramento. Outros recursos podem estar disponíveis.
logstash_one_channel | [2019-04-29T11: 29: 00.526] [INFO] [logstash.agent] Iniciou com êxito o terminal da API do Logstash {: port => 9600}
logstash_one_channel | [2019-04-29T11: 29: 04,478] [INFO] [logstash.outputs.elasticsearch] Executando verificação de integridade para verificar se uma conexão do Elasticsearch está funcionando {: healthcheck_url => http: // elasticsearch: 9200 / ,: path => "/"}
l ogstash_one_channel | [2019-04-29T11: 29: 04,487] [WARN] [logstash.outputs.elasticsearch] Tentou ressuscitar a conexão com a instância morta do ES, mas obteve um erro. {: url => “ elasticsearch : 9200 /” ,: error_type => LogStash :: Saídas :: ElasticSearch :: HttpClient :: Pool :: HostUnreachableError ,: error => “Elasticsearch Inacessível: [http: // elasticsearch: 9200 / ] [Manticore :: ResolutionFailure] elasticsearch ”}
logstash_one_channel | [2019-04-29T11: 29: 04,704] [INFO] [logstash.licensechecker.licensereader] Executando a verificação de integridade para verificar se uma conexão do Elasticsearch está funcionando {: healthcheck_url => http: // elasticsearch: 9200 / ,: path => "/"}
logstash_one_channel | [2019-04-29T11: 29: 04,710] [WARN] [logstash.licensechecker.licensereader] Tentou ressuscitar a conexão com a instância ES morta, mas obteve um erro. {: url => “ elasticsearch : 9200 /” ,: error_type => LogStash :: Saídas :: ElasticSearch :: HttpClient :: Pool :: HostUnreachableError ,: error => “Elasticsearch Inacessível: [http: // elasticsearch: 9200 / ] [Manticore :: ResolutionFailure] elasticsearch ”}

E nosso registro está subindo o tempo todo.

Aqui, destaquei em verde uma mensagem de que o pipeline foi iniciado com êxito, vermelho - uma mensagem de erro e amarelo - uma mensagem sobre uma tentativa de entrar em contato com elasticsearch : 9200.
Isso acontece porque o logstash.conf incluído na imagem verifica a disponibilidade da elasticsearch. Afinal, o logstash supõe que ele funcione como parte da pilha Elk e a separamos.

Você pode trabalhar, mas não é conveniente.

A solução é desativar essa verificação através da variável de ambiente XPACK_MONITORING_ENABLED.

Faça uma alteração no docker-compose.yml e execute-o novamente:

 version: '3' networks: elk: volumes: elasticsearch: driver: local services: logstash: container_name: logstash_one_channel image: docker.elastic.co/logstash/logstash:6.3.2 networks: - elk environment: XPACK_MONITORING_ENABLED: "false" ports: - 5046:5046 volumes: - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro - ./config/pipelines:/usr/share/logstash/config/pipelines:ro 

Agora está tudo bem. O recipiente está pronto para experimentação.

Podemos novamente digitar o próximo console:

 echo '13123123123123123123123213123213' | nc localhost 5046 

E veja:

 logstash_one_channel | { logstash_one_channel | "message" => "13123123123123123123123213123213", logstash_one_channel | "@timestamp" => 2019-04-29T11:43:44.582Z, logstash_one_channel | "@version" => "1", logstash_one_channel | "habra_field" => "Hello Habr", logstash_one_channel | "host" => "gateway", logstash_one_channel | "port" => 49418 logstash_one_channel | } 

Trabalhe em um canal


Então começamos. Agora você pode realmente reservar um tempo para configurar o logstash diretamente. Por enquanto, não tocaremos no arquivo pipelines.yml; veremos o que você pode obter trabalhando com um canal.

Devo dizer que o princípio geral de trabalhar com o arquivo de configuração de canal está bem descrito no guia oficial, aqui
Se você quiser ler em russo, usamos este artigo aqui (mas a sintaxe da consulta é antiga por lá, devemos levar isso em consideração).

Vamos sequencialmente na seção Entrada. Já vimos trabalho no tcp. O que mais poderia ser interessante aqui?

Testar mensagens usando pulsação


Existe uma oportunidade tão interessante para gerar mensagens de teste automáticas.
Para fazer isso, você precisa incluir o plug-in heartbean na seção de entrada.

 input { heartbeat { message => "HeartBeat!" } } 

Ligue, comece uma vez por minuto para receber

 logstash_one_channel | { logstash_one_channel | "@timestamp" => 2019-04-29T13:52:04.567Z, logstash_one_channel | "habra_field" => "Hello Habr", logstash_one_channel | "message" => "HeartBeat!", logstash_one_channel | "@version" => "1", logstash_one_channel | "host" => "a0667e5c57ec" logstash_one_channel | } 

Queremos obter com mais frequência, precisamos adicionar o parâmetro interval.
É assim que receberemos uma mensagem a cada 10 segundos.

 input { heartbeat { message => "HeartBeat!" interval => 10 } } 

Recuperando dados de um arquivo


Também decidimos ver o modo de arquivo. Se funcionar normalmente com o arquivo, é possível que nenhum agente seja necessário, pelo menos para uso local.

De acordo com a descrição, o modo de operação deve ser semelhante ao tail -f, ou seja, lê novas linhas ou, como opção, lê o arquivo inteiro.

Então, o que queremos obter:

  1. Queremos obter linhas que são anexadas a um arquivo de log.
  2. Queremos receber dados gravados em vários arquivos de log, enquanto podemos compartilhar o que veio.
  3. Queremos verificar se, ao reiniciar o logstash, ele não receberá esses dados novamente.
  4. Queremos verificar se, se o logstash estiver desativado, e os dados continuarem sendo gravados nos arquivos, quando executá-los, obteremos esses dados.

Para conduzir o experimento, adicione outra linha ao docker-compose.yml, abrindo o diretório em que colocamos os arquivos.

 version: '3' networks: elk: volumes: elasticsearch: driver: local services: logstash: container_name: logstash_one_channel image: docker.elastic.co/logstash/logstash:6.3.2 networks: - elk environment: XPACK_MONITORING_ENABLED: "false" ports: - 5046:5046 volumes: - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro - ./config/pipelines:/usr/share/logstash/config/pipelines:ro - ./logs:/usr/share/logstash/input 

E mude a seção de entrada em habr_pipeline.conf

 input { file { path => "/usr/share/logstash/input/*.log" } } 

Começamos:

 docker-compose up 

Para criar e registrar arquivos de log, usaremos o comando:

  echo '1' >> logs/number1.log 

 { logstash_one_channel | "host" => "ac2d4e3ef70f", logstash_one_channel | "habra_field" => "Hello Habr", logstash_one_channel | "@timestamp" => 2019-04-29T14:28:53.876Z, logstash_one_channel | "@version" => "1", logstash_one_channel | "message" => "1", logstash_one_channel | "path" => "/usr/share/logstash/input/number1.log" logstash_one_channel | } 

Sim, funciona!

Ao mesmo tempo, vemos que adicionamos automaticamente o campo path. Portanto, no futuro, podemos filtrar os registros por ele.

Vamos tentar novamente:

 echo '2' >> logs/number1.log 

 { logstash_one_channel | "host" => "ac2d4e3ef70f", logstash_one_channel | "habra_field" => "Hello Habr", logstash_one_channel | "@timestamp" => 2019-04-29T14:28:59.906Z, logstash_one_channel | "@version" => "1", logstash_one_channel | "message" => "2", logstash_one_channel | "path" => "/usr/share/logstash/input/number1.log" logstash_one_channel | } 


E agora para outro arquivo:

  echo '1' >> logs/number2.log 

 { logstash_one_channel | "host" => "ac2d4e3ef70f", logstash_one_channel | "habra_field" => "Hello Habr", logstash_one_channel | "@timestamp" => 2019-04-29T14:29:26.061Z, logstash_one_channel | "@version" => "1", logstash_one_channel | "message" => "1", logstash_one_channel | "path" => "/usr/share/logstash/input/number2.log" logstash_one_channel | } 

Ótimo! O arquivo foi escolhido, o caminho estava correto, está tudo bem.

Pare o logstash e reinicie. Vamos esperar O silêncio. I.e. Não recebemos esses registros novamente.

E agora o experimento mais ousado.

Colocamos logstash e executamos:

 echo '3' >> logs/number2.log echo '4' >> logs/number1.log 

Execute o logstash novamente e consulte:

 logstash_one_channel | { logstash_one_channel | "host" => "ac2d4e3ef70f", logstash_one_channel | "habra_field" => "Hello Habr", logstash_one_channel | "message" => "3", logstash_one_channel | "@version" => "1", logstash_one_channel | "path" => "/usr/share/logstash/input/number2.log", logstash_one_channel | "@timestamp" => 2019-04-29T14:48:50.589Z logstash_one_channel | } logstash_one_channel | { logstash_one_channel | "host" => "ac2d4e3ef70f", logstash_one_channel | "habra_field" => "Hello Habr", logstash_one_channel | "message" => "4", logstash_one_channel | "@version" => "1", logstash_one_channel | "path" => "/usr/share/logstash/input/number1.log", logstash_one_channel | "@timestamp" => 2019-04-29T14:48:50.856Z logstash_one_channel | } 

Viva! Tudo foi recolhido.

Mas, devemos advertir sobre o seguinte. Se o contêiner com logstash for excluído (docker stop logstash_one_channel && docker rm logstash_one_channel), nada será captado. Dentro do contêiner, a posição do arquivo na qual foi lida foi salva. Se executado do zero, ele aceitará apenas novas linhas.

Ler arquivos existentes


Suponha que executemos logstash pela primeira vez, mas já temos logs e gostaríamos de processá-los.
Se executarmos o logstash com a seção de entrada que usamos acima, não obteremos nada. Somente novas linhas serão processadas pelo logstash.

Para obter linhas de arquivos existentes, adicione uma linha adicional à seção de entrada:

 input { file { start_position => "beginning" path => "/usr/share/logstash/input/*.log" } } 

Além disso, há uma nuance, isso afeta apenas novos arquivos que o logstash ainda não viu. Para os mesmos arquivos que já caíam no campo de visão do logstash, ele já se lembrava do tamanho e agora só aceita novas entradas.

Vamos nos concentrar no estudo da seção de comentários. Há muito mais opções, mas para nós, para mais experiências por enquanto, é suficiente.

Roteamento e conversão de dados


Vamos tentar resolver o seguinte problema, digamos que temos mensagens de um canal, alguns deles são informativos e parcialmente uma mensagem de erro. Diferem na etiqueta. Alguns INFO, outros ERRO.

Precisamos separá-los na saída. I.e. Escrevemos mensagens informativas em um canal e mensagens de erro em outro.

Para fazer isso, vá da seção de entrada para filtro e saída.

Usando a seção de filtro, analisaremos a mensagem recebida, obtendo dela o hash (pares de chave-valor), com o qual você já pode trabalhar, ou seja, desmonte por condições. E na seção de saída, selecionamos mensagens e enviamos cada uma para o nosso canal.

Analisando uma mensagem usando grok


Para analisar as seqüências de texto e obter um conjunto de campos, existe um plug-in especial na seção de filtro - grok.

Não pretendendo fornecer aqui uma descrição detalhada aqui (para isso me refiro à documentação oficial ), darei meu exemplo simples.

Para fazer isso, você precisa decidir sobre o formato das linhas de entrada. Eu os tenho:

1 mensagem INFO
2 Mensagem de ERRO2

I.e. O identificador vem primeiro, depois INFO / ERROR, depois alguma palavra sem espaços.
Não é difícil, mas o suficiente para entender como funciona.

Portanto, na seção de filtro, no plug-in grok, precisamos definir um padrão para analisar nossas linhas.

Ficará assim:

 filter { grok { match => { "message" => ["%{INT:message_id} %{LOGLEVEL:message_type} %{WORD:message_text}"] } } } 

Esta é essencialmente uma expressão regular. Padrões prontos são usados, como INT, LOGLEVEL, WORD. Sua descrição, assim como outros padrões, pode ser encontrada aqui.

Agora, passando por esse filtro, nossa string se transformará em um hash de três campos: message_id, message_type, message_text.

Eles serão exibidos na seção de saída.

Roteando mensagens na seção de saída usando o comando if


Na seção de saída, como lembramos, íamos dividir as mensagens em dois fluxos. Alguns - que iNFO, produziremos para o console e, com erros, produziremos para um arquivo.

Como dividimos essas postagens? A condição do problema já solicita a solução - já temos o campo message_type selecionado, que pode receber apenas dois valores INFO e ERROR. É para ele que faremos uma escolha usando a instrução if.

 if [message_type] == "ERROR" { #     } else { #    stdout } 

A descrição do trabalho com campos e operadores pode ser encontrada nesta seção do manual oficial .

Agora, sobre a própria conclusão real.

A saída para o console, tudo está claro aqui - stdout {}

E aqui está a saída do arquivo - lembre-se de que executamos tudo a partir do contêiner e, para que o arquivo para o qual escrevemos o resultado seja acessível externamente, precisamos abrir esse diretório no docker-compose.yml.

Total:

A seção de saída do nosso arquivo fica assim:

  output { if [message_type] == "ERROR" { file { path => "/usr/share/logstash/output/test.log" codec => line { format => "custom format: %{message}"} } } else {stdout { } } } 

No docker-compose.yml, adicione outro volume à saída:

 version: '3' networks: elk: volumes: elasticsearch: driver: local services: logstash: container_name: logstash_one_channel image: docker.elastic.co/logstash/logstash:6.3.2 networks: - elk environment: XPACK_MONITORING_ENABLED: "false" ports: - 5046:5046 volumes: - ./config/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro - ./config/pipelines:/usr/share/logstash/config/pipelines:ro - ./logs:/usr/share/logstash/input - ./output:/usr/share/logstash/output 

Começamos, tentamos, vemos a divisão em dois fluxos.

Source: https://habr.com/ru/post/pt451264/


All Articles