AvanzadoMLOpspipelineMLflowconcept driftproducción

MLOps para Detección de Malware: Pipeline de Producción

Pipeline MLOps completo para detección de malware en producción. Training pipeline, model versioning con MLflow, monitoreo de concept drift, retraining automatizado y CI/CD para modelos de ML en ciberseguridad.

MalwareIntel Research··19 min lectura
Serie: AI/ML para Malware — Parte 18

Por qué MLOps no es opcional en malware detection

Un modelo de detección de malware no es un producto terminado. Es un componente vivo que se degrada cada día que pasa sin actualización. El malware evoluciona a una velocidad que hace que cualquier modelo estático sea obsoleto en semanas.

El problema concreto: entrenas un modelo con datos de Q1 2026. Alcanza 99.2% de AUC en test. Lo despliegas en producción. En Q2 2026, nuevas variantes de ransomware usan técnicas de empaquetado que tu modelo nunca vio. Los falsos negativos aumentan de 0.8% a 3.5%. Nadie lo detecta porque no hay monitoring. Un incidente real ocurre con malware que tu modelo clasificó como benigno con 87% de confianza.

Este escenario no es hipotético. Es el resultado inevitable de tratar un modelo de ML como software tradicional que se despliega una vez y se olvida. MLOps transforma la detección de malware de un artefacto estático a un sistema adaptativo con retroalimentación continua.

Arquitectura del pipeline MLOps

El pipeline completo de MLOps para detección de malware tiene cinco etapas que forman un ciclo continuo:

┌─────────────┐    ┌──────────────┐    ┌──────────────┐
│  Data       │───→│  Training    │───→│  Evaluation  │
│  Pipeline   │    │  Pipeline    │    │  & Registry  │
└─────────────┘    └──────────────┘    └──────┬───────┘
       ↑                                       │
       │                                       ▼
┌──────┴──────┐    ┌──────────────┐    ┌──────────────┐
│  Feedback   │←───│  Monitoring  │←───│  Serving     │
│  Loop       │    │  & Drift     │    │  Pipeline    │
└─────────────┘    └──────────────┘    └──────────────┘

Cada etapa tiene sus herramientas, metricas y puntos de decision.

Data pipeline: ingesta y curación de datos

El primer componente del pipeline gestiona la recolección continua de muestras etiquetadas, su procesamiento y almacenamiento.

Fuentes de datos para retraining

from dataclasses import dataclass
from datetime import datetime
from enum import Enum

class DataSourceType(Enum):
    MALWARE_BAZAAR = "malwarebazaar"
    VIRUSSHARE = "virusshare"
    SOREL_UPDATE = "sorel_update"
    INTERNAL_SANDBOX = "internal_sandbox"
    ANALYST_FEEDBACK = "analyst_feedback"  # Ground truth del SOC

@dataclass
class TrainingDataSource:
    source_type: DataSourceType
    url: str
    refresh_interval_hours: int
    label_strategy: str  # "av_consensus" | "sandbox" | "analyst"
    min_confidence: float

TRAINING_SOURCES = [
    TrainingDataSource(
        source_type=DataSourceType.MALWARE_BAZAAR,
        url="https://bazaar.abuse.ch/export/csv/full/",
        refresh_interval_hours=24,
        label_strategy="av_consensus",
        min_confidence=0.8
    ),
    TrainingDataSource(
        source_type=DataSourceType.ANALYST_FEEDBACK,
        url="internal://soc/feedback",
        refresh_interval_hours=1,
        label_strategy="analyst",  # Ground truth mas fiable
        min_confidence=1.0
    ),
]

Feature store

Un feature store centraliza las features extraídas y evita recalcularlas en cada retraining. Para malware, las features PE son deterministas (el mismo binario siempre produce las mismas features), lo que hace el caching especialmente eficiente.

import hashlib
import pandas as pd
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

