2. Pile élastique: analyse des journaux de sécurité. Logstash



Dans le dernier article, nous avons rencontré la pile ELK de quels produits logiciels elle se compose. Et la première tâche à laquelle un ingénieur est confronté lorsqu'il travaille avec la pile ELK est d'envoyer des journaux pour le stockage dans elasticsearch pour une analyse plus approfondie. Cependant, c'est juste en mots, elasticsearch stocke les journaux sous forme de documents avec certains champs et valeurs, ce qui signifie que l'ingénieur doit utiliser divers outils pour analyser le message envoyé depuis les systèmes finaux. Cela peut être fait de plusieurs manières - en écrivant le programme vous-même, qui ajoutera des documents à la base de données à l'aide de l'API ou utilisera des solutions prêtes à l'emploi. Dans ce cours, nous examinerons la solution Logstash , qui fait partie de la pile ELK. Nous verrons comment vous pouvez envoyer des journaux des systèmes finaux à Logstash, puis nous configurerons le fichier de configuration pour l'analyse et la redirection vers la base de données Elasticsearch. Pour ce faire, prenez les journaux du pare-feu Check Point en tant que système entrant.

Dans le cadre du cours, l'installation de la pile ELK n'est pas envisagée, car il existe un grand nombre d'articles sur ce sujet, nous considérerons le composant de configuration.

Faisons un plan d'action pour configurer Logstash:

  1. Vérification que elasticsearch acceptera les journaux (vérification de l'intégrité et de l'ouverture du port).
  2. Nous considérons comment envoyer des événements à Logstash, choisir une méthode et l'implémenter.
  3. Configurez Input dans le fichier de configuration Logstash.
  4. Nous configurons Output dans le fichier de configuration Logstash en mode débogage afin de comprendre à quoi ressemble le message du journal.
  5. Configurer le filtre.
  6. Configurez la sortie correcte dans ElasticSearch.
  7. Logstash se lance.
  8. Vérifiez les journaux dans Kibana.

Examinons chaque élément plus en détail:

Vérifier qu'elasticsearch accepte les journaux


Pour ce faire, vous pouvez utiliser la commande curl pour vérifier l'accès à Elasticsearch à partir du système sur lequel Logstash est déployé. Si vous avez configuré l'authentification, passez également l'utilisateur / le mot de passe via curl, spécifiez le port 9200 si vous ne l'avez pas modifié. Si une réponse comme celle ci-dessous apparaît, alors tout est en ordre.

[elastic@elasticsearch ~]$ curl -u <<user_name>> : <<password>> -sS -XGET "<<ip_address_elasticsearch>>:9200" { "name" : "elastic-1", "cluster_name" : "project", "cluster_uuid" : "sQzjTTuCR8q4ZO6DrEis0A", "version" : { "number" : "7.4.1", "build_flavor" : "default", "build_type" : "rpm", "build_hash" : "fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e", "build_date" : "2019-10-22T17:16:35.176724Z", "build_snapshot" : false, "lucene_version" : "8.2.0", "minimum_wire_compatibility_version" : "6.8.0", "minimum_index_compatibility_version" : "6.0.0-beta1" }, "tagline" : "You Know, for Search" } [elastic@elasticsearch ~]$ 

Si la réponse ne vient pas, il peut y avoir plusieurs types d'erreurs: le processus elasticsearch ne fonctionne pas, le mauvais port est spécifié ou le port est bloqué par un pare-feu sur le serveur où elasticsearch est installé.

Considérez comment envoyer des journaux à Logstash à partir d'un pare-feu de point de contrôle


Avec le serveur de gestion Check Point, vous pouvez envoyer des journaux à Logstash via syslog à l'aide de l'utilitaire log_exporter, vous pouvez vous familiariser avec celui-ci dans cet article plus en détail, ici nous ne laisserons que la commande qui crée le flux:

cp_log_export add name check_point_syslog target-server <<ip_address_logstash>> target-port 5555 protocol tcp format generic read-mode semi-unified

<<ip_address_logstash>> - l'adresse du serveur sur lequel Logstash s'exécute, port cible 5555 - le port auquel nous enverrons les journaux, l'envoi de journaux via tcp peut charger le serveur, donc dans certains cas, il est plus correct de définir udp.

Configurer INPUT dans le fichier de configuration Logstash




