Analisando as configurações do ELK 7.5 para análise de log do Mikrotik

Há muito tempo é uma ideia ver o que você pode fazer com o ELK e fontes improvisadas de logs e estatísticas. Nas páginas do Habr, pretendo mostrar um exemplo prático de como, usando um mini-servidor doméstico, você pode criar, por exemplo, honeypot com um sistema de análise de logs baseado na pilha ELK. Neste artigo, mostrarei o exemplo mais simples de análise de logs de firewall usando a pilha ELK. No futuro, gostaria de descrever as configurações do ambiente para analisar o tráfego do Netflow e os dumps de pcap do Zeek.



Se você tiver um endereço IP público e um dispositivo mais ou menos inteligente como gateway / firewall, poderá organizar um honeypot passivo configurando solicitações de entrada para “deliciosas” portas TCP e UDP. Há um exemplo de configuração de um roteador Mikrotik sob um gato, mas se você tiver um roteador de fornecedor diferente (ou algum outro sistema de segurança) em mãos, precisará descobrir alguns formatos de dados e configurações específicas do fornecedor, e obterá o mesmo resultado.

Isenção de responsabilidade


O artigo não pretende ser original, não trata de questões de tolerância a falhas de serviços, segurança, práticas recomendadas etc. É necessário considerar este material como acadêmico, adequado para familiarizar-se com a funcionalidade básica da pilha ELK e o mecanismo de análise de log do dispositivo de rede. No entanto, também pode ser interessante para um iniciante.

O projeto é iniciado a partir do arquivo docker-compose, e é muito fácil implantar seu ambiente semelhante, mesmo que você tenha um roteador de outro fornecedor disponível, você só precisa entender um pouco sobre os formatos de dados e as configurações específicas do fornecedor. De resto, tentei descrever o máximo possível todas as nuances associadas à configuração dos pipelines do Logstash e dos mapeamentos do Elasticsearch na versão atual do ELK. Todos os componentes deste sistema estão hospedados no github , incluindo configurações de serviço. No final do artigo, farei a seção Solução de problemas, que descreverá as etapas para diagnosticar os problemas populares recém-chegados a esse negócio.

1. Introdução


No próprio servidor, instalei o sistema de virtualização Proxmox, nele, na máquina KVM, os contêineres Docker são iniciados. Supõe-se que você saiba como o docker e o docker-componham funcionam, pois há exemplos de configuração suficientes sobre o uso da Internet. Não falarei sobre os problemas de instalação do Docker, vou escrever um pouco sobre o docker-compone.

A idéia de lançar o honeypot surgiu no processo de estudo do Elasticsearch, Logstash e Kibana. Na minha carreira profissional, nunca estive envolvido na administração e no uso geral dessa pilha, mas tenho projetos de hobby, graças aos quais desenvolvi um grande interesse em explorar as possibilidades oferecidas pelo mecanismo de pesquisa Elasticsearch e Kibana, com o qual é possível analisar e visualizar dados.

Meu não é o mais novo servidor mini NUC com 8 GB de RAM é suficiente para iniciar a pilha ELK com um nó Elastic. Em ambientes de produção, isso, obviamente, não é recomendado, mas apenas adequado para treinamento. Em relação à questão da segurança, há uma observação no final do artigo.

A Internet está cheia de instruções para instalar e configurar a pilha ELK para tarefas semelhantes (por exemplo, analisar ataques de força bruta no ssh usando o Logstash versão 2 , analisando os logs do Suricata usando o Filebeat versão 6 ), mas, na maioria dos casos, não é dada a devida atenção aos detalhes. 90% do material será das versões 1 a 6 (no momento da redação deste documento, a versão atual do ELK é 7.5.0). Isso é importante porque, a partir da versão 6, o Elasticsearch decidiu remover a entidade do tipo de mapeamento, alterando a sintaxe da consulta e a estrutura de mapeamento. O modelo de mapeamento no Elastic geralmente é um objeto muito importante e, para que mais tarde não haja problemas com a amostragem e visualização de dados, aconselho que você não se envolva em copiar e colar e tente entender o que está fazendo. Além disso, tentarei explicar claramente o significado das operações e configurações descritas.

