2. Elastic Stack: Analyse von Sicherheitsprotokollen. Logstash



Im letzten Artikel haben wir uns mit dem ELK-Stack darüber getroffen, aus welchen Softwareprodukten er besteht. Die erste Aufgabe, vor der ein Ingenieur bei der Arbeit mit dem ELK-Stack steht, ist das Senden von Protokollen zur Speicherung in Elasticsearch zur weiteren Analyse. Dies ist jedoch nur in Worten ausgedrückt: elasticsearch speichert die Protokolle in Form von Dokumenten mit bestimmten Feldern und Werten. Dies bedeutet, dass der Ingenieur verschiedene Tools verwenden muss, um die von den Endsystemen gesendete Nachricht zu analysieren. Dies kann auf verschiedene Arten geschehen - indem Sie das Programm selbst schreiben, wodurch der Datenbank mithilfe der API Dokumente hinzugefügt werden, oder indem Sie vorgefertigte Lösungen verwenden. In diesem Kurs werden wir uns die Logstash- Lösung ansehen , die Teil des ELK-Stacks ist. Wir werden sehen, wie Sie Protokolle von Endsystemen an Logstash senden können, und dann werden wir die Konfigurationsdatei für das Parsen und die Umleitung zur Elasticsearch-Datenbank konfigurieren. Nehmen Sie dazu die Protokolle von der Check Point-Firewall als eingehendes System.

Im Rahmen des Kurses wird die Installation des ELK-Stacks nicht berücksichtigt. Da es zu diesem Thema eine Vielzahl von Artikeln gibt, werden wir die Konfigurationskomponente berücksichtigen.

Lassen Sie uns einen Aktionsplan für die Konfiguration von Logstash erstellen:

  1. Überprüfung, dass elasticsearch Protokolle akzeptiert (Überprüfung der Portintegrität und Offenheit).
  2. Wir überlegen, wie wir Ereignisse an Logstash senden, eine Methode auswählen und implementieren können.
  3. Konfigurieren Sie die Eingabe in der Logstash-Konfigurationsdatei.
  4. Wir konfigurieren die Ausgabe in der Logstash-Konfigurationsdatei im Debug-Modus, um zu verstehen, wie die Protokollmeldung aussieht.
  5. Filter konfigurieren.
  6. Konfigurieren Sie die richtige Ausgabe in ElasticSearch.
  7. Logstash wird gestartet.
  8. Überprüfen Sie die Protokolle in Kibana.

Betrachten wir jeden Punkt genauer:

Wenn Sie diese Option aktivieren, werden Protokolle akzeptiert


Dazu können Sie den Befehl curl verwenden, um den Zugriff auf Elasticsearch von dem System aus zu überprüfen, auf dem Logstash bereitgestellt ist. Wenn Sie die Authentifizierung konfiguriert haben, geben Sie den Port 9200 an, wenn Sie das Kennwort nicht geändert haben. Wenn eine Antwort wie die folgende angezeigt wird, ist alles in Ordnung.

[elastic@elasticsearch ~]$ curl -u <<user_name>> : <<password>> -sS -XGET "<<ip_address_elasticsearch>>:9200" { "name" : "elastic-1", "cluster_name" : "project", "cluster_uuid" : "sQzjTTuCR8q4ZO6DrEis0A", "version" : { "number" : "7.4.1", "build_flavor" : "default", "build_type" : "rpm", "build_hash" : "fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e", "build_date" : "2019-10-22T17:16:35.176724Z", "build_snapshot" : false, "lucene_version" : "8.2.0", "minimum_wire_compatibility_version" : "6.8.0", "minimum_index_compatibility_version" : "6.0.0-beta1" }, "tagline" : "You Know, for Search" } [elastic@elasticsearch ~]$ 

Kommt die Antwort nicht, kann es verschiedene Arten von Fehlern geben: Der Elasticsearch-Prozess funktioniert nicht, der falsche Port wurde angegeben oder der Port wurde von einer Firewall auf dem Server blockiert, auf dem Elasticsearch installiert ist.

Überlegen Sie, wie Sie Protokolle von einer Checkpoint-Firewall an Logstash senden können


Mit dem Check Point-Verwaltungsserver können Sie mit dem Dienstprogramm log_exporter Protokolle über syslog an Logstash senden. Sie können sich in diesem Artikel ausführlicher damit vertraut machen. Hier belassen wir nur den Befehl, der den Stream erstellt:

cp_log_export add name check_point_syslog target-server << ip_address_logstash >> target-port 5555 protokoll tcp format generischer lesemodus semi-unified

<< ip_address_logstash >> - die Adresse des Servers, auf dem Logstash ausgeführt wird, Ziel-Port 5555 - der Port, an den wir Protokolle senden, das Senden von Protokollen über TCP kann den Server laden, daher ist es in einigen Fällen richtiger, udp festzulegen.

Konfigurieren Sie INPUT in der Logstash-Konfigurationsdatei