class MalwareFeatureStore:
    """Feature store para features de binarios PE."""

    def __init__(self, db_url: str, cache_dir: Path):
        self.engine = create_engine(db_url)
        self.cache_dir = cache_dir
        self.cache_dir.mkdir(parents=True, exist_ok=True)

    def get_or_compute(self, sample_hash: str,
                       compute_fn: callable) -> dict:
        """Recupera features del cache o las calcula."""
        # Buscar en cache
        cached = self._get_cached(sample_hash)
        if cached is not None:
            return cached

        # Calcular features
        features = compute_fn(sample_hash)

        # Almacenar en cache
        self._store(sample_hash, features)
        return features

    def get_training_dataset(self, start_date: str,
                             end_date: str,
                             min_confidence: float = 0.8) -> pd.DataFrame:
        """Genera dataset de entrenamiento desde el feature store."""
        query = """
        SELECT f.*, l.label, l.confidence, l.source
        FROM pe_features f
        JOIN sample_labels l ON f.sample_hash = l.sample_hash
        WHERE l.label_date BETWEEN :start AND :end
          AND l.confidence >= :min_conf
        ORDER BY l.label_date
        """
        with Session(self.engine) as session:
            df = pd.read_sql(
                query, session.bind,
                params={
                    "start": start_date,
                    "end": end_date,
                    "min_conf": min_confidence
                }
            )
        return df

    def get_dataset_stats(self) -> dict:
        """Estadisticas del feature store."""
        with Session(self.engine) as session:
            total = session.execute(
                "SELECT COUNT(*) FROM pe_features"
            ).scalar()
            malicious = session.execute(
                "SELECT COUNT(*) FROM sample_labels WHERE label = 1"
            ).scalar()
            benign = session.execute(
                "SELECT COUNT(*) FROM sample_labels WHERE label = 0"
            ).scalar()
        return {
            "total_samples": total,
            "malicious": malicious,
            "benign": benign,
            "ratio": malicious / max(benign, 1)
        }

    def _get_cached(self, sample_hash: str) -> dict | None:
        cache_path = self.cache_dir / f"{sample_hash}.parquet"
        if cache_path.exists():
            return pd.read_parquet(cache_path).iloc[0].to_dict()
        return None

    def _store(self, sample_hash: str, features: dict):
        df = pd.DataFrame([features])
        cache_path = self.cache_dir / f"{sample_hash}.parquet"
        df.to_parquet(cache_path)

Versionado de datasets

Cada dataset de entrenamiento debe ser versionado y reproducible. DVC (Data Version Control) es la herramienta estándar, pero para malware una aproximación basada en metadatos es mas practica porque los binarios no se versionan (solo sus features).

from pydantic import BaseModel
from datetime import datetime

class DatasetVersion(BaseModel):
    version: str  # "v2026.06.01"
    created_at: datetime
    sample_count: int
    malicious_count: int
    benign_count: int
    date_range_start: str
    date_range_end: str
    sources: list[str]
    feature_schema_version: str  # "pe_features_v3"
    label_strategy: str
    sha256_manifest: str  # Hash del archivo de features

    @property
    def balance_ratio(self) -> float:
        return self.malicious_count / max(self.benign_count, 1)

Training pipeline con MLflow

MLflow gestiona el ciclo de vida completo del modelo: experimentación, tracking de métricas, versionado de modelos y despliegue.

Configuración del training pipeline

import mlflow
import mlflow.xgboost
import xgboost as xgb
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import (
    roc_auc_score, precision_score, recall_score,
    f1_score, confusion_matrix
)
import numpy as np
import json

# Configurar MLflow tracking
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("malware-detection-xgb")

