2.弹性堆栈:分析安全日志。 Logstash



在上一篇文章中,我们遇到了ELK堆栈 ,其中包含其组成的软件产品。 工程师使用ELK堆栈时面临的首要任务是发送日志以存储在Elasticsearch中,以进行进一步分析。 但是,这只是字面上的意思,elasticsearch以具有某些字段和值的文档形式存储日志,这意味着工程师必须使用各种工具来解析从终端系统发送的消息。 这可以通过多种方式完成-自己编写程序,这将使用API​​或使用现成的解决方案将文档添加到数据库中。 在本课程中,我们将研究Logstash解决方案,它是ELK堆栈的一部分。 我们将看到如何将日志从终端系统发送到Logstash,然后我们将配置配置文件以解析并重定向到Elasticsearch数据库。 为此,请将Check Point防火墙中的日志作为传入系统。

作为课程的一部分,不考虑安装ELK堆栈,因为有大量关于此主题的文章,我们将考虑配置组件。

让我们制定一个配置Logstash的行动计划:

  1. 验证elasticsearch将接受日志(验证端口运行状况和开放性)。
  2. 我们考虑如何将事件发送到Logstash,选择一种方法并实现它。
  3. 在Logstash配置文件中配置Input。
  4. 我们以调试方式在Logstash配置文件中配置Output,以了解日志消息的外观。
  5. 配置过滤器。
  6. 在ElasticSearch中配置正确的输出。
  7. Logstash启动。
  8. 检查Kibana中的日志。

让我们更详细地考虑每个项目:

检查elasticsearch将接受日志


为此,您可以使用curl命令来验证从部署Logstash的系统对Elasticsearch的访问。 如果已配置身份验证,则还通过curl传递用户名/密码,如果未更改,请指定端口9200。 如果出现以下答案,则说明一切正常。

[elastic@elasticsearch ~]$ curl -u <<user_name>> : <<password>> -sS -XGET "<<ip_address_elasticsearch>>:9200" { "name" : "elastic-1", "cluster_name" : "project", "cluster_uuid" : "sQzjTTuCR8q4ZO6DrEis0A", "version" : { "number" : "7.4.1", "build_flavor" : "default", "build_type" : "rpm", "build_hash" : "fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e", "build_date" : "2019-10-22T17:16:35.176724Z", "build_snapshot" : false, "lucene_version" : "8.2.0", "minimum_wire_compatibility_version" : "6.8.0", "minimum_index_compatibility_version" : "6.0.0-beta1" }, "tagline" : "You Know, for Search" } [elastic@elasticsearch ~]$ 

如果没有找到答案,则可能存在多种类型的错误:Elasticsearch进程不起作用,指定了错误的端口,或者该端口被安装Elasticsearch的服务器上的防火墙阻止。

考虑如何将日志从检查点防火墙发送到Logstash


使用Check Point管理服务器,您可以使用log_exporter实用程序通过syslog将日志发送到Logstash,您可以在本文中更详细地了解它,这里我们仅保留创建流的命令:

cp_log_export添加名称check_point_syslog目标服务器<< ip_address_logstash >>目标端口5555协议tcp格式通用读取模式半统一

<< ip_address_logstash >>-运行Logstash的服务器的地址,目标端口5555-我们将日志发送到的端口,通过tcp发送日志可以加载服​​务器,因此在某些情况下,设置udp更正确。

在Logstash配置文件中配置INPUT




默认情况下,配置文件位于/etc/logstash/conf.d/目录中。 配置文件包含3个有意义的部分:INPUT,FILTER,OUTPUT。 在INPUT中,我们指示系统将从何处获取日志;在FILTER中,我们分析日志-配置如何将消息划分为字段和值;在OUTPUT中,我们配置输出流-将解析的日志发送到哪里。

首先,配置INPUT,考虑一些可能的类型-文件,tcp和exe。

Tcp:

 input { tcp { port => 5555 host => “10.10.1.205” type => "checkpoint" mode => "server" } } 

模式=>“服务器”
说Logstash接受连接。

端口=> 5555
主机=>“ 10.10.1.205”
我们接受IP地址10.10.1.205(Logstash)端口5555的连接-防火墙策略必须允许该端口。

类型=>“检查点”
我们标记了文档,如果您有多个传入连接,这将非常方便。 在下面,对于每个连接,您可以使用if逻辑构造编写自己的过滤器。

档案:

 input { file { path => "/var/log/openvas_report/*" type => "openvas" start_position => "beginning" } } 

设置说明:
路径=>“ / var / log / openvas_report / *”
指定必须在其中读取文件的目录。

类型=>“ openvas”
事件类型。

start_position =>“开始”
更改文件时,它将读取整个文件;如果您设置“ end”,则系统等待新条目出现在文件末尾。

执行:

 input { exec { command => "ls -alh" interval => 30 } } 

在此输入上(仅!),将启动shell命令并将其输出包装在日志消息中。

命令=>“ ls -alh”
我们感兴趣的输出团队。

间隔=> 30
命令调用间隔(以秒为单位)。

为了从防火墙接收日志,我们规定了tcpudp过滤器,具体取决于将日志发送到Logstash的方式。

我们以调试方式在Logstash配置文件中配置Output,以了解日志消息的外观


配置INPUT之后,我们需要了解日志消息的外观,应使用什么方法来配置日志的过滤器(解析器)。

