
No último
artigo, encontramos
a pilha ELK de quais produtos de software ela consiste. E a primeira tarefa que um engenheiro enfrenta ao trabalhar com a pilha ELK é enviar logs para armazenamento na elasticsearch para análises adicionais. No entanto, isso é apenas por palavras: a elasticsearch armazena os logs na forma de documentos com determinados campos e valores, o que significa que o engenheiro deve usar várias ferramentas para analisar a mensagem enviada pelos sistemas finais. Isso pode ser feito de várias maneiras - escrevendo você mesmo o programa, que adicionará documentos ao banco de dados usando a API ou usará soluções prontas. Neste curso, veremos a solução
Logstash , que faz parte da pilha ELK. Veremos como você pode enviar logs dos sistemas finais para o Logstash e, em seguida, configuraremos o arquivo de configuração para analisar e redirecionar para o banco de dados Elasticsearch. Para fazer isso, pegue os logs do firewall do Check Point como um sistema recebido.
Como parte do curso, a instalação da pilha ELK não é considerada, uma vez que há um grande número de artigos sobre esse tópico, consideraremos o componente de configuração.
Vamos fazer um plano de ação para configurar o Logstash:
- Verificação de que o elasticsearch aceitará logs (verificando a integridade e a abertura da porta).
- Consideramos como podemos enviar eventos para o Logstash, escolher um método e implementá-lo.
- Configure a entrada no arquivo de configuração do Logstash.
- Configuramos a Saída no arquivo de configuração do Logstash no modo de depuração para entender como é a mensagem de log.
- Configure o filtro.
- Configure a saída correta no ElasticSearch.
- Lançamento do Logstash.
- Verifique os logs em Kibana.
Vamos considerar cada item com mais detalhes:
Verificando se o elasticsearch aceitará logs
Para fazer isso, você pode usar o comando curl para verificar o acesso ao Elasticsearch a partir do sistema no qual o Logstash está implantado. Se você configurou a autenticação, também passe o usuário / senha através de curl, especifique a porta 9200 se você não a alterou. Se uma resposta como a abaixo aparecer, tudo está em ordem.
[elastic@elasticsearch ~]$ curl -u <<user_name>> : <<password>> -sS -XGET "<<ip_address_elasticsearch>>:9200" { "name" : "elastic-1", "cluster_name" : "project", "cluster_uuid" : "sQzjTTuCR8q4ZO6DrEis0A", "version" : { "number" : "7.4.1", "build_flavor" : "default", "build_type" : "rpm", "build_hash" : "fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e", "build_date" : "2019-10-22T17:16:35.176724Z", "build_snapshot" : false, "lucene_version" : "8.2.0", "minimum_wire_compatibility_version" : "6.8.0", "minimum_index_compatibility_version" : "6.0.0-beta1" }, "tagline" : "You Know, for Search" } [elastic@elasticsearch ~]$
Se a resposta não vier, pode haver vários tipos de erros: o processo elasticsearch não funciona, a porta errada é especificada ou a porta é bloqueada por um firewall no servidor em que o elasticsearch está instalado.
Considere como você pode enviar logs para o Logstash a partir de um firewall do ponto de verificação
Com o servidor de gerenciamento Check Point, você pode enviar logs para o Logstash via syslog usando o utilitário log_exporter. Você pode se familiarizar com ele neste
artigo com mais detalhes. Aqui, deixaremos apenas o comando que cria o fluxo:
cp_log_export adicione o nome check_point_syslog servidor de destino <<endereço_ip_logstash>> protocolo de porta-destino 5555 formato tcp protocolo modo de leitura genérico semi-unificado
<<ip_address_logstash>> - o endereço do servidor em que o Logstash está sendo executado, porta de destino 5555 - a porta para a qual enviaremos logs, o envio de logs via tcp pode carregar o servidor, portanto, em alguns casos, é mais correto configurar o udp.
Configure INPUT no arquivo de configuração do Logstash