class MalwareTrainingPipeline:
    """Pipeline de entrenamiento con tracking MLflow."""

    def __init__(self, feature_store: MalwareFeatureStore):
        self.feature_store = feature_store
        self.model = None

    def train(self, dataset_version: str,
              hyperparams: dict | None = None) -> str:
        """Entrena un modelo y registra en MLflow."""

        default_params = {
            "n_estimators": 500,
            "max_depth": 8,
            "learning_rate": 0.05,
            "subsample": 0.8,
            "colsample_bytree": 0.8,
            "min_child_weight": 5,
            "scale_pos_weight": 1.0,  # Ajustar si desbalanceado
            "eval_metric": "logloss",
            "random_state": 42,
            "n_jobs": -1,
        }

        params = {**default_params, **(hyperparams or {})}

        # Cargar dataset
        df = self.feature_store.get_training_dataset(
            start_date=dataset_version.split("_")[0],
            end_date=dataset_version.split("_")[1]
        )

        feature_cols = [c for c in df.columns
                        if c not in ["sample_hash", "label",
                                     "confidence", "source"]]
        X = df[feature_cols]
        y = df["label"]

        with mlflow.start_run(run_name=f"train_{dataset_version}"):
            # Loguear parametros
            mlflow.log_params(params)
            mlflow.log_param("dataset_version", dataset_version)
            mlflow.log_param("n_samples", len(df))
            mlflow.log_param("n_features", len(feature_cols))
            mlflow.log_param("class_balance",
                             f"{y.mean():.3f} malicious")

            # Cross-validation para metricas robustas
            cv_metrics = self._cross_validate(X, y, params)

            # Loguear metricas CV
            for metric, values in cv_metrics.items():
                mlflow.log_metric(f"cv_{metric}_mean", np.mean(values))
                mlflow.log_metric(f"cv_{metric}_std", np.std(values))

            # Entrenar modelo final con todos los datos
            self.model = xgb.XGBClassifier(**params)
            self.model.fit(X, y)

            # Loguear modelo
            mlflow.xgboost.log_model(
                self.model,
                "model",
                registered_model_name="malware-detector"
            )

            # Loguear feature importance
            importance = dict(zip(
                feature_cols,
                self.model.feature_importances_
            ))
            mlflow.log_dict(importance, "feature_importance.json")

            # Loguear schema de features
            mlflow.log_dict(
                {"features": feature_cols},
                "feature_schema.json"
            )

            run_id = mlflow.active_run().info.run_id
            mlflow.log_param("run_id", run_id)

        return run_id

    def _cross_validate(self, X, y, params, n_folds=5):
        """Cross-validation estratificada con metricas de malware."""
        skf = StratifiedKFold(n_splits=n_folds, shuffle=True,
                              random_state=42)
        metrics = {
            "auc": [], "precision": [], "recall": [],
            "f1": [], "fpr_at_1pct_fnr": []
        }

        for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
            X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
            y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]

            model = xgb.XGBClassifier(**params)
            model.fit(
                X_train, y_train,
                eval_set=[(X_val, y_val)],
                verbose=False
            )

            y_pred_proba = model.predict_proba(X_val)[:, 1]
            y_pred = (y_pred_proba > 0.5).astype(int)

            metrics["auc"].append(roc_auc_score(y_val, y_pred_proba))
            metrics["precision"].append(
                precision_score(y_val, y_pred, zero_division=0))
            metrics["recall"].append(
                recall_score(y_val, y_pred, zero_division=0))
            metrics["f1"].append(
                f1_score(y_val, y_pred, zero_division=0))

            # FPR cuando FNR = 1% (metrica critica en malware)
            fpr = self._fpr_at_fnr(y_val, y_pred_proba, target_fnr=0.01)
            metrics["fpr_at_1pct_fnr"].append(fpr)

        return metrics

    def _fpr_at_fnr(self, y_true, y_scores, target_fnr=0.01):
        """Calcula FPR cuando FNR esta en el objetivo."""
        thresholds = np.sort(y_scores)[::-1]
        for threshold in thresholds:
            y_pred = (y_scores >= threshold).astype(int)
            fn = np.sum((y_pred == 0) & (y_true == 1))
            fp = np.sum((y_pred == 1) & (y_true == 0))
            tp = np.sum((y_pred == 1) & (y_true == 1))
            tn = np.sum((y_pred == 0) & (y_true == 0))

            fnr = fn / max(fn + tp, 1)
            fpr = fp / max(fp + tn, 1)

            if fnr <= target_fnr:
                return fpr
        return 1.0

Model Registry

MLflow Model Registry gestiona las versiones del modelo y sus transiciones de estado.

from mlflow.tracking import MlflowClient

class MalwareModelRegistry:
    """Gestion del ciclo de vida de modelos."""

    def __init__(self):
        self.client = MlflowClient()
        self.model_name = "malware-detector"

    def promote_to_staging(self, run_id: str,
                           min_auc: float = 0.99) -> bool:
        """Promueve un modelo a staging si cumple umbrales."""
        run = self.client.get_run(run_id)
        auc = run.data.metrics.get("cv_auc_mean", 0)
        fpr = run.data.metrics.get("cv_fpr_at_1pct_fnr_mean", 1)

        if auc < min_auc:
            print(f"AUC {auc:.4f} por debajo del umbral {min_auc}")
            return False

        if fpr > 0.05:  # Maximo 5% FPR cuando FNR = 1%
            print(f"FPR@1%FNR {fpr:.4f} demasiado alto")
            return False

        # Obtener la ultima version del modelo
        versions = self.client.search_model_versions(
            f"name='{self.model_name}'"
        )
        latest = max(versions, key=lambda v: int(v.version))

        # Transicionar a staging
        self.client.transition_model_version_stage(
            name=self.model_name,
            version=latest.version,
            stage="Staging"
        )

        print(f"Modelo v{latest.version} promovido a Staging "
              f"(AUC={auc:.4f}, FPR@1%FNR={fpr:.4f})")
        return True

    def promote_to_production(self, version: str) -> bool:
        """Promueve staging a produccion tras validacion."""
        # Archivar modelo actual de produccion
        prod_versions = self.client.get_latest_versions(
            self.model_name, stages=["Production"]
        )
        for v in prod_versions:
            self.client.transition_model_version_stage(
                name=self.model_name,
                version=v.version,
                stage="Archived"
            )

        # Promover nuevo modelo
        self.client.transition_model_version_stage(
            name=self.model_name,
            version=version,
            stage="Production"
        )
        return True

    def get_production_model(self):
        """Carga el modelo de produccion actual."""
        model_uri = f"models:/{self.model_name}/Production"
        return mlflow.xgboost.load_model(model_uri)

    def compare_versions(self, version_a: str,
                         version_b: str) -> dict:
        """Compara metricas entre dos versiones."""
        run_a = self._get_run_for_version(version_a)
        run_b = self._get_run_for_version(version_b)

        metrics_a = run_a.data.metrics
        metrics_b = run_b.data.metrics

        comparison = {}
        for key in metrics_a:
            if key in metrics_b:
                comparison[key] = {
                    "version_a": metrics_a[key],
                    "version_b": metrics_b[key],
                    "delta": metrics_b[key] - metrics_a[key]
                }
        return comparison

    def _get_run_for_version(self, version: str):
        mv = self.client.get_model_version(
            self.model_name, version)
        return self.client.get_run(mv.run_id)

