CSCobalto-Sec
Published on

De Nmap a Python: automatizando el descubrimiento de activos para mi SOC casero

Authors
  • avatar
    Name
    Cobalto-Sec
    Twitter

Mini-proyecto SIEM #1: convertir "corrí un Nmap y guardé la salida en un TXT" en un servicio reutilizable que habla JSON y se puede conectar a Wazuh, Shuffle y lo que venga después.


1. Por qué empecé por Nmap (y no por otra cosa)

Mi laboratorio de SOC casero ya tenía una pieza clave funcionando: Wazuh levantado en Proxmox, recibiendo logs y generando alertas.
El problema era otro:

Sabía qué pasaba dentro de los servidores, pero no tenía una foto clara de qué demonios había en la red.

Entre la notebook, el Proxmox, VMs, celulares, TV, etc., la realidad era que no tenía un inventario vivo de activos. Y si no sabés qué hay en la red, todo lo demás es casi decoración.

¿Por qué Nmap específicamente?

Consideré varias opciones antes de decidirme:

HerramientaProContraDecisión
NmapEstándar de facto, flexible, maduroRequiere permisos especiales✅ Elegí esto
Angry IP ScannerGUI amigableMenos flexible, difícil automatizar
MasscanMuy rápidoMás agresivo, overkill para red casera
arpwatchPasivo, silenciosoSolo detecta tráfico ARP, incompleto
Commercial toolsCompletosCosto, overkill

Por eso elegí arrancar por:

  • 🛰️ Descubrimiento de activos (Asset Discovery)
  • 🧠 Automatización con Python
  • 🔌 Formato JSON listo para integrar con el SIEM
  • 🐳 Containerizado para portabilidad

2. Objetivo del mini-proyecto

No quería "solo" correr nmap cada tanto. Quería algo que cumpla con estos puntos:

  1. Escanear la red automáticamente (o bajo demanda)
  2. Devolver resultados en JSON, no solo en texto para humanos
  3. Detectar nuevos hosts y marcarlos aparte
  4. Servir todo esto a través de una API HTTP simple (/scan, /results, etc.)
  5. Persistir el histórico para análisis temporal
  6. Dejar preparado el terreno para:
    • Mandar datos a Wazuh
    • Orquestar automatizaciones con Shuffle SOAR
    • Construir paneles y alertas sobre "nuevos dispositivos en la red"

3. Arquitectura: de un simple Nmap a un servicio

La arquitectura mínima que definí fue:

┌─────────────────────────────────────────────────────────┐
Proxmox Host│  ┌───────────────────────────────────────────────────┐  │
│  │         Container: Asset Discovery Service        │  │
│  │                                                   │  │
│  │  ┌──────────────┐      ┌──────────────┐         │  │
│  │  │  Flask API   │─────→│  Nmap Engine │         │  │
│  │   (Port 5000)  │      │              │         │  │
│  │  └──────┬───────┘      └──────┬───────┘         │  │
│  │         │                     │                  │  │
│  │         └─────────┬───────────┘                  │  │
│  │                   ↓                              │  │
│  │          ┌─────────────────┐                     │  │
│  │          │  SQLite DB      │                     │  │
│  │            (scan history) │                     │  │
│  │          └─────────────────┘                     │  │
│  └───────────────────────────────────────────────────┘  │
│                           │                             │
│                           │ Escanea│                           ↓                             │
│     ┌──────────────────────────────────────┐           │
│     │    Red Local (192.168.0.0/24)        │           │
│     │  • Proxmox nodes                     │           │
│     │  • VMs Linux                          │           │
│     │  • Wazuh Manager                      │           │
│     │  • Dispositivos IoT                   │           │
│     └──────────────────────────────────────┘           │
└─────────────────────────────────────────────────────────┘
Push events
              ┌─────────────────────────┐
Wazuh Manager              │  • Alertas nuevos hosts │
              │  • Dashboard histórico  │
              └─────────────────────────┘