Configuração do roteador


Para a rede doméstica, eu uso o Mikrotik como roteador, então um exemplo será para ele. Mas quase qualquer sistema pode ser configurado para enviar syslog para um servidor remoto, seja um roteador, um servidor ou algum outro sistema de segurança que possa registrar.

Enviando mensagens syslog para um servidor remoto


No Mikrotik, para configurar o registro em um servidor remoto através da CLI, basta digitar alguns comandos:

/system logging action add remote=192.168.88.130 remote-port=5145 src-address=192.168.88.1 name=logstash target=remote /system logging add action=logstash topics=info 

Configurando regras de firewall com log


Estamos interessados ​​apenas em determinados dados (nome do host, endereço IP, nome de usuário, URL etc.), dos quais você pode obter uma bela visualização ou seleção. No caso mais simples, para obter informações sobre varreduras de portas e tentativas de acesso, você precisa configurar o componente do firewall para registrar os acionadores de regras. No Mikrotik, configurei as regras na tabela NAT, não no Filter, já que no futuro vou colocar chanipots que emularão o trabalho dos serviços, isso permitirá que eu investigue mais informações sobre o comportamento das redes bot, mas esse é um cenário mais avançado e não sobre esse momento.

Atenção! Na configuração abaixo, a porta TCP padrão do serviço SSH (22) é colocada em loop na rede local. Se você usar o SSH para acessar o roteador de fora e as configurações tiverem a porta 22 ( serviço IP impresso nos serviços CLI e ip> no Winbox), você deve atribuir novamente a porta ao SSH de gerenciamento ou não inserir a última regra na tabela.
Além disso, dependendo do nome da interface WAN (se a ponte WAN não for usada), você precisará alterar o parâmetro in-interface para o apropriado.

 /ip firewall nat add action=netmap chain=dstnat comment="HONEYPOT RDP" dst-port=3389 in-interface=bridge-wan log=yes log-prefix=honeypot_rdp protocol=tcp to-addresses=192.168.88.201 to-ports=3389 add action=netmap chain=dstnat comment="HONEYPOT ELASTIC" dst-port=9200 in-interface=bridge-wan log=yes log-prefix=honeypot_elastic protocol=tcp to-addresses=192.168.88.201 to-ports=9211 add action=netmap chain=dstnat comment=" HONEYPOT TELNET" dst-port=23 in-interface=bridge-wan log=yes log-prefix=honeypot_telnet protocol=tcp to-addresses=192.168.88.201 to-ports=2325 add action=netmap chain=dstnat comment="HONEYPOT DNS" dst-port=53 in-interface=bridge-wan log=yes log-prefix=honeypot_dns protocol=udp to-addresses=192.168.88.201 to-ports=9953 add action=netmap chain=dstnat comment="HONEYPOT FTP" dst-port=21 in-interface=bridge-wan log=yes log-prefix=honeypot_ftp protocol=tcp to-addresses=192.168.88.201 to-ports=9921 add action=netmap chain=dstnat comment="HONEYPOT SMTP" dst-port=25 in-interface=bridge-wan log=yes log-prefix=honeypot_smtp protocol=tcp to-addresses=192.168.88.201 to-ports=9925 add action=netmap chain=dstnat comment="HONEYPOT SMB" dst-port=445 in-interface=bridge-wan log=yes log-prefix=honeypot_smb protocol=tcp to-addresses=192.168.88.201 to-ports=9445 add action=netmap chain=dstnat comment="HONEYPOT MQTT" dst-port=1883 in-interface=bridge-wan log=yes log-prefix=honeypot_mqtt protocol=tcp to-addresses=192.168.88.201 to-ports=9883 add action=netmap chain=dstnat comment="HONEYPOT SIP" dst-port=5060 in-interface=bridge-wan log=yes log-prefix=honeypot_sip protocol=tcp to-addresses=192.168.88.201 to-ports=9060 add action=dst-nat chain=dstnat comment="HONEYPOT SSH" dst-port=22 in-interface=bridge-wan log=yes log-prefix=honeypot_ssh protocol=tcp to-addresses=192.168.88.201 to-ports=9922 



