解析ELK 7.5设置以进行Mikrotik日志分析

长期以来,了解使用ELK和临时日志和统计信息源可以做什么。 我计划在Habr的页面上显示一个实际示例,说明如何使用家用小型服务器使用基于ELK堆栈的日志分析系统制作蜜罐。 在本文中,我将向您介绍使用ELK堆栈分析防火墙日志的最简单示例。 将来,我将描述Zeek分析Netflow流量和pcap转储的环境设置。



如果您具有公共IP地址和或多或少的智能设备作为网关/防火墙,则可以通过为“美味的” TCP和UDP端口设置传入请求来组织被动蜜罐。 有一个在猫下配置Mikrotik路由器的示例,但是如果手头有一个不同的供应商路由器(或其他安全系统),则只需要弄清楚一些数据格式和特定​​于供应商的设置即可得到相同的结果。

免责声明


本文并非伪装成原始文章,也没有解决服务的容错性,安全性,最佳实践等问题。 有必要将此材料视为学术材料,它适合于熟悉ELK堆栈的基本功能和网络设备的日志分析机制。 但是,对于新手来说也可能很有趣。

该项目是从docker-compose文件启动的,并且可以轻松部署类似的环境,即使手头上有其他供应商的路由器,您也只需了解一些数据格式和特定​​于供应商的设置即可。 对于其余的内容,我尝试尽可能多地描述与在当前版本的ELK中配置Logstash管道和Elasticsearch映射相关的所有细微差别。 该系统的所有组件都托管在github上 ,包括服务配置。 在本文的结尾,我将进行“疑难解答”部分,其中将介绍诊断该业务新手的常见问题的步骤。

引言


在服务器本身上,我已经安装了Proxmox虚拟化系统,并在KVM计算机上启动了Docker容器。 假定您知道docker和docker-compose的工作原理,因为有足够的有关Internet使用的配置示例。 我不会涉及安装Docker的问题,我会写一些有关docker-compose的文章。

在研究Elasticsearch,Logstash和Kibana的过程中出现了启动蜜罐的想法。 在我的职业生涯中,我从未参与过管理和一般使用该堆栈的工作,但是我有一些业余项目,因此我对探索Elasticsearch和Kibana搜索引擎提供的可能性产生了浓厚的兴趣,您可以使用它们来分析和可视化数据。

我不是最新的具有8GB RAM的迷你NUC服务器,足以用一个Elastic节点启动ELK堆栈。 在生产环境中,当然不建议这样做,而只适合进行培训。 关于安全性问题,在本文的末尾有一条评论。

互联网上有关于如何为类似任务安装和配置ELK堆栈的说明(例如, 使用Logstash版本2 分析 ssh上的蛮力攻击使用Filebeat版本6分析Suricata日志 ),但是在大多数情况下,没有充分注意细节90%的材料将用于版本1至6(在撰写本文时,ELK的当前版本为7.5.0)。 这很重要,因为从版本6开始,Elasticsearch 决定删除映射类型实体,从而更改查询语法和映射结构。 Elastic中的映射模板通常是一个非常重要的对象,因此以后数据采样和可视化将不会出现问题,我建议您不要参与复制粘贴并尝试了解自己在做什么。 此外,我将尝试清楚地解释所描述的操作和配置的含义。

路由器设定


对于家庭网络,我将Mikrotik用作路由器,因此将以他为例。 但是几乎所有系统都可以配置为将syslog发送到远程服务器,无论是路由器,服务器还是其他可以记录日志的安全系统。

将系统日志消息发送到远程服务器


在Mikrotik中,要配置通过CLI到远程服务器的日志记录,只需输入几个命令:

/system logging action add remote=192.168.88.130 remote-port=5145 src-address=192.168.88.1 name=logstash target=remote /system logging add action=logstash topics=info 

使用日志记录配置防火墙规则


我们只对某些数据(主机名,ip地址,用户名,URL等)感兴趣,从中您可以得到漂亮的可视化或选择。 在最简单的情况下,为了获取有关端口扫描和访问尝试的信息,您需要配置防火墙组件以记录规则触发器。 在Mikrotik上,我是在NAT表中而不是在Filter上设置规则,因为将来我将放置模仿服务工作的角色,这将使我能够研究有关僵尸网络行为的更多信息,但这是更高级的方案,而不是这次。

