- Published on
De alertas a acción: respuesta automática a SSH brute force con SOAR (Shuffle + Wazuh)
- Authors

- Name
- Cobalto-Sec
Resumen ejecutivo: En este tutorial construyo una respuesta automática a ataques SSH brute force que va desde la alerta en Wazuh hasta el bloqueo de IP con iptables orquestado por Shuffle SOAR. El objetivo es que, en menos de 15 segundos, el sistema pase de detectar a mitigar sin intervención humana. Comparto arquitectura, código completo, decisiones de diseño, pruebas end‑to‑end y el troubleshooting real que me encontré.
0. Contexto y alcance
Este post asume que ya tenés un laboratorio con Wazuh y Shuffle funcionando. Si te perdiste la primera parte, acá está el post de integración Wazuh-Shuffle donde cubrimos el setup base.
El caso de uso: SSH brute force contra un host Linux. Cuando se detectan múltiples intentos fallidos, Wazuh envía un webhook hacia Shuffle. Shuffle procesa el evento, valida que sea un ataque real y, si corresponde, invoca un API local (Flask) que ejecuta iptables para bloquear la IP atacante.
Stack técnico:
- SIEM: Wazuh 4.9.2 (single-node en Docker)
- SOAR: Shuffle (OpenSearch + Backend + Frontend)
- Automation: Python 3.10 + Flask
- Firewall: iptables + systemd service
- Integration: Webhooks + JSON
Tip: Si es tu primera vez con Shuffle, empezá con un hello webhook simple (un flujo que solo recibe y registra el payload). Validás red, puertos y formato antes de meter lógica compleja.
1. El corazón del sistema: Firewall Blocker API
Antes de que Shuffle pueda bloquear IPs, necesitamos algo que efectivamente las bloquee en el host objetivo.
1.1 ¿Por qué un API?
- Shuffle no puede (ni debe) ejecutar comandos arbitrarios en tus VMs
- Un API nos da un punto de control con inputs validados y capacidad de auditoría
- Podés evolucionar a otros motores (nftables, firewalld) sin cambiar el flujo SOAR
Shuffle → HTTP POST → API → iptables → IP bloqueada
1.2 El código completo (Flask + iptables)
Ubicación: /home/wazuh/security-automation/firewall_blocker.py
from flask import Flask, request, jsonify
import subprocess
import re
import logging
app = Flask(__name__)
# Configurar logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def is_valid_ip(ip):
"""Validar formato IPv4"""
pattern = r'^(\d{1,3}\.){3}\d{1,3}$'
if not re.match(pattern, ip):
return False
octets = ip.split('.')
return all(0 <= int(octet) <= 255 for octet in octets)
def execute_iptables(action, ip):
"""Ejecutar comando iptables de forma segura"""
try:
if action == "block":
cmd = ["sudo", "/usr/sbin/iptables", "-I", "INPUT", "-s", ip, "-j", "DROP"]
elif action == "unblock":
cmd = ["sudo", "/usr/sbin/iptables", "-D", "INPUT", "-s", ip, "-j", "DROP"]
else:
return False, "Invalid action"
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
if result.returncode == 0:
logger.info(f"{action.upper()} successful for IP: {ip}")
return True, f"IP {ip} {action}ed successfully"
else:
logger.error(f"iptables error: {result.stderr}")
return False, result.stderr
except subprocess.TimeoutExpired:
return False, "Command timeout"
except Exception as e:
logger.error(f"Exception: {str(e)}")
return False, str(e)
@app.route('/health', methods=['GET'])
def health():
"""Health check endpoint"""
return jsonify({
"service": "firewall-blocker",
"status": "healthy"
}), 200
@app.route('/block', methods=['POST'])
def block_ip():
"""Bloquear una IP"""
data = request.get_json()
if not data or 'ip' not in data:
return jsonify({
"success": False,
"message": "Missing 'ip' field in request body"
}), 400
ip = data['ip']
if not is_valid_ip(ip):
return jsonify({
"success": False,
"message": f"Invalid IP address format: {ip}"
}), 400
# Evitar bloquear localhost
if ip in ['127.0.0.1', '::1', 'localhost']:
return jsonify({
"success": False,
"message": "Cannot block localhost"
}), 400
success, message = execute_iptables("block", ip)
if success:
return jsonify({
"action": "blocked",
"ip": ip,
"message": message,
"success": True
}), 200
else:
return jsonify({
"action": "blocked",
"ip": ip,
"message": message,
"success": False
}), 500
@app.route('/unblock', methods=['POST'])
def unblock_ip():
"""Desbloquear una IP"""
data = request.get_json()
if not data or 'ip' not in data:
return jsonify({
"success": False,
"message": "Missing 'ip' field in request body"
}), 400
ip = data['ip']
if not is_valid_ip(ip):
return jsonify({
"success": False,
"message": f"Invalid IP address format: {ip}"
}), 400
success, message = execute_iptables("unblock", ip)
if success:
return jsonify({
"action": "unblocked",
"ip": ip,
"message": message,
"success": True
}), 200
else:
return jsonify({
"action": "unblocked",
"ip": ip,
"message": message,
"success": False
}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5555, debug=False)
Características clave:
- ✅ Validación de formato IPv4
- ✅ Protección contra bloqueo de localhost
- ✅ Logging de todas las acciones
- ✅ Endpoints
/health,/block,/unblock - ✅ Timeouts para evitar comandos colgados
1.3 Configuración del host
Sudoers para iptables sin password:
echo "wazuh ALL=(ALL) NOPASSWD: /usr/sbin/iptables" | sudo tee /etc/sudoers.d/wazuh-iptables
sudo chmod 0440 /etc/sudoers.d/wazuh-iptables
Warning: Validá la ruta de
iptablesen tu distro (which iptables). Si cambia, actualizá la entrada ensudoers.
Servicio systemd:
# /etc/systemd/system/firewall_blocker.service
[Unit]
Description=Firewall Blocker API (Flask)
After=network.target
[Service]
User=wazuh
Group=wazuh
WorkingDirectory=/home/wazuh/security-automation
Environment="PYTHONUNBUFFERED=1"
ExecStart=/usr/bin/python3 /home/wazuh/security-automation/firewall_blocker.py
Restart=always
RestartSec=3
[Install]
WantedBy=multi-user.target
Habilitar y arrancar el servicio:
sudo systemctl daemon-reload
sudo systemctl enable firewall_blocker
sudo systemctl start firewall_blocker
sudo systemctl status firewall_blocker
1.4 Testing del API
Caption: Health check, bloqueo y desbloqueo exitosos# Test básico
curl http://localhost:5555/health
# Bloquear IP de prueba
curl -X POST http://localhost:5555/block \
-H "Content-Type: application/json" \
-d '{"ip": "1.2.3.4"}'
# Verificar en iptables
sudo iptables -L INPUT -n | grep 1.2.3.4
# Desbloquear
curl -X POST http://localhost:5555/unblock \
-H "Content-Type: application/json" \
-d '{"ip": "1.2.3.4"}'
Resultado esperado: IP bloqueada y desbloqueada en segundos.
Lesson Learned: Aislá el componente de bloqueo detrás de un API mínimo. Te permite testearlo sin Shuffle ni Wazuh, y descubrir problemas de permisos/paths desde el minuto cero.
2. Construyendo el workflow de respuesta en Shuffle
2.1 Arquitectura simplificada del flujo
Caption: Workflow de 3 nodos - simple y robusto[Webhook Wazuh]
↓
[Process & Block] ← Parsea + verifica + bloquea
↓
[Log Result] ← Registra la acción
¿Por qué tan simple? Menos nodos = menos puntos de fallo, menos estado a depurar y respuesta más rápida.
2.2 El nodo crítico: Process_and_Block
Código completo del nodo Execute Python:
import json
import re
import requests
rule_id = "$exec.rule_id"
text = "$exec.text"
title = "$exec.title"
src_ip = "unknown"
# Buscar IP en formato "from X.X.X.X"
from_match = re.search(r'from\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', text)
if from_match:
src_ip = from_match.group(1)
# Buscar IP en formato "rhost=X.X.X.X"
if src_ip == "unknown":
rhost_match = re.search(r'rhost=(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', text)
if rhost_match:
src_ip = rhost_match.group(1)
# Buscar cualquier IP en el texto
if src_ip == "unknown":
ip_pattern = r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
ip_matches = re.findall(ip_pattern, text)
if ip_matches:
src_ip = ip_matches[0]
# Lista de rule_ids de SSH brute force
ssh_rules = ["100001", "5710", "5712", "2502", "5503", "5551"]
result = {
"rule_id": rule_id,
"src_ip": src_ip,
"action_taken": "none",
"message": ""
}
# IPs a ignorar (localhost, etc.)
skip_ips = ["127.0.0.1", "::1", "localhost"]
if rule_id in ssh_rules and src_ip not in skip_ips and src_ip != "unknown":
try:
resp = requests.post(
"http://192.168.0.121:5555/block",
json={"ip": src_ip},
timeout=5
)
if resp.status_code == 200:
result["action_taken"] = "blocked"
result["message"] = "Blocked " + src_ip
else:
result["action_taken"] = "error"
result["message"] = "HTTP " + str(resp.status_code)
except Exception as e:
result["action_taken"] = "error"
result["message"] = str(e)[:100]
else:
result["action_taken"] = "skipped"
result["message"] = "Rule " + rule_id + " IP " + src_ip
print(json.dumps(result))
Lo que hace:
- Extrae
rule_iddel payload Wazuh - Busca la IP atacante en 3 formatos diferentes (regex flexible)
- Verifica si es un rule_id de SSH brute force
- Si corresponde, llama al API de bloqueo
- Registra el resultado
2.3 La batalla contra los booleanos (troubleshooting real)
El problema: Shuffle inyecta JSON con true/false (lowercase) que Python no interpreta correctamente.
La solución: Acceder a campos individuales en vez de evaluar el JSON completo.
# ❌ NO FUNCIONA
data = $exec # Inserta JSON crudo con true/false
# ✅ FUNCIONA
rule_id = "$exec.rule_id" # Acceso directo al campo
src_ip = "$exec.src_ip" # Shuffle reemplaza antes de ejecutar Python
Lesson Learned: Me costó ~3 horas debuggear esto. Cuando veas
name 'true' is not defined, no es Python rompiéndose - es Shuffle insertando JSON que Python no puede evaluar directamente.
2.4 Testing del workflow
Caption: IP extraída correctamente, acción tomadaResultado típico esperado:
{
"rule_id": "100001",
"src_ip": "192.168.0.121",
"action_taken": "blocked",
"message": "Blocked 192.168.0.121"
}
3. Wazuh → Shuffle: la integración por webhook
3.1 Configuración en ossec.conf
Caption: Integración configurada con level 10 - solo alertas importantes<!-- Integration with Shuffle SOAR -->
<integration>
<name>shuffle</name>
<hook_url>http://192.168.0.123:3001/api/v1/hooks/webhook_17702e08-066f-4c9a-ae3a-d61f1622b9af</hook_url>
<level>10</level>
<alert_format>json</alert_format>
</integration>
Ubicación: Dentro del archivo /var/ossec/etc/ossec.conf en el Wazuh Manager
Clave: <level>10</level> → solo alertas importantes (SSH brute force y eventos críticos).
3.2 El problema del level
En los primeros tests dejé level = 3 y recibí una avalancha de eventos normales: sudo exitoso, sesiones PAM, logins normales. El sistema era inusable.
Solución: Subir a level = 10 filtró todo excepto los eventos realmente críticos.
Caption: Solo alertas SSH brute force (nivel 10) - sistema silencioso hasta que importaNiveles de alerta en Wazuh:
- 0-3: Info/Debug (sesiones normales, sudo exitoso)
- 4-7: Warning (algunos fallos de auth)
- 8-12: Alertas importantes (SSH brute force, múltiples fallos)
- 13-15: Crítico (rootkits, malware)
3.3 Verificación de logs de integración
Para confirmar que Wazuh envía al webhook correcto:
docker exec -it single-node-wazuh.manager-1 bash
tail -f /var/ossec/logs/integrations.log
Caption: Alertas enviadas exitosamente al webhook correcto (17702e08...)Qué buscar:
- Webhook correcto (
...17702e08...) - Sin errores HTTP
- Timestamps recientes
3.4 Troubleshooting: XML comentado incorrectamente
El bug más tonto: Comenté mal la integración durante tests:
<!-- <integration>
...
</integration> --> -->
El doble cierre --> --> rompía el XML y integratord no arrancaba.
Síntoma: El servicio wazuh-integratord aparecía como not running...
Fix:
# Verificar sintaxis
grep -A 7 "Integration with Shuffle" /var/ossec/etc/ossec.conf
# Reiniciar servicios
/var/ossec/bin/wazuh-control restart
/var/ossec/bin/wazuh-control status | grep integratord
Lesson Learned: Cuando nada llega al webhook, validá lo obvio primero (XML, servicios corriendo, red).
4. El momento de la verdad: prueba end‑to‑end
4.1 Setup de observabilidad (3 terminales simultáneas)
Terminal 1 - Wazuh integration logs:
docker exec -it single-node-wazuh.manager-1 bash
tail -f /var/ossec/logs/integrations.log
Terminal 2 - iptables watch:
watch -n 2 'sudo iptables -L INPUT -n -v | grep DROP'
Terminal 3 - Shuffle UI:
Navegador con pestaña Runs abierta
4.2 Ejecutando el ataque de prueba
Caption: 6 intentos SSH fallidos - dispara la regla 100001for i in {1..6}; do
echo "Intento $i"
ssh fake_$(date +%N)@192.168.0.121
sleep 3
done
4.3 La cascada automática (≈15 segundos end-to-end)
- T+0s: falla el intento SSH #6
- T+2s: Wazuh genera alerta
rule_id = 100001(SSH brute force) - T+3s:
integratordenvía webhook a Shuffle - T+5s: Shuffle ejecuta workflow
Process_and_Block - T+8s: API Flask aplica
iptables -I INPUT -s <IP> -j DROP - T+10s: Nodo
Log_Resultregistra "blocked"
sudo iptables -L INPUT -n | grep 192.168.0.121
# OUTPUT: DROP all -- 192.168.0.121 0.0.0.0/0
4.4 Evidencia en Wazuh Dashboard
Caption: Alerta SSH brute force detectada (rule 2502, nivel 10)
Caption: Lista de ejecuciones en Shuffle
Caption: Payload Wazuh con rule_id, timestamp y detalles del ataqueLo que confirmamos:
- ✅
rule_id: "100001" (SSH brute force from ::1) - ✅
src_ipextraída: "192.168.0.121" - ✅
action_taken: "blocked" - ✅ Response time: menos de 15 segundos
4.5 Métricas finales del laboratorio
| Métrica | Valor |
|---|---|
| Response time | ~10–15 segundos |
| False positives | 0 (con level 10) |
| Block rate | 100% |
| Eventos procesados | ~500/día |
| Detection rate | 100% SSH brute force |
Note: Métricas de laboratorio con 1 agente. En producción variarán según volumen y latencia de red.
5. Lecciones aprendidas y próximos pasos
5.1 Errores críticos que encontré
1. Shuffle vs. booleanos Python
- Síntoma:
name 'true' is not defineden el nodo - Causa: Evaluar JSON crudo como objeto Python
- Solución: Acceso directo a campos (
"$exec.rule_id") - Tiempo perdido: ~3 horas 😅
2. XML mal comentado en ossec.conf
- Síntoma:
integratord not running... - Causa:
--> -->doble cierre - Solución: Limpiar comentario y reiniciar Wazuh
3. Level demasiado bajo (3)
- Síntoma: Avalancha de eventos (sudo, PAM sessions)
- Solución:
level = 10para filtrar a eventos críticos
5.2 Lo que funcionó sorprendentemente bien
✅ Arquitectura minimalista (3 nodos): Menos frágil que 6 nodos con condiciones
✅ API Flask: Rápido de desarrollar y debuggear
✅ systemd service: Robusto, se reinicia solo ante fallos
✅ Regex flexible: Maneja múltiples formatos de IP en logs
5.3 Lo que viene: completando el Bloque 2
Este post cubrió: SSH brute force response (el workflow más crítico)
Lo que falta del Bloque 2 (próximo post):
🔔 Notificaciones automáticas
- Slack/Discord/Email cuando se bloquea una IP
- Alertas para eventos críticos
- Dashboard de notificaciones
📁 File Integrity Monitoring (FIM) Response
- Detectar cambios en
/etc/passwd,/etc/shadow - Snapshot automático de archivos críticos
- Alertas elevadas para modificaciones no autorizadas
- Detectar cambios en
🆕 New Host Detection
- Workflow cuando nuevo agente se conecta
- Verificación automática de compliance
- Notificación al equipo SOC
🔍 Enriquecimiento básico con AbuseIPDB
- Verificar reputación de IP antes de bloquear
- Agregar "reputation_score" al resultado
- Evitar falsos positivos
Duración estimada del próximo post: 2-3 horas de implementación + testing
Spoiler: Con estos 4 componentes adicionales tenemos un SOAR completo que cubre los casos de uso más comunes de un SOC pequeño/mediano.
5.4 Mejoras futuras (Bloques 3+)
- Threat Intelligence (Bloque 3): MISP + feeds + APIs externas
- ML Anomaly Detection (Bloque 4): Isolation Forest para patrones no-rule-based
- Asset Discovery (Bloque 5): Nmap automation engine
- Dashboard Grafana (Bloque 7): Métricas SOC en tiempo real
5.5 Cómo reproducir este setup
Requisitos:
- Wazuh SIEM operativo (ver post anterior)
- Shuffle instalado y accesible
- VM Ubuntu 22.04+ con Python 3.10+
- Acceso sudo para iptables
Tiempo estimado: 2–3 horas (con troubleshooting)
Repositorio GitHub: (próximamente - estoy organizando el código)
Conclusión: de reactivo a proactivo
Antes: Las alertas llegaban. Yo las leía. Yo bloqueaba IPs. A mano. Uno por uno.
Ahora: El sistema detecta → verifica → bloquea → registra. Solo. Automáticamente. En menos de 15 segundos.
La diferencia es escalabilidad. Puedo estar durmiendo y el SOC sigue respondiendo a ataques SSH brute force. Puedo escalar a 100 hosts sin cambiar el workflow.
Próximo paso: Completar el Bloque 2 con notificaciones, FIM response y enriquecimiento de contexto. Después, agregar inteligencia (threat intel + ML) para decidir mejor a quién bloquear, no solo más rápido.
Por ahora, tengo un sistema que funciona. Detecta SSH brute force y lo bloquea automáticamente. Y eso... ya es bastante poderoso. 🔥
Apéndice: Comandos de referencia
Verificar integración Wazuh:
docker exec -it single-node-wazuh.manager-1 bash
grep -A 7 "Integration with Shuffle" /var/ossec/etc/ossec.conf
tail -f /var/ossec/logs/integrations.log
Watch de iptables:
watch -n 2 'sudo iptables -L INPUT -n -v | grep DROP'
Ataque controlado:
for i in {1..6}; do
echo "Intento $i"
ssh fake_$(date +%N)@192.168.0.121
sleep 3
done
Verificar bloqueo:
sudo iptables -L INPUT -n | grep 192.168.0.121
Test del API:
# Health check
curl http://localhost:5555/health
# Bloquear
curl -X POST http://localhost:5555/block \
-H "Content-Type: application/json" \
-d '{"ip": "1.2.3.4"}'
# Desbloquear
curl -X POST http://localhost:5555/unblock \
-H "Content-Type: application/json" \
-d '{"ip": "1.2.3.4"}'
Posts relacionados:
- De Cero a SOAR: Integrando Wazuh con Shuffle
- [Próximamente: Completando el Bloque 2 - Notificaciones y FIM Response]