为此,我们将使用过滤器在stdout中显示结果,以查看原始消息,此刻的完整配置文件如下所示:

 input { tcp { port => 5555 type => "checkpoint" mode => "server" host => “10.10.1.205” } } output { if [type] == "checkpoint" { stdout { codec=> json } } } 

运行命令以验证:
sudo / usr / share / logstash / bin // logstash -f /etc/logstash/conf.d/checkpoint.conf
我们看到结果,图片是可点击的:



如果复制它,它将如下所示:

 action=\"Accept\" conn_direction=\"Internal\" contextnum=\"1\" ifdir=\"outbound\" ifname=\"bond1.101\" logid=\"0\" loguid=\"{0x5dfb8c13,0x5,0xfe0a0a0a,0xc0000000}\" origin=\"10.10.10.254\" originsicname=\"CN=ts-spb-cpgw-01,O=cp-spb-mgmt-01.tssolution.local.kncafb\" sequencenum=\"8\" time=\"1576766483\" version=\"5\" context_num=\"1\" dst=\"10.10.10.10\" dst_machine_name=\"ts-spb-dc-01@tssolution.local\" layer_name=\"TSS-Standard Security\" layer_name=\"TSS-Standard Application\" layer_uuid=\"dae7f01c-4c98-4c3a-a643-bfbb8fcf40f0\" layer_uuid=\"dbee3718-cf2f-4de0-8681-529cb75be9a6\" match_id=\"8\" match_id=\"33554431\" parent_rule=\"0\" parent_rule=\"0\" rule_action=\"Accept\" rule_action=\"Accept\" rule_name=\"Implicit Cleanup\" rule_uid=\"6dc2396f-9644-4546-8f32-95d98a3344e6\" product=\"VPN-1 & FireWall-1\" proto=\"17\" s_port=\"37317\" service=\"53\" service_id=\"domain-udp\" src=\"10.10.1.180\" ","type":"qqqqq","host":"10.10.10.250","@version":"1","port":50620}{"@timestamp":"2019-12-19T14:50:12.153Z","message":"time=\"1576766483\" action=\"Accept\" conn_direction=\"Internal\" contextnum=\"1\" ifdir=\"outbound\" ifname=\"bond1.101\" logid=\"0\" loguid=\"{0x5dfb8c13, 

通过查看这些消息,我们了解到日志具有以下形式:field = value或key = value,这意味着称为kv的过滤器是合适的。 为了为每种情况选择合适的过滤器,最好在技术文档中熟悉过滤器,或者询问朋友。

自定义过滤器


在最后阶段,他们选择了kv,然后显示了此过滤器的配置:

 filter { if [type] == "checkpoint"{ kv { value_split => "=" allow_duplicate_values => false } } } 

我们选择将用来划分字段和值-“ =”的符号。 如果日志中有相同的条目,则我们只会在数据库中保存一个实例,否则您将获得一个具有相同值的数组,也就是说,如果我们收到消息“ foo = some foo = some”,则仅写入foo = some。

在ElasticSearch中配置正确的输出


配置过滤器后,可以将日志上传到elasticsearch数据库:

 output { if [type] == "checkpoint" { elasticsearch { hosts => ["10.10.1.200:9200"] index => "checkpoint-%{+YYYY.MM.dd}" user => "tssolution" password => "cool" } } } 

如果文档使用检查点类型签名,则将事件保存到elasticsearch数据库,该数据库默认接受10.10.1.200到端口9200的连接。 每个文档都保存在一个特定的索引中,在这种情况下,我们将保存在索引“ checkpoint-” +当前时间中。 每个索引可以具有一组特定的字段,或者在消息中出现新字段时自动创建该索引,可以在映射中查看字段设置及其类型。

如果您配置了身份验证(我们将在以后讨论),则必须指定用于写入特定索引的信用,在本示例中为“ tssolution”,密码为“ cool”。 您可以区分只将日志写入特定索引而不能再写入更多日志的用户权限。

启动Logstash。


Logstash配置文件:

 input { tcp { port => 5555 type => "checkpoint" mode => "server" host => “10.10.1.205” } } filter { if [type] == "checkpoint"{ kv { value_split => "=" allow_duplicate_values => false } } } output { if [type] == "checkpoint" { elasticsearch { hosts => ["10.10.1.200:9200"] index => "checkpoint-%{+YYYY.MM.dd}" user => "tssolution" password => "cool" } } } 

检查配置文件以进行正确的编译:
/ usr / share / logstash / bin // logstash -f checkpoint.conf


我们开始Logstash流程:
须藤systemctl启动logstash

检查该过程是否已开始:
sudo systemctl状态logstash



检查插座是否升高:
netstat -nat | grep 5555



检查Kibana中的日志。


一切开始之后,转到Kibana-发现,确保一切配置正确,图片可单击!



所有日志都已准备就绪,我们可以看到所有字段及其值!

结论


我们研究了如何编写Logstash配置文件,结果得到了所有字段和值的解析器。 现在,我们可以对某些字段进行搜索和绘制图表。 在本课程的下一步,我们将考虑在Kibana中进行可视化,创建一个简单的仪表板。 值得一提的是,在某些情况下,例如,当我们要将字段值从数字替换为单词时,必须不断添加Logstash配置文件。 在后续文章中,我们将始终这样做。

所以请继续关注( TelegramFacebookVKTS Solution Blog ), Yandex.Zen

Source: https://habr.com/ru/post/zh-CN481960/


All Articles