No Winbox, o mesmo está configurado na guia IP> Firewall> NAT .

Agora, o roteador redirecionará os pacotes recebidos para o endereço local 192.168.88.201 e a porta personalizada. No momento, ninguém está ouvindo essas portas, portanto as conexões serão interrompidas. No futuro, na janela de encaixe, você poderá executar o honeypot, dos quais existem muitos para cada serviço. Se isso não for planejado, em vez das regras NAT, você deve escrever uma regra com a ação de queda na cadeia Filtro.

Iniciando o ELK com docker-compose


Em seguida, você pode começar a configurar o componente que processará os logs. Eu aconselho você a praticar imediatamente e clonar o repositório para ver os arquivos de configuração completamente. Todas as configurações descritas podem ser vistas lá. No texto do artigo, copiarei apenas parte das configurações.

 ❯❯ git clone https://github.com/mekhanme/elk-mikrot.git 



Em um ambiente de teste ou desenvolvimento, é mais conveniente executar contêineres do docker usando o docker-compose. Neste projeto, eu uso o arquivo docker-compose da versão mais recente 3.7 no momento, ele requer o mecanismo docker versão 18.06.0+, portanto, vale a pena atualizar o docker , bem como o docker-compose .

 ❯❯ curl -L "https://github.com/docker/compose/releases/download/1.25.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose ❯❯ chmod +x /usr/local/bin/docker-compose 

Como nas versões recentes do docker-compose, o parâmetro mem_limit foi cortado e a implementação foi adicionada, que é executada apenas no modo enxame ( implantação da pilha do docker ), o lançamento da configuração do docker-compose com limites leva a um erro. Como não uso o enxame e quero ter limites de recursos, tenho que iniciá-lo com a opção --compatibility , que converte os limites de novas versões do docker-componha para o equivalente não-soldável.

Execução de teste de todos os contêineres (no segundo plano -d):

 ❯❯ docker-compose --compatibility up -d 

Você terá que esperar até que todas as imagens sejam baixadas e, após a conclusão da inicialização, você pode verificar o status dos contêineres com o comando:

 ❯❯ docker-compose --compatibility ps 

Devido ao fato de todos os contêineres estarem na mesma rede (se você não especificar explicitamente a rede, uma nova ponte será criada, o que é adequado nesse cenário) e o docker-compose.yml contém o parâmetro container_name para todos os contêineres , os contêineres já terão conectividade por meio do DNS interno janela de encaixe. Como resultado, não é necessário registrar endereços IP nas configurações de contêiner. Na configuração do Logstash, a sub-rede 192.168.88.0/24 é registrada como local. Além disso, na configuração, haverá explicações mais detalhadas, segundo as quais você pode desviar o exemplo da configuração antes de iniciar.

Configurar serviços ELK


Além disso, haverá explicações sobre como configurar a funcionalidade dos componentes ELK, além de mais algumas ações que precisarão ser executadas no Elasticsearch.

Para determinar as coordenadas geográficas por endereço IP, você precisará fazer o download do banco de dados GeoLite2 gratuito do MaxMind:

 ❯❯ cd elk-mikrot && mkdir logstash/geoip_db ❯❯ curl -O https://geolite.maxmind.com/download/geoip/database/GeoLite2-City-CSV.zip && unzip GeoLite2-City-CSV.zip -d logstash/geoip_db && rm -f GeoLite2-City-CSV.zip ❯❯ curl -O https://geolite.maxmind.com/download/geoip/database/GeoLite2-ASN-CSV.zip && unzip GeoLite2-ASN-CSV.zip -d logstash/geoip_db && rm -f GeoLite2-ASN-CSV.zip 

Configuração do Logstash


