
En el último
artículo, nos reunimos con
la pila ELK de los productos de software en los que consiste. Y la primera tarea a la que se enfrenta un ingeniero cuando trabaja con la pila ELK es enviar registros para su almacenamiento en Elasticsearch para su posterior análisis. Sin embargo, esto es solo en palabras, Elasticsearch almacena los registros en forma de documentos con ciertos campos y valores, lo que significa que el ingeniero debe usar varias herramientas para analizar el mensaje que se envía desde los sistemas finales. Esto se puede hacer de varias maneras: escribiendo el programa usted mismo, que agregará documentos a la base de datos utilizando la API o usar soluciones preparadas. En este curso, veremos la solución
Logstash , que es parte de la pila ELK. Veremos cómo puede enviar registros desde los sistemas finales a Logstash, y luego configuraremos el archivo de configuración para analizar y redirigir a la base de datos de Elasticsearch. Para hacer esto, tome los registros del firewall de Check Point como un sistema entrante.
Como parte del curso, no se considera la instalación de la pila ELK, ya que hay una gran cantidad de artículos sobre este tema, consideraremos el componente de configuración.
Hagamos un plan de acción para configurar Logstash:
- Verificación de que elasticsearch aceptará registros (verificando el estado del puerto y la apertura).
- Consideramos cómo podemos enviar eventos a Logstash, elegir un método e implementarlo.
- Configure Input en el archivo de configuración Logstash.
- Configuramos Salida en el archivo de configuración Logstash en modo de depuración para comprender cómo se ve el mensaje de registro.
- Configurar filtro.
- Configure la salida correcta en ElasticSearch.
- Logstash se lanza.
- Revisa los registros en Kibana.
Consideremos cada elemento con más detalle:
Comprobando que elasticsearch aceptará registros
Para hacer esto, puede usar el comando curl para verificar el acceso a Elasticsearch desde el sistema en el que se implementa Logstash. Si ha configurado la autenticación, también pase el usuario / contraseña a través de curl, especifique el puerto 9200 si no lo cambió. Si aparece una respuesta como la siguiente, entonces todo está en orden.
[elastic@elasticsearch ~]$ curl -u <<user_name>> : <<password>> -sS -XGET "<<ip_address_elasticsearch>>:9200" { "name" : "elastic-1", "cluster_name" : "project", "cluster_uuid" : "sQzjTTuCR8q4ZO6DrEis0A", "version" : { "number" : "7.4.1", "build_flavor" : "default", "build_type" : "rpm", "build_hash" : "fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e", "build_date" : "2019-10-22T17:16:35.176724Z", "build_snapshot" : false, "lucene_version" : "8.2.0", "minimum_wire_compatibility_version" : "6.8.0", "minimum_index_compatibility_version" : "6.0.0-beta1" }, "tagline" : "You Know, for Search" } [elastic@elasticsearch ~]$
Si la respuesta no llega, puede haber varios tipos de errores: el proceso de Elasticsearch no funciona, se especifica el puerto incorrecto o el puerto está bloqueado por un firewall en el servidor donde está instalado Elasticsearch.
Considere cómo puede enviar registros a Logstash desde un firewall de punto de control
Con el servidor de administración de Check Point, puede enviar registros a Logstash a través de syslog utilizando la utilidad log_exporter, puede familiarizarse con él en este
artículo con más detalle, aquí solo dejaremos el comando que crea la secuencia:
cp_log_export add name check_point_syslog target-server <<ip_address_logstash>> target-port 5555 protocolo tcp formato genérico modo de lectura semi-unificado
<<ip_address_logstash>> - la dirección del servidor donde se está ejecutando Logstash, puerto de destino 5555 - el puerto al que enviaremos registros, enviando registros a través de tcp puede cargar el servidor, por lo que en algunos casos es más correcto configurar udp.
Configure INPUT en el archivo de configuración de Logstash

