CSCobalto-Sec
Published on

De alertas a acción: respuesta automática a SSH brute force con SOAR (Shuffle + Wazuh)

Authors
  • avatar
    Name
    Cobalto-Sec
    Twitter

Resumen ejecutivo: En este tutorial construyo una respuesta automática a ataques SSH brute force que va desde la alerta en Wazuh hasta el bloqueo de IP con iptables orquestado por Shuffle SOAR. El objetivo es que, en menos de 15 segundos, el sistema pase de detectar a mitigar sin intervención humana. Comparto arquitectura, código completo, decisiones de diseño, pruebas end‑to‑end y el troubleshooting real que me encontré.

0. Contexto y alcance

Este post asume que ya tenés un laboratorio con Wazuh y Shuffle funcionando. Si te perdiste la primera parte, acá está el post de integración Wazuh-Shuffle donde cubrimos el setup base.

El caso de uso: SSH brute force contra un host Linux. Cuando se detectan múltiples intentos fallidos, Wazuh envía un webhook hacia Shuffle. Shuffle procesa el evento, valida que sea un ataque real y, si corresponde, invoca un API local (Flask) que ejecuta iptables para bloquear la IP atacante.

Stack técnico:

  • SIEM: Wazuh 4.9.2 (single-node en Docker)
  • SOAR: Shuffle (OpenSearch + Backend + Frontend)
  • Automation: Python 3.10 + Flask
  • Firewall: iptables + systemd service
  • Integration: Webhooks + JSON

Tip: Si es tu primera vez con Shuffle, empezá con un hello webhook simple (un flujo que solo recibe y registra el payload). Validás red, puertos y formato antes de meter lógica compleja.


1. El corazón del sistema: Firewall Blocker API

Antes de que Shuffle pueda bloquear IPs, necesitamos algo que efectivamente las bloquee en el host objetivo.

1.1 ¿Por qué un API?

  • Shuffle no puede (ni debe) ejecutar comandos arbitrarios en tus VMs
  • Un API nos da un punto de control con inputs validados y capacidad de auditoría
  • Podés evolucionar a otros motores (nftables, firewalld) sin cambiar el flujo SOAR
ShuffleHTTP POSTAPI → iptables → IP bloqueada

1.2 El código completo (Flask + iptables)

Ubicación: /home/wazuh/security-automation/firewall_blocker.py

from flask import Flask, request, jsonify
import subprocess
import re
import logging

app = Flask(__name__)

# Configurar logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def is_valid_ip(ip):
    """Validar formato IPv4"""
    pattern = r'^(\d{1,3}\.){3}\d{1,3}$'
    if not re.match(pattern, ip):
        return False
    octets = ip.split('.')
    return all(0 <= int(octet) <= 255 for octet in octets)

def execute_iptables(action, ip):
    """Ejecutar comando iptables de forma segura"""
    try:
        if action == "block":
            cmd = ["sudo", "/usr/sbin/iptables", "-I", "INPUT", "-s", ip, "-j", "DROP"]
        elif action == "unblock":
            cmd = ["sudo", "/usr/sbin/iptables", "-D", "INPUT", "-s", ip, "-j", "DROP"]
        else:
            return False, "Invalid action"

        result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)

        if result.returncode == 0:
            logger.info(f"{action.upper()} successful for IP: {ip}")
            return True, f"IP {ip} {action}ed successfully"
        else:
            logger.error(f"iptables error: {result.stderr}")
            return False, result.stderr

    except subprocess.TimeoutExpired:
        return False, "Command timeout"
    except Exception as e:
        logger.error(f"Exception: {str(e)}")
        return False, str(e)

@app.route('/health', methods=['GET'])
def health():
    """Health check endpoint"""
    return jsonify({
        "service": "firewall-blocker",
        "status": "healthy"
    }), 200