O arquivo de configuração principal é logstash.yml , onde registrei a opção de recarregar automaticamente a configuração, o restante das configurações do ambiente de teste não são significativas. A configuração do processamento de dados (logs) no Logstash é descrita em arquivos conf separados, geralmente armazenados no diretório de pipeline . No esquema, quando vários pipelines são usados, o arquivo pipelines.yml descreve os pipelines ativados. Um pipeline é uma cadeia de ações em dados não estruturados para receber dados com uma estrutura específica na saída. Um esquema com pipelines.yml configurado separadamente é opcional, você pode fazer isso sem fazer o download de todas as configurações do diretório de pipeline montado, no entanto, com um arquivo pipelines.yml específico, a configuração é mais flexível, pois é possível ativar e desativar os arquivos conf do diretório de pipeline configurações necessárias. Além disso, recarregar configurações só funciona no esquema de vários pipelines.

 ❯❯ cat logstash/config/pipelines.yml - pipeline.id: logstash-mikrot path.config: "pipeline/logstash-mikrot.conf" 

Em seguida, vem a parte mais importante da configuração do Logstash. A descrição do pipeline consiste em várias seções - no início, os plugins são indicados na seção Entrada com a ajuda da qual o Logstash recebe dados. A maneira mais fácil de coletar o syslog de um dispositivo de rede é usar os plugins de entrada tcp / udp . O único parâmetro necessário para esses plug-ins é porta , ele deve ser especificado da mesma forma que nas configurações do roteador.

A segunda seção é Filtro , que prescreve ações adicionais com dados que ainda não foram estruturados. No meu exemplo, mensagens syslog desnecessárias de um roteador com determinado texto são excluídas. Isso é feito usando a condição e a ação de descarte padrão, que descarta a mensagem inteira se a condição for atendida. Na condição , o campo da mensagem é verificado quanto à presença de determinado texto.



Se a mensagem não cair, ela desce mais a corrente e entra no filtro do grok . Como a documentação diz, o grok é uma ótima maneira de analisar dados de log não estruturados em algo estruturado e consultável . Este filtro é usado para processar logs de vários sistemas (syslog linux, servidor web, banco de dados, dispositivos de rede, etc.). Com base em padrões prontos, você pode, sem gastar muito tempo, criar um analisador para qualquer sequência mais ou menos repetitiva. É conveniente usar um analisador online para validação (na versão mais recente do Kibana, funcionalidade semelhante está na seção Ferramentas de Desenvolvimento ).



O volume "./logstash/patterns:/usr/share/logstash/patterns" está registrado no docker-compose.yml , no diretório patterns existe um arquivo com padrões da comunidade padrão (apenas por conveniência, veja se eu esqueci), bem como um arquivo com padrões de vários tipos de mensagens Mikrotik (módulos Firewall e Auth) , por analogia, você pode adicionar seus próprios modelos para mensagens de uma estrutura diferente.

As opções padrão add_field e remove_field permitem adicionar ou remover campos da mensagem que está sendo processada dentro de qualquer filtro. Nesse caso, o campo host é excluído, que contém o nome do host do qual a mensagem foi recebida. No meu exemplo, há apenas um host, portanto não há nenhum ponto nesse campo.

Além disso, na mesma seção Filtro , registrei o filtro cidr , que verifica o campo com o endereço IP quanto à conformidade com a condição de entrada na sub-rede fornecida e coloca a tag. Com base na tag na cadeia adicional, as ações serão executadas ou não (se especificamente, isso é para não fazer uma pesquisa geográfica para endereços locais no futuro).

Pode haver qualquer número de seções Filtro , para que haja menos condições em uma seção. Na nova seção, defini ações para mensagens sem a tag src_local , ou seja, os eventos do firewall são processados ​​aqui, nos quais estamos interessados ​​no endereço de origem.

Agora precisamos conversar um pouco mais sobre de onde o Logstash obtém as informações do GeoIP. O Logstash suporta bancos de dados GeoLite2. Existem várias opções de banco de dados, eu uso dois bancos de dados: GeoLite2 City (que contém informações sobre o país, cidade, fuso horário) e GeoLite2 ASN (informações sobre o sistema autônomo ao qual o endereço IP pertence).