Monitoring: concept drift y degradación

El monitoring es la pieza más crítica del pipeline. Sin él, no sabes cuándo tu modelo ha dejado de funcionar.

Tipos de drift en detección de malware

Data drift (covariate shift): la distribución de features de los binarios que el modelo procesa en producción cambia respecto a la distribución del training set. Ejemplo: aparece una nueva versión de un compilador popular que genera binarios con diferentes características de PE header.

Concept drift: la relación entre features y etiqueta cambia. Ejemplo: una técnica de empaquetado que antes era exclusiva de malware empieza a ser usada por software legítimo (falsos positivos) o una técnica benigna empieza a ser explotada por malware (falsos negativos).

Label drift: la proporción de clases cambia. Ejemplo: una campaña masiva de ransomware aumenta la proporción de binarios maliciosos en producción.

Implementación del drift monitor

from scipy import stats
import numpy as np
from datetime import datetime, timedelta
from dataclasses import dataclass

@dataclass
class DriftAlert:
    feature: str
    drift_type: str  # "data" | "concept" | "label"
    severity: str    # "low" | "medium" | "high" | "critical"
    statistic: float
    p_value: float
    timestamp: datetime
    message: str

class MalwareDriftMonitor:
    """Monitor de concept drift para deteccion de malware."""

    def __init__(self, reference_data: pd.DataFrame,
                 feature_names: list[str]):
        self.reference = reference_data
        self.feature_names = feature_names
        self.alerts: list[DriftAlert] = []

    def check_data_drift(self, production_data: pd.DataFrame,
                         significance: float = 0.01) -> list[DriftAlert]:
        """Detecta data drift por feature usando KS test."""
        alerts = []

        for feature in self.feature_names:
            ref_values = self.reference[feature].dropna()
            prod_values = production_data[feature].dropna()

            if len(prod_values) == 0:
                continue

            # Kolmogorov-Smirnov test
            ks_stat, p_value = stats.ks_2samp(ref_values, prod_values)

            if p_value < significance:
                severity = self._classify_drift_severity(ks_stat)
                alert = DriftAlert(
                    feature=feature,
                    drift_type="data",
                    severity=severity,
                    statistic=ks_stat,
                    p_value=p_value,
                    timestamp=datetime.utcnow(),
                    message=f"Data drift detectado en {feature}: "
                            f"KS={ks_stat:.4f}, p={p_value:.6f}"
                )
                alerts.append(alert)

        self.alerts.extend(alerts)
        return alerts

    def check_prediction_drift(self, recent_predictions: pd.DataFrame,
                               window_days: int = 7) -> list[DriftAlert]:
        """Detecta drift en la distribucion de predicciones."""
        alerts = []

        # Comparar distribucion de scores
        ref_scores = self.reference.get("prediction_score", pd.Series())
        prod_scores = recent_predictions.get("prediction_score", pd.Series())

        if len(ref_scores) == 0 or len(prod_scores) == 0:
            return alerts

        ks_stat, p_value = stats.ks_2samp(ref_scores, prod_scores)

        if p_value < 0.01:
            alerts.append(DriftAlert(
                feature="prediction_score",
                drift_type="concept",
                severity=self._classify_drift_severity(ks_stat),
                statistic=ks_stat,
                p_value=p_value,
                timestamp=datetime.utcnow(),
                message=f"Prediction drift: distribucion de scores "
                        f"difiere del baseline (KS={ks_stat:.4f})"
            ))

        # Comparar tasa de deteccion (label drift)
        ref_positive_rate = ref_scores.mean()
        prod_positive_rate = prod_scores.mean()
        rate_change = abs(prod_positive_rate - ref_positive_rate)

        if rate_change > 0.05:  # Cambio mayor al 5%
            alerts.append(DriftAlert(
                feature="detection_rate",
                drift_type="label",
                severity="high" if rate_change > 0.1 else "medium",
                statistic=rate_change,
                p_value=0.0,
                timestamp=datetime.utcnow(),
                message=f"Label drift: tasa de deteccion cambio de "
                        f"{ref_positive_rate:.3f} a {prod_positive_rate:.3f}"
            ))

        self.alerts.extend(alerts)
        return alerts

    def check_performance_drift(self, ground_truth: pd.DataFrame,
                                min_auc: float = 0.98) -> list[DriftAlert]:
        """Detecta degradacion de rendimiento con ground truth."""
        alerts = []

        if "true_label" not in ground_truth.columns:
            return alerts

        y_true = ground_truth["true_label"]
        y_score = ground_truth["prediction_score"]

        current_auc = roc_auc_score(y_true, y_score)

        if current_auc < min_auc:
            severity = "critical" if current_auc < 0.95 else "high"
            alerts.append(DriftAlert(
                feature="model_auc",
                drift_type="concept",
                severity=severity,
                statistic=current_auc,
                p_value=0.0,
                timestamp=datetime.utcnow(),
                message=f"Performance degradation: AUC={current_auc:.4f} "
                        f"por debajo del umbral {min_auc}"
            ))

        self.alerts.extend(alerts)
        return alerts

    def _classify_drift_severity(self, ks_statistic: float) -> str:
        if ks_statistic > 0.3:
            return "critical"
        elif ks_statistic > 0.2:
            return "high"
        elif ks_statistic > 0.1:
            return "medium"
        return "low"

    def generate_report(self) -> dict:
        """Genera reporte de drift."""
        return {
            "timestamp": datetime.utcnow().isoformat(),
            "total_alerts": len(self.alerts),
            "critical": len([a for a in self.alerts
                             if a.severity == "critical"]),
            "high": len([a for a in self.alerts
                         if a.severity == "high"]),
            "alerts": [
                {
                    "feature": a.feature,
                    "drift_type": a.drift_type,
                    "severity": a.severity,
                    "message": a.message
                }
                for a in sorted(self.alerts,
                                key=lambda x: ["critical", "high",
                                               "medium", "low"]
                                .index(x.severity))
            ]
        }