Par défaut, le fichier de configuration se trouve dans le répertoire /etc/logstash/conf.d/. Le fichier de configuration se compose de 3 parties significatives: INPUT, FILTER, OUTPUT. Dans INPUT, nous indiquons d'où le système prendra les journaux, dans FILTER nous analysons le journal - configurons comment diviser le message en champs et valeurs, dans OUTPUT nous configurons le flux de sortie - où les journaux analysés seront envoyés.

Tout d'abord, configurez INPUT, considérez certains types qui peuvent être - file, tcp et exe.

Tcp:

 input { tcp { port => 5555 host => “10.10.1.205” type => "checkpoint" mode => "server" } } 

mode => "serveur"
Dit que Logstash accepte les connexions.

port => 5555
hôte => «10.10.1.205»
Nous acceptons les connexions à l'adresse IP 10.10.1.205 (Logstash), port 5555 - le port doit être autorisé par la politique de pare-feu.

type => "checkpoint"
Nous marquons le document, c'est très pratique si vous avez plusieurs connexions entrantes. Dans ce qui suit, pour chaque connexion, vous pouvez écrire votre propre filtre à l'aide de la construction logique if.

Fichier:

 input { file { path => "/var/log/openvas_report/*" type => "openvas" start_position => "beginning" } } 

Description des paramètres:
path => "/ var / log / openvas_report / *"
Spécifiez le répertoire dans lequel les fichiers doivent être lus.

type => "openvas"
Type d'événement.

start_position => "début"
Lors de la modification du fichier, il lit l'intégralité du fichier, si vous définissez «fin», le système attend que de nouvelles entrées apparaissent à la fin du fichier.

Exec:

 input { exec { command => "ls -alh" interval => 30 } } 

Sur cette entrée, (seulement!) Une commande shell est lancée et sa sortie est enveloppée dans un message de journal.

commande => "ls -alh"
L'équipe dont la production nous intéresse.

intervalle => 30
Intervalle d'invocation de commande en secondes.

Afin de recevoir les journaux du pare-feu, nous prescrivons un filtre tcp ou udp , selon la façon dont les journaux sont envoyés à Logstash.

Nous configurons Output dans le fichier de configuration Logstash en mode débogage afin de comprendre à quoi ressemble le message du journal


Après avoir configuré INPUT, nous devons comprendre à quoi ressemblera le message du journal, quelles méthodes devraient être utilisées pour configurer le filtre (analyseur) des journaux.

Pour ce faire, nous utiliserons un filtre qui affiche le résultat dans stdout afin d'afficher le message d'origine, le fichier de configuration complet pour le moment actuel ressemblera à ceci:

 input { tcp { port => 5555 type => "checkpoint" mode => "server" host => “10.10.1.205” } } output { if [type] == "checkpoint" { stdout { codec=> json } } } 

Exécute la commande pour vérifier:
sudo / usr / share / logstash / bin // logstash -f /etc/logstash/conf.d/checkpoint.conf
On voit le résultat, l'image est cliquable:



Si vous le copiez, il ressemblera à ceci:

 action=\"Accept\" conn_direction=\"Internal\" contextnum=\"1\" ifdir=\"outbound\" ifname=\"bond1.101\" logid=\"0\" loguid=\"{0x5dfb8c13,0x5,0xfe0a0a0a,0xc0000000}\" origin=\"10.10.10.254\" originsicname=\"CN=ts-spb-cpgw-01,O=cp-spb-mgmt-01.tssolution.local.kncafb\" sequencenum=\"8\" time=\"1576766483\" version=\"5\" context_num=\"1\" dst=\"10.10.10.10\" dst_machine_name=\"ts-spb-dc-01@tssolution.local\" layer_name=\"TSS-Standard Security\" layer_name=\"TSS-Standard Application\" layer_uuid=\"dae7f01c-4c98-4c3a-a643-bfbb8fcf40f0\" layer_uuid=\"dbee3718-cf2f-4de0-8681-529cb75be9a6\" match_id=\"8\" match_id=\"33554431\" parent_rule=\"0\" parent_rule=\"0\" rule_action=\"Accept\" rule_action=\"Accept\" rule_name=\"Implicit Cleanup\" rule_uid=\"6dc2396f-9644-4546-8f32-95d98a3344e6\" product=\"VPN-1 & FireWall-1\" proto=\"17\" s_port=\"37317\" service=\"53\" service_id=\"domain-udp\" src=\"10.10.1.180\" ","type":"qqqqq","host":"10.10.10.250","@version":"1","port":50620}{"@timestamp":"2019-12-19T14:50:12.153Z","message":"time=\"1576766483\" action=\"Accept\" conn_direction=\"Internal\" contextnum=\"1\" ifdir=\"outbound\" ifname=\"bond1.101\" logid=\"0\" loguid=\"{0x5dfb8c13, 

En regardant ces messages, nous comprenons que les journaux ont la forme: champ = valeur ou clé = valeur, ce qui signifie qu'un filtre appelé kv convient. Afin de choisir le bon filtre pour chaque cas spécifique, il serait bon de vous familiariser avec eux dans la documentation technique, ou de demander à un ami.

Personnaliser le filtre


Lors de la dernière étape, ils ont choisi kv, puis la configuration de ce filtre est présentée:

 filter { if [type] == "checkpoint"{ kv { value_split => "=" allow_duplicate_values => false } } } 

Nous sélectionnons le symbole par lequel nous diviserons le champ et la valeur - «=». Si nous avons les mêmes entrées dans le journal, nous n'enregistrons qu'une seule instance dans la base de données, sinon vous obtiendrez un tableau des mêmes valeurs, c'est-à-dire que si nous avons le message «foo = some foo = some», nous écrivons seulement foo = some.

Configurer la sortie correcte dans ElasticSearch


Une fois le filtre configuré, vous pouvez télécharger les journaux dans la base de données elasticsearch :

 output { if [type] == "checkpoint" { elasticsearch { hosts => ["10.10.1.200:9200"] index => "checkpoint-%{+YYYY.MM.dd}" user => "tssolution" password => "cool" } } } 

Si le document est signé avec le type de point de contrôle, enregistrez l'événement dans la base de données elasticsearch, qui accepte les connexions le 10.10.1.200 au port 9200 par défaut. Chaque document est enregistré dans un index spécifique, dans ce cas nous enregistrons dans l'index «checkpoint-» + la date actuelle. Chaque index peut avoir un ensemble spécifique de champs, ou il est créé automatiquement lorsqu'un nouveau champ apparaît dans le message, les paramètres de champ et leur type peuvent être affichés dans les mappages.

Si vous avez configuré l'authentification (nous y reviendrons plus tard), les crédits pour l'écriture dans un index spécifique doivent être spécifiés, dans cet exemple c'est «tssolution» avec le mot de passe «cool». Vous pouvez faire la distinction entre les droits d'utilisateur pour écrire des journaux uniquement sur un index spécifique et pas plus.

Lancez Logstash.


Fichier de configuration de Logstash:

 input { tcp { port => 5555 type => "checkpoint" mode => "server" host => “10.10.1.205” } } filter { if [type] == "checkpoint"{ kv { value_split => "=" allow_duplicate_values => false } } } output { if [type] == "checkpoint" { elasticsearch { hosts => ["10.10.1.200:9200"] index => "checkpoint-%{+YYYY.MM.dd}" user => "tssolution" password => "cool" } } } 

Vérifiez le fichier de configuration pour une compilation correcte:
/ usr / share / logstash / bin // logstash -f checkpoint.conf


Nous commençons le processus Logstash:
sudo systemctl start logstash

Vérifiez que le processus a commencé:
sudo systemctl logstash d'état



Vérifiez si la prise a augmenté:
netstat -nat | grep 5555



Vérifiez les journaux dans Kibana.


Une fois que tout a commencé, rendez-vous sur Kibana - Découvrir, assurez-vous que tout est correctement configuré, l'image est cliquable!



Tous les journaux sont en place et nous pouvons voir tous les champs et leurs valeurs!

Conclusion


Nous avons examiné comment écrire un fichier de configuration Logstash, nous avons donc obtenu un analyseur de tous les champs et valeurs. Nous pouvons maintenant travailler avec la recherche et la cartographie de certains champs. Plus loin dans le cours, nous considérerons la visualisation dans Kibana, créer un tableau de bord simple. Il convient de mentionner que le fichier de configuration Logstash doit être constamment ajouté dans certaines situations, par exemple lorsque nous voulons remplacer la valeur du champ d'un chiffre par un mot. Dans les articles suivants, nous le ferons tout le temps.

Alors restez à l'écoute ( Telegram , Facebook , VK , TS Solution Blog ), Yandex.Zen .

Source: https://habr.com/ru/post/fr481960/


All Articles