Machine Learning – Flow Anomaly Detection
Runtime Inference: model_engine.py
|
Used by: ryu_project.py
(after MUD pre-check) |
Retraining: retrain_from_logs.py
1) Overview
The ML engine classifies flows as benign or malicious in real time. It is invoked only for flows that are not outright denied by the MUD baseline. Decisions are fused with MUD verdicts and a PageRank-based trust score (see Architecture) before programming OpenFlow rules.
- Model family: Random Forest (sklearn), persisted as
rf_model.pkl
- Latency budget: < 3 ms per inference (typical dev laptop)
- Outputs:
label
∈ {benign
,malicious
} +score
∈ [0,1] (malicious probability) - Serving path:
ryu_project.py
→ feature extraction →model_engine.classify_flow(features)
2) Data & Labeling
Training data is produced by the controller logs and test harness:
- Benign: normal IoT behaviour (DNS/NTP/HTTPS to vendor endpoints, LAN chatter).
- Malicious: scripted scans, port sweeps, UDP/ICMP floods, policy-drift destinations, blocklisted IPs.
- Ground truth: generated by the traffic harness and controller decisions; reviewed to reduce label noise.
2.1 Log schema (per row)
# flows_log.csv (union of multiple files allowed)
timestamp, device_id, src_ip, dst_ip, proto, src_port, dst_port, bytes, pkts, duration,
inter_arrival_mean, inter_arrival_std, conn_attempts_window, port_rarity,
mud_verdict, ml_label, ml_score, trust_score, final_decision, ground_truth_label
3) Features
Computed in ryu_project.py
before inference:
- Identity / Header:
proto
(one-hot/ordinal),src_port
,dst_port
(binned/rare),device_id
(optional embedding/one-hot). - Size / Rate:
pkt_len
,bytes
,pkts
,duration
,bpp
=bytes/pkt,pps
=pkts/s. - Temporal:
inter_arrival_mean
,inter_arrival_std
,burstiness
=CV. - Heuristics:
port_rarity
(device/profile-aware),conn_attempts_window
(N attempts / Δt).
Preprocessing: type casts, missing-value imputation, scaling where relevant; categorical encodings kept in the model pipeline.
4) Online Inference API (model_engine.py
)
from model_engine import classify_flow
features = {
"proto": 6, "src_port": 51524, "dst_port": 443,
"pkt_len": 1180, "bytes": 9216, "pkts": 8, "duration": 1.2,
"bpp": 1152.0, "pps": 6.7,
"inter_arrival_mean": 0.18, "inter_arrival_std": 0.05,
"port_rarity": 0.02, "conn_attempts_window": 1
}
label, score = classify_flow(features) # e.g., ("benign", 0.08)
4.1 Controller call site (ryu_project.py
, conceptual)
if mud_verdict == "DENY":
decision = "DROP"
else:
label, score = classify_flow(features)
# Fuse with trust score and MUD result
decision = fuse(mud_verdict, label, score, trust_score)
program_switch(decision, flow_spec)
5) Training & Retraining (retrain_from_logs.py
)
The model can be retrained from accumulated CSV logs (single file or glob). The script handles loading, feature engineering, train/val/test split, class imbalance, cross-validation, and persistence.
5.1 Usage
# Train from multiple logs and export model + report
python retrain_from_logs.py
5.2 What it does
- Merges CSV logs; filters rows with valid
ground_truth_label
∈ {benign, malicious}. - Applies the same feature engineering as runtime (kept inside the sklearn pipeline).
- Splits data (70/15/15) stratified by label and optionally by
device_id
to reduce leakage. - Handles class imbalance via
class_weight='balanced'
(or sampling). - Tunes key RF hyperparameters (e.g.,
n_estimators
,max_depth
) via cross-validation. - Outputs:
rf_model.pkl
(joblib), evaluation JSON, and visuals (confusion matrix, ROC if enabled).
5.3 Hot-swap in production
# point the runtime to the new model (no controller restart if you reload safely)
export ML_MODEL_PATH=models/rf_model.pkl
# or set in config JSON and trigger a reload endpoint (if exposed)
6) Evaluation & Thresholds
- Metrics: Accuracy, Precision, Recall, F1, ROC-AUC; report both macro and weighted averages.
- Confusion matrix: saved to
artifacts/confusion_matrix.png
. - Operating point: default malicious threshold τ=0.7 (tune per risk appetite).
{
"accuracy": 0.964,
"precision": {"benign": 0.97, "malicious": 0.95},
"recall": {"benign": 0.96, "malicious": 0.97},
"f1": {"benign": 0.96, "malicious": 0.96},
"auc": 0.987,
"threshold": 0.70
}
7) Drift Detection & Retrain Policy
- Data drift signals: rise in OOD (out-of-distribution) features, increase in quarantine rate, shift in port distributions.
- Label drift signals: drop in precision/recall on a rolling validation set.
- Policy: if FP or FN exceed 5% over a 7-day window, trigger
retrain_from_logs.py
with the latest logs.
8) Decision Fusion (MUD ∧ ML ∧ Trust)
// conceptual
if MUD == "DENY":
decision = DROP
elif ML_score >= 0.9:
decision = DROP
elif ML_score >= 0.7 and Trust < 0.2:
decision = QUARANTINE
elif MUD == "ALLOW" and ML_label == "benign" and Trust > 0.4:
decision = ALLOW
else:
decision = RATE_LIMIT
9) Reproducibility
- Model artifacts stored under
models/
with semantic version (e.g.,rf_model_v1.2.0.pkl
). - Each version accompanied by
artifacts/metrics_report.json
, confusion matrix, and training args JSON. - Runtime config includes the active model path and threshold τ; exposed via
/api/metrics
.
10) Performance & Safety
- Inference on a background worker to avoid blocking the controller’s I/O loop if needed.
- Graceful fallback: if model fails to load → default to MUD allowlist + rate-limit + alert.
- Input validation on features; clamp outliers; enforce schema to avoid poisoning.
11) Quick Commands
# 1) Retrain
python retrain_from_logs.py
# 2) Run controller (uses new model if configured)
ryu-manager ryu_project.py --observe-links
# 3) Generate mixed traffic in Mininet topology, then inspect UI pages:
# - Demo: live events
# - Results & Evaluation: metrics, confusion matrix, FP/FN trend