Skip to content

Security Architecture for Amazon-Scale Systems

Introduction to Security Architecture at Scale

Security at Amazon scale requires comprehensive threat modeling, zero-trust architecture, advanced encryption, and automated security monitoring. This module covers enterprise-grade security patterns essential for L6/L7 system design interviews, focusing on practical implementations used in production systems.


STRIDE Threat Modeling Methodology

STRIDE Framework Implementation

STRIDE (Spoofing, Tampering, Repudiation, Information Disclosure, Denial of Service, Elevation of Privilege) provides systematic threat analysis.

Comprehensive Threat Modeling Engine

Python
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
from dataclasses import dataclass
from enum import Enum
from typing import List, Dict, Set, Optional
import json
import hashlib
import time

class ThreatCategory(Enum):
    SPOOFING = "spoofing"
    TAMPERING = "tampering"
    REPUDIATION = "repudiation"
    INFORMATION_DISCLOSURE = "information_disclosure"
    DENIAL_OF_SERVICE = "denial_of_service"
    ELEVATION_OF_PRIVILEGE = "elevation_of_privilege"

class RiskLevel(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4

@dataclass
class Asset:
    """Represents a system asset to be protected"""
    name: str
    asset_type: str  # data, process, interactor, data_store
    description: str
    value: RiskLevel
    data_classification: str  # public, internal, confidential, restricted

@dataclass
class DataFlow:
    """Represents data flow between system components"""
    source: str
    destination: str
    data_type: str
    protocol: str
    authentication_required: bool
    encryption_in_transit: bool

@dataclass
class TrustBoundary:
    """Represents trust boundary in the system"""
    name: str
    internal_assets: Set[str]
    external_assets: Set[str]
    boundary_type: str  # network, process, physical

@dataclass
class Threat:
    """Represents an identified threat"""
    threat_id: str
    category: ThreatCategory
    target_asset: str
    description: str
    impact: RiskLevel
    likelihood: RiskLevel
    risk_score: int
    mitigation_controls: List[str]
    residual_risk: RiskLevel

class STRIDEThreatModeling:
    """Comprehensive STRIDE threat modeling implementation"""

    def __init__(self):
        self.assets = {}
        self.data_flows = []
        self.trust_boundaries = []
        self.identified_threats = []
        self.threat_templates = self._load_threat_templates()

    def _load_threat_templates(self) -> Dict[ThreatCategory, List[str]]:
        """Load threat templates for each STRIDE category"""
        return {
            ThreatCategory.SPOOFING: [
                "Attacker impersonates legitimate user",
                "Service impersonation attack",
                "DNS spoofing attack",
                "IP address spoofing",
                "Certificate spoofing"
            ],
            ThreatCategory.TAMPERING: [
                "Data modification in transit",
                "Database tampering",
                "Code injection attacks",
                "Configuration tampering",
                "Log file manipulation"
            ],
            ThreatCategory.REPUDIATION: [
                "User denies performing action",
                "Insufficient audit logging",
                "Log tampering to hide evidence",
                "Weak digital signatures",
                "Missing transaction records"
            ],
            ThreatCategory.INFORMATION_DISCLOSURE: [
                "Sensitive data exposure",
                "SQL injection revealing data",
                "Directory traversal attacks",
                "Memory dumps containing secrets",
                "Side-channel attacks"
            ],
            ThreatCategory.DENIAL_OF_SERVICE: [
                "DDoS attacks",
                "Resource exhaustion",
                "Algorithmic complexity attacks",
                "Database connection exhaustion",
                "Memory exhaustion attacks"
            ],
            ThreatCategory.ELEVATION_OF_PRIVILEGE: [
                "Buffer overflow exploitation",
                "SQL injection with admin access",
                "Privilege escalation vulnerabilities",
                "Cross-site scripting (XSS)",
                "Insecure direct object references"
            ]
        }

    def add_asset(self, asset: Asset):
        """Add an asset to the threat model"""
        self.assets[asset.name] = asset

    def add_data_flow(self, data_flow: DataFlow):
        """Add a data flow to the threat model"""
        self.data_flows.append(data_flow)

    def add_trust_boundary(self, trust_boundary: TrustBoundary):
        """Add a trust boundary to the threat model"""
        self.trust_boundaries.append(trust_boundary)

    def analyze_threats(self) -> List[Threat]:
        """Perform comprehensive threat analysis"""
        self.identified_threats.clear()

        # Analyze each asset against STRIDE categories
        for asset_name, asset in self.assets.items():
            self._analyze_asset_threats(asset)

        # Analyze data flows
        for data_flow in self.data_flows:
            self._analyze_data_flow_threats(data_flow)

        # Analyze trust boundary crossings
        for boundary in self.trust_boundaries:
            self._analyze_trust_boundary_threats(boundary)

        # Sort threats by risk score
        self.identified_threats.sort(key=lambda t: t.risk_score, reverse=True)

        return self.identified_threats

    def _analyze_asset_threats(self, asset: Asset):
        """Analyze threats specific to an asset"""
        # Spoofing threats for interactors
        if asset.asset_type == "interactor":
            self._create_threat(
                ThreatCategory.SPOOFING,
                asset.name,
                f"Attacker could impersonate {asset.name}",
                asset.value,
                RiskLevel.MEDIUM
            )

        # Tampering threats for data stores and processes
        if asset.asset_type in ["data_store", "process"]:
            self._create_threat(
                ThreatCategory.TAMPERING,
                asset.name,
                f"Unauthorized modification of {asset.name}",
                asset.value,
                RiskLevel.MEDIUM
            )

        # Information disclosure for all assets with confidential data
        if asset.data_classification in ["confidential", "restricted"]:
            self._create_threat(
                ThreatCategory.INFORMATION_DISCLOSURE,
                asset.name,
                f"Unauthorized access to sensitive data in {asset.name}",
                asset.value,
                RiskLevel.HIGH
            )

        # Denial of service for critical processes
        if asset.asset_type == "process" and asset.value == RiskLevel.CRITICAL:
            self._create_threat(
                ThreatCategory.DENIAL_OF_SERVICE,
                asset.name,
                f"Service disruption of critical process {asset.name}",
                asset.value,
                RiskLevel.HIGH
            )

    def _analyze_data_flow_threats(self, data_flow: DataFlow):
        """Analyze threats in data flows"""
        # Tampering threats for unencrypted data in transit
        if not data_flow.encryption_in_transit:
            self._create_threat(
                ThreatCategory.TAMPERING,
                f"{data_flow.source}->{data_flow.destination}",
                f"Data tampering in transit between {data_flow.source} and {data_flow.destination}",
                RiskLevel.HIGH,
                RiskLevel.MEDIUM
            )

        # Information disclosure for unencrypted sensitive data
        if not data_flow.encryption_in_transit and data_flow.data_type in ["personal", "financial"]:
            self._create_threat(
                ThreatCategory.INFORMATION_DISCLOSURE,
                f"{data_flow.source}->{data_flow.destination}",
                f"Sensitive data exposure in transit",
                RiskLevel.HIGH,
                RiskLevel.MEDIUM
            )

        # Spoofing threats for unauthenticated flows
        if not data_flow.authentication_required:
            self._create_threat(
                ThreatCategory.SPOOFING,
                f"{data_flow.source}->{data_flow.destination}",
                f"Source spoofing in data flow from {data_flow.source}",
                RiskLevel.MEDIUM,
                RiskLevel.HIGH
            )

    def _analyze_trust_boundary_threats(self, boundary: TrustBoundary):
        """Analyze threats at trust boundaries"""
        # Elevation of privilege at trust boundaries
        self._create_threat(
            ThreatCategory.ELEVATION_OF_PRIVILEGE,
            boundary.name,
            f"Privilege escalation across {boundary.name} trust boundary",
            RiskLevel.HIGH,
            RiskLevel.MEDIUM
        )

        # Information disclosure across boundaries
        self._create_threat(
            ThreatCategory.INFORMATION_DISCLOSURE,
            boundary.name,
            f"Unauthorized information flow across {boundary.name}",
            RiskLevel.MEDIUM,
            RiskLevel.MEDIUM
        )

    def _create_threat(self, category: ThreatCategory, target: str, 
                      description: str, impact: RiskLevel, likelihood: RiskLevel):
        """Create a threat with risk calculation"""
        risk_score = impact.value * likelihood.value

        # Determine residual risk based on typical controls
        residual_risk = self._calculate_residual_risk(category, impact, likelihood)

        # Get typical mitigation controls
        controls = self._get_mitigation_controls(category)

        threat = Threat(
            threat_id=hashlib.md5(f"{category.value}{target}{description}".encode()).hexdigest()[:8],
            category=category,
            target_asset=target,
            description=description,
            impact=impact,
            likelihood=likelihood,
            risk_score=risk_score,
            mitigation_controls=controls,
            residual_risk=residual_risk
        )

        self.identified_threats.append(threat)

    def _calculate_residual_risk(self, category: ThreatCategory, 
                                impact: RiskLevel, likelihood: RiskLevel) -> RiskLevel:
        """Calculate residual risk after typical controls"""
        # Simplified residual risk calculation
        initial_risk = impact.value * likelihood.value

        # Reduction factors based on typical controls
        reduction_factors = {
            ThreatCategory.SPOOFING: 0.7,  # Strong authentication reduces risk
            ThreatCategory.TAMPERING: 0.6,  # Integrity controls
            ThreatCategory.REPUDIATION: 0.5,  # Audit logging
            ThreatCategory.INFORMATION_DISCLOSURE: 0.6,  # Encryption and access controls
            ThreatCategory.DENIAL_OF_SERVICE: 0.8,  # Harder to fully mitigate
            ThreatCategory.ELEVATION_OF_PRIVILEGE: 0.7  # Access controls help
        }

        reduced_risk = initial_risk * reduction_factors.get(category, 0.8)

        if reduced_risk <= 2:
            return RiskLevel.LOW
        elif reduced_risk <= 4:
            return RiskLevel.MEDIUM
        elif reduced_risk <= 8:
            return RiskLevel.HIGH
        else:
            return RiskLevel.CRITICAL

    def _get_mitigation_controls(self, category: ThreatCategory) -> List[str]:
        """Get typical mitigation controls for threat category"""
        control_mapping = {
            ThreatCategory.SPOOFING: [
                "Multi-factor authentication",
                "Strong credential policies",
                "Certificate-based authentication",
                "Digital signatures"
            ],
            ThreatCategory.TAMPERING: [
                "Input validation",
                "Data integrity checks",
                "Code signing",
                "Database transaction controls",
                "Checksums and hashing"
            ],
            ThreatCategory.REPUDIATION: [
                "Comprehensive audit logging",
                "Digital signatures",
                "Timestamping services",
                "Log integrity protection"
            ],
            ThreatCategory.INFORMATION_DISCLOSURE: [
                "Encryption at rest and in transit",
                "Access controls and authorization",
                "Data loss prevention (DLP)",
                "Network segmentation",
                "Least privilege principle"
            ],
            ThreatCategory.DENIAL_OF_SERVICE: [
                "Rate limiting and throttling",
                "Load balancing and scaling",
                "DDoS protection services",
                "Resource monitoring and alerting",
                "Circuit breakers"
            ],
            ThreatCategory.ELEVATION_OF_PRIVILEGE: [
                "Input validation and sanitization",
                "Least privilege access controls",
                "Security code reviews",
                "Runtime protection (ASLR, DEP)",
                "Regular security patching"
            ]
        }

        return control_mapping.get(category, [])

    def generate_threat_report(self) -> Dict:
        """Generate comprehensive threat assessment report"""
        if not self.identified_threats:
            self.analyze_threats()

        # Categorize threats by risk level
        risk_distribution = {level: 0 for level in RiskLevel}
        category_distribution = {cat: 0 for cat in ThreatCategory}

        for threat in self.identified_threats:
            risk_distribution[threat.residual_risk] += 1
            category_distribution[threat.category] += 1

        # Calculate risk metrics
        total_threats = len(self.identified_threats)
        high_risk_threats = sum(1 for t in self.identified_threats 
                               if t.residual_risk in [RiskLevel.HIGH, RiskLevel.CRITICAL])

        report = {
            "threat_model_summary": {
                "total_assets": len(self.assets),
                "total_data_flows": len(self.data_flows),
                "total_trust_boundaries": len(self.trust_boundaries),
                "total_threats_identified": total_threats,
                "high_risk_threats": high_risk_threats,
                "risk_score": sum(t.risk_score for t in self.identified_threats)
            },
            "risk_distribution": {level.name: count for level, count in risk_distribution.items()},
            "category_distribution": {cat.name: count for cat, count in category_distribution.items()},
            "top_threats": [
                {
                    "threat_id": t.threat_id,
                    "category": t.category.name,
                    "description": t.description,
                    "target": t.target_asset,
                    "risk_score": t.risk_score,
                    "residual_risk": t.residual_risk.name,
                    "controls": t.mitigation_controls
                }
                for t in self.identified_threats[:10]
            ],
            "recommended_actions": self._generate_recommendations()
        }

        return report

    def _generate_recommendations(self) -> List[str]:
        """Generate security recommendations based on threat analysis"""
        recommendations = []

        # High-level recommendations based on threat patterns
        high_risk_count = sum(1 for t in self.identified_threats 
                             if t.residual_risk == RiskLevel.HIGH)
        critical_risk_count = sum(1 for t in self.identified_threats 
                                 if t.residual_risk == RiskLevel.CRITICAL)

        if critical_risk_count > 0:
            recommendations.append(f"URGENT: Address {critical_risk_count} critical risk threats immediately")

        if high_risk_count > 5:
            recommendations.append(f"Prioritize mitigation of {high_risk_count} high-risk threats")

        # Category-specific recommendations
        category_counts = {}
        for threat in self.identified_threats:
            if threat.residual_risk in [RiskLevel.HIGH, RiskLevel.CRITICAL]:
                category_counts[threat.category] = category_counts.get(threat.category, 0) + 1

        for category, count in category_counts.items():
            if count >= 3:
                recommendations.append(f"Focus on {category.name.replace('_', ' ')} controls - {count} threats identified")

        # Generic recommendations
        recommendations.extend([
            "Implement comprehensive security monitoring and alerting",
            "Conduct regular penetration testing and vulnerability assessments",
            "Establish incident response procedures",
            "Implement security training for development teams"
        ])

        return recommendations

# Amazon-specific threat modeling for AWS services
class AWSSecurityThreatModeling(STRIDEThreatModeling):
    """Specialized threat modeling for AWS environments"""

    def __init__(self):
        super().__init__()
        self.aws_service_threats = self._load_aws_service_threats()

    def _load_aws_service_threats(self) -> Dict[str, List[Dict]]:
        """Load AWS service-specific threat patterns"""
        return {
            "s3": [
                {
                    "category": ThreatCategory.INFORMATION_DISCLOSURE,
                    "description": "Misconfigured S3 bucket permissions allowing public access",
                    "impact": RiskLevel.HIGH,
                    "likelihood": RiskLevel.HIGH,
                    "controls": ["Bucket policies", "ACLs", "Block public access settings"]
                },
                {
                    "category": ThreatCategory.TAMPERING,
                    "description": "Unauthorized S3 object modification",
                    "impact": RiskLevel.MEDIUM,
                    "likelihood": RiskLevel.MEDIUM,
                    "controls": ["Object versioning", "MFA delete", "IAM policies"]
                }
            ],
            "dynamodb": [
                {
                    "category": ThreatCategory.INFORMATION_DISCLOSURE,
                    "description": "Overprivileged IAM roles accessing DynamoDB data",
                    "impact": RiskLevel.HIGH,
                    "likelihood": RiskLevel.MEDIUM,
                    "controls": ["Fine-grained IAM policies", "VPC endpoints", "Encryption"]
                }
            ],
            "lambda": [
                {
                    "category": ThreatCategory.ELEVATION_OF_PRIVILEGE,
                    "description": "Lambda function with excessive IAM permissions",
                    "impact": RiskLevel.HIGH,
                    "likelihood": RiskLevel.MEDIUM,
                    "controls": ["Least privilege IAM roles", "Resource-based policies"]
                },
                {
                    "category": ThreatCategory.INFORMATION_DISCLOSURE,
                    "description": "Sensitive data in Lambda environment variables",
                    "impact": RiskLevel.MEDIUM,
                    "likelihood": RiskLevel.HIGH,
                    "controls": ["AWS Secrets Manager", "Parameter Store", "Encryption"]
                }
            ],
            "api_gateway": [
                {
                    "category": ThreatCategory.DENIAL_OF_SERVICE,
                    "description": "API Gateway without throttling limits",
                    "impact": RiskLevel.MEDIUM,
                    "likelihood": RiskLevel.HIGH,
                    "controls": ["Throttling limits", "Usage plans", "WAF integration"]
                }
            ]
        }

    def analyze_aws_service_threats(self, service_name: str):
        """Analyze threats specific to AWS services"""
        if service_name not in self.aws_service_threats:
            return

        for threat_pattern in self.aws_service_threats[service_name]:
            self._create_threat(
                threat_pattern["category"],
                f"aws_{service_name}",
                threat_pattern["description"],
                threat_pattern["impact"],
                threat_pattern["likelihood"]
            )

Zero-Trust Architecture Implementation

Zero-Trust Network Security

Python
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
import jwt
import time
import hashlib
import hmac
from typing import Dict, List, Optional, Set
from dataclasses import dataclass
from enum import Enum

class AccessDecision(Enum):
    ALLOW = "allow"
    DENY = "deny"
    CONDITIONAL = "conditional"

class RiskLevel(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4

@dataclass
class Principal:
    """Represents a user, service, or device requesting access"""
    id: str
    type: str  # user, service, device
    attributes: Dict[str, str]
    authentication_method: str
    authentication_time: float
    source_ip: str
    device_id: Optional[str] = None

@dataclass
class Resource:
    """Represents a resource being accessed"""
    id: str
    type: str
    classification: str  # public, internal, confidential, restricted
    owner: str
    attributes: Dict[str, str]

@dataclass
class AccessRequest:
    """Represents an access request"""
    principal: Principal
    resource: Resource
    action: str
    context: Dict[str, str]
    timestamp: float

@dataclass
class PolicyRule:
    """Represents a zero-trust policy rule"""
    rule_id: str
    principal_conditions: Dict[str, str]
    resource_conditions: Dict[str, str]
    action_conditions: List[str]
    decision: AccessDecision
    additional_controls: List[str]
    priority: int

class ZeroTrustPolicyEngine:
    """Zero-trust policy evaluation engine"""

    def __init__(self):
        self.policies = []
        self.risk_engine = RiskAssessmentEngine()
        self.device_trust_store = DeviceTrustStore()
        self.behavioral_analytics = BehavioralAnalytics()

    def add_policy(self, policy: PolicyRule):
        """Add a policy rule"""
        self.policies.append(policy)
        # Sort by priority
        self.policies.sort(key=lambda p: p.priority)

    def evaluate_access(self, request: AccessRequest) -> Dict:
        """Evaluate access request against zero-trust policies"""
        # Step 1: Risk assessment
        risk_score = self.risk_engine.assess_risk(request)

        # Step 2: Device trust evaluation
        device_trust = self.device_trust_store.get_device_trust(request.principal.device_id)

        # Step 3: Behavioral analysis
        behavioral_risk = self.behavioral_analytics.analyze_behavior(request.principal, request)

        # Step 4: Policy evaluation
        policy_decision = self._evaluate_policies(request)

        # Step 5: Combine all factors for final decision
        final_decision = self._make_final_decision(
            policy_decision, risk_score, device_trust, behavioral_risk
        )

        # Step 6: Generate access token if allowed
        access_token = None
        if final_decision["decision"] == AccessDecision.ALLOW:
            access_token = self._generate_access_token(request, final_decision)

        return {
            "decision": final_decision["decision"],
            "risk_score": risk_score,
            "device_trust_score": device_trust,
            "behavioral_risk_score": behavioral_risk,
            "policy_match": policy_decision,
            "additional_controls": final_decision.get("controls", []),
            "access_token": access_token,
            "expires_at": time.time() + final_decision.get("duration", 3600)
        }

    def _evaluate_policies(self, request: AccessRequest) -> Dict:
        """Evaluate request against policy rules"""
        for policy in self.policies:
            if self._matches_policy(request, policy):
                return {
                    "policy_id": policy.rule_id,
                    "decision": policy.decision,
                    "additional_controls": policy.additional_controls
                }

        # Default deny if no policy matches
        return {
            "policy_id": "default_deny",
            "decision": AccessDecision.DENY,
            "additional_controls": []
        }

    def _matches_policy(self, request: AccessRequest, policy: PolicyRule) -> bool:
        """Check if request matches policy conditions"""
        # Check principal conditions
        for attr, value in policy.principal_conditions.items():
            if attr not in request.principal.attributes:
                return False
            if request.principal.attributes[attr] != value:
                return False

        # Check resource conditions
        for attr, value in policy.resource_conditions.items():
            if attr not in request.resource.attributes:
                return False
            if request.resource.attributes[attr] != value:
                return False

        # Check action conditions
        if policy.action_conditions and request.action not in policy.action_conditions:
            return False

        return True

    def _make_final_decision(self, policy_decision: Dict, risk_score: float,
                           device_trust: float, behavioral_risk: float) -> Dict:
        """Make final access decision combining all factors"""
        base_decision = policy_decision["decision"]

        # If policy denies, always deny
        if base_decision == AccessDecision.DENY:
            return {"decision": AccessDecision.DENY}

        # Calculate combined risk
        combined_risk = (risk_score + behavioral_risk + (1 - device_trust)) / 3

        additional_controls = policy_decision["additional_controls"].copy()

        # Add risk-based controls
        if combined_risk > 0.7:
            return {"decision": AccessDecision.DENY}
        elif combined_risk > 0.5:
            additional_controls.extend([
                "step_up_authentication",
                "session_monitoring",
                "reduced_session_duration"
            ])
            return {
                "decision": AccessDecision.CONDITIONAL,
                "controls": additional_controls,
                "duration": 900  # 15 minutes
            }
        elif combined_risk > 0.3:
            additional_controls.extend([
                "session_monitoring",
                "periodic_reauth"
            ])
            return {
                "decision": AccessDecision.ALLOW,
                "controls": additional_controls,
                "duration": 1800  # 30 minutes
            }
        else:
            return {
                "decision": AccessDecision.ALLOW,
                "controls": additional_controls,
                "duration": 3600  # 1 hour
            }

    def _generate_access_token(self, request: AccessRequest, decision: Dict) -> str:
        """Generate JWT access token"""
        payload = {
            "sub": request.principal.id,
            "resource": request.resource.id,
            "action": request.action,
            "iat": int(time.time()),
            "exp": int(time.time() + decision.get("duration", 3600)),
            "additional_controls": decision.get("controls", []),
            "device_id": request.principal.device_id,
            "source_ip": request.principal.source_ip
        }

        # In production, use proper key management
        secret_key = "zero-trust-key"  # Should be from secure key store

        return jwt.encode(payload, secret_key, algorithm='HS256')

class RiskAssessmentEngine:
    """Risk assessment for zero-trust decisions"""

    def __init__(self):
        self.risk_factors = {
            "location": self._assess_location_risk,
            "time": self._assess_time_risk,
            "authentication": self._assess_auth_risk,
            "resource_sensitivity": self._assess_resource_risk,
            "network": self._assess_network_risk
        }

    def assess_risk(self, request: AccessRequest) -> float:
        """Assess overall risk score (0.0 to 1.0)"""
        risk_scores = {}

        for factor_name, assessment_func in self.risk_factors.items():
            risk_scores[factor_name] = assessment_func(request)

        # Weighted average of risk factors
        weights = {
            "location": 0.2,
            "time": 0.1,
            "authentication": 0.3,
            "resource_sensitivity": 0.3,
            "network": 0.1
        }

        total_risk = sum(risk_scores[factor] * weights[factor] 
                        for factor in risk_scores)

        return min(1.0, max(0.0, total_risk))

    def _assess_location_risk(self, request: AccessRequest) -> float:
        """Assess risk based on access location"""
        # Simplified location risk assessment
        source_ip = request.principal.source_ip

        # Check if IP is from known safe locations
        safe_ip_ranges = [
            "10.0.0.0/8",    # Internal network
            "172.16.0.0/12", # Internal network
            "192.168.0.0/16" # Internal network
        ]

        # In production, use GeoIP and threat intelligence
        if self._ip_in_ranges(source_ip, safe_ip_ranges):
            return 0.1  # Low risk for internal IPs
        else:
            return 0.5  # Medium risk for external IPs

    def _assess_time_risk(self, request: AccessRequest) -> float:
        """Assess risk based on time of access"""
        import datetime

        access_time = datetime.datetime.fromtimestamp(request.timestamp)
        hour = access_time.hour

        # Higher risk for access outside business hours
        if 9 <= hour <= 17:  # Business hours
            return 0.1
        elif 7 <= hour <= 21:  # Extended hours
            return 0.3
        else:  # Night/early morning
            return 0.7

    def _assess_auth_risk(self, request: AccessRequest) -> float:
        """Assess risk based on authentication method"""
        auth_method = request.principal.authentication_method

        # Risk scores by authentication method
        auth_risks = {
            "mfa_hardware_token": 0.1,
            "mfa_mobile_app": 0.2,
            "mfa_sms": 0.4,
            "password_only": 0.9,
            "certificate": 0.15,
            "saml_sso": 0.3
        }

        return auth_risks.get(auth_method, 0.8)

    def _assess_resource_risk(self, request: AccessRequest) -> float:
        """Assess risk based on resource sensitivity"""
        classification = request.resource.classification

        # Risk multiplier based on data classification
        classification_risks = {
            "public": 0.1,
            "internal": 0.3,
            "confidential": 0.7,
            "restricted": 0.9
        }

        return classification_risks.get(classification, 0.5)

    def _assess_network_risk(self, request: AccessRequest) -> float:
        """Assess risk based on network characteristics"""
        # Simplified network risk assessment
        # In production, integrate with threat intelligence

        context = request.context

        # Check for VPN usage
        if context.get("vpn_used") == "true":
            return 0.2  # Lower risk with VPN

        # Check for known malicious IPs (simplified)
        if context.get("threat_intel_match") == "true":
            return 1.0  # Maximum risk

        return 0.4  # Default medium risk

    def _ip_in_ranges(self, ip: str, ranges: List[str]) -> bool:
        """Check if IP is in any of the given ranges"""
        import ipaddress

        try:
            ip_addr = ipaddress.ip_address(ip)
            for range_str in ranges:
                if ip_addr in ipaddress.ip_network(range_str):
                    return True
        except ValueError:
            pass

        return False

class DeviceTrustStore:
    """Device trust assessment and storage"""

    def __init__(self):
        self.device_trust_scores = {}  # device_id -> trust_score
        self.device_attributes = {}    # device_id -> attributes

    def get_device_trust(self, device_id: Optional[str]) -> float:
        """Get device trust score (0.0 to 1.0)"""
        if not device_id:
            return 0.3  # Unknown device has low trust

        if device_id in self.device_trust_scores:
            return self.device_trust_scores[device_id]

        # New device - assess initial trust
        return self._assess_new_device_trust(device_id)

    def _assess_new_device_trust(self, device_id: str) -> float:
        """Assess trust for new device"""
        # In production, integrate with device management systems
        # Check device attributes, compliance status, etc.

        # Simplified assessment
        if device_id.startswith("corp_"):  # Corporate device
            initial_trust = 0.7
        elif device_id.startswith("byod_"):  # BYOD device
            initial_trust = 0.4
        else:  # Unknown device
            initial_trust = 0.2

        self.device_trust_scores[device_id] = initial_trust
        return initial_trust

    def update_device_trust(self, device_id: str, trust_adjustment: float):
        """Update device trust based on behavior"""
        current_trust = self.get_device_trust(device_id)
        new_trust = max(0.0, min(1.0, current_trust + trust_adjustment))
        self.device_trust_scores[device_id] = new_trust

class BehavioralAnalytics:
    """Behavioral analytics for anomaly detection"""

    def __init__(self):
        self.user_baselines = {}  # user_id -> behavioral baseline
        self.access_history = {}  # user_id -> list of access patterns

    def analyze_behavior(self, principal: Principal, request: AccessRequest) -> float:
        """Analyze behavioral risk (0.0 to 1.0)"""
        user_id = principal.id

        # Get user's baseline behavior
        baseline = self.user_baselines.get(user_id, {})

        # Analyze various behavioral factors
        risk_factors = []

        # Time-based analysis
        risk_factors.append(self._analyze_time_pattern(user_id, request.timestamp))

        # Resource access pattern analysis
        risk_factors.append(self._analyze_resource_pattern(user_id, request.resource))

        # Source IP analysis
        risk_factors.append(self._analyze_source_ip_pattern(user_id, principal.source_ip))

        # Return average risk
        return sum(risk_factors) / len(risk_factors) if risk_factors else 0.5

    def _analyze_time_pattern(self, user_id: str, timestamp: float) -> float:
        """Analyze if access time is unusual for user"""
        import datetime

        access_hour = datetime.datetime.fromtimestamp(timestamp).hour

        # Get user's typical access hours from baseline
        baseline = self.user_baselines.get(user_id, {})
        typical_hours = baseline.get("typical_hours", set(range(9, 18)))  # Default business hours

        if access_hour in typical_hours:
            return 0.1  # Low risk
        else:
            return 0.6  # Higher risk for unusual hours

    def _analyze_resource_pattern(self, user_id: str, resource: Resource) -> float:
        """Analyze if resource access is unusual for user"""
        baseline = self.user_baselines.get(user_id, {})
        typical_resources = baseline.get("typical_resources", set())

        if resource.id in typical_resources:
            return 0.1  # Low risk for familiar resource
        elif resource.type in baseline.get("typical_resource_types", set()):
            return 0.3  # Medium risk for new resource of familiar type
        else:
            return 0.7  # High risk for completely new resource type

    def _analyze_source_ip_pattern(self, user_id: str, source_ip: str) -> float:
        """Analyze if source IP is unusual for user"""
        baseline = self.user_baselines.get(user_id, {})
        known_ips = baseline.get("known_source_ips", set())

        if source_ip in known_ips:
            return 0.1  # Low risk for known IP
        else:
            return 0.5  # Medium risk for new IP

    def update_baseline(self, principal: Principal, request: AccessRequest):
        """Update user behavioral baseline"""
        user_id = principal.id

        if user_id not in self.user_baselines:
            self.user_baselines[user_id] = {
                "typical_hours": set(),
                "typical_resources": set(),
                "typical_resource_types": set(),
                "known_source_ips": set()
            }

        baseline = self.user_baselines[user_id]

        # Update baseline with current access pattern
        import datetime
        access_hour = datetime.datetime.fromtimestamp(request.timestamp).hour
        baseline["typical_hours"].add(access_hour)
        baseline["typical_resources"].add(request.resource.id)
        baseline["typical_resource_types"].add(request.resource.type)
        baseline["known_source_ips"].add(principal.source_ip)

        # Keep baselines manageable (limit set sizes)
        for key in ["typical_resources", "known_source_ips"]:
            if len(baseline[key]) > 100:
                # Keep most recent entries
                baseline[key] = set(list(baseline[key])[-50:])

Advanced Key Management at Scale

Hierarchical Key Management System

Python
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
import os
import time
import hmac
import hashlib
import secrets
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from enum import Enum
import json
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC

class KeyType(Enum):
    ROOT = "root"
    MASTER = "master" 
    DATA_ENCRYPTION = "data_encryption"
    KEY_ENCRYPTION = "key_encryption"
    SIGNING = "signing"

class KeyStatus(Enum):
    ACTIVE = "active"
    INACTIVE = "inactive"
    COMPROMISED = "compromised"
    SCHEDULED_DELETION = "scheduled_deletion"

@dataclass
class KeyMetadata:
    """Metadata for cryptographic keys"""
    key_id: str
    key_type: KeyType
    algorithm: str
    key_length: int
    created_at: float
    expires_at: Optional[float]
    status: KeyStatus
    usage_count: int
    max_usage: Optional[int]
    authorized_principals: List[str]
    tags: Dict[str, str]

class HierarchicalKeyManager:
    """Enterprise-grade hierarchical key management system"""

    def __init__(self, hsm_integration=False):
        self.keys = {}  # key_id -> encrypted key material
        self.metadata = {}  # key_id -> KeyMetadata
        self.key_hierarchy = {}  # child_key_id -> parent_key_id
        self.access_logs = []
        self.hsm_integration = hsm_integration
        self.key_derivation_cache = {}  # For performance optimization

        # Initialize root key
        self._initialize_root_key()

    def _initialize_root_key(self):
        """Initialize the root key of trust"""
        if self.hsm_integration:
            # In production, root key would be stored in HSM
            root_key_id = "root_key_001"
        else:
            # For development/testing
            root_key_id = "root_key_dev"

        # Generate or load root key
        root_key = self._generate_key_material(256)  # 256-bit root key

        self.keys[root_key_id] = root_key  # In HSM, this would be a reference
        self.metadata[root_key_id] = KeyMetadata(
            key_id=root_key_id,
            key_type=KeyType.ROOT,
            algorithm="AES",
            key_length=256,
            created_at=time.time(),
            expires_at=None,  # Root key doesn't expire
            status=KeyStatus.ACTIVE,
            usage_count=0,
            max_usage=None,
            authorized_principals=["system"],
            tags={"purpose": "root_key", "criticality": "high"}
        )

    def create_master_key(self, key_id: str, authorized_principals: List[str],
                         expires_in_days: Optional[int] = None) -> str:
        """Create a master key derived from root key"""
        root_key_id = self._get_root_key_id()

        # Derive master key from root key
        master_key = self._derive_key(root_key_id, f"master_{key_id}", 256)

        expires_at = None
        if expires_in_days:
            expires_at = time.time() + (expires_in_days * 24 * 3600)

        self.keys[key_id] = master_key
        self.metadata[key_id] = KeyMetadata(
            key_id=key_id,
            key_type=KeyType.MASTER,
            algorithm="AES",
            key_length=256,
            created_at=time.time(),
            expires_at=expires_at,
            status=KeyStatus.ACTIVE,
            usage_count=0,
            max_usage=None,
            authorized_principals=authorized_principals,
            tags={}
        )

        self.key_hierarchy[key_id] = root_key_id

        self._log_key_operation("CREATE_MASTER_KEY", key_id, "system")

        return key_id

    def create_data_encryption_key(self, master_key_id: str, purpose: str,
                                  key_length: int = 256) -> Dict[str, Any]:
        """Create a data encryption key (DEK) encrypted by master key"""
        if master_key_id not in self.metadata:
            raise ValueError(f"Master key {master_key_id} not found")

        if self.metadata[master_key_id].key_type != KeyType.MASTER:
            raise ValueError(f"Key {master_key_id} is not a master key")

        # Generate DEK
        dek = self._generate_key_material(key_length)
        dek_id = f"dek_{secrets.token_hex(8)}"

        # Encrypt DEK with master key
        encrypted_dek = self._encrypt_with_key(master_key_id, dek)

        # Store encrypted DEK
        self.keys[dek_id] = encrypted_dek
        self.metadata[dek_id] = KeyMetadata(
            key_id=dek_id,
            key_type=KeyType.DATA_ENCRYPTION,
            algorithm="AES",
            key_length=key_length,
            created_at=time.time(),
            expires_at=time.time() + (30 * 24 * 3600),  # 30 days default
            status=KeyStatus.ACTIVE,
            usage_count=0,
            max_usage=100000,  # Limit DEK usage
            authorized_principals=self.metadata[master_key_id].authorized_principals,
            tags={"purpose": purpose, "master_key": master_key_id}
        )

        self.key_hierarchy[dek_id] = master_key_id

        self._log_key_operation("CREATE_DEK", dek_id, "system")

        return {
            "key_id": dek_id,
            "encrypted_key": encrypted_dek,
            "plaintext_key": dek,  # Return plaintext for immediate use
            "expires_at": self.metadata[dek_id].expires_at
        }

    def decrypt_data_key(self, encrypted_dek_id: str, principal: str) -> bytes:
        """Decrypt a data encryption key"""
        if encrypted_dek_id not in self.metadata:
            raise ValueError(f"DEK {encrypted_dek_id} not found")

        # Check authorization
        if not self._is_authorized(encrypted_dek_id, principal):
            raise PermissionError(f"Principal {principal} not authorized for key {encrypted_dek_id}")

        # Check key status and expiration
        metadata = self.metadata[encrypted_dek_id]
        if metadata.status != KeyStatus.ACTIVE:
            raise ValueError(f"Key {encrypted_dek_id} is not active")

        if metadata.expires_at and time.time() > metadata.expires_at:
            raise ValueError(f"Key {encrypted_dek_id} has expired")

        # Get master key for decryption
        master_key_id = self.key_hierarchy[encrypted_dek_id]

        # Decrypt DEK
        encrypted_dek = self.keys[encrypted_dek_id]
        plaintext_dek = self._decrypt_with_key(master_key_id, encrypted_dek)

        # Update usage count
        metadata.usage_count += 1

        # Check usage limits
        if metadata.max_usage and metadata.usage_count > metadata.max_usage:
            metadata.status = KeyStatus.INACTIVE
            raise ValueError(f"Key {encrypted_dek_id} has exceeded usage limit")

        self._log_key_operation("DECRYPT_DEK", encrypted_dek_id, principal)

        return plaintext_dek

    def rotate_master_key(self, master_key_id: str) -> str:
        """Rotate a master key and re-encrypt all child keys"""
        if master_key_id not in self.metadata:
            raise ValueError(f"Master key {master_key_id} not found")

        old_metadata = self.metadata[master_key_id]
        if old_metadata.key_type != KeyType.MASTER:
            raise ValueError(f"Key {master_key_id} is not a master key")

        # Create new master key
        new_master_key_id = f"{master_key_id}_rotated_{int(time.time())}"
        self.create_master_key(
            new_master_key_id,
            old_metadata.authorized_principals,
            expires_in_days=365
        )

        # Find all child keys
        child_keys = [kid for kid, parent in self.key_hierarchy.items() 
                     if parent == master_key_id]

        # Re-encrypt child keys with new master key
        for child_key_id in child_keys:
            # Decrypt with old master key
            encrypted_child = self.keys[child_key_id]
            plaintext_child = self._decrypt_with_key(master_key_id, encrypted_child)

            # Re-encrypt with new master key
            new_encrypted_child = self._encrypt_with_key(new_master_key_id, plaintext_child)

            # Update storage
            self.keys[child_key_id] = new_encrypted_child
            self.key_hierarchy[child_key_id] = new_master_key_id

        # Mark old master key as inactive
        old_metadata.status = KeyStatus.INACTIVE

        self._log_key_operation("ROTATE_MASTER_KEY", master_key_id, "system")

        return new_master_key_id

    def revoke_key(self, key_id: str, reason: str = "revoked"):
        """Revoke a key and all its children"""
        if key_id not in self.metadata:
            raise ValueError(f"Key {key_id} not found")

        # Mark key as compromised
        self.metadata[key_id].status = KeyStatus.COMPROMISED

        # Find and revoke all child keys
        child_keys = [kid for kid, parent in self.key_hierarchy.items() 
                     if parent == key_id]

        for child_key_id in child_keys:
            self.revoke_key(child_key_id, f"parent_revoked: {reason}")

        self._log_key_operation("REVOKE_KEY", key_id, "system", {"reason": reason})

    def get_key_lineage(self, key_id: str) -> List[str]:
        """Get the complete lineage of a key"""
        lineage = []
        current_key = key_id

        while current_key in self.key_hierarchy:
            parent_key = self.key_hierarchy[current_key]
            lineage.append(parent_key)
            current_key = parent_key

        return lineage

    def audit_key_usage(self, key_id: Optional[str] = None,
                       start_time: Optional[float] = None,
                       end_time: Optional[float] = None) -> List[Dict]:
        """Audit key usage with optional filters"""
        filtered_logs = []

        for log_entry in self.access_logs:
            # Filter by key_id
            if key_id and log_entry.get("key_id") != key_id:
                continue

            # Filter by time range
            if start_time and log_entry.get("timestamp", 0) < start_time:
                continue

            if end_time and log_entry.get("timestamp", 0) > end_time:
                continue

            filtered_logs.append(log_entry)

        return filtered_logs

    def _generate_key_material(self, length_bits: int) -> bytes:
        """Generate cryptographically secure key material"""
        return secrets.token_bytes(length_bits // 8)

    def _derive_key(self, parent_key_id: str, context: str, length_bits: int) -> bytes:
        """Derive a key from parent key using HKDF"""
        # Check cache first
        cache_key = f"{parent_key_id}:{context}:{length_bits}"
        if cache_key in self.key_derivation_cache:
            return self.key_derivation_cache[cache_key]

        parent_key = self.keys[parent_key_id]

        # Use HMAC-based key derivation
        derived_key = hmac.new(
            parent_key,
            context.encode('utf-8'),
            hashlib.sha256
        ).digest()[:length_bits // 8]

        # Cache derived key
        self.key_derivation_cache[cache_key] = derived_key

        return derived_key

    def _encrypt_with_key(self, key_id: str, plaintext: bytes) -> bytes:
        """Encrypt data with specified key"""
        key_material = self.keys[key_id]

        # Generate random IV
        iv = secrets.token_bytes(16)

        # Create cipher
        cipher = Cipher(algorithms.AES(key_material), modes.CBC(iv))
        encryptor = cipher.encryptor()

        # Pad plaintext to block size
        padding_length = 16 - (len(plaintext) % 16)
        padded_plaintext = plaintext + bytes([padding_length] * padding_length)

        # Encrypt
        ciphertext = encryptor.update(padded_plaintext) + encryptor.finalize()

        # Return IV + ciphertext
        return iv + ciphertext

    def _decrypt_with_key(self, key_id: str, ciphertext: bytes) -> bytes:
        """Decrypt data with specified key"""
        key_material = self.keys[key_id]

        # Extract IV and ciphertext
        iv = ciphertext[:16]
        encrypted_data = ciphertext[16:]

        # Create cipher
        cipher = Cipher(algorithms.AES(key_material), modes.CBC(iv))
        decryptor = cipher.decryptor()

        # Decrypt
        padded_plaintext = decryptor.update(encrypted_data) + decryptor.finalize()

        # Remove padding
        padding_length = padded_plaintext[-1]
        plaintext = padded_plaintext[:-padding_length]

        return plaintext

    def _is_authorized(self, key_id: str, principal: str) -> bool:
        """Check if principal is authorized to use key"""
        metadata = self.metadata[key_id]
        return (principal in metadata.authorized_principals or 
                "system" in metadata.authorized_principals)

    def _get_root_key_id(self) -> str:
        """Get the root key ID"""
        for key_id, metadata in self.metadata.items():
            if metadata.key_type == KeyType.ROOT:
                return key_id
        raise ValueError("No root key found")

    def _log_key_operation(self, operation: str, key_id: str, principal: str,
                          additional_data: Optional[Dict] = None):
        """Log key operations for audit"""
        log_entry = {
            "timestamp": time.time(),
            "operation": operation,
            "key_id": key_id,
            "principal": principal,
            "success": True
        }

        if additional_data:
            log_entry.update(additional_data)

        self.access_logs.append(log_entry)

        # In production, send to centralized logging system
        # self.send_to_audit_log(log_entry)

# AWS KMS Integration Layer
class AWSKMSIntegration:
    """Integration with AWS Key Management Service"""

    def __init__(self, region='us-east-1'):
        try:
            import boto3
            self.kms_client = boto3.client('kms', region_name=region)
        except ImportError:
            raise ImportError("boto3 required for AWS KMS integration")

    def create_customer_master_key(self, description: str, key_usage: str = 'ENCRYPT_DECRYPT') -> str:
        """Create a Customer Master Key in AWS KMS"""
        response = self.kms_client.create_key(
            Description=description,
            KeyUsage=key_usage,
            KeySpec='SYMMETRIC_DEFAULT'
        )

        return response['KeyMetadata']['KeyId']

    def generate_data_key(self, cmk_id: str, key_spec: str = 'AES_256') -> Dict[str, Any]:
        """Generate a data encryption key"""
        response = self.kms_client.generate_data_key(
            KeyId=cmk_id,
            KeySpec=key_spec
        )

        return {
            'key_id': response['KeyId'],
            'plaintext_key': response['Plaintext'],
            'encrypted_key': response['CiphertextBlob']
        }

    def decrypt_data_key(self, encrypted_key: bytes) -> bytes:
        """Decrypt a data encryption key"""
        response = self.kms_client.decrypt(CiphertextBlob=encrypted_key)
        return response['Plaintext']

    def encrypt_data(self, cmk_id: str, plaintext: bytes) -> bytes:
        """Encrypt data directly with CMK (for small data)"""
        response = self.kms_client.encrypt(
            KeyId=cmk_id,
            Plaintext=plaintext
        )

        return response['CiphertextBlob']

    def rotate_key(self, cmk_id: str):
        """Enable automatic key rotation"""
        self.kms_client.enable_key_rotation(KeyId=cmk_id)

This comprehensive security architecture module provides production-ready implementations of STRIDE threat modeling, zero-trust architecture, and hierarchical key management systems. The code includes AWS integrations and follows security best practices used in Amazon-scale systems.