Population Stability Index (PSI)

El PSI es una metrica especifica para cuantificar cuanto ha cambiado la distribucion de una variable entre dos periodos. Es mas estable que el KS test para deteccion continua de drift.

def calculate_psi(reference: np.ndarray, production: np.ndarray,
                  n_bins: int = 10) -> float:
    """Calcula Population Stability Index."""
    # Crear bins basados en la distribucion de referencia
    bins = np.percentile(reference,
                         np.linspace(0, 100, n_bins + 1))
    bins[0] = -np.inf
    bins[-1] = np.inf

    # Calcular proporciones en cada bin
    ref_counts = np.histogram(reference, bins=bins)[0]
    prod_counts = np.histogram(production, bins=bins)[0]

    # Evitar division por cero
    ref_pct = (ref_counts + 1) / (len(reference) + n_bins)
    prod_pct = (prod_counts + 1) / (len(production) + n_bins)

    # PSI
    psi = np.sum((prod_pct - ref_pct) * np.log(prod_pct / ref_pct))
    return psi

# Interpretacion:
# PSI menor a 0.1  -> Sin drift significativo
# PSI 0.1 - 0.2    -> Drift moderado, monitorear
# PSI mayor a 0.2  -> Drift significativo, investigar/reentrenar

Retraining automatizado

El retraining debe ser condicionado a metricas de drift, no solo a un calendario fijo.

Trigger de retraining

from enum import Enum

class RetrainingTrigger(Enum):
    SCHEDULED = "scheduled"        # Calendario (semanal/mensual)
    DRIFT_DETECTED = "drift"       # Drift monitor
    PERFORMANCE_DROP = "perf_drop" # Ground truth
    MANUAL = "manual"              # Analista SOC solicita

