下午好
我想告诉您,为Mikrotik路由器配置网络流量元数据收集服务器非常容易和自然。
目的:目标是将“咀嚼的”防火墙日志存储在数据库中以进行进一步分析。
意思是:任何具有rsyslogd v8和更高版本的新Linux发行版都适合实施,也许建议的语法也可以在v7上运行。 我们还需要一个DBMS,我选择了mariadb。 数据库的增长将取决于所记录规则的数量,因为驱动器的大小由您决定,在我的情况下,将记录30-40条规则,每天大约1200万行。 在使用数据库(包括索引)的月份中,它增长到3.8 GB。
机制:路由器通过UDP将日志发送到远程服务器。 rsyslog服务器使用正则表达式清除不必要的信息字符串,生成SQL插入并将其发送到DBMS。 DBMS在插入之前使用触发器,对rsyslog中无法解析的字段执行额外的清理和分隔。
配置RSYSLOG
编辑文件/etc/rsyslog.conf
在其中添加以下行:
module(load="ommysql") module(load="imudp") input(type="imudp" port="514")
因此,我们加载必要的模块并打开514 UDP端口。
Mikrotik的日志行如下所示:
20180927155341 BLOCKSMKNETS forward: in:ether6 - LocalTORF out:VLAN55 - RT_INET, src-mac 00:15:17:31:b8:d7, proto TCP (SYN), 192.168.0.234:2457->192.168.6.14:65535, len 60
如您所见,要在数据库中存储很多额外的内容,并且很难进行选择。
从理论上讲,我需要添加以下数据:
20180927155341 ether6 VLAN5 192.168.0.234 2457 192.168.6.14 65535 00:15:17:31:b8:d7 TCP SYN forward BLOCKSMKNETS 60
我仅使用一个rsyslog就无法获得这样的一行。 Rsyslog常规使用POSIX ERE / BRE,因此无法应用先行或后备功能。
有一个工具可让您调试常规程序,进行尝试,也许您可以将端口与地址以及接口名称从in:和out:分开。 请记住,缺少某些运动和运动协议。
通常,我的输出如下:
20180927155341 in:ether6 out:VLAN5 192.168.0.234:2457 192.168.6.14:65535 00:15:17:31:b8:d7 TCP (SYN) forward BLOCKSMKNETS 60
有关如何烹饪rsyslog正则文件的文档。
在最终形式中,用于从Mikrotik /etc/rsyslog.d/20-remote.conf接收日志的配置文件如下所示:
$template tpl_traflog,"insert into traflog.traffic (datetime, inif, outif, src, dst, smac, proto, flags, chain, logpref, len) values ('%timereported:::date-mysql%', '%msg:R,ERE,0,DFLT,0:in:[a-zA-Z]+[0-9]+|in:<[a-zA-Z]+-[a-zA-Z]+>--end%', '%msg:R,ERE,0,BLANK,0:out:[a-zA-Z]+[0-9]+|out:<[a-zA-Z]+-[a-zA-Z]+>--end%', '%msg:R,ERE,0,DFLT,0:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end%', '%msg:R,ERE,0,DFLT,1:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end%', '%msg:R,ERE,0,BLANK:([0-f]+:){5}[0-f]+--end%', '%msg:R,ERE,0,BLANK:\b[AX]{3,4}\b--end%', '%msg:R,ERE,0,BLANK:\([AZ]+\)|\(([AZ]+\,){1,3}[AZ]+\)--end%', '%msg:R,ERE,0,DFLT:[ax]+--end%', '%msg:F,32:2%', '%msg:R,ERE,0,DFLT:[0-9]+$--end%' )",SQL if ($fromhost-ip == '192.168.0.230') and ($syslogtag contains "firewall") then {action(type="ommysql" server="localhost" serverport="3306" db="traflog" uid="rsyslogger" pwd="rsyslogger" template="tpl_traflog") stop}
在第一行中,模板(模板)的描述是一行用于将其传输到DBMS的SQL代码。
第二行是执行操作的条件,即DBMS中的记录。
条件如下所示:如果日志源= 192.168.0.230(
if ($fromhost-ip == '192.168.0.230')
)并且msg行包含“防火墙”(并且($ syslogtag包含“防火墙”)),则使用模块带有连接参数的ommysql(
then {action(type="ommysql" server="localhost" serverport="3306" db="traflog" uid="rsyslogger" pwd="..."
)我们将模板称为tpl_traflog(
template="tpl_traflog")
),然后我们停止对该行(
stop}
)的进一步处理。
在您的情况下,可能会出错,这可能是由于接口名称或日志前缀引起的,也可能是其他原因。 为了进行调试,我们将执行以下操作,在第二行中添加注释,添加新模板和两个新条件:
$template tpl_traflog_test,"%timereported:::date-mysql% %msg:R,ERE,0,DFLT,0:in:[a-zA-Z]+[0-9]+|in:<[a-zA-Z]+-[a-zA-Z]+>--end% %msg:R,ERE,0,BLANK,0:out:[a-zA-Z]+[0-9]+|out:<[a-zA-Z]+-[a-zA-Z]+>--end% %msg:R,ERE,0,DFLT,0:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end% %msg:R,ERE,0,DFLT,1:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end% %msg:R,ERE,0,BLANK:([0-f]+:){5}[0-f]+--end% %msg:R,ERE,0,BLANK:\b[AX]{3,4}\b--end% %msg:R,ERE,0,BLANK:\([AZ]+\)|\(([AZ]+\,){1,3}[AZ]+\)--end% %msg:R,ERE,0,DFLT:[ax]+--end% %msg:F,32:2% %msg:R,ERE,0,DFLT:[0-9]+$--end%\n" if ($fromhost-ip == '192.168.0.230') then {action(type="omfile" file="/var/log/remote/192.168.0.230.log" )} if ($fromhost-ip == '192.168.0.230') then {action(type="omfile" file="/var/log/remote/192.168.0.230.log" template="tpl_traflog_test" ) stop}
重新启动记录器。
tpl_traflog_test模板类似于tpl_traflog,但是没有SQL INSERT。
第一个条件将未处理的行%msg%添加到文件/var/log/remote/192.168.0.230.log,因为未指定模板。
第二个条件将已处理的行添加到同一文件。
这样比较起来会更方便。
接下来,准备数据库。
我们准备一个数据库
我们将降低DBMS设置,这是此处的所有标准设置。
我们启动mysql控制台并执行以下代码:
该表已准备就绪,用户已准备就绪。
现在添加一个触发器,它将执行记录器无法将地址与端口分离,清除接口名称,从标志中删除括号的操作:
REGEXP_REPLACE在小数点后(常规季节)搜索第二个参数,并将其替换为第三个参数,在我们这种情况下,引号中没有任何内容,因此,它只会删除找到的内容。
让我们进行测试插入,类似于记录器将执行的操作:
让我们看看发生了什么:
select * from tarffic;
如果一切正确,请继续。 如果不是,请查找错误。
添加至少一个索引。 我不是创建索引的专家,但据我所知,在mysql中,对不同查询使用具有不同联接字段的索引更为正确,因为一个查询只能使用一个索引(或者我错了吗?)。 如果您了解,请自行决定。
我经常必须使用特定的前缀进行请求,因此添加了以下索引:
做完了
现在,您需要开始在路由器上发送,向其添加远程日志服务器设置和操作,向其中一个防火墙规则添加log选项,并添加不超过24个字符的前缀。
在Mikrotik控制台中,它看起来像这样:
/system logging action set 3 remote=192.168.0.94 src-address=192.168.0.230 add name=remote2 remote=192.168.0.19 syslog-facility=local6 target=remote /system logging add action=remote topics=error,account,critical,event,info add action=remote2 topics=firewall /ip firewall filter ... add action=drop chain=input comment="drop ssh brute forcers" dst-port=22,8291 log=yes log-prefix=DROP_SSH_BRUTE protocol=tcp src-address-list=ssh_blacklist ...
其中192.168.0.230是路由器的地址,192.168.0.19是用于防火墙日志的日志服务器的地址,而192.168.0.94是另一个日志服务器,我的Mikrotik系统日志位于此处,我们现在不需要它。 我们的设置是remote2。
接下来,查看文件内容:
tail -f /var/log/remote/192.168.0.230.log
除非您经常触发规则,否则应该将来自路由器的行散布到文件中。
如果缺少某些字段,即不遵循日期时间,inif,outif,src,dst,smac,proto,flags,flag,chain,logpref,len的顺序,则可以尝试在记录器的调试模板中更改参数,将DLFT替换为BLANK。 然后,将显示一些字母,而不是任何字段的空白,我不记得已经有哪个字母。 如果发生这种情况,则常规计划有问题,您需要对其进行修复。
如果一切正常,请关闭测试条件和模板。
您还需要在下面的/etc/rsyslog.d/中运行默认配置,我将其重命名为50-default.conf,这样就不会将远程日志倒入系统日志/ var / log / message中
重新启动记录器。
让我们稍等片刻,直到数据库已满。 然后我们可以开始选择。
一些查询示例:要查看数据库的大小和行数: MariaDB [traflog]> select table_schema as "database", round(sum(data_length + index_length)/1024/1024,2) as "size Mb", TABLE_ROWS as "count rows" from information_schema.tables group by table_schema; +
一个月内增长了将近4GB,但这取决于已记录的防火墙规则的数量和属性
记录的前缀数记录的前缀数量不等于规则数量,某些规则使用一个前缀,但仍然有多少个前缀? 以及为他们制定了多少规则?:
MariaDB [traflog]> select logpref,count(logpref) from traffic group by logpref order by count(logpref) desc; +
ACCEPT_TORF_INET是领导者,通过此前缀,您可以找到从我们的本地网络访问Internet的每个人,记录了协议和端口,时间到了,访问将被关闭。 有参考资料,供以后在错误中使用。
SMTP矛长让我们看看今天谁试图进入smtp服务器:
MariaDB [traflog]> select src,count(dport) from traffic where logpref='SMTP_DNAT' and datetime > '2018101600000000' group by src order by count(dport) desc limit 10; +
显然,节点191.96.249.92是今天的赢家。 让我们看看他仍然出现在哪些记录规则中:
MariaDB [traflog]> select src,dport,count(dport),logpref from traffic where src='191.96.249.92' group by logpref order by count(dport) desc; +
这个只专注于smtp,大约1%的点击是因为尝试猜测密码或尝试发送一些垃圾,其余的则去了澡堂。
该请求花了10分钟,这很多,当前索引不适合它,或者您可以重新格式化该请求,但是现在我们不再谈论它。
将来,计划用标准查询和表单来固定Web界面。
给出了向量,希望本文对您有所帮助。
谢谢大家!
参考文献:Rsyslog文档MySQL文档Mikrotik日志记录文档感谢LOR社区的
提示。UPD.1将标志字段添加到数据库,现在您可以通过捕获SYN,FIN来跟踪连接持续时间。
修复了rsyslog常规中的一些错误以及mysql触发器。
奇怪的是,默认规则defconf:drop invalid会丢弃所有TCP连接的最终数据包,结果,所有试图关闭科学连接的节点均会失败,并发送多个FIN。 是这样吗
我添加了一条规则,允许使用ACK和FIN标志遍历TCP。
在SQL破坏器下,该过程显示最近五分钟的TCP连接时间
connections_list() DROP PROCEDURE IF EXISTS connections_list; DELIMITER // CREATE PROCEDURE connections_list() BEGIN DECLARE logid BIGINT UNSIGNED; DECLARE done INT DEFAULT FALSE; DECLARE datefin DATETIME; DECLARE datesyn DATETIME; DECLARE conntime TIME; DECLARE connsport INT; DECLARE conndport INT; DECLARE connsrc VARCHAR(21); DECLARE conndst VARCHAR(21); DECLARE cur CURSOR FOR SELECT id,datetime,src,sport,dst,dport FROM conn_syn_fin WHERE flags='SYN'; DECLARE CONTINUE HANDLER FOR NOT FOUND SET done=TRUE; DROP TABLE IF EXISTS conn_syn_fin; DROP TABLE IF EXISTS connless; CREATE temporary TABLE connless(datestart DATETIME,dateend DATETIME,duration TIME,src VARCHAR(21),sport INT,dst VARCHAR(21),dport INT); CREATE temporary TABLE conn_syn_fin (SELECT * from traffic WHERE datetime > now() - interval 5 minute and src in (select src from traffic where datetime > now() - interval 5 minute and logpref='TCP_FIN' and flags like '%FIN%') and (flags like '%SYN%' or flags like '%FIN%') order by id); OPEN cur; read_loop: LOOP FETCH cur INTO logid,datesyn,connsrc,connsport,conndst,conndport; IF done THEN LEAVE read_loop; END IF; set datefin=(SELECT datetime FROM conn_syn_fin WHERE id>logid and src=connsrc and sport=connsport and flags like '%FIN%' and dst=conndst and dport=conndport limit 1); set conntime=(SELECT timediff(datefin,datesyn)); INSERT INTO connless (datestart,dateend,duration,src,sport,dst,dport) value (datesyn,datefin,conntime,connsrc,connsport,conndst,conndport); END LOOP; CLOSE cur; select * from connless; END; // DELIMITER ;
作为该过程的结果,将创建两个临时表。
conn_syn_fin表包含带有SYN和FIN标志的日志条目,然后使用此表中的光标进行搜索。
connless表包含一个连接列表,分别为打开和完成,完成有打开持续时间的分别。
请注意采样时间减去当前时间的五分钟。 我的要求很慢。 它缓慢地通过游标搜索,每秒处理大约10条记录,试图以各种方式加快它的速度,但是执行时间始终是相同的。
另请注意,此过程仅用于演示目的。 如果您需要为特定的src / sport / dst / dport进行选择,最好进行与此类似的单独过程。 如果您是sql管理员,则可以更好地编写查询。
调用connections_list(); MariaDB [traflog]> call connections_list(); +
该过程完成后,临时表
conn_syn_fin和
connless会保留,如果您发现可疑或不可靠的内容,可以更详细地查看它们。 启动该过程后,将删除旧表,并显示新表。 如果发现错误,请写信。