O plug-in geoip também está envolvido na adição de informações de GeoIP à mensagem. A partir dos parâmetros, você deve especificar o campo que contém o endereço IP, a base usada e o nome do novo campo no qual as informações serão gravadas. No meu exemplo, o mesmo é feito para o endereço IP de destino, mas até agora neste cenário simples essas informações não serão interessantes, pois o endereço de destino sempre será o endereço do roteador. No entanto, no futuro, será possível adicionar logs a esse pipeline não apenas do firewall, mas também de outros sistemas em que será relevante examinar os dois endereços.

O filtro mutate permite alterar os campos da mensagem e modificar o texto nos próprios campos; a documentação descreve em detalhes muitos exemplos do que você pode fazer. Nesse caso, é usado para adicionar uma tag, renomear campos (para visualização adicional de logs no Kibana, é necessário um determinado formato do objeto de ponto geográfico , tocarei nesse tópico mais adiante) e excluir campos desnecessários.



Isso encerra a seção de processamento de dados e pode indicar apenas para onde enviar uma mensagem estruturada. Nesse caso, o Elasticsearch coletará dados; você só precisará inserir o endereço IP, a porta e o nome do índice. É recomendável que você insira o índice com um campo de data variável, para que um novo índice seja criado todos os dias.



Configurando o Elasticsearch


Voltar para Elasticsearch. Primeiro, você precisa se certificar de que o servidor esteja em funcionamento. O Elastic é interagido com mais eficiência por meio da API Rest na CLI. Usando curl, você pode ver o estado do nó (substitua localhost pelo endereço IP do docker do host):

 ❯❯ curl localhost:9200 

Depois, você pode tentar abrir o Kibana em localhost : 5601. Não há necessidade de configurar nada na interface da web do Kibana (a menos que mude o tema para escuro). Estamos interessados ​​em ver se um índice foi criado.Para fazer isso, abra a seção Gerenciamento e selecione Elasticsearch Index Management no canto superior esquerdo. Aqui você pode ver quantos documentos estão indexados, quanto ocupa espaço em disco, além de informações úteis sobre mapeamento de índice.



Nesse momento, você precisa registrar o modelo de mapeamento correto. Essas informações são necessárias para o Elastic para que ele entenda quais tipos de dados e quais campos pertencem. Por exemplo, para fazer seleções especiais baseadas em endereços IP, para o campo src_ip , você deve especificar explicitamente o tipo de dados ip e para determinar a localização geográfica, é necessário definir o campo geoip.location em um formato específico e registrar o tipo geo_point . Todos os campos possíveis não precisam ser descritos, pois para novos campos o tipo de dados é determinado automaticamente com base em padrões dinâmicos ( longo para números e palavra - chave para cadeias).

Você pode escrever um novo modelo usando curl ou diretamente no console do Kibana (seção Dev Tools ).

 ❯❯ curl -X POST -H "Content-Type: application/json" -d @elasticsearch/logstash_mikrot-template.json http://192.168.88.130:9200/_template/logstash-mikrot 

Após alterar o mapeamento, você precisa excluir o índice:

 ❯❯ curl -X DELETE http://192.168.88.130:9200/logstash-mikrot-2019.12.16 

Quando pelo menos uma mensagem chegar no índice, verifique o mapeamento:

 ❯❯ curl http://192.168.88.130:9200/logstash-mikrot-2019.12.16/_mapping 

Para uso adicional dos dados no Kibana, você precisa criar um padrão em Gerenciamento> Padrão de Índice Kibana . Digite o nome do índice com o símbolo * ( logstash-mikrot *) para que todos os índices correspondam, selecione o campo de registro de data e hora como o campo com a data e a hora. No campo ID do padrão de índice customizado , é possível inserir o ID do padrão (por exemplo, logstash-mikrot ); no futuro, isso pode simplificar o acesso ao objeto.

Análise e visualização de dados em Kibana


Depois de criar o padrão de índice , você pode prosseguir para a parte mais interessante - análise e visualização de dados. O Kibana tem muitas funcionalidades e seções, mas até agora estaremos interessados ​​em apenas dois.

Descobrir


Aqui você pode visualizar documentos em índices, filtrar, pesquisar e visualizar as informações recebidas. É importante não esquecer a linha do tempo, que define o período nas condições de pesquisa.