class RetrainingOrchestrator:
    """Orquesta el retraining automatizado."""

    def __init__(self, pipeline: MalwareTrainingPipeline,
                 registry: MalwareModelRegistry,
                 monitor: MalwareDriftMonitor):
        self.pipeline = pipeline
        self.registry = registry
        self.monitor = monitor

    def evaluate_retraining_need(self) -> tuple[bool, str]:
        """Evalua si se necesita retraining."""
        report = self.monitor.generate_report()

        # Trigger inmediato: alertas criticas
        if report["critical"] > 0:
            return True, RetrainingTrigger.PERFORMANCE_DROP.value

        # Trigger por acumulacion: multiples alertas high
        if report["high"] >= 3:
            return True, RetrainingTrigger.DRIFT_DETECTED.value

        return False, ""

    def run_retraining(self, trigger: str,
                       dataset_config: dict | None = None) -> dict:
        """Ejecuta pipeline de retraining completo."""
        result = {
            "trigger": trigger,
            "timestamp": datetime.utcnow().isoformat(),
            "status": "started"
        }

        try:
            # 1. Preparar dataset con datos recientes
            dataset_version = self._prepare_dataset(dataset_config)
            result["dataset_version"] = dataset_version

            # 2. Entrenar nuevo modelo
            run_id = self.pipeline.train(dataset_version)
            result["run_id"] = run_id

            # 3. Evaluar contra modelo actual de produccion
            comparison = self._compare_with_production(run_id)
            result["comparison"] = comparison

            # 4. Promover si mejora
            if comparison["improves"]:
                promoted = self.registry.promote_to_staging(run_id)
                result["promoted_to_staging"] = promoted

                if promoted:
                    # Shadow testing antes de produccion
                    result["shadow_test"] = "initiated"
                    result["status"] = "pending_shadow_validation"
                else:
                    result["status"] = "rejected_metrics_below_threshold"
            else:
                result["status"] = "rejected_no_improvement"

        except Exception as e:
            result["status"] = "failed"
            result["error"] = str(e)

        return result

    def _prepare_dataset(self, config: dict | None) -> str:
        """Prepara dataset para retraining."""
        end_date = datetime.utcnow().strftime("%Y-%m-%d")
        start_date = (datetime.utcnow() - timedelta(days=90)
                      ).strftime("%Y-%m-%d")
        return f"{start_date}_{end_date}"

    def _compare_with_production(self, run_id: str) -> dict:
        """Compara nuevo modelo contra produccion."""
        # Obtener metricas del modelo nuevo
        client = MlflowClient()
        new_run = client.get_run(run_id)
        new_auc = new_run.data.metrics.get("cv_auc_mean", 0)

        # Obtener metricas del modelo en produccion
        prod_versions = client.get_latest_versions(
            "malware-detector", stages=["Production"]
        )

        if not prod_versions:
            return {"improves": True, "reason": "no_production_model"}

        prod_run = client.get_run(prod_versions[0].run_id)
        prod_auc = prod_run.data.metrics.get("cv_auc_mean", 0)

        return {
            "new_auc": new_auc,
            "production_auc": prod_auc,
            "delta": new_auc - prod_auc,
            "improves": new_auc > prod_auc
        }

CI/CD para modelos de ML

Los modelos de ML necesitan su propio pipeline de CI/CD, separado del CI/CD de la aplicacion.

GitHub Actions para model CI

# .github/workflows/model-ci.yml
name: Malware Model CI

on:
  push:
    paths:
      - 'ml/**'
      - 'features/**'
  schedule:
    - cron: '0 3 * * 1'  # Retraining semanal lunes 03:00 UTC

jobs:
  train-and-evaluate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.12'

      - name: Install dependencies
        run: pip install -r ml/requirements.txt

      - name: Run feature extraction tests
        run: pytest ml/tests/test_features.py -v

      - name: Train model
        env:
          MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
        run: python ml/train.py --dataset-version latest

      - name: Evaluate model
        run: python ml/evaluate.py --min-auc 0.99 --max-fpr 0.05

      - name: Model security scan
        run: python ml/security_check.py --check-adversarial

      - name: Promote to staging
        if: success()
        run: python ml/promote.py --stage staging

  shadow-test:
    needs: train-and-evaluate
    runs-on: ubuntu-latest
    steps:
      - name: Run shadow predictions
        run: python ml/shadow_test.py --duration 24h --threshold 0.95

      - name: Compare with production
        run: python ml/compare.py --report

  deploy:
    needs: shadow-test
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - name: Promote to production
        run: python ml/promote.py --stage production

      - name: Update serving endpoint
        run: python ml/deploy.py --target production

      - name: Verify deployment
        run: python ml/verify_deployment.py

Tests especificos para modelos de malware

# ml/tests/test_model_quality.py

import pytest
import numpy as np