@app.route('/block', methods=['POST'])
def block_ip():
    """Bloquear una IP"""
    data = request.get_json()

    if not data or 'ip' not in data:
        return jsonify({
            "success": False,
            "message": "Missing 'ip' field in request body"
        }), 400

    ip = data['ip']

    if not is_valid_ip(ip):
        return jsonify({
            "success": False,
            "message": f"Invalid IP address format: {ip}"
        }), 400

    # Evitar bloquear localhost
    if ip in ['127.0.0.1', '::1', 'localhost']:
        return jsonify({
            "success": False,
            "message": "Cannot block localhost"
        }), 400

    success, message = execute_iptables("block", ip)

    if success:
        return jsonify({
            "action": "blocked",
            "ip": ip,
            "message": message,
            "success": True
        }), 200
    else:
        return jsonify({
            "action": "blocked",
            "ip": ip,
            "message": message,
            "success": False
        }), 500

@app.route('/unblock', methods=['POST'])
def unblock_ip():
    """Desbloquear una IP"""
    data = request.get_json()

    if not data or 'ip' not in data:
        return jsonify({
            "success": False,
            "message": "Missing 'ip' field in request body"
        }), 400

    ip = data['ip']

    if not is_valid_ip(ip):
        return jsonify({
            "success": False,
            "message": f"Invalid IP address format: {ip}"
        }), 400

    success, message = execute_iptables("unblock", ip)

    if success:
        return jsonify({
            "action": "unblocked",
            "ip": ip,
            "message": message,
            "success": True
        }), 200
    else:
        return jsonify({
            "action": "unblocked",
            "ip": ip,
            "message": message,
            "success": False
        }), 500

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5555, debug=False)

Características clave:

  • ✅ Validación de formato IPv4
  • ✅ Protección contra bloqueo de localhost
  • ✅ Logging de todas las acciones
  • ✅ Endpoints /health, /block, /unblock
  • ✅ Timeouts para evitar comandos colgados

1.3 Configuración del host

Sudoers para iptables sin password:

echo "wazuh ALL=(ALL) NOPASSWD: /usr/sbin/iptables" | sudo tee /etc/sudoers.d/wazuh-iptables
sudo chmod 0440 /etc/sudoers.d/wazuh-iptables

Warning: Validá la ruta de iptables en tu distro (which iptables). Si cambia, actualizá la entrada en sudoers.

Servicio systemd:

# /etc/systemd/system/firewall_blocker.service
[Unit]
Description=Firewall Blocker API (Flask)
After=network.target

[Service]
User=wazuh
Group=wazuh
WorkingDirectory=/home/wazuh/security-automation
Environment="PYTHONUNBUFFERED=1"
ExecStart=/usr/bin/python3 /home/wazuh/security-automation/firewall_blocker.py
Restart=always
RestartSec=3

[Install]
WantedBy=multi-user.target

Habilitar y arrancar el servicio:

sudo systemctl daemon-reload
sudo systemctl enable firewall_blocker
sudo systemctl start firewall_blocker
sudo systemctl status firewall_blocker

1.4 Testing del API

Tests del Firewall Blocker API Caption: Health check, bloqueo y desbloqueo exitosos
# Test básico
curl http://localhost:5555/health

# Bloquear IP de prueba
curl -X POST http://localhost:5555/block \
  -H "Content-Type: application/json" \
  -d '{"ip": "1.2.3.4"}'

# Verificar en iptables
sudo iptables -L INPUT -n | grep 1.2.3.4

# Desbloquear
curl -X POST http://localhost:5555/unblock \
  -H "Content-Type: application/json" \
  -d '{"ip": "1.2.3.4"}'

Resultado esperado: IP bloqueada y desbloqueada en segundos.

Lesson Learned: Aislá el componente de bloqueo detrás de un API mínimo. Te permite testearlo sin Shuffle ni Wazuh, y descubrir problemas de permisos/paths desde el minuto cero.


2. Construyendo el workflow de respuesta en Shuffle

2.1 Arquitectura simplificada del flujo

Workflow final simplificado Caption: Workflow de 3 nodos - simple y robusto
[Webhook Wazuh]
[Process & Block]Parsea + verifica + bloquea
[Log Result]Registra la acción

¿Por qué tan simple? Menos nodos = menos puntos de fallo, menos estado a depurar y respuesta más rápida.

2.2 El nodo crítico: Process_and_Block

Código completo del nodo Execute Python:

import json
import re
import requests

rule_id = "$exec.rule_id"
text = "$exec.text"
title = "$exec.title"

src_ip = "unknown"