Standardmäßig befindet sich die Konfigurationsdatei im Verzeichnis /etc/logstash/conf.d/. Die Konfigurationsdatei besteht aus 3 sinnvollen Teilen: EINGANG, FILTER, AUSGANG. In INPUT geben wir an, woher das System die Protokolle bezieht , in FILTER analysieren wir das Protokoll - konfigurieren Sie, wie die Nachricht in Felder und Werte unterteilt wird, und in OUTPUT konfigurieren wir den Ausgabestream - wohin die analysierten Protokolle gesendet werden.

Konfigurieren Sie zunächst INPUT, und betrachten Sie einige der möglichen Typen: file, tcp und exe.

Tcp:

 input { tcp { port => 5555 host => “10.10.1.205” type => "checkpoint" mode => "server" } } 

mode => "server"
Sagt, dass Logstash Verbindungen akzeptiert.

port => 5555
host => "10.10.1.205"
Wir akzeptieren Verbindungen zur IP-Adresse 10.10.1.205 (Logstash), Port 5555 - der Port muss von der Firewall-Richtlinie zugelassen werden.

Typ => "Checkpoint"
Wir kennzeichnen das Dokument, es ist sehr praktisch, wenn Sie mehrere eingehende Verbindungen haben. Im Folgenden können Sie für jede Verbindung einen eigenen Filter mit der if-Logikkonstruktion schreiben.

Datei:

 input { file { path => "/var/log/openvas_report/*" type => "openvas" start_position => "beginning" } } 

Beschreibung der Einstellungen:
path => "/ var / log / openvas_report / *"
Geben Sie das Verzeichnis an, in das die Dateien gelesen werden müssen.

Typ => "openvas"
Art der Veranstaltung.

start_position => "beginn"
Beim Ändern der Datei wird die gesamte Datei gelesen, und wenn Sie "end" einstellen, wartet das System auf neue Einträge am Ende der Datei.

Exec:

 input { exec { command => "ls -alh" interval => 30 } } 

Bei dieser Eingabe wird (nur!) Ein Shell-Befehl gestartet und seine Ausgabe in eine Protokollnachricht eingeschlossen.

Befehl => "ls -alh"
Das Team, an dessen Leistung wir interessiert sind.

Intervall => 30
Befehlsaufrufintervall in Sekunden.

Um Protokolle von der Firewall zu erhalten, schreiben wir einen TCP- oder UDP- Filter vor, je nachdem, wie die Protokolle an Logstash gesendet werden.

Wir konfigurieren die Ausgabe in der Logstash-Konfigurationsdatei im Debug-Modus, um zu verstehen, wie die Protokollmeldung aussieht


Nachdem wir INPUT konfiguriert haben, müssen wir verstehen, wie die Protokollnachricht aussehen wird und welche Methoden zum Konfigurieren des Filters (Parsers) der Protokolle verwendet werden sollten.

Dazu verwenden wir einen Filter, der das Ergebnis in stdout anzeigt, um die ursprüngliche Nachricht anzuzeigen. Die vollständige Konfigurationsdatei für den aktuellen Moment sieht folgendermaßen aus:

 input { tcp { port => 5555 type => "checkpoint" mode => "server" host => “10.10.1.205” } } output { if [type] == "checkpoint" { stdout { codec=> json } } } 

Führt den Befehl aus, um Folgendes zu überprüfen:
sudo / usr / share / logstash / bin / logstash -f /etc/logstash/conf.d/checkpoint.conf
Wir sehen das Ergebnis, das Bild ist anklickbar:



Wenn Sie es kopieren, sieht es so aus:

 action=\"Accept\" conn_direction=\"Internal\" contextnum=\"1\" ifdir=\"outbound\" ifname=\"bond1.101\" logid=\"0\" loguid=\"{0x5dfb8c13,0x5,0xfe0a0a0a,0xc0000000}\" origin=\"10.10.10.254\" originsicname=\"CN=ts-spb-cpgw-01,O=cp-spb-mgmt-01.tssolution.local.kncafb\" sequencenum=\"8\" time=\"1576766483\" version=\"5\" context_num=\"1\" dst=\"10.10.10.10\" dst_machine_name=\"ts-spb-dc-01@tssolution.local\" layer_name=\"TSS-Standard Security\" layer_name=\"TSS-Standard Application\" layer_uuid=\"dae7f01c-4c98-4c3a-a643-bfbb8fcf40f0\" layer_uuid=\"dbee3718-cf2f-4de0-8681-529cb75be9a6\" match_id=\"8\" match_id=\"33554431\" parent_rule=\"0\" parent_rule=\"0\" rule_action=\"Accept\" rule_action=\"Accept\" rule_name=\"Implicit Cleanup\" rule_uid=\"6dc2396f-9644-4546-8f32-95d98a3344e6\" product=\"VPN-1 & FireWall-1\" proto=\"17\" s_port=\"37317\" service=\"53\" service_id=\"domain-udp\" src=\"10.10.1.180\" ","type":"qqqqq","host":"10.10.10.250","@version":"1","port":50620}{"@timestamp":"2019-12-19T14:50:12.153Z","message":"time=\"1576766483\" action=\"Accept\" conn_direction=\"Internal\" contextnum=\"1\" ifdir=\"outbound\" ifname=\"bond1.101\" logid=\"0\" loguid=\"{0x5dfb8c13, 

Wenn wir uns diese Meldungen ansehen, verstehen wir, dass die Protokolle die Form haben: Feld = Wert oder Schlüssel = Wert, was bedeutet, dass ein Filter mit dem Namen kv geeignet ist. Um für jeden Einzelfall den richtigen Filter auszuwählen, ist es hilfreich, sich in der technischen Dokumentation mit diesen vertraut zu machen oder einen Freund zu fragen.

Filter anpassen


In der letzten Phase wählten sie kv, dann wird die Konfiguration dieses Filters vorgestellt:

 filter { if [type] == "checkpoint"{ kv { value_split => "=" allow_duplicate_values => false } } } 

Wir wählen das Symbol, durch das wir das Feld und den Wert teilen - "=". Wenn wir dieselben Einträge im Protokoll haben, speichern wir nur eine Instanz in der Datenbank. Andernfalls erhalten Sie ein Array mit denselben Werten. Wenn also die Meldung "foo = some foo = some" angezeigt wird, schreiben Sie nur foo = some.

Konfigurieren Sie die richtige Ausgabe in ElasticSearch


Nachdem der Filter konfiguriert wurde, können Sie die Protokolle in die elasticsearch- Datenbank hochladen:

 output { if [type] == "checkpoint" { elasticsearch { hosts => ["10.10.1.200:9200"] index => "checkpoint-%{+YYYY.MM.dd}" user => "tssolution" password => "cool" } } } 

Wenn das Dokument mit dem Checkpoint-Typ signiert ist, speichern Sie das Ereignis in der Datenbank elasticsearch, die standardmäßig Verbindungen zum Port 9200 am 10.10.1.200 akzeptiert. Jedes Dokument wird in einem bestimmten Index gespeichert. In diesem Fall speichern wir im Index „Checkpoint-“ + das aktuelle Zeitdatum. Jeder Index kann über einen bestimmten Satz von Feldern verfügen, oder er wird automatisch erstellt, wenn in der Nachricht ein neues Feld angezeigt wird. Die Feldeinstellungen und deren Typ können in Zuordnungen angezeigt werden.

Wenn Sie die Authentifizierung konfiguriert haben (wir werden später darüber nachdenken), müssen Credits zum Schreiben in einen bestimmten Index angegeben werden. In diesem Beispiel ist dies "tssolution" mit dem Kennwort "cool". Sie können zwischen Benutzerrechten zum Schreiben von Protokollen nur auf einen bestimmten Index und nicht mehr unterscheiden.

Starten Sie Logstash.


Logstash-Konfigurationsdatei:

 input { tcp { port => 5555 type => "checkpoint" mode => "server" host => “10.10.1.205” } } filter { if [type] == "checkpoint"{ kv { value_split => "=" allow_duplicate_values => false } } } output { if [type] == "checkpoint" { elasticsearch { hosts => ["10.10.1.200:9200"] index => "checkpoint-%{+YYYY.MM.dd}" user => "tssolution" password => "cool" } } } 

Überprüfen Sie die Konfigurationsdatei auf korrekte Kompilierung:
/usr/share/logstash/bin/logstash -f checkpoint.conf


Wir starten den Logstash-Prozess:
sudo systemctl starte logstash

Stellen Sie sicher, dass der Prozess gestartet wurde:
sudo systemctl status logstash



Überprüfen Sie, ob die Steckdose gestiegen ist:
netstat -nat | grep 5555



Überprüfen Sie die Protokolle in Kibana.


Nachdem alles gestartet ist, gehen Sie zu Kibana - Discover, stellen Sie sicher, dass alles richtig konfiguriert ist, das Bild ist anklickbar!



Alle Protokolle sind vorhanden und wir können alle Felder und ihre Werte sehen!

Fazit


Wir haben untersucht, wie eine Logstash-Konfigurationsdatei geschrieben wird. Als Ergebnis haben wir einen Parser aller Felder und Werte erhalten. Jetzt können wir mit der Suche und dem Charting für bestimmte Felder arbeiten. Im weiteren Verlauf des Kurses werden wir uns mit der Visualisierung in Kibana befassen und ein einfaches Dashboard erstellen. Es ist erwähnenswert, dass die Logstash-Konfigurationsdatei in bestimmten Situationen ständig hinzugefügt werden muss, z. B. wenn der Feldwert von einer Ziffer in ein Wort ersetzt werden soll. In den folgenden Artikeln werden wir dies ständig tun.

Also bleibt dran ( Telegramm , Facebook , VK , TS Solution Blog ), Yandex.Zen .

Source: https://habr.com/ru/post/de481960/


All Articles