De manera predeterminada, el archivo de configuración se encuentra en el directorio /etc/logstash/conf.d/. El archivo de configuración consta de 3 partes significativas: ENTRADA, FILTRO, SALIDA. En
INPUT indicamos de dónde tomará el sistema los registros, en
FILTER analizamos el registro - configuramos cómo dividir el mensaje en campos y valores, en
OUTPUT configuramos el flujo de salida - donde se enviarán los registros analizados.
Primero, configure INPUT, considere algunos tipos que pueden ser: file, tcp y exe.
Tcp: input { tcp { port => 5555 host => “10.10.1.205” type => "checkpoint" mode => "server" } }
modo => "servidor"
Dice que Logstash acepta conexiones.
puerto => 5555
host => "10.10.1.205"
Aceptamos conexiones a la dirección IP 10.10.1.205 (Logstash), puerto 5555: la política de firewall debe permitir el puerto.
tipo => "punto de control"
Marcamos el documento, es muy conveniente si tiene varias conexiones entrantes. A continuación, para cada conexión, puede escribir su propio filtro utilizando la construcción if logic.
Archivo: input { file { path => "/var/log/openvas_report/*" type => "openvas" start_position => "beginning" } }
Descripción de la configuración:
ruta => "/ var / log / openvas_report / *"
Especifique el directorio en el que se deben leer los archivos.
tipo => "openvas"
Tipo de evento.
start_position => "principio"
Al cambiar el archivo, lee todo el archivo, si establece "fin", el sistema espera a que aparezcan nuevas entradas al final del archivo.
Ejecutivo: input { exec { command => "ls -alh" interval => 30 } }
En esta entrada, (¡solo!) Se inicia un comando de shell y su salida se envuelve en un mensaje de registro.
comando => "ls -alh"
El equipo cuyo resultado nos interesa.
intervalo => 30
Intervalo de invocación de comando en segundos.
Para recibir registros del firewall, prescribimos un filtro
tcp o
udp , dependiendo de cómo se envíen los registros a Logstash.
Configuramos la salida en el archivo de configuración Logstash en modo de depuración para comprender cómo se ve el mensaje de registro
Después de configurar INPUT, debemos entender cómo se verá el mensaje de registro, qué métodos se deben usar para configurar el filtro (analizador) de los registros.
Para hacer esto, utilizaremos un filtro que muestre el resultado en stdout para ver el mensaje original, el archivo de configuración completo para el momento actual se verá así:
input { tcp { port => 5555 type => "checkpoint" mode => "server" host => “10.10.1.205” } } output { if [type] == "checkpoint" { stdout { codec=> json } } }
Ejecuta el comando para verificar:
sudo / usr / share / logstash / bin // logstash -f /etc/logstash/conf.d/checkpoint.conf
Vemos el resultado, se puede hacer clic en la imagen:

