2. Tumpukan elastis: analisis log keamanan. Logstash



Pada artikel terakhir , kami bertemu dengan tumpukan ELK dari produk perangkat lunak apa itu terdiri. Dan tugas pertama yang dihadapi seorang insinyur ketika bekerja dengan tumpukan ELK adalah mengirim log untuk penyimpanan di elasticsearch untuk analisis lebih lanjut. Namun, ini hanya dengan kata-kata, elasticsearch menyimpan log dalam bentuk dokumen dengan bidang dan nilai-nilai tertentu, yang berarti insinyur harus menggunakan berbagai alat untuk mengurai pesan yang dikirim dari sistem akhir. Ini dapat dilakukan dengan beberapa cara - dengan menulis sendiri program, yang akan menambahkan dokumen ke database menggunakan API atau menggunakan solusi yang sudah jadi. Dalam kursus ini, kita akan melihat solusi Logstash , yang merupakan bagian dari tumpukan ELK. Kami akan melihat bagaimana Anda dapat mengirim log dari sistem akhir ke Logstash, dan kemudian kami akan mengonfigurasi file konfigurasi untuk penguraian dan pengalihan ke basis data Elasticsearch. Untuk melakukan ini, ambil log dari firewall Check Point sebagai sistem masuk.

Sebagai bagian dari kursus, pemasangan tumpukan ELK tidak dipertimbangkan, karena ada sejumlah besar artikel tentang topik ini, kami akan mempertimbangkan komponen konfigurasi.

Mari kita membuat rencana tindakan untuk mengkonfigurasi Logstash:

  1. Verifikasi bahwa elasticsearch akan menerima log (memverifikasi kesehatan dan keterbukaan pelabuhan).
  2. Kami mempertimbangkan bagaimana kami dapat mengirim acara ke Logstash, memilih metode, dan mengimplementasikannya.
  3. Konfigurasikan Input dalam file konfigurasi Logstash.
  4. Kami mengonfigurasi Output dalam file konfigurasi Logstash dalam mode debug untuk memahami seperti apa pesan log itu.
  5. Konfigurasikan Filter.
  6. Konfigurasikan Output yang benar dalam ElasticSearch.
  7. Peluncuran logstash.
  8. Periksa log di Kibana.

Mari kita perhatikan setiap item secara lebih rinci:

Memeriksa bahwa elasticsearch akan menerima log


Untuk melakukan ini, Anda dapat menggunakan perintah curl untuk memverifikasi akses ke Elasticsearch dari sistem tempat Logstash digunakan. Jika Anda telah mengonfigurasi otentikasi, lalu lewati pengguna / kata sandi melalui curl, tentukan port 9200 jika Anda tidak mengubahnya. Jika jawaban seperti yang di bawah ini muncul, maka semuanya beres.

[elastic@elasticsearch ~]$ curl -u <<user_name>> : <<password>> -sS -XGET "<<ip_address_elasticsearch>>:9200" { "name" : "elastic-1", "cluster_name" : "project", "cluster_uuid" : "sQzjTTuCR8q4ZO6DrEis0A", "version" : { "number" : "7.4.1", "build_flavor" : "default", "build_type" : "rpm", "build_hash" : "fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e", "build_date" : "2019-10-22T17:16:35.176724Z", "build_snapshot" : false, "lucene_version" : "8.2.0", "minimum_wire_compatibility_version" : "6.8.0", "minimum_index_compatibility_version" : "6.0.0-beta1" }, "tagline" : "You Know, for Search" } [elastic@elasticsearch ~]$ 

Jika jawabannya tidak datang, maka mungkin ada beberapa jenis kesalahan: proses elasticsearch tidak berfungsi, port yang salah ditentukan, atau port diblokir oleh firewall di server tempat elasticsearch diinstal.

Pertimbangkan bagaimana Anda dapat mengirim log ke Logstash dari firewall titik pemeriksaan


Dengan server manajemen Titik Periksa, Anda dapat mengirim log ke Logstash melalui syslog menggunakan utilitas log_exporter, Anda dapat membiasakan diri dengannya dalam artikel ini secara lebih rinci, di sini kami hanya akan meninggalkan perintah yang menciptakan aliran:

cp_log_export menambahkan nama check_point_syslog target-server <<ip_address_logstash>> target-port 5555 protokol tcp format generik mode baca semi-unified

<<ip_address_logstash>> - alamat server tempat Logstash berjalan, port-target 5555 - port tempat kami akan mengirim log, mengirimkan log melalui tcp dapat memuat server, jadi dalam beberapa kasus lebih tepat untuk mengatur udp.

Konfigurasikan INPUT dalam file konfigurasi Logstash




Secara default, file konfigurasi terletak di direktori /etc/logstash/conf.d/. File konfigurasi terdiri dari 3 bagian yang bermakna: INPUT, FILTER, OUTPUT. Dalam INPUT kami menunjukkan dari mana sistem akan mengambil log, di FILTER kami mengurai log - mengonfigurasi cara membagi pesan ke dalam bidang dan nilai, di OUTPUT kami mengonfigurasi aliran output - tempat log parsing akan dikirim.

Pertama, konfigurasikan INPUT, pertimbangkan beberapa jenis yang mungkin - file, tcp dan exe.

Tcp:

 input { tcp { port => 5555 host => β€œ10.10.1.205” type => "checkpoint" mode => "server" } } 

mode => "server"
Mengatakan bahwa Logstash menerima koneksi.

port => 5555
host => β€œ10.10.1.205”
Kami menerima koneksi ke alamat IP 10.10.1.205 (Logstash), port 5555 - port harus diizinkan oleh kebijakan firewall.

type => "checkpoint"
Kami menandai dokumen itu, sangat nyaman jika Anda memiliki beberapa koneksi masuk. Berikut ini, untuk setiap koneksi, Anda dapat menulis filter Anda sendiri menggunakan konstruksi if logis.

File:

 input { file { path => "/var/log/openvas_report/*" type => "openvas" start_position => "beginning" } } 

Deskripsi pengaturan:
path => "/ var / log / openvas_report / *"
Tentukan direktori di mana file harus dibaca.

type => "openvas"
Jenis acara.

start_position => "awal"
Ketika mengubah file, itu membaca seluruh file, jika Anda mengatur "akhir", sistem menunggu entri baru muncul di akhir file.

Exec:

 input { exec { command => "ls -alh" interval => 30 } } 

Pada input ini, (hanya!) Perintah shell diluncurkan dan hasilnya dibungkus dengan pesan log.

command => "ls -alh"
Tim yang hasilnya menarik bagi kami.

Interval => 30
Interval perintah doa dalam hitungan detik.

Untuk menerima log dari firewall, kami meresepkan filter tcp atau udp , tergantung pada bagaimana log dikirim ke Logstash.

Kami mengkonfigurasi Output dalam file konfigurasi Logstash dalam mode debug untuk memahami seperti apa pesan log itu


Setelah mengonfigurasi INPUT, kita perlu memahami seperti apa tampilan pesan log, metode apa yang harus digunakan untuk mengonfigurasi filter (pengurai) log.

Untuk melakukan ini, kita akan menggunakan filter yang menampilkan hasilnya di stdout untuk melihat pesan asli, file konfigurasi penuh untuk saat ini akan terlihat seperti ini:

 input { tcp { port => 5555 type => "checkpoint" mode => "server" host => β€œ10.10.1.205” } } output { if [type] == "checkpoint" { stdout { codec=> json } } } 

Jalankan perintah untuk memverifikasi:
sudo / usr / share / logstash / bin // logstash -f /etc/logstash/conf.d/checkpoint.conf
Kami melihat hasilnya, gambar dapat diklik:



Jika Anda menyalinnya akan terlihat seperti ini:

 action=\"Accept\" conn_direction=\"Internal\" contextnum=\"1\" ifdir=\"outbound\" ifname=\"bond1.101\" logid=\"0\" loguid=\"{0x5dfb8c13,0x5,0xfe0a0a0a,0xc0000000}\" origin=\"10.10.10.254\" originsicname=\"CN=ts-spb-cpgw-01,O=cp-spb-mgmt-01.tssolution.local.kncafb\" sequencenum=\"8\" time=\"1576766483\" version=\"5\" context_num=\"1\" dst=\"10.10.10.10\" dst_machine_name=\"ts-spb-dc-01@tssolution.local\" layer_name=\"TSS-Standard Security\" layer_name=\"TSS-Standard Application\" layer_uuid=\"dae7f01c-4c98-4c3a-a643-bfbb8fcf40f0\" layer_uuid=\"dbee3718-cf2f-4de0-8681-529cb75be9a6\" match_id=\"8\" match_id=\"33554431\" parent_rule=\"0\" parent_rule=\"0\" rule_action=\"Accept\" rule_action=\"Accept\" rule_name=\"Implicit Cleanup\" rule_uid=\"6dc2396f-9644-4546-8f32-95d98a3344e6\" product=\"VPN-1 & FireWall-1\" proto=\"17\" s_port=\"37317\" service=\"53\" service_id=\"domain-udp\" src=\"10.10.1.180\" ","type":"qqqqq","host":"10.10.10.250","@version":"1","port":50620}{"@timestamp":"2019-12-19T14:50:12.153Z","message":"time=\"1576766483\" action=\"Accept\" conn_direction=\"Internal\" contextnum=\"1\" ifdir=\"outbound\" ifname=\"bond1.101\" logid=\"0\" loguid=\"{0x5dfb8c13, 

Melihat pesan-pesan ini, kami memahami bahwa log memiliki bentuk: bidang = nilai atau kunci = nilai, yang berarti filter yang disebut kv cocok. Untuk memilih filter yang tepat untuk setiap kasus tertentu, akan lebih baik untuk membiasakan diri dengan mereka dalam dokumentasi teknis, atau bertanya kepada teman.

Sesuaikan Filter


Pada tahap terakhir, mereka memilih kv, kemudian konfigurasi filter ini disajikan:

 filter { if [type] == "checkpoint"{ kv { value_split => "=" allow_duplicate_values => false } } } 

Kami memilih simbol yang akan digunakan untuk membagi bidang dan nilainya - β€œ=”. Jika kita memiliki entri yang sama dalam log, kita hanya menyimpan satu instance dalam database, jika tidak, Anda akan mendapatkan array dengan nilai yang sama, yaitu, jika kita memiliki pesan β€œfoo = some foo = some”, tulis saja foo = some.

Konfigurasikan Output yang benar dalam ElasticSearch


Setelah Filter dikonfigurasikan, Anda dapat mengunggah log ke database elasticsearch :

 output { if [type] == "checkpoint" { elasticsearch { hosts => ["10.10.1.200:9200"] index => "checkpoint-%{+YYYY.MM.dd}" user => "tssolution" password => "cool" } } } 

Jika dokumen ditandatangani dengan jenis pos pemeriksaan, simpan acara ke database elasticsearch, yang menerima koneksi pada 10.10.1.200 ke port 9200 secara default. Setiap dokumen disimpan dalam indeks tertentu, dalam hal ini kami menyimpan dalam indeks "pos pemeriksaan-" + tanggal waktu saat ini. Setiap indeks dapat memiliki kumpulan bidang tertentu, atau dibuat secara otomatis ketika bidang baru muncul di pesan, pengaturan bidang dan jenisnya dapat dilihat dalam pemetaan.

Jika Anda telah mengonfigurasi otentikasi (kami akan pertimbangkan nanti), kredit untuk penulisan indeks tertentu harus ditentukan, dalam contoh ini adalah "tssolution" dengan kata sandi "cool". Anda dapat membedakan antara hak pengguna untuk menulis log hanya untuk indeks tertentu dan tidak lebih.

Luncurkan Logstash.


File konfigurasi logstash:

 input { tcp { port => 5555 type => "checkpoint" mode => "server" host => β€œ10.10.1.205” } } filter { if [type] == "checkpoint"{ kv { value_split => "=" allow_duplicate_values => false } } } output { if [type] == "checkpoint" { elasticsearch { hosts => ["10.10.1.200:9200"] index => "checkpoint-%{+YYYY.MM.dd}" user => "tssolution" password => "cool" } } } 

Periksa file konfigurasi untuk kompilasi yang benar:
/ usr / share / logstash / bin // logstash -f checkpoint.conf


Kami memulai proses Logstash:
sudo systemctl mulai logstash

Pastikan proses telah dimulai:
sudo systemctl status logstash



Periksa apakah soket telah naik:
netstat -nat | grep 5555



Periksa log di Kibana.


Setelah semuanya dimulai, buka Kibana - Discover, pastikan semuanya terkonfigurasi dengan benar, gambarnya dapat diklik!



Semua log ada di tempat dan kita bisa melihat semua bidang dan nilainya!

Kesimpulan


Kami memeriksa cara menulis file konfigurasi Logstash, sebagai hasilnya kami mendapatkan parser semua bidang dan nilai. Sekarang kita dapat bekerja dengan pencarian dan pembuatan bagan untuk bidang tertentu. Selanjutnya dalam kursus kita akan mempertimbangkan visualisasi di Kibana, buat dasbor sederhana. Perlu disebutkan bahwa file konfigurasi Logstash harus terus ditambahkan dalam situasi tertentu, misalnya, ketika kita ingin mengganti nilai bidang dari angka ke kata. Dalam artikel selanjutnya, kami akan melakukan ini terus-menerus.

Jadi tunggu saja ( Telegram , Facebook , VK , TS Solution Blog ), Yandex.Zen .

Source: https://habr.com/ru/post/id481960/


All Articles