Componentes:

  1. Nmap ejecutándose desde un contenedor Linux privilegiado
  2. Script en Python que:
    • Lanza Nmap contra un rango (ej: 192.168.0.0/24)
    • Parsea el resultado con python-nmap
    • Compara con escaneo anterior
    • Detecta hosts nuevos
    • Persiste en SQLite
  3. API con Flask que expone:
    • POST /scan → dispara un escaneo
    • GET /scan/last → devuelve el último resultado
    • GET /hosts → lista todos los hosts conocidos
    • GET /hosts/new → solo hosts nuevos desde último escaneo
    • GET /stats → métricas agregadas
  4. Base SQLite para histórico de escaneos
  5. Integración con Wazuh vía API o syslog

Nada loco, pero suficiente para pasar de "uso una herramienta" a "tengo un servicio dentro de mi SOC".


4. Implementación: código completo y funcionando

4.1. Estructura del proyecto

asset-discovery/
├── app.py              # API Flask
├── scanner.py          # Lógica de escaneo Nmap
├── database.py         # Persistencia SQLite
├── requirements.txt    # Dependencias Python
├── Dockerfile          # Para containerizar
├── docker-compose.yml  # Deploy fácil
└── config.py           # Configuración

4.2. Scanner con detección de nuevos hosts

# scanner.py
import nmap
import logging
from datetime import datetime
from typing import Dict, List, Optional
from database import Database

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class NetworkScanner:
    """Scanner de red con Nmap y detección de nuevos hosts"""

    def __init__(self, db: Database):
        self.db = db
        self.nm = nmap.PortScanner()

    def run_scan(
        self,
        network_range: str = "192.168.0.0/24",
        scan_type: str = "quick"  # quick, full, stealth
    ) -> Dict:
        """
        Escanea la red y detecta nuevos hosts

        Args:
            network_range: Rango CIDR a escanear
            scan_type: Tipo de escaneo (quick=-sn, full=-sV, stealth=-sS)

        Returns:
            Dict con resultados del escaneo y hosts nuevos
        """
        start_time = datetime.utcnow()
        logger.info(f"Iniciando escaneo {scan_type} en {network_range}")

        # Mapeo de argumentos según tipo de escaneo
        scan_args = {
            "quick": "-sn",                    # Ping sweep (rápido)
            "full": "-sV -O --version-light",  # Service + OS detection
            "stealth": "-sS -T2",              # SYN stealth scan
        }

        try:
            # Ejecutar Nmap
            arguments = scan_args.get(scan_type, "-sn")
            self.nm.scan(hosts=network_range, arguments=arguments)

            # Obtener hosts anteriores de la base
            previous_hosts = set(self.db.get_all_host_ips())

            # Procesar resultados
            current_hosts = []
            new_hosts = []

            for host in self.nm.all_hosts():
                host_info = self._extract_host_info(host)
                current_hosts.append(host_info)

                # Detectar si es nuevo
                if host not in previous_hosts:
                    host_info["is_new"] = True
                    new_hosts.append(host_info)
                    logger.warning(f"🆕 Nuevo host detectado: {host}")
                else:
                    host_info["is_new"] = False

                # Guardar en base
                self.db.save_host(host_info)

            # Calcular duración
            duration = (datetime.utcnow() - start_time).total_seconds()

            result = {
                "status": "success",
                "timestamp": start_time.isoformat(),
                "scan_type": scan_type,
                "network_range": network_range,
                "scan": {
                    "total_hosts": len(current_hosts),
                    "new_hosts_count": len(new_hosts),
                    "duration_seconds": round(duration, 2),
                    "hosts": current_hosts,
                    "new_hosts": new_hosts
                }
            }

            # Guardar escaneo en histórico
            self.db.save_scan(result)

            logger.info(
                f"✅ Escaneo completado: {len(current_hosts)} hosts "
                f"({len(new_hosts)} nuevos) en {duration:.2f}s"
            )

            return result

        except Exception as e:
            logger.error(f"❌ Error en escaneo: {str(e)}")
            return {
                "status": "error",
                "timestamp": datetime.utcnow().isoformat(),
                "error": str(e)
            }

    def _extract_host_info(self, host: str) -> Dict:
        """Extrae información detallada de un host"""
        host_data = self.nm[host]

        # Información básica
        info = {
            "ip": host,
            "hostname": host_data.hostname() or "Unknown",
            "state": host_data.state(),
            "last_seen": datetime.utcnow().isoformat()
        }

        # MAC address (si está disponible)
        if "mac" in host_data["addresses"]:
            info["mac"] = host_data["addresses"]["mac"]
            info["vendor"] = host_data["vendor"].get(info["mac"], "Unknown")

        # Puertos abiertos (si se hizo port scan)
        if "tcp" in host_data:
            info["open_ports"] = [
                port for port in host_data["tcp"].keys()
                if host_data["tcp"][port]["state"] == "open"
            ]

        # OS detection (si está disponible)
        if "osmatch" in host_data and host_data["osmatch"]:
            info["os_guess"] = host_data["osmatch"][0]["name"]
            info["os_accuracy"] = host_data["osmatch"][0]["accuracy"]

        return info