Visualize


Nesta seção, você pode criar a visualização com base nos dados coletados. O mais simples é exibir as fontes das redes de bots de varredura em um mapa geográfico, pontilhado ou na forma de um mapa de calor. Também existem muitas maneiras de criar gráficos, fazer seleções etc.



No futuro, pretendo contar com mais detalhes sobre processamento de dados, possivelmente visualização, talvez algo mais interessante. No processo de estudo, tentarei complementar o tutorial.

Solução de problemas


Se o índice não aparecer no Elasticsearch, verifique primeiro os logs do Logstash:

 ❯❯ docker logs logstash --tail 100 -f 

O Logstash não funcionará se não houver conectividade com o Elasticsearch, ou se um erro na configuração do pipeline for o principal motivo e isso se tornar claro após um estudo cuidadoso dos logs que são gravados no json docker por padrão.

Se não houver erros no log, você precisará garantir que o Logstash capture mensagens no soquete configurado. Para fins de depuração, você pode usar stdout como saída :

 stdout { codec => rubydebug } 

Depois disso, o Logstash gravará informações de debag quando a mensagem for recebida diretamente no log.

A verificação do Elasticsearch é muito simples - basta fazer uma solicitação GET se curvar no endereço IP e na porta do servidor ou em um terminal de API específico. Por exemplo, observe o status dos índices em uma tabela legível por humanos:

 ❯❯ curl -s 'http://192.168.88.130:9200/_cat/indices?v' 



O Kibana também não será iniciado se não houver conexão com o Elasticsearch; é fácil ver isso pelos logs.

Se a interface da web não abrir, verifique se o firewall está configurado ou desativado corretamente no Linux (no Centos houve problemas com o iptables e o docker , eles foram resolvidos com base nas recomendações do tópico ). Também vale a pena considerar que, em equipamentos pouco produtivos, todos os componentes podem carregar por vários minutos. Com falta de memória, os serviços podem não carregar. Exibir o uso de recursos do contêiner:

 ❯❯ docker stats 

Se de repente alguém não souber como alterar corretamente a configuração de contêineres no arquivo docker-compose.yml e reiniciar os contêineres, isso será feito editando o docker-compose.yml e usando o mesmo comando com os mesmos parâmetros, reinicie:

 ❯❯ docker-compose --compatibility up -d 

Ao mesmo tempo, nas seções alteradas, objetos antigos (contêineres, redes, volumes) são apagados e novos são recriados de acordo com a configuração. Os dados dos serviços não são perdidos ao mesmo tempo, uma vez que são usados ​​volumes nomeados , que não são excluídos com o contêiner e as configurações são montadas no sistema host, o Logstash pode até monitorar os arquivos de configuração e reiniciar a configuração do pipeline quando o arquivo é alterado.

Você pode reiniciar o serviço separadamente com o comando docker restart (não é necessário estar no diretório com docker-compose.yml) :

 ❯❯ docker restart logstash 

Você pode ver a configuração do objeto docker com o comando docker inspecionar ; é mais conveniente usá-lo com jq .



Conclusão


Quero observar que a segurança neste projeto não foi relatada porque é um ambiente de teste (dev) e não está planejada para ser lançada fora do roteador. Se você implantá-lo para uso mais sério, precisará seguir as práticas recomendadas, instalar certificados para HTTPS, fazer backups, monitoramento normal (que não inicia próximo ao sistema principal). A propósito, o Traefik é executado na minha janela de encaixe no meu servidor, que é um proxy reverso para alguns serviços, e também encerra o TLS em si mesmo e faz autenticação. Ou seja, graças ao DNS configurado e ao proxy reverso, é possível acessar a interface da web do Kibana da Internet com HTTPS não configurado e uma senha (como eu o entendo, na versão da comunidade, o Kibana não suporta proteção de senha para a interface da web). Pretendo descrever melhor minha experiência na configuração do Traefik para uso em uma rede doméstica com o Docker.

Source: https://habr.com/ru/post/pt481596/


All Articles