Por padrão, o arquivo de configuração está localizado no diretório /etc/logstash/conf.d/. O arquivo de configuração consiste em 3 partes significativas: INPUT, FILTER, OUTPUT. Em
INPUT , indicamos de onde o sistema tirará os logs; em
FILTER, analisamos o log - configuramos como dividir a mensagem em campos e valores; em
OUTPUT, configuramos o fluxo de saída - para onde os logs analisados serão enviados.
Primeiro, configure INPUT, considere alguns tipos que podem ser - file, tcp e exe.
Tcp: input { tcp { port => 5555 host => “10.10.1.205” type => "checkpoint" mode => "server" } }
mode => "servidor"
Diz que o Logstash aceita conexões.
port => 5555
host => "10.10.1.205"
Aceitamos conexões com o endereço IP 10.10.1.205 (Logstash), porta 5555 - a porta deve ser permitida pela política de firewall.
tipo => "ponto de verificação"
Marcamos o documento, é muito conveniente se você tiver várias conexões de entrada. A seguir, para cada conexão, você pode gravar seu próprio filtro usando a construção lógica.
Arquivo: input { file { path => "/var/log/openvas_report/*" type => "openvas" start_position => "beginning" } }
Descrição das configurações:
caminho => "/ var / log / openvas_report / *"
Especifique o diretório em que os arquivos devem ser lidos.
tipo => "openvas"
Tipo de evento.
start_position => "começo"
Ao alterar o arquivo, ele lê o arquivo inteiro; se você definir "final", o sistema aguardará a entrada de novas entradas no final do arquivo.
Exec: input { exec { command => "ls -alh" interval => 30 } }
Nesta entrada, (apenas!) Um comando shell é iniciado e sua saída é agrupada em uma mensagem de log.
comando => "ls -alh"
A equipe cuja produção estamos interessados.
intervalo => 30
Intervalo de chamada de comando em segundos.
Para receber logs do firewall, prescrevemos um filtro
tcp ou
udp , dependendo de como os logs são enviados ao Logstash.
Configuramos Saída no arquivo de configuração do Logstash no modo de depuração para entender como é a mensagem de log
Depois de configurarmos o INPUT, precisamos entender como será a mensagem de log, quais métodos devem ser usados para configurar o filtro (analisador) dos logs.
Para fazer isso, usaremos um filtro que exibe o resultado em stdout para visualizar a mensagem original; o arquivo de configuração completo do momento atual será semelhante a este:
input { tcp { port => 5555 type => "checkpoint" mode => "server" host => “10.10.1.205” } } output { if [type] == "checkpoint" { stdout { codec=> json } } }
Executa o comando para verificar:
sudo / usr / share / logstash / bin // logstash -f /etc/logstash/conf.d/checkpoint.conf
Vemos o resultado, a imagem é clicável:

Se você copiá-lo, ficará assim:
action=\"Accept\" conn_direction=\"Internal\" contextnum=\"1\" ifdir=\"outbound\" ifname=\"bond1.101\" logid=\"0\" loguid=\"{0x5dfb8c13,0x5,0xfe0a0a0a,0xc0000000}\" origin=\"10.10.10.254\" originsicname=\"CN=ts-spb-cpgw-01,O=cp-spb-mgmt-01.tssolution.local.kncafb\" sequencenum=\"8\" time=\"1576766483\" version=\"5\" context_num=\"1\" dst=\"10.10.10.10\" dst_machine_name=\"ts-spb-dc-01@tssolution.local\" layer_name=\"TSS-Standard Security\" layer_name=\"TSS-Standard Application\" layer_uuid=\"dae7f01c-4c98-4c3a-a643-bfbb8fcf40f0\" layer_uuid=\"dbee3718-cf2f-4de0-8681-529cb75be9a6\" match_id=\"8\" match_id=\"33554431\" parent_rule=\"0\" parent_rule=\"0\" rule_action=\"Accept\" rule_action=\"Accept\" rule_name=\"Implicit Cleanup\" rule_uid=\"6dc2396f-9644-4546-8f32-95d98a3344e6\" product=\"VPN-1 & FireWall-1\" proto=\"17\" s_port=\"37317\" service=\"53\" service_id=\"domain-udp\" src=\"10.10.1.180\" ","type":"qqqqq","host":"10.10.10.250","@version":"1","port":50620}{"@timestamp":"2019-12-19T14:50:12.153Z","message":"time=\"1576766483\" action=\"Accept\" conn_direction=\"Internal\" contextnum=\"1\" ifdir=\"outbound\" ifname=\"bond1.101\" logid=\"0\" loguid=\"{0x5dfb8c13,
Observando essas mensagens, entendemos que os logs têm o formato: campo = valor ou chave = valor, o que significa que um filtro chamado kv é adequado. Para escolher o filtro certo para cada caso específico, seria bom se familiarizar com eles na documentação técnica ou pedir a um amigo.
Personalizar filtro
No último estágio, eles escolheram kv e a configuração desse filtro é apresentada:
filter { if [type] == "checkpoint"{ kv { value_split => "=" allow_duplicate_values => false } } }
Selecionamos o símbolo pelo qual dividiremos o campo e o valor - “=”. Se tivermos as mesmas entradas no log, salvaremos apenas uma instância no banco de dados; caso contrário, você obterá uma matriz com os mesmos valores, ou seja, se tivermos a mensagem “foo = some foo = some”, escreva apenas foo = some.
Configure a saída correta no ElasticSearch
Após a configuração do Filtro, é possível fazer upload dos logs no banco de dados
elasticsearch :
output { if [type] == "checkpoint" { elasticsearch { hosts => ["10.10.1.200:9200"] index => "checkpoint-%{+YYYY.MM.dd}" user => "tssolution" password => "cool" } } }
Se o documento for assinado com o tipo de ponto de verificação, salve o evento no banco de dados elasticsearch, que aceita conexões em 10.10.1.200 à porta 9200 por padrão. Cada documento é salvo em um índice específico; nesse caso, salvamos no índice “checkpoint-” + a data atual. Cada índice pode ter um determinado conjunto de campos ou é criado automaticamente quando um novo campo aparece na mensagem, as configurações de campo e seu tipo podem ser visualizados nos mapeamentos.
Se você configurou a autenticação (consideraremos mais adiante), os créditos para gravar em um índice específico devem ser especificados; neste exemplo, é "tssolution" com a senha "cool". Você pode distinguir entre direitos do usuário para gravar logs apenas em um índice específico e não mais.
Inicie o Logstash.
Arquivo de configuração do logstash:
input { tcp { port => 5555 type => "checkpoint" mode => "server" host => “10.10.1.205” } } filter { if [type] == "checkpoint"{ kv { value_split => "=" allow_duplicate_values => false } } } output { if [type] == "checkpoint" { elasticsearch { hosts => ["10.10.1.200:9200"] index => "checkpoint-%{+YYYY.MM.dd}" user => "tssolution" password => "cool" } } }
Verifique o arquivo de configuração para obter a compilação correta:
/ usr / share / logstash / bin // logstash -f ponto de verificação.conf

Iniciamos o processo Logstash:
sudo systemctl iniciar logstash
Verifique se o processo foi iniciado:
sudo systemctl status logstash

Verifique se o soquete aumentou:
netstat -nat | grep 5555

Verifique os logs em Kibana.
Depois de tudo começar, vá para Kibana - Discover, verifique se tudo está configurado corretamente, se a imagem é clicável!

Todos os logs estão no lugar e podemos ver todos os campos e seus valores!
Conclusão
Examinamos como gravar um arquivo de configuração do Logstash e, como resultado, obtivemos um analisador de todos os campos e valores. Agora podemos trabalhar com a pesquisa e gráficos para determinados campos. Além disso, no curso, consideraremos a visualização no Kibana, crie um painel simples. Vale ressaltar que o arquivo de configuração do Logstash deve ser constantemente adicionado em determinadas situações, por exemplo, quando queremos substituir o valor do campo de um dígito para uma palavra. Nos artigos subsequentes, faremos isso o tempo todo.
Portanto, fique atento (
Telegram ,
Facebook ,
VK ,
TS Solution Blog ),
Yandex.Zen .