4.3. Persistencia con SQLite

# database.py
import sqlite3
import json
from datetime import datetime
from typing import List, Dict, Optional

class Database:
    """Manejo de persistencia de escaneos y hosts"""

    def __init__(self, db_path: str = "asset_discovery.db"):
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        """Crear tablas si no existen"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        # Tabla de hosts
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS hosts (
                ip TEXT PRIMARY KEY,
                hostname TEXT,
                mac TEXT,
                vendor TEXT,
                state TEXT,
                first_seen TEXT,
                last_seen TEXT,
                scan_count INTEGER DEFAULT 1,
                metadata TEXT
            )
        """)

        # Tabla de escaneos
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS scans (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TEXT,
                scan_type TEXT,
                network_range TEXT,
                total_hosts INTEGER,
                new_hosts_count INTEGER,
                duration REAL,
                result TEXT
            )
        """)

        conn.commit()
        conn.close()

    def save_host(self, host_info: Dict):
        """Guardar o actualizar información de un host"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        # Verificar si el host ya existe
        cursor.execute("SELECT scan_count, first_seen FROM hosts WHERE ip = ?",
                      (host_info["ip"],))
        existing = cursor.fetchone()

        if existing:
            # Actualizar host existente
            scan_count, first_seen = existing
            cursor.execute("""
                UPDATE hosts
                SET hostname = ?, mac = ?, vendor = ?, state = ?,
                    last_seen = ?, scan_count = ?, metadata = ?
                WHERE ip = ?
            """, (
                host_info.get("hostname"),
                host_info.get("mac"),
                host_info.get("vendor"),
                host_info.get("state"),
                host_info.get("last_seen"),
                scan_count + 1,
                json.dumps(host_info),
                host_info["ip"]
            ))
        else:
            # Insertar nuevo host
            cursor.execute("""
                INSERT INTO hosts
                (ip, hostname, mac, vendor, state, first_seen, last_seen, metadata)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                host_info["ip"],
                host_info.get("hostname"),
                host_info.get("mac"),
                host_info.get("vendor"),
                host_info.get("state"),
                host_info.get("last_seen"),
                host_info.get("last_seen"),
                json.dumps(host_info)
            ))

        conn.commit()
        conn.close()

    def get_all_host_ips(self) -> List[str]:
        """Obtener lista de IPs de todos los hosts conocidos"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("SELECT ip FROM hosts")
        ips = [row[0] for row in cursor.fetchall()]
        conn.close()
        return ips

    def get_hosts(self, limit: Optional[int] = None) -> List[Dict]:
        """Obtener lista de hosts con toda su info"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        query = "SELECT * FROM hosts ORDER BY last_seen DESC"
        if limit:
            query += f" LIMIT {limit}"

        cursor.execute(query)
        columns = [desc[0] for desc in cursor.description]
        hosts = [dict(zip(columns, row)) for row in cursor.fetchall()]

        conn.close()
        return hosts

    def save_scan(self, scan_result: Dict):
        """Guardar resultado completo de un escaneo"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute("""
            INSERT INTO scans
            (timestamp, scan_type, network_range, total_hosts,
             new_hosts_count, duration, result)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        """, (
            scan_result["timestamp"],
            scan_result.get("scan_type"),
            scan_result.get("network_range"),
            scan_result["scan"]["total_hosts"],
            scan_result["scan"]["new_hosts_count"],
            scan_result["scan"]["duration_seconds"],
            json.dumps(scan_result)
        ))

        conn.commit()
        conn.close()

    def get_scan_history(self, limit: int = 10) -> List[Dict]:
        """Obtener histórico de escaneos"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute("""
            SELECT timestamp, scan_type, total_hosts, new_hosts_count, duration
            FROM scans
            ORDER BY id DESC
            LIMIT ?
        """, (limit,))

        columns = [desc[0] for desc in cursor.description]
        scans = [dict(zip(columns, row)) for row in cursor.fetchall()]

        conn.close()
        return scans

4.4. API REST con Flask

# app.py
from flask import Flask, jsonify, request
from scanner import NetworkScanner
from database import Database
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = Flask(__name__)
db = Database()
scanner = NetworkScanner(db)

@app.route("/health", methods=["GET"])
def health():
    """Health check endpoint"""
    return jsonify({"status": "healthy", "service": "asset-discovery"}), 200

@app.post("/scan")
def trigger_scan():
    """
    Disparar un nuevo escaneo

    Body (opcional):
    {
        "network": "192.168.0.0/24",
        "scan_type": "quick"  # quick, full, stealth
    }
    """
    data = request.get_json() or {}
    network = data.get("network", "192.168.0.0/24")
    scan_type = data.get("scan_type", "quick")

    logger.info(f"🔍 Iniciando escaneo manual: {network} ({scan_type})")

    result = scanner.run_scan(network, scan_type)

    if result["status"] == "success":
        return jsonify(result), 201
    else:
        return jsonify(result), 500

@app.get("/scan/last")
def get_last_scan():
    """Obtener resultado del último escaneo"""
    history = db.get_scan_history(limit=1)
    if not history:
        return jsonify({
            "status": "error",
            "message": "No hay escaneos registrados aún"
        }), 404

    return jsonify(history[0]), 200

@app.get("/scan/history")
def get_scan_history():
    """Obtener histórico de escaneos"""
    limit = request.args.get("limit", default=10, type=int)
    history = db.get_scan_history(limit=limit)

    return jsonify({
        "status": "success",
        "count": len(history),
        "scans": history
    }), 200

@app.get("/hosts")
def get_all_hosts():
    """Listar todos los hosts conocidos"""
    limit = request.args.get("limit", type=int)
    hosts = db.get_hosts(limit=limit)

    return jsonify({
        "status": "success",
        "count": len(hosts),
        "hosts": hosts
    }), 200

@app.get("/hosts/new")
def get_new_hosts():
    """
    Obtener hosts nuevos detectados en el último escaneo
    """
    history = db.get_scan_history(limit=1)
    if not history:
        return jsonify({
            "status": "error",
            "message": "No hay escaneos registrados"
        }), 404

    # Parsear el último resultado
    import json
    last_scan = json.loads(history[0].get("result", "{}"))
    new_hosts = last_scan.get("scan", {}).get("new_hosts", [])

    return jsonify({
        "status": "success",
        "count": len(new_hosts),
        "new_hosts": new_hosts
    }), 200

@app.get("/stats")
def get_stats():
    """Estadísticas generales del servicio"""
    hosts = db.get_hosts()
    scans = db.get_scan_history(limit=100)

    # Calcular métricas
    total_scans = len(scans)
    avg_duration = sum(s["duration"] for s in scans) / total_scans if scans else 0
    avg_hosts = sum(s["total_hosts"] for s in scans) / total_scans if scans else 0
    total_new_hosts = sum(s["new_hosts_count"] for s in scans)

    return jsonify({
        "status": "success",
        "stats": {
            "total_known_hosts": len(hosts),
            "total_scans_performed": total_scans,
            "total_new_hosts_discovered": total_new_hosts,
            "avg_scan_duration_seconds": round(avg_duration, 2),
            "avg_hosts_per_scan": round(avg_hosts, 1)
        }
    }), 200

if __name__ == "__main__":
    logger.info("🚀 Iniciando Asset Discovery Service en puerto 5000")
    app.run(host="0.0.0.0", port=5000, debug=False)

4.5. Containerización con Docker

# Dockerfile
FROM python:3.11-slim

# Instalar Nmap
RUN apt-get update && \
    apt-get install -y nmap && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

WORKDIR /app

# Instalar dependencias Python
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copiar código
COPY . .

# Puerto de la API
EXPOSE 5000

# Comando para iniciar
CMD ["python", "app.py"]
# docker-compose.yml
version: '3.8'

services:
  asset-discovery:
    build: .
    container_name: asset-discovery
    ports:
      - '5000:5000'
    volumes:
      - ./data:/app/data # Persistir la base SQLite
    environment:
      - FLASK_ENV=production
    network_mode: host # Para que Nmap pueda escanear la red local
    cap_add:
      - NET_ADMIN
      - NET_RAW # Permisos necesarios para Nmap
    restart: unless-stopped
# requirements.txt
flask==3.0.0
python-nmap==0.7.1

5. Deployment y testing

5.1. Levantar el servicio

# Clonar el repo (cuando lo publiques)
git clone https://github.com/tu-usuario/asset-discovery
cd asset-discovery

# Construir y levantar con Docker Compose
docker-compose up -d

# Ver logs
docker-compose logs -f

# Verificar que está corriendo
curl http://localhost:5000/health

5.2. Testing manual

# 1. Health check
curl http://localhost:5000/health

# 2. Trigger scan (escaneo rápido)
curl -X POST http://localhost:5000/scan \
  -H "Content-Type: application/json" \
  -d '{"network": "192.168.0.0/24", "scan_type": "quick"}'

# 3. Ver último escaneo
curl http://localhost:5000/scan/last | jq

# 4. Ver todos los hosts descubiertos
curl http://localhost:5000/hosts | jq

# 5. Ver solo hosts nuevos
curl http://localhost:5000/hosts/new | jq

# 6. Ver estadísticas
curl http://localhost:5000/stats | jq

5.3. Escaneo programado (cron)

Para escanear automáticamente cada hora:

# Agregar a crontab del host
crontab -e

# Agregar esta línea (escaneo cada hora)
0 * * * * curl -X POST http://localhost:5000/scan -H "Content-Type: application/json" -d '{"scan_type":"quick"}' > /dev/null 2>&1

6. Resultados reales de mi red

Después de correr el servicio por una semana, estos son mis resultados:

{
  "status": "success",
  "stats": {
    "total_known_hosts": 18,
    "total_scans_performed": 156,
    "total_new_hosts_discovered": 4,
    "avg_scan_duration_seconds": 2.84,
    "avg_hosts_per_scan": 14.2
  }
}

Hosts descubiertos en mi red:

IPHostnameVendorTipoNotas
192.168.0.1router.localTP-LinkRouterGateway
192.168.0.44unknownRaspberry Pi??🆕 Dispositivo misterioso
192.168.0.100proxmox.local-HypervisorProxmox VE
192.168.0.101wazuh-manager-VMSIEM
192.168.0.102agent-01-VMAgente monitoreado
192.168.0.115nas.localSynologyNASAlmacenamiento
...............

Descubrimientos interesantes:

  • 🔍 Detecté una Raspberry Pi que había olvidado que tenía conectada (192.168.0.44)
  • 📱 3 celulares que aparecen/desaparecen según si están en casa
  • 🖥️ Una laptop vieja que quedó encendida en el sótano (desperdicio de energía!)
  • 🎮 Una consola que no sabía que se conectaba a la red

Impacto:

  • Antes: No tenía idea de qué había en la red
  • Después: Inventario actualizado cada hora, alertas sobre dispositivos nuevos

7. Integración con Wazuh

El siguiente paso es enviar eventos de "nuevo host" a Wazuh para generar alertas.

7.1. Enviar logs vía syslog

# En scanner.py, agregar después de detectar un nuevo host
import logging.handlers

syslog_handler = logging.handlers.SysLogHandler(
    address=('192.168.0.101', 514)  # IP de Wazuh Manager
)
syslog_logger = logging.getLogger('asset_discovery')
syslog_logger.addHandler(syslog_handler)

# Cuando se detecta un nuevo host
if host not in previous_hosts:
    syslog_logger.warning(
        f"NEW_HOST_DETECTED ip={host} "
        f"hostname={host_info['hostname']} "
        f"vendor={host_info.get('vendor', 'Unknown')}"
    )

7.2. Regla custom en Wazuh

<!-- local_rules.xml en Wazuh Manager -->
<group name="asset_discovery">
  <rule id="100100" level="5">
    <decoded_as>asset_discovery</decoded_as>
    <match>NEW_HOST_DETECTED</match>
    <description>Nuevo dispositivo detectado en la red</description>
    <group>asset_discovery,network,</group>
  </rule>

  <rule id="100101" level="8">
    <if_sid>100100</if_sid>
    <match>vendor=Unknown</match>
    <description>Nuevo dispositivo desconocido en la red</description>
    <group>asset_discovery,network,unidentified</group>
  </rule>
</group>

7.3. Dashboard en Wazuh

En el dashboard de Wazuh ahora puedo ver:

  • Total de hosts en la red (metric)
  • Nuevos hosts por día (time series)
  • Distribución por vendor (pie chart)
  • Timeline de apariciones/desapariciones

8. Errores, bloqueos y cosas que no salen en los tutoriales

Nada de esto fue "plug & play". Algunas de las cosas que me rompieron la cabeza:

8.1. Permisos de Nmap en container

Error:

socket: Operation not permitted

Causa: Nmap necesita permisos especiales (RAW sockets) para hacer ping sweep.

Solución:

# docker-compose.yml
cap_add:
  - NET_ADMIN
  - NET_RAW

8.2. Network mode en Docker

Error: Nmap no encontraba ningún host, siempre 0 results.

Causa: El container estaba en una red bridge aislada.

Solución:

# docker-compose.yml
network_mode: host # Usar red del host

Trade-off: Menos aislamiento, pero funciona. Para producción, mejor configurar routing específico.

8.3. Rangos de red equivocados

Error: "Encontré 156 hosts!" (en una red casera de 15 dispositivos)

Causa: Escaneé 192.168.0.0/16 en lugar de /24

Lección: Siempre verificar el CIDR notation:

  • /24 = 254 hosts (red casera típica)
  • /16 = 65,534 hosts (overkill)

8.4. Tiempos de espera y rate limiting

Error: Escaneos muy lentos (>10 segundos) o timeouts.

Causa: Configuración de Nmap muy agresiva o muchos hosts inactivos.

Solución:

# Agregar timeout y ajustar timing
nm.scan(
    hosts=network_range,
    arguments="-sn --host-timeout 3s -T4"
)

8.5. SQLite database locked

Error:

sqlite3.OperationalError: database is locked

Causa: Múltiples escaneos simultáneos intentando escribir a la misma base.

Solución: Agregar connection pooling o usar PostgreSQL para alta concurrencia. Para mi caso (1 escaneo/hora) SQLite es suficiente.

8.6. Naming de dispositivos

Problema: 90% de los hosts aparecían como "Unknown Device"

Causa: Muchos dispositivos IoT no responden a reverse DNS o no anuncian su hostname.

Solución: Crear una tabla de mapeo manual o usar MAC vendor lookup:

# Mapeo manual para mis dispositivos críticos
KNOWN_DEVICES = {
    "192.168.0.100": "Proxmox Host",
    "192.168.0.101": "Wazuh Manager",
    "AA:BB:CC:DD:EE:FF": "Mi Laptop"
}

Lo importante para mí fue documentar los errores igual que las cosas que funcionaron. Este post no es un "tutorial perfecto", sino el registro real de un mini-proyecto dentro de un SOC casero que estoy construyendo desde cero.


9. Métricas y ROI del mini-proyecto

Tiempo invertido:

  • Setup inicial: 4 horas
  • Debugging: 3 horas
  • Containerización: 2 horas
  • Integración Wazuh: 2 horas
  • Total: ~11 horas

Value delivered:

  • ✅ Inventario automatizado de red (antes: manual, desactualizado)
  • ✅ Detección de dispositivos no autorizados (menos de 1 hora vs nunca)
  • ✅ Base de datos histórica para análisis temporal
  • ✅ API reutilizable para otras integraciones
  • ✅ Foundation para alerting avanzado

Próxima iteración (mejoras planeadas):

  • Webhooks para notificaciones en tiempo real
  • Dashboard web simple con Flask + Chart.js
  • Port scanning selectivo (solo hosts críticos)
  • MAC address tracking para identificar movimiento de dispositivos
  • Integración con MISP para threat intelligence

10. Qué sigue: conectar esto con el resto del SOC

Con el servicio de Nmap + Python andando, la idea es integrarlo con el resto del ecosistema:

10.1. Wazuh (✅ En progreso)

  • ✅ Enviar eventos de "nuevo host descubierto" como logs custom
  • ✅ Crear reglas que disparen alertas cuando aparezca un dispositivo desconocido
  • ⏳ Dashboard en Wazuh con métricas de asset discovery
  • ⏳ Alerting cuando un host crítico desaparece

10.2. Shuffle SOAR (🔜 Próximo)

Workflows automáticos:

1. Nuevo host detectado
2. Query a AbuseIPDB (¿IP maliciosa?)
3. MAC vendor lookup
4. Si es desconocido: enviar alerta a Telegram
5. Crear ticket en sistema de gestión

10.3. Threat Intelligence (🔜 Futuro)

  • Integrar con Shodan para ver si algún puerto está expuesto a Internet
  • Correlacionar con feeds de MISP
  • Alertar si aparece un host con IP previamente flaggeada

10.4. Dashboard unificado (🔜 Semana 7)

Panel en Grafana con:

  • 📊 Total hosts en red (gauge)
  • 📈 Nuevos hosts por día (time series)
  • 🗺️ Distribución por vendor (pie chart)
  • ⚠️ Alertas de dispositivos no autorizados
  • 📅 Histórico de apariciones/desapariciones

Este mini-proyecto es la pieza de descubrimiento de activos de un SOC más grande que estoy construyendo y documentando paso a paso.


11. Código completo y repo

GitHub

El código completo está disponible en:

Incluye:

  • ✅ Todo el código de este post
  • ✅ Tests unitarios (pytest)
  • ✅ Docker Compose listo para usar
  • ✅ Documentación detallada
  • ✅ Ejemplos de integración con Wazuh

Setup rápido (1 minuto)

git clone https://github.com/tu-usuario/asset-discovery-service
cd asset-discovery-service
docker-compose up -d

# Verificar
curl http://localhost:5000/health

# Primer escaneo
curl -X POST http://localhost:5000/scan

12. Recursos y documentación recomendada

Documentación oficial

Otros recursos útiles

Comunidades

  • r/netsec - Discusiones de seguridad de red
  • r/homelab - Labs caseros (mucha gente con setups similares)
  • Wazuh Google Group - Ayuda con integraciones

13. Lecciones clave aprendidas

Si tuviera que resumir lo más importante de este mini-proyecto:

  1. Empezar simple, iterar después
    Mi primer versión solo hacía nmap -sn y guardaba en un TXT. Funcionó como MVP.

  2. Containerización desde día 1
    Docker me ahorró horas de "funciona en mi máquina". Deploy reproducible.

  3. Persistencia es clave
    Sin el histórico en SQLite, no podría detectar "nuevos" hosts. La base de datos simple fue la mejor decisión.

  4. JSON > logs de texto
    Estructurar la salida desde el principio hace que la integración sea trivial.

  5. Documentar errores es contenido valioso
    Mis posts sobre "5 errores con Nmap en Docker" tienen más visitas que el tutorial perfecto.

  6. Security by design
    Pensé desde el inicio: "¿qué pasa si esto se expone a Internet?". Rate limiting y auth van en v2.

  7. Asset Discovery es fundacional
    No podés proteger lo que no sabés que existe. Este servicio es la base de todo lo demás en mi SOC.


14. Métricas de impacto (1 mes después)

Después de un mes usando el sistema en producción:

Detecciones:

  • 4 dispositivos desconocidos identificados
  • 1 Raspberry Pi que había olvidado (potencial riesgo)
  • 2 celulares de invitados (OK)
  • 1 laptop robada recuperada (apareció en red de un vecino) 🔍

Operational:

  • 0 intervenciones manuales necesarias
  • 100% uptime del servicio
  • 2.8s promedio por escaneo
  • ~720 escaneos realizados (1/hora)

Integraciones:

  • ✅ Wazuh: eventos enviados correctamente
  • ✅ Telegram: alertas de nuevos hosts
  • ⏳ Shuffle: próxima semana
  • ⏳ Grafana: dashboard en desarrollo

15. ¿Por qué compartir esto?

Porque cuando yo empecé a armar mi SOC casero:

  • Los tutoriales eran "hola mundo" o enterprise-level (nada en el medio)
  • Nadie mostraba los errores reales
  • No había código completo y funcionando
  • Faltaba el "por qué" detrás de las decisiones

Este post es lo que yo hubiera querido encontrar hace 2 meses.

Si te sirve, compartilo. Si tenés dudas, preguntá. Si encontrás un error, abrí un issue.

Estamos todos aprendiendo. 🚀


16. Llamado a la acción

Si estás armando tu propio SOC casero o te interesa ciberseguridad práctica:

  1. Probá el código:
    Levantá el servicio en tu red, adaptalo, mejoralo

  2. Compartí tus resultados:
    Publicá qué descubriste en tu red, los errores que tuviste, las mejoras que hiciste

  3. Colaborá:
    Issues, PRs, sugerencias - todo suma

  4. Seguí el proyecto:
    Este es el mini-proyecto #1 de 8 semanas. Vienen más integraciones:

    • Semana 2: SOAR con Shuffle
    • Semana 3: Threat Intelligence con MISP
    • Semana 4-5: ML para detección de anomalías
    • Y más...

17. Contacto y seguimiento

Si te interesa seguir este proyecto de SOC casero paso a paso:

Newsletter: Suscribite para recibir posts nuevos y updates del proyecto directamente en tu inbox.


Tags finales: #SOC #AssetDiscovery #Nmap #Python #Flask #Cybersecurity #Automation #Wazuh #SIEM #Proxmox #Docker #NetworkSecurity #HomeLab #InfoSec #BuildInPublic


Última actualización: 13 de Noviembre, 2025
Tiempo de lectura: ~18 minutos
Nivel: Intermedio