Si lo copia, se verá así:
action=\"Accept\" conn_direction=\"Internal\" contextnum=\"1\" ifdir=\"outbound\" ifname=\"bond1.101\" logid=\"0\" loguid=\"{0x5dfb8c13,0x5,0xfe0a0a0a,0xc0000000}\" origin=\"10.10.10.254\" originsicname=\"CN=ts-spb-cpgw-01,O=cp-spb-mgmt-01.tssolution.local.kncafb\" sequencenum=\"8\" time=\"1576766483\" version=\"5\" context_num=\"1\" dst=\"10.10.10.10\" dst_machine_name=\"ts-spb-dc-01@tssolution.local\" layer_name=\"TSS-Standard Security\" layer_name=\"TSS-Standard Application\" layer_uuid=\"dae7f01c-4c98-4c3a-a643-bfbb8fcf40f0\" layer_uuid=\"dbee3718-cf2f-4de0-8681-529cb75be9a6\" match_id=\"8\" match_id=\"33554431\" parent_rule=\"0\" parent_rule=\"0\" rule_action=\"Accept\" rule_action=\"Accept\" rule_name=\"Implicit Cleanup\" rule_uid=\"6dc2396f-9644-4546-8f32-95d98a3344e6\" product=\"VPN-1 & FireWall-1\" proto=\"17\" s_port=\"37317\" service=\"53\" service_id=\"domain-udp\" src=\"10.10.1.180\" ","type":"qqqqq","host":"10.10.10.250","@version":"1","port":50620}{"@timestamp":"2019-12-19T14:50:12.153Z","message":"time=\"1576766483\" action=\"Accept\" conn_direction=\"Internal\" contextnum=\"1\" ifdir=\"outbound\" ifname=\"bond1.101\" logid=\"0\" loguid=\"{0x5dfb8c13,
Al observar estos mensajes, entendemos que los registros tienen la forma: campo = valor o clave = valor, lo que significa que un filtro llamado kv es adecuado. Para elegir el filtro correcto para cada caso específico, sería bueno familiarizarse con ellos en la documentación técnica o preguntarle a un amigo.
Personalizar filtro
En la última etapa, eligieron kv, luego se presenta la configuración de este filtro:
filter { if [type] == "checkpoint"{ kv { value_split => "=" allow_duplicate_values => false } } }
Seleccionamos el símbolo por el cual dividiremos el campo y el valor - "=". Si tenemos las mismas entradas en el registro, guardamos solo una instancia en la base de datos, de lo contrario, obtendrá una matriz de los mismos valores, es decir, si tenemos el mensaje "foo = some foo = some", escriba solo foo = some.
Configure la salida correcta en ElasticSearch
Después de configurar el filtro, puede cargar los registros en la base de datos de
elasticsearch :
output { if [type] == "checkpoint" { elasticsearch { hosts => ["10.10.1.200:9200"] index => "checkpoint-%{+YYYY.MM.dd}" user => "tssolution" password => "cool" } } }
Si el documento está firmado con el tipo de punto de control, guarde el evento en la base de datos de Elasticsearch, que acepta conexiones en 10.10.1.200 al puerto 9200 de forma predeterminada. Cada documento se guarda en un índice específico, en este caso guardamos en el índice "punto de control" + la fecha actual. Cada índice puede tener un conjunto específico de campos, o se crea automáticamente cuando aparece un nuevo campo en el mensaje, la configuración del campo y su tipo se pueden ver en las asignaciones.
Si ha configurado la autenticación (lo consideraremos más adelante), se deben especificar los créditos para escribir en un índice específico; en este ejemplo, es "solución" con la contraseña "cool". Puede distinguir entre los derechos de usuario para escribir registros solo en un índice específico y no más.
Inicie Logstash.
Archivo de configuración de Logstash:
input { tcp { port => 5555 type => "checkpoint" mode => "server" host => “10.10.1.205” } } filter { if [type] == "checkpoint"{ kv { value_split => "=" allow_duplicate_values => false } } } output { if [type] == "checkpoint" { elasticsearch { hosts => ["10.10.1.200:9200"] index => "checkpoint-%{+YYYY.MM.dd}" user => "tssolution" password => "cool" } } }
Verifique el archivo de configuración para la compilación correcta:
/ usr / share / logstash / bin // logstash -f checkpoint.conf

Comenzamos el proceso Logstash:
sudo systemctl iniciar logstash
Comprueba que el proceso ha comenzado:
sudo systemctl status logstash

Compruebe si el zócalo ha subido:
netstat -nat | grep 5555

Revisa los registros en Kibana.
Después de que todo haya comenzado, vaya a Kibana - Descubrir, asegúrese de que todo esté configurado correctamente, ¡se puede hacer clic en la imagen!

¡Todos los registros están en su lugar y podemos ver todos los campos y sus valores!
Conclusión
Examinamos cómo escribir un archivo de configuración de Logstash, como resultado obtuvimos un analizador de todos los campos y valores. Ahora podemos trabajar con la búsqueda y los gráficos de ciertos campos. Además, en el curso consideraremos la visualización en Kibana, crearemos un tablero simple. Vale la pena mencionar que el archivo de configuración de Logstash debe agregarse constantemente en ciertas situaciones, por ejemplo, cuando queremos reemplazar el valor del campo de un dígito a una palabra. En artículos posteriores, haremos esto todo el tiempo.
Así que estad atentos (
Telegram ,
Facebook ,
VK ,
TS Solution Blog ),
Yandex.Zen .