# Buscar IP en formato "from X.X.X.X"
from_match = re.search(r'from\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', text)
if from_match:
    src_ip = from_match.group(1)

# Buscar IP en formato "rhost=X.X.X.X"
if src_ip == "unknown":
    rhost_match = re.search(r'rhost=(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', text)
    if rhost_match:
        src_ip = rhost_match.group(1)

# Buscar cualquier IP en el texto
if src_ip == "unknown":
    ip_pattern = r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
    ip_matches = re.findall(ip_pattern, text)
    if ip_matches:
        src_ip = ip_matches[0]

# Lista de rule_ids de SSH brute force
ssh_rules = ["100001", "5710", "5712", "2502", "5503", "5551"]

result = {
    "rule_id": rule_id,
    "src_ip": src_ip,
    "action_taken": "none",
    "message": ""
}

# IPs a ignorar (localhost, etc.)
skip_ips = ["127.0.0.1", "::1", "localhost"]

if rule_id in ssh_rules and src_ip not in skip_ips and src_ip != "unknown":
    try:
        resp = requests.post(
            "http://192.168.0.121:5555/block",
            json={"ip": src_ip},
            timeout=5
        )

        if resp.status_code == 200:
            result["action_taken"] = "blocked"
            result["message"] = "Blocked " + src_ip
        else:
            result["action_taken"] = "error"
            result["message"] = "HTTP " + str(resp.status_code)

    except Exception as e:
        result["action_taken"] = "error"
        result["message"] = str(e)[:100]
else:
    result["action_taken"] = "skipped"
    result["message"] = "Rule " + rule_id + " IP " + src_ip

print(json.dumps(result))

Lo que hace:

  1. Extrae rule_id del payload Wazuh
  2. Busca la IP atacante en 3 formatos diferentes (regex flexible)
  3. Verifica si es un rule_id de SSH brute force
  4. Si corresponde, llama al API de bloqueo
  5. Registra el resultado

2.3 La batalla contra los booleanos (troubleshooting real)

El problema: Shuffle inyecta JSON con true/false (lowercase) que Python no interpreta correctamente.

La solución: Acceder a campos individuales en vez de evaluar el JSON completo.

# ❌ NO FUNCIONA
data = $exec  # Inserta JSON crudo con true/false

# ✅ FUNCIONA
rule_id = "$exec.rule_id"   # Acceso directo al campo
src_ip  = "$exec.src_ip"    # Shuffle reemplaza antes de ejecutar Python

Lesson Learned: Me costó ~3 horas debuggear esto. Cuando veas name 'true' is not defined, no es Python rompiéndose - es Shuffle insertando JSON que Python no puede evaluar directamente.

2.4 Testing del workflow

Ejecución exitosa en Shuffle Caption: IP extraída correctamente, acción tomada

Resultado típico esperado:

{
  "rule_id": "100001",
  "src_ip": "192.168.0.121",
  "action_taken": "blocked",
  "message": "Blocked 192.168.0.121"
}

3. Wazuh → Shuffle: la integración por webhook

3.1 Configuración en ossec.conf

Configuración integración Wazuh-Shuffle Caption: Integración configurada con level 10 - solo alertas importantes
<!-- Integration with Shuffle SOAR -->
<integration>
  <name>shuffle</name>
  <hook_url>http://192.168.0.123:3001/api/v1/hooks/webhook_17702e08-066f-4c9a-ae3a-d61f1622b9af</hook_url>
  <level>10</level>
  <alert_format>json</alert_format>
</integration>

Ubicación: Dentro del archivo /var/ossec/etc/ossec.conf en el Wazuh Manager

Clave: <level>10</level> → solo alertas importantes (SSH brute force y eventos críticos).

3.2 El problema del level

En los primeros tests dejé level = 3 y recibí una avalancha de eventos normales: sudo exitoso, sesiones PAM, logins normales. El sistema era inusable.

Solución: Subir a level = 10 filtró todo excepto los eventos realmente críticos.

Alertas SSH nivel 10 Caption: Solo alertas SSH brute force (nivel 10) - sistema silencioso hasta que importa

Niveles de alerta en Wazuh:

  • 0-3: Info/Debug (sesiones normales, sudo exitoso)
  • 4-7: Warning (algunos fallos de auth)
  • 8-12: Alertas importantes (SSH brute force, múltiples fallos)
  • 13-15: Crítico (rootkits, malware)