注意! 在下面的配置中,SSH服务(22)的标准TCP端口循环到本地网络。 如果使用SSH从外部访问路由器,并且设置具有端口22(在CLI中打印 ip服务,在Winbox中显示ip>服务 ),则应重新分配用于管理SSH的端口,或者不要在表中输入最后一条规则。
另外,根据WAN接口的名称(如果未使用WAN网桥),您需要将in-interface参数更改为适当的参数。

 /ip firewall nat add action=netmap chain=dstnat comment="HONEYPOT RDP" dst-port=3389 in-interface=bridge-wan log=yes log-prefix=honeypot_rdp protocol=tcp to-addresses=192.168.88.201 to-ports=3389 add action=netmap chain=dstnat comment="HONEYPOT ELASTIC" dst-port=9200 in-interface=bridge-wan log=yes log-prefix=honeypot_elastic protocol=tcp to-addresses=192.168.88.201 to-ports=9211 add action=netmap chain=dstnat comment=" HONEYPOT TELNET" dst-port=23 in-interface=bridge-wan log=yes log-prefix=honeypot_telnet protocol=tcp to-addresses=192.168.88.201 to-ports=2325 add action=netmap chain=dstnat comment="HONEYPOT DNS" dst-port=53 in-interface=bridge-wan log=yes log-prefix=honeypot_dns protocol=udp to-addresses=192.168.88.201 to-ports=9953 add action=netmap chain=dstnat comment="HONEYPOT FTP" dst-port=21 in-interface=bridge-wan log=yes log-prefix=honeypot_ftp protocol=tcp to-addresses=192.168.88.201 to-ports=9921 add action=netmap chain=dstnat comment="HONEYPOT SMTP" dst-port=25 in-interface=bridge-wan log=yes log-prefix=honeypot_smtp protocol=tcp to-addresses=192.168.88.201 to-ports=9925 add action=netmap chain=dstnat comment="HONEYPOT SMB" dst-port=445 in-interface=bridge-wan log=yes log-prefix=honeypot_smb protocol=tcp to-addresses=192.168.88.201 to-ports=9445 add action=netmap chain=dstnat comment="HONEYPOT MQTT" dst-port=1883 in-interface=bridge-wan log=yes log-prefix=honeypot_mqtt protocol=tcp to-addresses=192.168.88.201 to-ports=9883 add action=netmap chain=dstnat comment="HONEYPOT SIP" dst-port=5060 in-interface=bridge-wan log=yes log-prefix=honeypot_sip protocol=tcp to-addresses=192.168.88.201 to-ports=9060 add action=dst-nat chain=dstnat comment="HONEYPOT SSH" dst-port=22 in-interface=bridge-wan log=yes log-prefix=honeypot_ssh protocol=tcp to-addresses=192.168.88.201 to-ports=9922 



在Winbox中,可以在IP>防火墙> NAT选项卡中进行配置

现在,路由器会将收到的数据包重定向到本地地址192.168.88.201和自定义端口。 现在没有人在这些端口上监听,因此连接将断开。 将来,在docker中,您可以运行honeypot,每种服务都有很多。 如果没有计划,则应在过滤器链中编写带有drop操作的规则,而不是NAT规则。

使用docker-compose启动ELK


接下来,您可以开始配置将处理日志的组件。 我建议您立即练习并克隆存储库以完整地查看配置文件。 可以在此处看到所有描述的配置,在本文中,我将仅复制部分配置。

 ❯❯ git clone https://github.com/mekhanme/elk-mikrot.git 



在测试或开发环境中,使用docker-compose运行docker容器最方便。 在此项目中,我目前使用的是最新版本3.7的docker-compose文件,它需要使用docker engine 18.06.0+版本,因此值得更新docker和docker -compose

 ❯❯ curl -L "https://github.com/docker/compose/releases/download/1.25.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose ❯❯ chmod +x /usr/local/bin/docker-compose 

