Monitoring
This guide covers monitoring ML pipelines and infrastructure.
Overview
Monitoring covers three areas:
- Pipeline Health: Are pipelines running successfully?
- Model Performance: How are models performing?
- Infrastructure: Is the infrastructure healthy?
MLflow Monitoring
Experiment Dashboard
Access MLflow UI to monitor experiments:
# Local
http://localhost:5000
# AWS
http://<instance-ip>:5000
Key Metrics to Track
| Metric | Description | Alert Threshold |
|---|---|---|
| Accuracy | Model accuracy | < 0.8 |
| Loss | Training loss | > 1.0 |
| Training Time | Time per epoch | > 300s |
| Memory Usage | GPU memory | > 90% |
Comparing Runs
- Open MLflow UI
- Select experiment
- Check runs to compare
- Click “Compare”
CloudWatch Monitoring (AWS)
Log Groups
Logs are sent to CloudWatch:
/mlops/pipeline # Pipeline logs
/mlops/mlflow-server # MLflow server logs
View Logs
# Follow logs
aws logs tail /mlops/pipeline --follow
# Search logs
aws logs filter-log-events \
--log-group-name /mlops/pipeline \
--filter-pattern "ERROR"
CloudWatch Alarms
Create alarms for critical metrics:
aws cloudwatch put-metric-alarm \
--alarm-name "Pipeline-Failure" \
--metric-name "Errors" \
--namespace "MLOps" \
--statistic Sum \
--period 300 \
--threshold 1 \
--comparison-operator GreaterThanOrEqualToThreshold \
--evaluation-periods 1 \
--alarm-actions arn:aws:sns:us-east-1:064592191516:alerts
Dashboard
Create a CloudWatch dashboard:
{
"widgets": [
{
"type": "metric",
"properties": {
"metrics": [
["MLOps", "PipelineRuns", "Status", "Success"],
["MLOps", "PipelineRuns", "Status", "Failed"]
],
"title": "Pipeline Runs"
}
},
{
"type": "metric",
"properties": {
"metrics": [
["AWS/EC2", "CPUUtilization", "InstanceId", "i-xxx"]
],
"title": "EC2 CPU"
}
}
]
}
Infrastructure Monitoring
EC2 Instance Health
# Check instance status
aws ec2 describe-instance-status --instance-ids i-xxx
# Check system logs
aws ec2 get-console-output --instance-id i-xxx
S3 Bucket Metrics
Monitor S3 storage:
aws cloudwatch get-metric-statistics \
--namespace AWS/S3 \
--metric-name BucketSizeBytes \
--dimensions Name=BucketName,Value=064592191516-mlflow \
--start-time $(date -d '7 days ago' --iso-8601) \
--end-time $(date --iso-8601) \
--period 86400 \
--statistics Average
Custom Metrics
Logging Custom Metrics
import mlflow
with mlflow.start_run():
# Log standard metrics
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_score", f1)
# Log custom metrics
mlflow.log_metric("inference_time_ms", inference_time * 1000)
mlflow.log_metric("model_size_mb", model_size / 1024 / 1024)
Sending to CloudWatch
import boto3
cloudwatch = boto3.client('cloudwatch')
cloudwatch.put_metric_data(
Namespace='MLOps',
MetricData=[
{
'MetricName': 'ModelAccuracy',
'Value': 0.95,
'Unit': 'None',
'Dimensions': [
{'Name': 'Model', 'Value': 'fraud_detection'},
{'Name': 'Environment', 'Value': 'production'}
]
}
]
)
Alerting
Email Alerts
Set up SNS topic for alerts:
# Create topic
aws sns create-topic --name mlops-alerts
# Subscribe email
aws sns subscribe \
--topic-arn arn:aws:sns:us-east-1:064592191516:mlops-alerts \
--protocol email \
--notification-endpoint admin@example.com
Slack Integration
Use AWS Lambda to send to Slack:
import json
import urllib.request
def lambda_handler(event, context):
message = event['Records'][0]['Sns']['Message']
slack_message = {
'text': f':warning: MLOps Alert: {message}'
}
req = urllib.request.Request(
SLACK_WEBHOOK_URL,
data=json.dumps(slack_message).encode(),
headers={'Content-Type': 'application/json'}
)
urllib.request.urlopen(req)
Health Checks
Pipeline Health Check Script
#!/bin/bash
# health_check.sh
# Check MLflow server
if curl -s http://localhost:5000/health > /dev/null; then
echo "✓ MLflow server healthy"
else
echo "✗ MLflow server down"
exit 1
fi
# Check disk space
DISK_USAGE=$(df -h / | awk 'NR==2 {print $5}' | tr -d '%')
if [ $DISK_USAGE -lt 90 ]; then
echo "✓ Disk usage: ${DISK_USAGE}%"
else
echo "✗ Disk usage critical: ${DISK_USAGE}%"
exit 1
fi
# Check memory
MEM_USAGE=$(free | awk 'NR==2 {printf "%.0f", $3/$2*100}')
if [ $MEM_USAGE -lt 90 ]; then
echo "✓ Memory usage: ${MEM_USAGE}%"
else
echo "✗ Memory usage critical: ${MEM_USAGE}%"
exit 1
fi
echo "All checks passed"
Cron Job
Schedule regular health checks:
# Add to crontab
*/5 * * * * /opt/mlops/health_check.sh >> /var/log/health_check.log 2>&1
Model Drift Detection
Monitor model performance over time:
def detect_drift(current_metrics, baseline_metrics, threshold=0.1):
"""Detect if model performance has drifted."""
for metric, current_value in current_metrics.items():
baseline_value = baseline_metrics.get(metric)
if baseline_value:
drift = abs(current_value - baseline_value) / baseline_value
if drift > threshold:
alert(f"Drift detected in {metric}: {drift:.2%}")
return True
return False