class TestModelQuality:
    """Tests de calidad del modelo antes de deploy."""

    def test_auc_above_threshold(self, trained_model, test_data):
        """AUC debe superar 0.99."""
        X_test, y_test = test_data
        y_pred = trained_model.predict_proba(X_test)[:, 1]
        auc = roc_auc_score(y_test, y_pred)
        assert auc >= 0.99, f"AUC {auc:.4f} por debajo del umbral 0.99"

    def test_fpr_at_1pct_fnr(self, trained_model, test_data):
        """FPR debe ser menor a 5% cuando FNR = 1%."""
        X_test, y_test = test_data
        y_pred = trained_model.predict_proba(X_test)[:, 1]
        # Calcular FPR al threshold donde FNR = 1%
        fpr = calculate_fpr_at_fnr(y_test, y_pred, target_fnr=0.01)
        assert fpr <= 0.05, f"FPR@1%FNR = {fpr:.4f}, maximo permitido 0.05"

    def test_no_feature_dominance(self, trained_model):
        """Ninguna feature debe dominar mas del 30% de importancia."""
        importances = trained_model.feature_importances_
        max_importance = np.max(importances)
        assert max_importance <= 0.30, (
            f"Feature con {max_importance:.2%} de importancia. "
            f"Riesgo de manipulacion adversarial."
        )

    def test_prediction_latency(self, trained_model, test_data):
        """Latencia de prediccion menor a 10ms por muestra."""
        import time
        X_test = test_data[0]
        sample = X_test.iloc[:1]

        start = time.perf_counter()
        for _ in range(100):
            trained_model.predict_proba(sample)
        elapsed = (time.perf_counter() - start) / 100

        assert elapsed < 0.01, (
            f"Latencia {elapsed*1000:.1f}ms por encima del umbral 10ms"
        )

    def test_model_size(self, trained_model):
        """Modelo no debe exceder 500MB serializado."""
        import tempfile
        import os
        with tempfile.NamedTemporaryFile(suffix=".json") as f:
            trained_model.save_model(f.name)
            size_mb = os.path.getsize(f.name) / (1024 * 1024)
        assert size_mb <= 500, f"Modelo {size_mb:.0f}MB excede 500MB"

    def test_temporal_generalization(self, trained_model,
                                     temporal_test_data):
        """Modelo debe generalizar a datos mas recientes."""
        X_future, y_future = temporal_test_data  # Datos 30 dias despues
        y_pred = trained_model.predict_proba(X_future)[:, 1]
        auc = roc_auc_score(y_future, y_pred)
        assert auc >= 0.97, (
            f"AUC en datos futuros {auc:.4f}. "
            f"Posible concept drift en entrenamiento."
        )

Serving: modelo en producción

Arquitectura de serving

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow
import numpy as np
import time

app = FastAPI(title="Malware Detection API")

class PredictionRequest(BaseModel):
    features: dict[str, float]
    sample_hash: str | None = None

class PredictionResponse(BaseModel):
    prediction: str  # "malicious" | "benign"
    confidence: float
    model_version: str
    latency_ms: float
    feature_contributions: dict[str, float] | None = None

class MalwareServingPipeline:
    """Pipeline de serving con A/B testing y fallback."""

    def __init__(self):
        self.production_model = None
        self.shadow_model = None
        self.model_version = "unknown"
        self._load_models()

    def _load_models(self):
        """Carga modelos de produccion y shadow."""
        try:
            self.production_model = mlflow.xgboost.load_model(
                "models:/malware-detector/Production"
            )
            mv = MlflowClient().get_latest_versions(
                "malware-detector", stages=["Production"]
            )[0]
            self.model_version = mv.version
        except Exception:
            raise RuntimeError("No hay modelo de produccion disponible")

        try:
            self.shadow_model = mlflow.xgboost.load_model(
                "models:/malware-detector/Staging"
            )
        except Exception:
            self.shadow_model = None  # Shadow es opcional

    def predict(self, features: dict) -> PredictionResponse:
        """Prediccion con logging de shadow model."""
        start = time.perf_counter()

        # Convertir features a formato del modelo
        feature_array = np.array(
            [[features.get(f, 0) for f in self.feature_names]]
        )

        # Prediccion principal
        proba = self.production_model.predict_proba(feature_array)[0]
        label = "malicious" if proba[1] > 0.5 else "benign"

        latency = (time.perf_counter() - start) * 1000

        # Shadow prediction (asincrono, no afecta latencia)
        if self.shadow_model is not None:
            self._shadow_predict(feature_array, proba)

        return PredictionResponse(
            prediction=label,
            confidence=float(max(proba)),
            model_version=self.model_version,
            latency_ms=latency
        )

    def _shadow_predict(self, features, prod_proba):
        """Compara shadow model con produccion (async)."""
        try:
            shadow_proba = self.shadow_model.predict_proba(features)[0]
            agreement = (
                (prod_proba[1] > 0.5) == (shadow_proba[1] > 0.5)
            )
            # Loguear para analisis posterior
            self._log_shadow_comparison(
                prod_proba, shadow_proba, agreement
            )
        except Exception:
            pass  # Shadow nunca debe afectar produccion

    def _log_shadow_comparison(self, prod, shadow, agreement):
        """Registra comparacion para evaluar shadow model."""
        # Enviar a sistema de metricas (Prometheus, CloudWatch, etc.)
        pass

Canary deployment

class CanaryDeployer:
    """Despliegue canary para modelos de malware."""

    def __init__(self, registry: MalwareModelRegistry):
        self.registry = registry
        self.canary_traffic_pct = 0.0
        self.canary_model = None
        self.production_model = None

    def start_canary(self, new_version: str,
                     initial_traffic: float = 0.05):
        """Inicia canary con 5% del trafico."""
        self.canary_model = self.registry.get_model(new_version)
        self.canary_traffic_pct = initial_traffic

    def predict(self, features) -> tuple[str, str]:
        """Ruta la prediccion segun canary traffic."""
        import random
        if random.random() < self.canary_traffic_pct:
            model = self.canary_model
            source = "canary"
        else:
            model = self.production_model
            source = "production"

        proba = model.predict_proba(features)
        label = "malicious" if proba[0][1] > 0.5 else "benign"
        return label, source

    def evaluate_canary(self, metrics: dict) -> str:
        """Decide si promover, escalar o revertir canary."""
        canary_auc = metrics.get("canary_auc", 0)
        prod_auc = metrics.get("production_auc", 0)
        canary_fpr = metrics.get("canary_fpr", 1)

        if canary_auc < prod_auc - 0.005:
            return "rollback"  # Canary peor que produccion
        elif canary_fpr > 0.05:
            return "rollback"  # FPR demasiado alto
        elif self.canary_traffic_pct < 0.5:
            return "scale_up"  # Incrementar trafico canary
        else:
            return "promote"   # Promover a produccion

Observabilidad del pipeline

Metricas clave a monitorear

# Metricas de modelo
MODEL_METRICS = {
    "prediction_latency_ms": "Histograma de latencia por prediccion",
    "predictions_per_second": "Throughput del servicio",
    "malicious_ratio": "Proporcion de detecciones maliciosas",
    "confidence_distribution": "Distribucion de scores de confianza",
    "model_version_active": "Version del modelo en produccion",
}

# Metricas de drift
DRIFT_METRICS = {
    "psi_per_feature": "PSI por feature (actualizado diariamente)",
    "ks_statistic": "KS statistic por feature",
    "prediction_drift_psi": "PSI de la distribucion de predicciones",
    "detection_rate_7d": "Tasa de deteccion rolling 7 dias",
}

# Metricas de pipeline
PIPELINE_METRICS = {
    "training_duration_minutes": "Duracion del ultimo entrenamiento",
    "dataset_size": "Tamano del dataset de entrenamiento",
    "feature_store_size": "Muestras en el feature store",
    "retraining_trigger_count": "Retrainings por trigger type",
    "model_versions_total": "Versiones totales en el registry",
}

Alertas

ALERT_RULES = [
    {
        "name": "model_performance_critical",
        "condition": "cv_auc_mean < 0.95",
        "severity": "critical",
        "action": "trigger_retraining + page_oncall"
    },
    {
        "name": "high_drift_detected",
        "condition": "psi_any_feature > 0.25",
        "severity": "high",
        "action": "trigger_investigation + notify_ml_team"
    },
    {
        "name": "prediction_latency_high",
        "condition": "p99_latency_ms > 50",
        "severity": "medium",
        "action": "notify_infra_team"
    },
    {
        "name": "retraining_failed",
        "condition": "last_retraining_status == 'failed'",
        "severity": "high",
        "action": "notify_ml_team + create_incident"
    },
]

Checklist de MLOps para malware detection

Antes de considerar tu pipeline listo para produccion, verifica:

Data pipeline: Feature store centralizado. Datasets versionados con hash. Fuentes de datos documentadas. Pipeline de etiquetado con ground truth del SOC. Manejo de datos desbalanceados.

Training: Hiperparametros versionados en MLflow. Cross-validation estratificada. Metricas especificas de malware (FPR@1%FNR). Tests automatizados de calidad del modelo. Training reproducible (semillas fijas).

Registry: Modelo versionado en MLflow Model Registry. Transiciones de estado documentadas (Staging, Production, Archived). Comparativa automatica entre versiones. Rollback en un comando.

Monitoring: Drift monitoring activo (PSI, KS test). Alertas configuradas por severidad. Dashboard de metricas de modelo. Ground truth feedback loop con el SOC.

Serving: Latencia de prediccion por debajo de 10ms. Canary deployment para nuevas versiones. Shadow testing antes de promover. Fallback a version anterior.

CI/CD: Pipeline automatizado de training, evaluacion y deployment. Tests de calidad del modelo en CI. Security checks (adversarial robustness). Documentacion de cada release.

El pipeline de MLOps no es un proyecto de una vez: es infraestructura viva que evoluciona con el modelo. Lo que importa no es la sofisticacion de cada componente sino que todas las piezas esten conectadas y que el ciclo de retroalimentacion funcione. Un pipeline simple pero completo (datos, training, monitoring, retraining) supera a un pipeline sofisticado con piezas desconectadas.

Preguntas frecuentes

Artículos relacionados

Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.