由于在最新版本的docker-compose中,mem_limit参数被切出并添加 deploy ,该参数仅在群体模式下运行( docker stack deploy ),因此启动带有限制的docker-compose up配置会导致错误。 由于我不使用swarm,并且我希望有资源限制,因此必须使用--compatibility选项启动它,该选项将限制从docker -compose新版本转换为非焊接等效版本。

测试所有容器(在后台-d中):

 ❯❯ docker-compose --compatibility up -d 

您将不得不等待所有图像下载完毕,并且启动完成后,可以使用以下命令检查容器的状态:

 ❯❯ docker-compose --compatibility ps 

由于所有容器都将在同一网络上(如果您未明确指定网络,则创建一个新的网桥,此情况下适用),并且docker-compose.yml包含所有容器的container_name参数,这些容器将已经通过内置DNS进行连接码头工人 结果,不需要在容器配置中注册IP地址。 在Logstash配置中,将192.168.88.0/24子网注册为本地,进一步的配置中将有更详细的说明,根据这些说明,您可以在启动之前先偏向配置示例。

配置ELK服务


此外,还将解释如何配置ELK组件的功能,以及需要在Elasticsearch上执行的其他一些操作。

要通过IP地址确定地理坐标,您需要从MaxMind下载免费的GeoLite2数据库:

 ❯❯ cd elk-mikrot && mkdir logstash/geoip_db ❯❯ curl -O https://geolite.maxmind.com/download/geoip/database/GeoLite2-City-CSV.zip && unzip GeoLite2-City-CSV.zip -d logstash/geoip_db && rm -f GeoLite2-City-CSV.zip ❯❯ curl -O https://geolite.maxmind.com/download/geoip/database/GeoLite2-ASN-CSV.zip && unzip GeoLite2-ASN-CSV.zip -d logstash/geoip_db && rm -f GeoLite2-ASN-CSV.zip 

Logstash设置


主要配置文件是logstash.yml ,我在其中注册了自动重新加载配置的选项,测试环境的其余设置并不重要。 Logstash中数据处理(日志)的配置在单独的conf文件中描述,该文件通常存储在管道目录中。 在该方案中,当使用多个管道时, pipelines.yml文件描述了已激活的管道 。 管道是对非结构化数据的一系列动作,以便在输出端接收具有特定结构的数据。 带有单独配置pipeline.yml的方案是可选的,您可以通过从已安装的pipeline目录下载所有配置来取消配置,但是,使用特定pipelines.yml文件,配置更加灵活,因为您可以从pipeline目录打开和关闭conf文件。必要的配置。 此外,重新加载配置仅适用于多管道方案。

 ❯❯ cat logstash/config/pipelines.yml - pipeline.id: logstash-mikrot path.config: "pipeline/logstash-mikrot.conf" 

接下来是Logstash配置中最重要的部分。 管道描述由几个部分组成-在开始时,借助Logstash接收数据的方式在Input部分中指示插件。 从网络设备收集系统日志的最简单方法是使用tcp / udp输入插件。 这些插件唯一需要的参数是port ,必须将其指定为与路由器设置相同。

第二部分是Filter ,它规定了尚未构造的数据的进一步操作。 在我的示例中,删除了带有某些文本的来自路由器的不必要的syslog消息。 这是使用条件和标准丢弃操作完成的,如果满足条件,该操作将丢弃整个消息。 在这种情况下 ,将检查消息字段中是否存在某些文本。



如果消息没有丢失,它将沿着链进一步移动并进入grok过滤器。 如文档所述, grok是将非结构化日志数据解析为结构化和可查询的一种好方法 。 此过滤器用于处理各种系统(Linux syslog,Web服务器,数据库,网络设备等)的日志。 基于现成的模式,您可以花很多时间或更少的重复序列来创建解析器,而无需花费大量时间。 使用在线解析器进行验证很方便(在最新版本的Kibana中,“ 开发工具”部分中提供了类似的功能)。



