Monitoring

This guide covers monitoring ML pipelines and infrastructure.

Overview

Monitoring covers three areas:

  1. Pipeline Health: Are pipelines running successfully?
  2. Model Performance: How are models performing?
  3. Infrastructure: Is the infrastructure healthy?

MLflow Monitoring

Experiment Dashboard

Access MLflow UI to monitor experiments:

# Local
http://localhost:5000

# AWS
http://<instance-ip>:5000

Key Metrics to Track

MetricDescriptionAlert Threshold
AccuracyModel accuracy< 0.8
LossTraining loss> 1.0
Training TimeTime per epoch> 300s
Memory UsageGPU memory> 90%

Comparing Runs

  1. Open MLflow UI
  2. Select experiment
  3. Check runs to compare
  4. Click “Compare”

CloudWatch Monitoring (AWS)

Log Groups

Logs are sent to CloudWatch:

/mlops/pipeline              # Pipeline logs
/mlops/mlflow-server         # MLflow server logs

View Logs

# Follow logs
aws logs tail /mlops/pipeline --follow

# Search logs
aws logs filter-log-events \
    --log-group-name /mlops/pipeline \
    --filter-pattern "ERROR"

CloudWatch Alarms

Create alarms for critical metrics:

aws cloudwatch put-metric-alarm \
    --alarm-name "Pipeline-Failure" \
    --metric-name "Errors" \
    --namespace "MLOps" \
    --statistic Sum \
    --period 300 \
    --threshold 1 \
    --comparison-operator GreaterThanOrEqualToThreshold \
    --evaluation-periods 1 \
    --alarm-actions arn:aws:sns:us-east-1:064592191516:alerts

Dashboard

Create a CloudWatch dashboard:

{
    "widgets": [
        {
            "type": "metric",
            "properties": {
                "metrics": [
                    ["MLOps", "PipelineRuns", "Status", "Success"],
                    ["MLOps", "PipelineRuns", "Status", "Failed"]
                ],
                "title": "Pipeline Runs"
            }
        },
        {
            "type": "metric",
            "properties": {
                "metrics": [
                    ["AWS/EC2", "CPUUtilization", "InstanceId", "i-xxx"]
                ],
                "title": "EC2 CPU"
            }
        }
    ]
}

Infrastructure Monitoring

EC2 Instance Health

# Check instance status
aws ec2 describe-instance-status --instance-ids i-xxx

# Check system logs
aws ec2 get-console-output --instance-id i-xxx

S3 Bucket Metrics

Monitor S3 storage:

aws cloudwatch get-metric-statistics \
    --namespace AWS/S3 \
    --metric-name BucketSizeBytes \
    --dimensions Name=BucketName,Value=064592191516-mlflow \
    --start-time $(date -d '7 days ago' --iso-8601) \
    --end-time $(date --iso-8601) \
    --period 86400 \
    --statistics Average

Custom Metrics

Logging Custom Metrics

import mlflow

with mlflow.start_run():
    # Log standard metrics
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("f1_score", f1)
    
    # Log custom metrics
    mlflow.log_metric("inference_time_ms", inference_time * 1000)
    mlflow.log_metric("model_size_mb", model_size / 1024 / 1024)

Sending to CloudWatch

import boto3

cloudwatch = boto3.client('cloudwatch')

cloudwatch.put_metric_data(
    Namespace='MLOps',
    MetricData=[
        {
            'MetricName': 'ModelAccuracy',
            'Value': 0.95,
            'Unit': 'None',
            'Dimensions': [
                {'Name': 'Model', 'Value': 'fraud_detection'},
                {'Name': 'Environment', 'Value': 'production'}
            ]
        }
    ]
)

Alerting

Email Alerts

Set up SNS topic for alerts:

# Create topic
aws sns create-topic --name mlops-alerts

# Subscribe email
aws sns subscribe \
    --topic-arn arn:aws:sns:us-east-1:064592191516:mlops-alerts \
    --protocol email \
    --notification-endpoint admin@example.com

Slack Integration

Use AWS Lambda to send to Slack:

import json
import urllib.request

def lambda_handler(event, context):
    message = event['Records'][0]['Sns']['Message']
    
    slack_message = {
        'text': f':warning: MLOps Alert: {message}'
    }
    
    req = urllib.request.Request(
        SLACK_WEBHOOK_URL,
        data=json.dumps(slack_message).encode(),
        headers={'Content-Type': 'application/json'}
    )
    urllib.request.urlopen(req)

Health Checks

Pipeline Health Check Script

#!/bin/bash
# health_check.sh

# Check MLflow server
if curl -s http://localhost:5000/health > /dev/null; then
    echo "✓ MLflow server healthy"
else
    echo "✗ MLflow server down"
    exit 1
fi

# Check disk space
DISK_USAGE=$(df -h / | awk 'NR==2 {print $5}' | tr -d '%')
if [ $DISK_USAGE -lt 90 ]; then
    echo "✓ Disk usage: ${DISK_USAGE}%"
else
    echo "✗ Disk usage critical: ${DISK_USAGE}%"
    exit 1
fi

# Check memory
MEM_USAGE=$(free | awk 'NR==2 {printf "%.0f", $3/$2*100}')
if [ $MEM_USAGE -lt 90 ]; then
    echo "✓ Memory usage: ${MEM_USAGE}%"
else
    echo "✗ Memory usage critical: ${MEM_USAGE}%"
    exit 1
fi

echo "All checks passed"

Cron Job

Schedule regular health checks:

# Add to crontab
*/5 * * * * /opt/mlops/health_check.sh >> /var/log/health_check.log 2>&1

Model Drift Detection

Monitor model performance over time:

def detect_drift(current_metrics, baseline_metrics, threshold=0.1):
    """Detect if model performance has drifted."""
    for metric, current_value in current_metrics.items():
        baseline_value = baseline_metrics.get(metric)
        if baseline_value:
            drift = abs(current_value - baseline_value) / baseline_value
            if drift > threshold:
                alert(f"Drift detected in {metric}: {drift:.2%}")
                return True
    return False