3.3 Verificación de logs de integración

Para confirmar que Wazuh envía al webhook correcto:

docker exec -it single-node-wazuh.manager-1 bash
tail -f /var/ossec/logs/integrations.log
Logs de integración Wazuh Caption: Alertas enviadas exitosamente al webhook correcto (17702e08...)

Qué buscar:

  • Webhook correcto (...17702e08...)
  • Sin errores HTTP
  • Timestamps recientes

3.4 Troubleshooting: XML comentado incorrectamente

El bug más tonto: Comenté mal la integración durante tests:

<!-- <integration>
  ...
</integration> --> -->

El doble cierre --> --> rompía el XML y integratord no arrancaba.

Síntoma: El servicio wazuh-integratord aparecía como not running...

Fix:

# Verificar sintaxis
grep -A 7 "Integration with Shuffle" /var/ossec/etc/ossec.conf

# Reiniciar servicios
/var/ossec/bin/wazuh-control restart
/var/ossec/bin/wazuh-control status | grep integratord

Lesson Learned: Cuando nada llega al webhook, validá lo obvio primero (XML, servicios corriendo, red).


4. El momento de la verdad: prueba end‑to‑end

4.1 Setup de observabilidad (3 terminales simultáneas)

Terminal 1 - Wazuh integration logs:

docker exec -it single-node-wazuh.manager-1 bash
tail -f /var/ossec/logs/integrations.log

Terminal 2 - iptables watch:

watch -n 2 'sudo iptables -L INPUT -n -v | grep DROP'

Terminal 3 - Shuffle UI:
Navegador con pestaña Runs abierta

4.2 Ejecutando el ataque de prueba

Test SSH brute force Caption: 6 intentos SSH fallidos - dispara la regla 100001
for i in {1..6}; do
  echo "Intento $i"
  ssh fake_$(date +%N)@192.168.0.121
  sleep 3
done

4.3 La cascada automática (≈15 segundos end-to-end)

  1. T+0s: falla el intento SSH #6
  2. T+2s: Wazuh genera alerta rule_id = 100001 (SSH brute force)
  3. T+3s: integratord envía webhook a Shuffle
  4. T+5s: Shuffle ejecuta workflow Process_and_Block
  5. T+8s: API Flask aplica iptables -I INPUT -s <IP> -j DROP
  6. T+10s: Nodo Log_Result registra "blocked"
IP bloqueada en iptables Caption: IP 192.168.0.121 bloqueada automáticamente
sudo iptables -L INPUT -n | grep 192.168.0.121
# OUTPUT: DROP  all  --  192.168.0.121  0.0.0.0/0

4.4 Evidencia en Wazuh Dashboard

Alerta SSH en Dashboard Caption: Alerta SSH brute force detectada (rule 2502, nivel 10)
Ejecución en Shuffle Caption: Lista de ejecuciones en Shuffle
Payload completo Caption: Payload Wazuh con rule_id, timestamp y detalles del ataque

Lo que confirmamos:

  • rule_id: "100001" (SSH brute force from ::1)
  • src_ip extraída: "192.168.0.121"
  • action_taken: "blocked"
  • ✅ Response time: menos de 15 segundos

4.5 Métricas finales del laboratorio

MétricaValor
Response time~10–15 segundos
False positives0 (con level 10)
Block rate100%
Eventos procesados~500/día
Detection rate100% SSH brute force

Note: Métricas de laboratorio con 1 agente. En producción variarán según volumen y latencia de red.


5. Lecciones aprendidas y próximos pasos

5.1 Errores críticos que encontré

1. Shuffle vs. booleanos Python

  • Síntoma: name 'true' is not defined en el nodo
  • Causa: Evaluar JSON crudo como objeto Python
  • Solución: Acceso directo a campos ("$exec.rule_id")
  • Tiempo perdido: ~3 horas 😅

2. XML mal comentado en ossec.conf

  • Síntoma: integratord not running...
  • Causa: --> --> doble cierre
  • Solución: Limpiar comentario y reiniciar Wazuh

