import psutil
import time
import statistics
from collections import deque
class CPUBottleneckAnalyzer:
"""Advanced CPU performance analysis for identifying bottlenecks"""
def __init__(self, sampling_interval=1.0):
self.sampling_interval = sampling_interval
self.cpu_metrics = deque(maxlen=100)
self.process_metrics = {}
def collect_cpu_metrics(self, duration=60):
"""Collect CPU metrics over specified duration"""
start_time = time.time()
while time.time() - start_time < duration:
# Overall CPU metrics
cpu_percent = psutil.cpu_percent(interval=None)
cpu_count = psutil.cpu_count()
load_avg = psutil.getloadavg() if hasattr(psutil, 'getloadavg') else (0, 0, 0)
# Per-core metrics
cpu_per_core = psutil.cpu_percent(percpu=True)
# Context switches and interrupts
cpu_stats = psutil.cpu_stats()
metrics = {
'timestamp': time.time(),
'cpu_percent': cpu_percent,
'cpu_count': cpu_count,
'load_avg_1m': load_avg[0],
'load_avg_5m': load_avg[1],
'load_avg_15m': load_avg[2],
'cpu_per_core': cpu_per_core,
'context_switches': cpu_stats.ctx_switches,
'interrupts': cpu_stats.interrupts,
'soft_interrupts': cpu_stats.soft_interrupts
}
self.cpu_metrics.append(metrics)
time.sleep(self.sampling_interval)
def analyze_cpu_bottlenecks(self):
"""Analyze collected metrics for CPU bottlenecks"""
if not self.cpu_metrics:
return {"error": "No metrics collected"}
cpu_utilizations = [m['cpu_percent'] for m in self.cpu_metrics]
load_avgs = [m['load_avg_1m'] for m in self.cpu_metrics]
analysis = {
'cpu_utilization': {
'avg': statistics.mean(cpu_utilizations),
'p95': self._percentile(cpu_utilizations, 95),
'p99': self._percentile(cpu_utilizations, 99),
'max': max(cpu_utilizations)
},
'load_average': {
'avg': statistics.mean(load_avgs),
'max': max(load_avgs),
'ratio_to_cores': max(load_avgs) / self.cpu_metrics[-1]['cpu_count']
},
'bottleneck_indicators': []
}
# Identify bottlenecks
if analysis['cpu_utilization']['p95'] > 80:
analysis['bottleneck_indicators'].append({
'type': 'high_cpu_utilization',
'severity': 'critical' if analysis['cpu_utilization']['p95'] > 90 else 'warning',
'description': f"CPU utilization P95 is {analysis['cpu_utilization']['p95']:.1f}%"
})
if analysis['load_average']['ratio_to_cores'] > 1.0:
analysis['bottleneck_indicators'].append({
'type': 'cpu_overload',
'severity': 'critical',
'description': f"Load average {analysis['load_average']['max']:.2f} exceeds CPU count"
})
# Check for uneven core distribution
latest_cores = self.cpu_metrics[-1]['cpu_per_core']
core_variance = statistics.variance(latest_cores) if len(latest_cores) > 1 else 0
if core_variance > 100: # High variance indicates uneven load
analysis['bottleneck_indicators'].append({
'type': 'uneven_cpu_distribution',
'severity': 'warning',
'description': f"High variance in per-core utilization: {core_variance:.1f}"
})
return analysis
@staticmethod
def _percentile(data, percentile):
"""Calculate percentile value"""
sorted_data = sorted(data)
index = int((percentile / 100.0) * len(sorted_data))
return sorted_data[min(index, len(sorted_data) - 1)]
# Amazon EC2 CPU optimization
class EC2CPUOptimizer:
"""Optimize EC2 instances for CPU performance"""
def __init__(self):
self.instance_types = {
# CPU optimized instances
'c5.large': {'vcpus': 2, 'baseline_performance': 100, 'burst': False},
'c5.xlarge': {'vcpus': 4, 'baseline_performance': 100, 'burst': False},
'c5.2xlarge': {'vcpus': 8, 'baseline_performance': 100, 'burst': False},
'c5.4xlarge': {'vcpus': 16, 'baseline_performance': 100, 'burst': False},
# Burstable instances
't3.medium': {'vcpus': 2, 'baseline_performance': 20, 'burst': True},
't3.large': {'vcpus': 2, 'baseline_performance': 30, 'burst': True},
't3.xlarge': {'vcpus': 4, 'baseline_performance': 40, 'burst': True},
}
def recommend_instance_type(self, workload_profile):
"""Recommend optimal EC2 instance type based on workload"""
avg_cpu = workload_profile.get('avg_cpu_percent', 0)
p95_cpu = workload_profile.get('p95_cpu_percent', 0)
sustained_load = workload_profile.get('sustained_load', True)
parallel_threads = workload_profile.get('parallel_threads', 1)
recommendations = []
for instance_type, specs in self.instance_types.items():
# Check if instance can handle the load
if specs['burst'] and not sustained_load:
# Burstable instance - good for variable loads
if avg_cpu <= specs['baseline_performance'] and p95_cpu <= 100:
cost_effectiveness = self._calculate_cost_effectiveness(instance_type, workload_profile)
recommendations.append({
'instance_type': instance_type,
'match_score': self._calculate_match_score(specs, workload_profile),
'cost_effectiveness': cost_effectiveness,
'reasoning': 'Good for variable workloads with burst capability'
})
elif not specs['burst']:
# Non-burstable - good for sustained loads
required_vcpus = max(1, parallel_threads // 2) # Rough estimate
if specs['vcpus'] >= required_vcpus and p95_cpu <= 80:
cost_effectiveness = self._calculate_cost_effectiveness(instance_type, workload_profile)
recommendations.append({
'instance_type': instance_type,
'match_score': self._calculate_match_score(specs, workload_profile),
'cost_effectiveness': cost_effectiveness,
'reasoning': 'Good for sustained high-performance workloads'
})
# Sort by match score and cost effectiveness
recommendations.sort(key=lambda x: (x['match_score'], x['cost_effectiveness']), reverse=True)
return recommendations[:3] # Top 3 recommendations
def _calculate_match_score(self, instance_specs, workload_profile):
"""Calculate how well instance matches workload"""
score = 0
# CPU capacity match
required_capacity = workload_profile.get('p95_cpu_percent', 50) / 100.0
instance_capacity = 1.0 if not instance_specs['burst'] else instance_specs['baseline_performance'] / 100.0
if instance_capacity >= required_capacity:
score += 50 * (1 - abs(instance_capacity - required_capacity))
# Thread match
parallel_threads = workload_profile.get('parallel_threads', 1)
if instance_specs['vcpus'] >= parallel_threads:
score += 30
# Burst capability match
if workload_profile.get('sustained_load', True) == (not instance_specs['burst']):
score += 20
return min(100, score)
def _calculate_cost_effectiveness(self, instance_type, workload_profile):
"""Calculate cost effectiveness (simplified)"""
# In real implementation, would use actual AWS pricing
base_costs = {
'c5.large': 0.085,
'c5.xlarge': 0.17,
'c5.2xlarge': 0.34,
'c5.4xlarge': 0.68,
't3.medium': 0.0416,
't3.large': 0.0832,
't3.xlarge': 0.1664,
}
cost_per_hour = base_costs.get(instance_type, 0.1)
performance_ratio = self.instance_types[instance_type]['vcpus']
return performance_ratio / cost_per_hour # Performance per dollar