“ ./logstash/patterns:/usr/share/logstash/patterns”已在docker-compose.yml中注册patterns目录中有一个带有标准社区模式的文件(只是为了方便起见,看看我是否忘记了),以及一个带有以此类推,可以将几种类型的Mikrotik消息( 防火墙Auth模块)的模式组合起来 ,可以为不同结构的消息添加自己的模板。

标准选项add_fieldremove_field允许您在任何过滤器内部的正在处理的消息中添加或删除字段。 在这种情况下, 主机字段将被删除,其中包含接收消息的主机名。 在我的示例中,只有一台主机,因此该字段没有意义。

此外,在同一“ 过滤器”部分中,我注册了cidr过滤器,该过滤器检查具有IP地址的字段是否符合给定子网中的输入条件,并放置标签。 根据进一步的链中的标签,将执行或不执行操作(如果具体而言,这样做是为了将来不对本地地址进行geoip查找)。

可以有任意数量的Filter节,因此一个节中的条件更少。在新节中,我为不带src_local标记的消息定义了操作,也就是说,在这里处理防火墙事件,我们对源地址感兴趣。

现在,我们需要更多地讨论Logstash从何处获取GeoIP信息。 Logstash支持GeoLite2数据库。 有几个数据库选项,我使用两个数据库:GeoLite2 City(包含有关国家,城市,时区的信息)和GeoLite2 ASN(有关IP地址所属的自治系统的信息)。



geoip插件还涉及将GeoIP信息添加到消息中。 从参数中,您必须指定包含IP地址,使用的基础和将在其中写入信息的新字段的名称的字段。 在我的示例中,对目标IP地址执行相同的操作,但是到目前为止,在这种简单情况下,此信息将不会引起关注,因为目标地址将始终是路由器的地址。 但是,将来,不仅可以从防火墙,而且可以从与查看两个地址有关的其他系统中向该管道添加日志。

mutate过滤器允许您更改消息字段并修改字段本身中的文本;文档详细描述了许多您可以执行的示例。 在这种情况下,它用于添加标签,重命名字段(为了进一步可视化Kibana中的日志,需要某种格式的地理对象,我将进一步讨论该主题)并删除不必要的字段。



这样就结束了数据处理部分,并且只能指示在何处发送结构化消息。 在这种情况下,Elasticsearch将收集数据,您只需输入IP地址,端口和索引名称。 建议您输入带有可变日期字段的索引,以便每天创建一个新索引。



设置Elasticsearch


返回Elasticsearch。 首先,您需要确保服务器已启动并正在运行。 通过CLI中的Rest API与Elastic进行最有效的交互。 使用curl,您可以查看节点的状态(将localhost替换为主机docker ip地址):

 ❯❯ curl localhost:9200 

然后,您可以尝试在打开Kibana 本地主机 :5601。 无需在Kibana Web界面中进行任何配置(除非将主题更改为深色)。 我们有兴趣查看是否已创建索引,为此,请打开“ 管理”部分,然后选择左上方的Elasticsearch索引管理 。 在这里,您可以查看索引了多少文档,占用了多少磁盘空间,还可以从有用的信息中查看有关索引映射的信息。



此时,您需要注册正确的映射模板。 该信息对于Elastic而言是必需的,以便他了解哪些字段属于哪些数据类型。 例如,要基于IP地址进行特殊选择,对于src_ip字段,必须显式指定ip数据类型,并确定地理位置,您需要以特定格式定义geoip.location字段并注册geo_point类型。 不需要描述所有可能的字段,因为对于新字段,数据类型是根据动态模式自动确定的(数字的长号和字符串的关键字 )。

您可以使用curl或直接从Kibana控制台(“ 开发工具”部分)编写新模板。

 ❯❯ curl -X POST -H "Content-Type: application/json" -d @elasticsearch/logstash_mikrot-template.json http://192.168.88.130:9200/_template/logstash-mikrot 

更改映射后,您需要删除索引:

 ❯❯ curl -X DELETE http://192.168.88.130:9200/logstash-mikrot-2019.12.16 

当索引中至少有一条消息到达时,请检查映射:

 ❯❯ curl http://192.168.88.130:9200/logstash-mikrot-2019.12.16/_mapping 

为了在Kibana中进一步使用数据,您需要在Management> Kibana Index Pattern中创建一个模式 。 输入带有符号*的索引名称logstash-mikrot *),以便所有索引都匹配,选择“ 时间戳”字段作为具有日期和时间的字段。 在“ 自定义索引模式ID”字段中,您可以输入模式ID(例如, logstash-mikrot ),以后可以简化对对象的访问。

Kibana中的数据分析和可视化


创建索引模式后 ,就可以继续进行最有趣的部分-数据分析和可视化。 Kibana有很多功能和部分,但是到目前为止,我们仅对其中两个感兴趣。

发掘


您可以在此处查看索引中的文档,过滤,搜索和查看收到的信息。 重要的是不要忘记时间轴,它会在搜索条件中设置时间范围。



可视化


在本部分中,您可以基于收集的数据构建可视化。 最简单的方法是在地理地图上以点阵图或热图的形式显示扫描僵尸网络的来源。 还有很多方法可以构建图形,进行选择等。



将来,我计划更详细地介绍数据处理,可视化以及其他有趣的内容。 在学习过程中,我将尝试补充本教程。

故障排除


如果该索引未出现在Elasticsearch中,则应首先查看Logstash日志:

 ❯❯ docker logs logstash --tail 100 -f 

如果与Elasticsearch没有连接,则Logstash将不起作用,或者主要原因是管道配置中的错误,并且在仔细研究默认情况下写入json docker的日志后,它变得很明显。

如果日志中没有错误,则需要确保Logstash在其配置的套接字上捕获消息。 为了进行调试,可以将stdout用作输出

 stdout { codec => rubydebug } 

此后,当直接将消息接收到日志时,Logstash将写入解包信息。

检查Elasticsearch非常简单-只需使GET请求卷曲在服务器的IP地址和端口上,或在特定的API端点上即可。 例如,查看人类可读表中索引的状态:

 ❯❯ curl -s 'http://192.168.88.130:9200/_cat/indices?v' 



如果没有与Elasticsearch的连接,Kibana也不会启动,很容易从日志中看到。

如果未打开Web界面,则应确保在Linux中正确配置或禁用了防火墙(在Centos中, iptables和docker存在问题,已根据主题的建议解决了问题 )。 还值得考虑的是,在生产率不是很高的设备上,所有组件都可以加载几分钟。 由于内存不足,服务可能根本无法加载。 查看容器资源使用情况:

 ❯❯ docker stats 

如果突然有人不知道如何正确更改docker-compose.yml文件中的容器配置并重新启动容器,则可以通过编辑docker-compose.yml并使用具有相同参数的相同命令来重新启动:

 ❯❯ docker-compose --compatibility up -d 

同时,在更改的部分中,将根据配置删除旧对象(容器,网络,卷),并重新创建新对象。 服务的数据不会同时丢失,因为使用了命名卷(不会随容器一起删除),并且配置是从主机系统挂载的,因此Logstash甚至可以监视配置文件并在更改文件后重新启动管道配置。

您可以使用docker restart命令单独重启服务(不必使用docker-compose.yml放在目录中

 ❯❯ docker restart logstash 

您可以使用docker inspect命令查看docker对象的配置,将其与jq结合使用更方便。



结论


我想指出,没有报告此项目中的安全性,因为它是一个测试(dev)环境,并且不打算在路由器外部发布。 如果将其部署以更认真地使用,则需要遵循最佳实践,为HTTPS安装证书,进行备份,进行常规监视(不会在主系统旁边启动)。 顺便说一句,Traefik在我的服务器上的docker中运行,它是某些服务的反向代理,并且本身也会终止TLS并进行身份验证。 也就是说,由于配置了DNS和反向代理,可以使用未配置的HTTPS和密码从Internet访问Kibana Web界面(据我所知,在社区版本中,Kibana不支持Web界面的密码保护)。 我计划进一步描述我设置Traefik以便在Docker的家庭网络上使用的经验。

Source: https://habr.com/ru/post/zh-CN481596/


All Articles