3. Level demasiado bajo (3)

  • Síntoma: Avalancha de eventos (sudo, PAM sessions)
  • Solución: level = 10 para filtrar a eventos críticos

5.2 Lo que funcionó sorprendentemente bien

Arquitectura minimalista (3 nodos): Menos frágil que 6 nodos con condiciones
API Flask: Rápido de desarrollar y debuggear
systemd service: Robusto, se reinicia solo ante fallos
Regex flexible: Maneja múltiples formatos de IP en logs

5.3 Lo que viene: completando el Bloque 2

Este post cubrió: SSH brute force response (el workflow más crítico)

Lo que falta del Bloque 2 (próximo post):

  1. 🔔 Notificaciones automáticas

    • Slack/Discord/Email cuando se bloquea una IP
    • Alertas para eventos críticos
    • Dashboard de notificaciones
  2. 📁 File Integrity Monitoring (FIM) Response

    • Detectar cambios en /etc/passwd, /etc/shadow
    • Snapshot automático de archivos críticos
    • Alertas elevadas para modificaciones no autorizadas
  3. 🆕 New Host Detection

    • Workflow cuando nuevo agente se conecta
    • Verificación automática de compliance
    • Notificación al equipo SOC
  4. 🔍 Enriquecimiento básico con AbuseIPDB

    • Verificar reputación de IP antes de bloquear
    • Agregar "reputation_score" al resultado
    • Evitar falsos positivos

Duración estimada del próximo post: 2-3 horas de implementación + testing

Spoiler: Con estos 4 componentes adicionales tenemos un SOAR completo que cubre los casos de uso más comunes de un SOC pequeño/mediano.

5.4 Mejoras futuras (Bloques 3+)

  1. Threat Intelligence (Bloque 3): MISP + feeds + APIs externas
  2. ML Anomaly Detection (Bloque 4): Isolation Forest para patrones no-rule-based
  3. Asset Discovery (Bloque 5): Nmap automation engine
  4. Dashboard Grafana (Bloque 7): Métricas SOC en tiempo real

5.5 Cómo reproducir este setup

Requisitos:

  • Wazuh SIEM operativo (ver post anterior)
  • Shuffle instalado y accesible
  • VM Ubuntu 22.04+ con Python 3.10+
  • Acceso sudo para iptables

Tiempo estimado: 2–3 horas (con troubleshooting)

Repositorio GitHub: (próximamente - estoy organizando el código)


Conclusión: de reactivo a proactivo

Antes: Las alertas llegaban. Yo las leía. Yo bloqueaba IPs. A mano. Uno por uno.

Ahora: El sistema detecta → verifica → bloquea → registra. Solo. Automáticamente. En menos de 15 segundos.

La diferencia es escalabilidad. Puedo estar durmiendo y el SOC sigue respondiendo a ataques SSH brute force. Puedo escalar a 100 hosts sin cambiar el workflow.

Próximo paso: Completar el Bloque 2 con notificaciones, FIM response y enriquecimiento de contexto. Después, agregar inteligencia (threat intel + ML) para decidir mejor a quién bloquear, no solo más rápido.

Por ahora, tengo un sistema que funciona. Detecta SSH brute force y lo bloquea automáticamente. Y eso... ya es bastante poderoso. 🔥


Apéndice: Comandos de referencia

Verificar integración Wazuh:

docker exec -it single-node-wazuh.manager-1 bash
grep -A 7 "Integration with Shuffle" /var/ossec/etc/ossec.conf
tail -f /var/ossec/logs/integrations.log

Watch de iptables:

watch -n 2 'sudo iptables -L INPUT -n -v | grep DROP'

Ataque controlado:

for i in {1..6}; do
  echo "Intento $i"
  ssh fake_$(date +%N)@192.168.0.121
  sleep 3
done

Verificar bloqueo:

sudo iptables -L INPUT -n | grep 192.168.0.121

Test del API:

# Health check
curl http://localhost:5555/health

# Bloquear
curl -X POST http://localhost:5555/block \
  -H "Content-Type: application/json" \
  -d '{"ip": "1.2.3.4"}'

# Desbloquear
curl -X POST http://localhost:5555/unblock \
  -H "Content-Type: application/json" \
  -d '{"ip": "1.2.3.4"}'

Posts relacionados: