MLOps para Detección de Malware: Pipeline de Producción
Pipeline MLOps completo para detección de malware en producción. Training pipeline, model versioning con MLflow, monitoreo de concept drift, retraining automatizado y CI/CD para modelos de ML en ciberseguridad.
Por qué MLOps no es opcional en malware detection
Un modelo de detección de malware no es un producto terminado. Es un componente vivo que se degrada cada día que pasa sin actualización. El malware evoluciona a una velocidad que hace que cualquier modelo estático sea obsoleto en semanas.
El problema concreto: entrenas un modelo con datos de Q1 2026. Alcanza 99.2% de AUC en test. Lo despliegas en producción. En Q2 2026, nuevas variantes de ransomware usan técnicas de empaquetado que tu modelo nunca vio. Los falsos negativos aumentan de 0.8% a 3.5%. Nadie lo detecta porque no hay monitoring. Un incidente real ocurre con malware que tu modelo clasificó como benigno con 87% de confianza.
Este escenario no es hipotético. Es el resultado inevitable de tratar un modelo de ML como software tradicional que se despliega una vez y se olvida. MLOps transforma la detección de malware de un artefacto estático a un sistema adaptativo con retroalimentación continua.
Arquitectura del pipeline MLOps
El pipeline completo de MLOps para detección de malware tiene cinco etapas que forman un ciclo continuo:
┌─────────────┐ ┌──────────────┐ ┌──────────────┐
│ Data │───→│ Training │───→│ Evaluation │
│ Pipeline │ │ Pipeline │ │ & Registry │
└─────────────┘ └──────────────┘ └──────┬───────┘
↑ │
│ ▼
┌──────┴──────┐ ┌──────────────┐ ┌──────────────┐
│ Feedback │←───│ Monitoring │←───│ Serving │
│ Loop │ │ & Drift │ │ Pipeline │
└─────────────┘ └──────────────┘ └──────────────┘
Cada etapa tiene sus herramientas, metricas y puntos de decision.
Data pipeline: ingesta y curación de datos
El primer componente del pipeline gestiona la recolección continua de muestras etiquetadas, su procesamiento y almacenamiento.
Fuentes de datos para retraining
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
class DataSourceType(Enum):
MALWARE_BAZAAR = "malwarebazaar"
VIRUSSHARE = "virusshare"
SOREL_UPDATE = "sorel_update"
INTERNAL_SANDBOX = "internal_sandbox"
ANALYST_FEEDBACK = "analyst_feedback" # Ground truth del SOC
@dataclass
class TrainingDataSource:
source_type: DataSourceType
url: str
refresh_interval_hours: int
label_strategy: str # "av_consensus" | "sandbox" | "analyst"
min_confidence: float
TRAINING_SOURCES = [
TrainingDataSource(
source_type=DataSourceType.MALWARE_BAZAAR,
url="https://bazaar.abuse.ch/export/csv/full/",
refresh_interval_hours=24,
label_strategy="av_consensus",
min_confidence=0.8
),
TrainingDataSource(
source_type=DataSourceType.ANALYST_FEEDBACK,
url="internal://soc/feedback",
refresh_interval_hours=1,
label_strategy="analyst", # Ground truth mas fiable
min_confidence=1.0
),
]
Feature store
Un feature store centraliza las features extraídas y evita recalcularlas en cada retraining. Para malware, las features PE son deterministas (el mismo binario siempre produce las mismas features), lo que hace el caching especialmente eficiente.
import hashlib
import pandas as pd
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
class MalwareFeatureStore:
"""Feature store para features de binarios PE."""
def __init__(self, db_url: str, cache_dir: Path):
self.engine = create_engine(db_url)
self.cache_dir = cache_dir
self.cache_dir.mkdir(parents=True, exist_ok=True)
def get_or_compute(self, sample_hash: str,
compute_fn: callable) -> dict:
"""Recupera features del cache o las calcula."""
# Buscar en cache
cached = self._get_cached(sample_hash)
if cached is not None:
return cached
# Calcular features
features = compute_fn(sample_hash)
# Almacenar en cache
self._store(sample_hash, features)
return features
def get_training_dataset(self, start_date: str,
end_date: str,
min_confidence: float = 0.8) -> pd.DataFrame:
"""Genera dataset de entrenamiento desde el feature store."""
query = """
SELECT f.*, l.label, l.confidence, l.source
FROM pe_features f
JOIN sample_labels l ON f.sample_hash = l.sample_hash
WHERE l.label_date BETWEEN :start AND :end
AND l.confidence >= :min_conf
ORDER BY l.label_date
"""
with Session(self.engine) as session:
df = pd.read_sql(
query, session.bind,
params={
"start": start_date,
"end": end_date,
"min_conf": min_confidence
}
)
return df
def get_dataset_stats(self) -> dict:
"""Estadisticas del feature store."""
with Session(self.engine) as session:
total = session.execute(
"SELECT COUNT(*) FROM pe_features"
).scalar()
malicious = session.execute(
"SELECT COUNT(*) FROM sample_labels WHERE label = 1"
).scalar()
benign = session.execute(
"SELECT COUNT(*) FROM sample_labels WHERE label = 0"
).scalar()
return {
"total_samples": total,
"malicious": malicious,
"benign": benign,
"ratio": malicious / max(benign, 1)
}
def _get_cached(self, sample_hash: str) -> dict | None:
cache_path = self.cache_dir / f"{sample_hash}.parquet"
if cache_path.exists():
return pd.read_parquet(cache_path).iloc[0].to_dict()
return None
def _store(self, sample_hash: str, features: dict):
df = pd.DataFrame([features])
cache_path = self.cache_dir / f"{sample_hash}.parquet"
df.to_parquet(cache_path)
Versionado de datasets
Cada dataset de entrenamiento debe ser versionado y reproducible. DVC (Data Version Control) es la herramienta estándar, pero para malware una aproximación basada en metadatos es mas practica porque los binarios no se versionan (solo sus features).
from pydantic import BaseModel
from datetime import datetime
class DatasetVersion(BaseModel):
version: str # "v2026.06.01"
created_at: datetime
sample_count: int
malicious_count: int
benign_count: int
date_range_start: str
date_range_end: str
sources: list[str]
feature_schema_version: str # "pe_features_v3"
label_strategy: str
sha256_manifest: str # Hash del archivo de features
@property
def balance_ratio(self) -> float:
return self.malicious_count / max(self.benign_count, 1)
Training pipeline con MLflow
MLflow gestiona el ciclo de vida completo del modelo: experimentación, tracking de métricas, versionado de modelos y despliegue.
Configuración del training pipeline
import mlflow
import mlflow.xgboost
import xgboost as xgb
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import (
roc_auc_score, precision_score, recall_score,
f1_score, confusion_matrix
)
import numpy as np
import json
# Configurar MLflow tracking
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("malware-detection-xgb")
class MalwareTrainingPipeline:
"""Pipeline de entrenamiento con tracking MLflow."""
def __init__(self, feature_store: MalwareFeatureStore):
self.feature_store = feature_store
self.model = None
def train(self, dataset_version: str,
hyperparams: dict | None = None) -> str:
"""Entrena un modelo y registra en MLflow."""
default_params = {
"n_estimators": 500,
"max_depth": 8,
"learning_rate": 0.05,
"subsample": 0.8,
"colsample_bytree": 0.8,
"min_child_weight": 5,
"scale_pos_weight": 1.0, # Ajustar si desbalanceado
"eval_metric": "logloss",
"random_state": 42,
"n_jobs": -1,
}
params = {**default_params, **(hyperparams or {})}
# Cargar dataset
df = self.feature_store.get_training_dataset(
start_date=dataset_version.split("_")[0],
end_date=dataset_version.split("_")[1]
)
feature_cols = [c for c in df.columns
if c not in ["sample_hash", "label",
"confidence", "source"]]
X = df[feature_cols]
y = df["label"]
with mlflow.start_run(run_name=f"train_{dataset_version}"):
# Loguear parametros
mlflow.log_params(params)
mlflow.log_param("dataset_version", dataset_version)
mlflow.log_param("n_samples", len(df))
mlflow.log_param("n_features", len(feature_cols))
mlflow.log_param("class_balance",
f"{y.mean():.3f} malicious")
# Cross-validation para metricas robustas
cv_metrics = self._cross_validate(X, y, params)
# Loguear metricas CV
for metric, values in cv_metrics.items():
mlflow.log_metric(f"cv_{metric}_mean", np.mean(values))
mlflow.log_metric(f"cv_{metric}_std", np.std(values))
# Entrenar modelo final con todos los datos
self.model = xgb.XGBClassifier(**params)
self.model.fit(X, y)
# Loguear modelo
mlflow.xgboost.log_model(
self.model,
"model",
registered_model_name="malware-detector"
)
# Loguear feature importance
importance = dict(zip(
feature_cols,
self.model.feature_importances_
))
mlflow.log_dict(importance, "feature_importance.json")
# Loguear schema de features
mlflow.log_dict(
{"features": feature_cols},
"feature_schema.json"
)
run_id = mlflow.active_run().info.run_id
mlflow.log_param("run_id", run_id)
return run_id
def _cross_validate(self, X, y, params, n_folds=5):
"""Cross-validation estratificada con metricas de malware."""
skf = StratifiedKFold(n_splits=n_folds, shuffle=True,
random_state=42)
metrics = {
"auc": [], "precision": [], "recall": [],
"f1": [], "fpr_at_1pct_fnr": []
}
for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
model = xgb.XGBClassifier(**params)
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
verbose=False
)
y_pred_proba = model.predict_proba(X_val)[:, 1]
y_pred = (y_pred_proba > 0.5).astype(int)
metrics["auc"].append(roc_auc_score(y_val, y_pred_proba))
metrics["precision"].append(
precision_score(y_val, y_pred, zero_division=0))
metrics["recall"].append(
recall_score(y_val, y_pred, zero_division=0))
metrics["f1"].append(
f1_score(y_val, y_pred, zero_division=0))
# FPR cuando FNR = 1% (metrica critica en malware)
fpr = self._fpr_at_fnr(y_val, y_pred_proba, target_fnr=0.01)
metrics["fpr_at_1pct_fnr"].append(fpr)
return metrics
def _fpr_at_fnr(self, y_true, y_scores, target_fnr=0.01):
"""Calcula FPR cuando FNR esta en el objetivo."""
thresholds = np.sort(y_scores)[::-1]
for threshold in thresholds:
y_pred = (y_scores >= threshold).astype(int)
fn = np.sum((y_pred == 0) & (y_true == 1))
fp = np.sum((y_pred == 1) & (y_true == 0))
tp = np.sum((y_pred == 1) & (y_true == 1))
tn = np.sum((y_pred == 0) & (y_true == 0))
fnr = fn / max(fn + tp, 1)
fpr = fp / max(fp + tn, 1)
if fnr <= target_fnr:
return fpr
return 1.0
Model Registry
MLflow Model Registry gestiona las versiones del modelo y sus transiciones de estado.
from mlflow.tracking import MlflowClient
class MalwareModelRegistry:
"""Gestion del ciclo de vida de modelos."""
def __init__(self):
self.client = MlflowClient()
self.model_name = "malware-detector"
def promote_to_staging(self, run_id: str,
min_auc: float = 0.99) -> bool:
"""Promueve un modelo a staging si cumple umbrales."""
run = self.client.get_run(run_id)
auc = run.data.metrics.get("cv_auc_mean", 0)
fpr = run.data.metrics.get("cv_fpr_at_1pct_fnr_mean", 1)
if auc < min_auc:
print(f"AUC {auc:.4f} por debajo del umbral {min_auc}")
return False
if fpr > 0.05: # Maximo 5% FPR cuando FNR = 1%
print(f"FPR@1%FNR {fpr:.4f} demasiado alto")
return False
# Obtener la ultima version del modelo
versions = self.client.search_model_versions(
f"name='{self.model_name}'"
)
latest = max(versions, key=lambda v: int(v.version))
# Transicionar a staging
self.client.transition_model_version_stage(
name=self.model_name,
version=latest.version,
stage="Staging"
)
print(f"Modelo v{latest.version} promovido a Staging "
f"(AUC={auc:.4f}, FPR@1%FNR={fpr:.4f})")
return True
def promote_to_production(self, version: str) -> bool:
"""Promueve staging a produccion tras validacion."""
# Archivar modelo actual de produccion
prod_versions = self.client.get_latest_versions(
self.model_name, stages=["Production"]
)
for v in prod_versions:
self.client.transition_model_version_stage(
name=self.model_name,
version=v.version,
stage="Archived"
)
# Promover nuevo modelo
self.client.transition_model_version_stage(
name=self.model_name,
version=version,
stage="Production"
)
return True
def get_production_model(self):
"""Carga el modelo de produccion actual."""
model_uri = f"models:/{self.model_name}/Production"
return mlflow.xgboost.load_model(model_uri)
def compare_versions(self, version_a: str,
version_b: str) -> dict:
"""Compara metricas entre dos versiones."""
run_a = self._get_run_for_version(version_a)
run_b = self._get_run_for_version(version_b)
metrics_a = run_a.data.metrics
metrics_b = run_b.data.metrics
comparison = {}
for key in metrics_a:
if key in metrics_b:
comparison[key] = {
"version_a": metrics_a[key],
"version_b": metrics_b[key],
"delta": metrics_b[key] - metrics_a[key]
}
return comparison
def _get_run_for_version(self, version: str):
mv = self.client.get_model_version(
self.model_name, version)
return self.client.get_run(mv.run_id)
Monitoring: concept drift y degradación
El monitoring es la pieza más crítica del pipeline. Sin él, no sabes cuándo tu modelo ha dejado de funcionar.
Tipos de drift en detección de malware
Data drift (covariate shift): la distribución de features de los binarios que el modelo procesa en producción cambia respecto a la distribución del training set. Ejemplo: aparece una nueva versión de un compilador popular que genera binarios con diferentes características de PE header.
Concept drift: la relación entre features y etiqueta cambia. Ejemplo: una técnica de empaquetado que antes era exclusiva de malware empieza a ser usada por software legítimo (falsos positivos) o una técnica benigna empieza a ser explotada por malware (falsos negativos).
Label drift: la proporción de clases cambia. Ejemplo: una campaña masiva de ransomware aumenta la proporción de binarios maliciosos en producción.
Implementación del drift monitor
from scipy import stats
import numpy as np
from datetime import datetime, timedelta
from dataclasses import dataclass
@dataclass
class DriftAlert:
feature: str
drift_type: str # "data" | "concept" | "label"
severity: str # "low" | "medium" | "high" | "critical"
statistic: float
p_value: float
timestamp: datetime
message: str
class MalwareDriftMonitor:
"""Monitor de concept drift para deteccion de malware."""
def __init__(self, reference_data: pd.DataFrame,
feature_names: list[str]):
self.reference = reference_data
self.feature_names = feature_names
self.alerts: list[DriftAlert] = []
def check_data_drift(self, production_data: pd.DataFrame,
significance: float = 0.01) -> list[DriftAlert]:
"""Detecta data drift por feature usando KS test."""
alerts = []
for feature in self.feature_names:
ref_values = self.reference[feature].dropna()
prod_values = production_data[feature].dropna()
if len(prod_values) == 0:
continue
# Kolmogorov-Smirnov test
ks_stat, p_value = stats.ks_2samp(ref_values, prod_values)
if p_value < significance:
severity = self._classify_drift_severity(ks_stat)
alert = DriftAlert(
feature=feature,
drift_type="data",
severity=severity,
statistic=ks_stat,
p_value=p_value,
timestamp=datetime.utcnow(),
message=f"Data drift detectado en {feature}: "
f"KS={ks_stat:.4f}, p={p_value:.6f}"
)
alerts.append(alert)
self.alerts.extend(alerts)
return alerts
def check_prediction_drift(self, recent_predictions: pd.DataFrame,
window_days: int = 7) -> list[DriftAlert]:
"""Detecta drift en la distribucion de predicciones."""
alerts = []
# Comparar distribucion de scores
ref_scores = self.reference.get("prediction_score", pd.Series())
prod_scores = recent_predictions.get("prediction_score", pd.Series())
if len(ref_scores) == 0 or len(prod_scores) == 0:
return alerts
ks_stat, p_value = stats.ks_2samp(ref_scores, prod_scores)
if p_value < 0.01:
alerts.append(DriftAlert(
feature="prediction_score",
drift_type="concept",
severity=self._classify_drift_severity(ks_stat),
statistic=ks_stat,
p_value=p_value,
timestamp=datetime.utcnow(),
message=f"Prediction drift: distribucion de scores "
f"difiere del baseline (KS={ks_stat:.4f})"
))
# Comparar tasa de deteccion (label drift)
ref_positive_rate = ref_scores.mean()
prod_positive_rate = prod_scores.mean()
rate_change = abs(prod_positive_rate - ref_positive_rate)
if rate_change > 0.05: # Cambio mayor al 5%
alerts.append(DriftAlert(
feature="detection_rate",
drift_type="label",
severity="high" if rate_change > 0.1 else "medium",
statistic=rate_change,
p_value=0.0,
timestamp=datetime.utcnow(),
message=f"Label drift: tasa de deteccion cambio de "
f"{ref_positive_rate:.3f} a {prod_positive_rate:.3f}"
))
self.alerts.extend(alerts)
return alerts
def check_performance_drift(self, ground_truth: pd.DataFrame,
min_auc: float = 0.98) -> list[DriftAlert]:
"""Detecta degradacion de rendimiento con ground truth."""
alerts = []
if "true_label" not in ground_truth.columns:
return alerts
y_true = ground_truth["true_label"]
y_score = ground_truth["prediction_score"]
current_auc = roc_auc_score(y_true, y_score)
if current_auc < min_auc:
severity = "critical" if current_auc < 0.95 else "high"
alerts.append(DriftAlert(
feature="model_auc",
drift_type="concept",
severity=severity,
statistic=current_auc,
p_value=0.0,
timestamp=datetime.utcnow(),
message=f"Performance degradation: AUC={current_auc:.4f} "
f"por debajo del umbral {min_auc}"
))
self.alerts.extend(alerts)
return alerts
def _classify_drift_severity(self, ks_statistic: float) -> str:
if ks_statistic > 0.3:
return "critical"
elif ks_statistic > 0.2:
return "high"
elif ks_statistic > 0.1:
return "medium"
return "low"
def generate_report(self) -> dict:
"""Genera reporte de drift."""
return {
"timestamp": datetime.utcnow().isoformat(),
"total_alerts": len(self.alerts),
"critical": len([a for a in self.alerts
if a.severity == "critical"]),
"high": len([a for a in self.alerts
if a.severity == "high"]),
"alerts": [
{
"feature": a.feature,
"drift_type": a.drift_type,
"severity": a.severity,
"message": a.message
}
for a in sorted(self.alerts,
key=lambda x: ["critical", "high",
"medium", "low"]
.index(x.severity))
]
}
Population Stability Index (PSI)
El PSI es una metrica especifica para cuantificar cuanto ha cambiado la distribucion de una variable entre dos periodos. Es mas estable que el KS test para deteccion continua de drift.
def calculate_psi(reference: np.ndarray, production: np.ndarray,
n_bins: int = 10) -> float:
"""Calcula Population Stability Index."""
# Crear bins basados en la distribucion de referencia
bins = np.percentile(reference,
np.linspace(0, 100, n_bins + 1))
bins[0] = -np.inf
bins[-1] = np.inf
# Calcular proporciones en cada bin
ref_counts = np.histogram(reference, bins=bins)[0]
prod_counts = np.histogram(production, bins=bins)[0]
# Evitar division por cero
ref_pct = (ref_counts + 1) / (len(reference) + n_bins)
prod_pct = (prod_counts + 1) / (len(production) + n_bins)
# PSI
psi = np.sum((prod_pct - ref_pct) * np.log(prod_pct / ref_pct))
return psi
# Interpretacion:
# PSI menor a 0.1 -> Sin drift significativo
# PSI 0.1 - 0.2 -> Drift moderado, monitorear
# PSI mayor a 0.2 -> Drift significativo, investigar/reentrenar
Retraining automatizado
El retraining debe ser condicionado a metricas de drift, no solo a un calendario fijo.
Trigger de retraining
from enum import Enum
class RetrainingTrigger(Enum):
SCHEDULED = "scheduled" # Calendario (semanal/mensual)
DRIFT_DETECTED = "drift" # Drift monitor
PERFORMANCE_DROP = "perf_drop" # Ground truth
MANUAL = "manual" # Analista SOC solicita
class RetrainingOrchestrator:
"""Orquesta el retraining automatizado."""
def __init__(self, pipeline: MalwareTrainingPipeline,
registry: MalwareModelRegistry,
monitor: MalwareDriftMonitor):
self.pipeline = pipeline
self.registry = registry
self.monitor = monitor
def evaluate_retraining_need(self) -> tuple[bool, str]:
"""Evalua si se necesita retraining."""
report = self.monitor.generate_report()
# Trigger inmediato: alertas criticas
if report["critical"] > 0:
return True, RetrainingTrigger.PERFORMANCE_DROP.value
# Trigger por acumulacion: multiples alertas high
if report["high"] >= 3:
return True, RetrainingTrigger.DRIFT_DETECTED.value
return False, ""
def run_retraining(self, trigger: str,
dataset_config: dict | None = None) -> dict:
"""Ejecuta pipeline de retraining completo."""
result = {
"trigger": trigger,
"timestamp": datetime.utcnow().isoformat(),
"status": "started"
}
try:
# 1. Preparar dataset con datos recientes
dataset_version = self._prepare_dataset(dataset_config)
result["dataset_version"] = dataset_version
# 2. Entrenar nuevo modelo
run_id = self.pipeline.train(dataset_version)
result["run_id"] = run_id
# 3. Evaluar contra modelo actual de produccion
comparison = self._compare_with_production(run_id)
result["comparison"] = comparison
# 4. Promover si mejora
if comparison["improves"]:
promoted = self.registry.promote_to_staging(run_id)
result["promoted_to_staging"] = promoted
if promoted:
# Shadow testing antes de produccion
result["shadow_test"] = "initiated"
result["status"] = "pending_shadow_validation"
else:
result["status"] = "rejected_metrics_below_threshold"
else:
result["status"] = "rejected_no_improvement"
except Exception as e:
result["status"] = "failed"
result["error"] = str(e)
return result
def _prepare_dataset(self, config: dict | None) -> str:
"""Prepara dataset para retraining."""
end_date = datetime.utcnow().strftime("%Y-%m-%d")
start_date = (datetime.utcnow() - timedelta(days=90)
).strftime("%Y-%m-%d")
return f"{start_date}_{end_date}"
def _compare_with_production(self, run_id: str) -> dict:
"""Compara nuevo modelo contra produccion."""
# Obtener metricas del modelo nuevo
client = MlflowClient()
new_run = client.get_run(run_id)
new_auc = new_run.data.metrics.get("cv_auc_mean", 0)
# Obtener metricas del modelo en produccion
prod_versions = client.get_latest_versions(
"malware-detector", stages=["Production"]
)
if not prod_versions:
return {"improves": True, "reason": "no_production_model"}
prod_run = client.get_run(prod_versions[0].run_id)
prod_auc = prod_run.data.metrics.get("cv_auc_mean", 0)
return {
"new_auc": new_auc,
"production_auc": prod_auc,
"delta": new_auc - prod_auc,
"improves": new_auc > prod_auc
}
CI/CD para modelos de ML
Los modelos de ML necesitan su propio pipeline de CI/CD, separado del CI/CD de la aplicacion.
GitHub Actions para model CI
# .github/workflows/model-ci.yml
name: Malware Model CI
on:
push:
paths:
- 'ml/**'
- 'features/**'
schedule:
- cron: '0 3 * * 1' # Retraining semanal lunes 03:00 UTC
jobs:
train-and-evaluate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install dependencies
run: pip install -r ml/requirements.txt
- name: Run feature extraction tests
run: pytest ml/tests/test_features.py -v
- name: Train model
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
run: python ml/train.py --dataset-version latest
- name: Evaluate model
run: python ml/evaluate.py --min-auc 0.99 --max-fpr 0.05
- name: Model security scan
run: python ml/security_check.py --check-adversarial
- name: Promote to staging
if: success()
run: python ml/promote.py --stage staging
shadow-test:
needs: train-and-evaluate
runs-on: ubuntu-latest
steps:
- name: Run shadow predictions
run: python ml/shadow_test.py --duration 24h --threshold 0.95
- name: Compare with production
run: python ml/compare.py --report
deploy:
needs: shadow-test
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- name: Promote to production
run: python ml/promote.py --stage production
- name: Update serving endpoint
run: python ml/deploy.py --target production
- name: Verify deployment
run: python ml/verify_deployment.py
Tests especificos para modelos de malware
# ml/tests/test_model_quality.py
import pytest
import numpy as np
class TestModelQuality:
"""Tests de calidad del modelo antes de deploy."""
def test_auc_above_threshold(self, trained_model, test_data):
"""AUC debe superar 0.99."""
X_test, y_test = test_data
y_pred = trained_model.predict_proba(X_test)[:, 1]
auc = roc_auc_score(y_test, y_pred)
assert auc >= 0.99, f"AUC {auc:.4f} por debajo del umbral 0.99"
def test_fpr_at_1pct_fnr(self, trained_model, test_data):
"""FPR debe ser menor a 5% cuando FNR = 1%."""
X_test, y_test = test_data
y_pred = trained_model.predict_proba(X_test)[:, 1]
# Calcular FPR al threshold donde FNR = 1%
fpr = calculate_fpr_at_fnr(y_test, y_pred, target_fnr=0.01)
assert fpr <= 0.05, f"FPR@1%FNR = {fpr:.4f}, maximo permitido 0.05"
def test_no_feature_dominance(self, trained_model):
"""Ninguna feature debe dominar mas del 30% de importancia."""
importances = trained_model.feature_importances_
max_importance = np.max(importances)
assert max_importance <= 0.30, (
f"Feature con {max_importance:.2%} de importancia. "
f"Riesgo de manipulacion adversarial."
)
def test_prediction_latency(self, trained_model, test_data):
"""Latencia de prediccion menor a 10ms por muestra."""
import time
X_test = test_data[0]
sample = X_test.iloc[:1]
start = time.perf_counter()
for _ in range(100):
trained_model.predict_proba(sample)
elapsed = (time.perf_counter() - start) / 100
assert elapsed < 0.01, (
f"Latencia {elapsed*1000:.1f}ms por encima del umbral 10ms"
)
def test_model_size(self, trained_model):
"""Modelo no debe exceder 500MB serializado."""
import tempfile
import os
with tempfile.NamedTemporaryFile(suffix=".json") as f:
trained_model.save_model(f.name)
size_mb = os.path.getsize(f.name) / (1024 * 1024)
assert size_mb <= 500, f"Modelo {size_mb:.0f}MB excede 500MB"
def test_temporal_generalization(self, trained_model,
temporal_test_data):
"""Modelo debe generalizar a datos mas recientes."""
X_future, y_future = temporal_test_data # Datos 30 dias despues
y_pred = trained_model.predict_proba(X_future)[:, 1]
auc = roc_auc_score(y_future, y_pred)
assert auc >= 0.97, (
f"AUC en datos futuros {auc:.4f}. "
f"Posible concept drift en entrenamiento."
)
Serving: modelo en producción
Arquitectura de serving
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow
import numpy as np
import time
app = FastAPI(title="Malware Detection API")
class PredictionRequest(BaseModel):
features: dict[str, float]
sample_hash: str | None = None
class PredictionResponse(BaseModel):
prediction: str # "malicious" | "benign"
confidence: float
model_version: str
latency_ms: float
feature_contributions: dict[str, float] | None = None
class MalwareServingPipeline:
"""Pipeline de serving con A/B testing y fallback."""
def __init__(self):
self.production_model = None
self.shadow_model = None
self.model_version = "unknown"
self._load_models()
def _load_models(self):
"""Carga modelos de produccion y shadow."""
try:
self.production_model = mlflow.xgboost.load_model(
"models:/malware-detector/Production"
)
mv = MlflowClient().get_latest_versions(
"malware-detector", stages=["Production"]
)[0]
self.model_version = mv.version
except Exception:
raise RuntimeError("No hay modelo de produccion disponible")
try:
self.shadow_model = mlflow.xgboost.load_model(
"models:/malware-detector/Staging"
)
except Exception:
self.shadow_model = None # Shadow es opcional
def predict(self, features: dict) -> PredictionResponse:
"""Prediccion con logging de shadow model."""
start = time.perf_counter()
# Convertir features a formato del modelo
feature_array = np.array(
[[features.get(f, 0) for f in self.feature_names]]
)
# Prediccion principal
proba = self.production_model.predict_proba(feature_array)[0]
label = "malicious" if proba[1] > 0.5 else "benign"
latency = (time.perf_counter() - start) * 1000
# Shadow prediction (asincrono, no afecta latencia)
if self.shadow_model is not None:
self._shadow_predict(feature_array, proba)
return PredictionResponse(
prediction=label,
confidence=float(max(proba)),
model_version=self.model_version,
latency_ms=latency
)
def _shadow_predict(self, features, prod_proba):
"""Compara shadow model con produccion (async)."""
try:
shadow_proba = self.shadow_model.predict_proba(features)[0]
agreement = (
(prod_proba[1] > 0.5) == (shadow_proba[1] > 0.5)
)
# Loguear para analisis posterior
self._log_shadow_comparison(
prod_proba, shadow_proba, agreement
)
except Exception:
pass # Shadow nunca debe afectar produccion
def _log_shadow_comparison(self, prod, shadow, agreement):
"""Registra comparacion para evaluar shadow model."""
# Enviar a sistema de metricas (Prometheus, CloudWatch, etc.)
pass
Canary deployment
class CanaryDeployer:
"""Despliegue canary para modelos de malware."""
def __init__(self, registry: MalwareModelRegistry):
self.registry = registry
self.canary_traffic_pct = 0.0
self.canary_model = None
self.production_model = None
def start_canary(self, new_version: str,
initial_traffic: float = 0.05):
"""Inicia canary con 5% del trafico."""
self.canary_model = self.registry.get_model(new_version)
self.canary_traffic_pct = initial_traffic
def predict(self, features) -> tuple[str, str]:
"""Ruta la prediccion segun canary traffic."""
import random
if random.random() < self.canary_traffic_pct:
model = self.canary_model
source = "canary"
else:
model = self.production_model
source = "production"
proba = model.predict_proba(features)
label = "malicious" if proba[0][1] > 0.5 else "benign"
return label, source
def evaluate_canary(self, metrics: dict) -> str:
"""Decide si promover, escalar o revertir canary."""
canary_auc = metrics.get("canary_auc", 0)
prod_auc = metrics.get("production_auc", 0)
canary_fpr = metrics.get("canary_fpr", 1)
if canary_auc < prod_auc - 0.005:
return "rollback" # Canary peor que produccion
elif canary_fpr > 0.05:
return "rollback" # FPR demasiado alto
elif self.canary_traffic_pct < 0.5:
return "scale_up" # Incrementar trafico canary
else:
return "promote" # Promover a produccion
Observabilidad del pipeline
Metricas clave a monitorear
# Metricas de modelo
MODEL_METRICS = {
"prediction_latency_ms": "Histograma de latencia por prediccion",
"predictions_per_second": "Throughput del servicio",
"malicious_ratio": "Proporcion de detecciones maliciosas",
"confidence_distribution": "Distribucion de scores de confianza",
"model_version_active": "Version del modelo en produccion",
}
# Metricas de drift
DRIFT_METRICS = {
"psi_per_feature": "PSI por feature (actualizado diariamente)",
"ks_statistic": "KS statistic por feature",
"prediction_drift_psi": "PSI de la distribucion de predicciones",
"detection_rate_7d": "Tasa de deteccion rolling 7 dias",
}
# Metricas de pipeline
PIPELINE_METRICS = {
"training_duration_minutes": "Duracion del ultimo entrenamiento",
"dataset_size": "Tamano del dataset de entrenamiento",
"feature_store_size": "Muestras en el feature store",
"retraining_trigger_count": "Retrainings por trigger type",
"model_versions_total": "Versiones totales en el registry",
}
Alertas
ALERT_RULES = [
{
"name": "model_performance_critical",
"condition": "cv_auc_mean < 0.95",
"severity": "critical",
"action": "trigger_retraining + page_oncall"
},
{
"name": "high_drift_detected",
"condition": "psi_any_feature > 0.25",
"severity": "high",
"action": "trigger_investigation + notify_ml_team"
},
{
"name": "prediction_latency_high",
"condition": "p99_latency_ms > 50",
"severity": "medium",
"action": "notify_infra_team"
},
{
"name": "retraining_failed",
"condition": "last_retraining_status == 'failed'",
"severity": "high",
"action": "notify_ml_team + create_incident"
},
]
Checklist de MLOps para malware detection
Antes de considerar tu pipeline listo para produccion, verifica:
Data pipeline: Feature store centralizado. Datasets versionados con hash. Fuentes de datos documentadas. Pipeline de etiquetado con ground truth del SOC. Manejo de datos desbalanceados.
Training: Hiperparametros versionados en MLflow. Cross-validation estratificada. Metricas especificas de malware (FPR@1%FNR). Tests automatizados de calidad del modelo. Training reproducible (semillas fijas).
Registry: Modelo versionado en MLflow Model Registry. Transiciones de estado documentadas (Staging, Production, Archived). Comparativa automatica entre versiones. Rollback en un comando.
Monitoring: Drift monitoring activo (PSI, KS test). Alertas configuradas por severidad. Dashboard de metricas de modelo. Ground truth feedback loop con el SOC.
Serving: Latencia de prediccion por debajo de 10ms. Canary deployment para nuevas versiones. Shadow testing antes de promover. Fallback a version anterior.
CI/CD: Pipeline automatizado de training, evaluacion y deployment. Tests de calidad del modelo en CI. Security checks (adversarial robustness). Documentacion de cada release.
El pipeline de MLOps no es un proyecto de una vez: es infraestructura viva que evoluciona con el modelo. Lo que importa no es la sofisticacion de cada componente sino que todas las piezas esten conectadas y que el ciclo de retroalimentacion funcione. Un pipeline simple pero completo (datos, training, monitoring, retraining) supera a un pipeline sofisticado con piezas desconectadas.
Preguntas frecuentes
Artículos relacionados
Python para Clasificación de Malware: Primer Modelo con scikit-learn
Datasets y Benchmarks para ML en Malware: EMBER, SOREL, BODMAS
Explainability en Detección de Malware: SHAP, LIME y Modelos Interpretables
Adversarial ML: Cómo el Malware Evade Modelos de Detección
Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.