Implement GDPR-compliant data handling with consent management, data subject rights, and privacy by design. Use when building systems that process EU personal data, implementing privacy controls, or conducting GDPR compliance reviews.
Inherits all available tools
Additional assets for this skill
This skill inherits all available tools. When active, it can use any tool Claude has access to.
Practical implementation guide for GDPR-compliant data processing, consent management, and privacy controls.
| Category | Examples | Protection Level |
|---|---|---|
| Basic | Name, email, phone | Standard |
| Sensitive (Art. 9) | Health, religion, ethnicity | Explicit consent |
| Criminal (Art. 10) | Convictions, offenses | Official authority |
| Children's | Under 16 data | Parental consent |
Article 6 - Lawful Bases:
├── Consent: Freely given, specific, informed
├── Contract: Necessary for contract performance
├── Legal Obligation: Required by law
├── Vital Interests: Protecting someone's life
├── Public Interest: Official functions
└── Legitimate Interest: Balanced against rights
Right to Access (Art. 15) ─┐
Right to Rectification (Art. 16) │
Right to Erasure (Art. 17) │ Must respond
Right to Restrict (Art. 18) │ within 1 month
Right to Portability (Art. 20) │
Right to Object (Art. 21) ─┘
// Consent data model
const consentSchema = {
userId: String,
consents: [{
purpose: String, // 'marketing', 'analytics', etc.
granted: Boolean,
timestamp: Date,
source: String, // 'web_form', 'api', etc.
version: String, // Privacy policy version
ipAddress: String, // For proof
userAgent: String // For proof
}],
auditLog: [{
action: String, // 'granted', 'withdrawn', 'updated'
purpose: String,
timestamp: Date,
source: String
}]
};
// Consent service
class ConsentManager {
async recordConsent(userId, purpose, granted, metadata) {
const consent = {
purpose,
granted,
timestamp: new Date(),
source: metadata.source,
version: await this.getCurrentPolicyVersion(),
ipAddress: metadata.ipAddress,
userAgent: metadata.userAgent
};
// Store consent
await this.db.consents.updateOne(
{ userId },
{
$push: {
consents: consent,
auditLog: {
action: granted ? 'granted' : 'withdrawn',
purpose,
timestamp: consent.timestamp,
source: metadata.source
}
}
},
{ upsert: true }
);
// Emit event for downstream systems
await this.eventBus.emit('consent.changed', {
userId,
purpose,
granted,
timestamp: consent.timestamp
});
}
async hasConsent(userId, purpose) {
const record = await this.db.consents.findOne({ userId });
if (!record) return false;
const latestConsent = record.consents
.filter(c => c.purpose === purpose)
.sort((a, b) => b.timestamp - a.timestamp)[0];
return latestConsent?.granted === true;
}
async getConsentHistory(userId) {
const record = await this.db.consents.findOne({ userId });
return record?.auditLog || [];
}
}
<!-- GDPR-compliant consent UI -->
<div class="consent-banner" role="dialog" aria-labelledby="consent-title">
<h2 id="consent-title">Cookie Preferences</h2>
<p>We use cookies to improve your experience. Select your preferences below.</p>
<form id="consent-form">
<!-- Necessary - always on, no consent needed -->
<div class="consent-category">
<input type="checkbox" id="necessary" checked disabled>
<label for="necessary">
<strong>Necessary</strong>
<span>Required for the website to function. Cannot be disabled.</span>
</label>
</div>
<!-- Analytics - requires consent -->
<div class="consent-category">
<input type="checkbox" id="analytics" name="analytics">
<label for="analytics">
<strong>Analytics</strong>
<span>Help us understand how you use our site.</span>
</label>
</div>
<!-- Marketing - requires consent -->
<div class="consent-category">
<input type="checkbox" id="marketing" name="marketing">
<label for="marketing">
<strong>Marketing</strong>
<span>Personalized ads based on your interests.</span>
</label>
</div>
<div class="consent-actions">
<button type="button" id="accept-all">Accept All</button>
<button type="button" id="reject-all">Reject All</button>
<button type="submit">Save Preferences</button>
</div>
<p class="consent-links">
<a href="/privacy-policy">Privacy Policy</a> |
<a href="/cookie-policy">Cookie Policy</a>
</p>
</form>
</div>
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import json
class DSARHandler:
"""Handle Data Subject Access Requests."""
RESPONSE_DEADLINE_DAYS = 30
EXTENSION_ALLOWED_DAYS = 60 # For complex requests
def __init__(self, data_sources: List['DataSource']):
self.data_sources = data_sources
async def submit_request(
self,
request_type: str, # 'access', 'erasure', 'rectification', 'portability'
user_id: str,
verified: bool,
details: Optional[Dict] = None
) -> str:
"""Submit a new DSAR."""
request = {
'id': self.generate_request_id(),
'type': request_type,
'user_id': user_id,
'status': 'pending_verification' if not verified else 'processing',
'submitted_at': datetime.utcnow(),
'deadline': datetime.utcnow() + timedelta(days=self.RESPONSE_DEADLINE_DAYS),
'details': details or {},
'audit_log': [{
'action': 'submitted',
'timestamp': datetime.utcnow(),
'details': 'Request received'
}]
}
await self.db.dsar_requests.insert_one(request)
await self.notify_dpo(request)
return request['id']
async def process_access_request(self, request_id: str) -> Dict:
"""Process a data access request."""
request = await self.get_request(request_id)
if request['type'] != 'access':
raise ValueError("Not an access request")
# Collect data from all sources
user_data = {}
for source in self.data_sources:
try:
data = await source.get_user_data(request['user_id'])
user_data[source.name] = data
except Exception as e:
user_data[source.name] = {'error': str(e)}
# Format response
response = {
'request_id': request_id,
'generated_at': datetime.utcnow().isoformat(),
'data_categories': list(user_data.keys()),
'data': user_data,
'retention_info': await self.get_retention_info(),
'processing_purposes': await self.get_processing_purposes(),
'third_party_recipients': await self.get_recipients()
}
# Update request status
await self.update_request(request_id, 'completed', response)
return response
async def process_erasure_request(self, request_id: str) -> Dict:
"""Process a right to erasure request."""
request = await self.get_request(request_id)
if request['type'] != 'erasure':
raise ValueError("Not an erasure request")
results = {}
exceptions = []
for source in self.data_sources:
try:
# Check for legal exceptions
can_delete, reason = await source.can_delete(request['user_id'])
if can_delete:
await source.delete_user_data(request['user_id'])
results[source.name] = 'deleted'
else:
exceptions.append({
'source': source.name,
'reason': reason # e.g., 'legal retention requirement'
})
results[source.name] = f'retained: {reason}'
except Exception as e:
results[source.name] = f'error: {str(e)}'
response = {
'request_id': request_id,
'completed_at': datetime.utcnow().isoformat(),
'results': results,
'exceptions': exceptions
}
await self.update_request(request_id, 'completed', response)
return response
async def process_portability_request(self, request_id: str) -> bytes:
"""Generate portable data export."""
request = await self.get_request(request_id)
user_data = await self.process_access_request(request_id)
# Convert to machine-readable format (JSON)
portable_data = {
'export_date': datetime.utcnow().isoformat(),
'format_version': '1.0',
'data': user_data['data']
}
return json.dumps(portable_data, indent=2, default=str).encode()
from datetime import datetime, timedelta
from enum import Enum
class RetentionBasis(Enum):
CONSENT = "consent"
CONTRACT = "contract"
LEGAL_OBLIGATION = "legal_obligation"
LEGITIMATE_INTEREST = "legitimate_interest"
class DataRetentionPolicy:
"""Define and enforce data retention policies."""
POLICIES = {
'user_account': {
'retention_period_days': 365 * 3, # 3 years after last activity
'basis': RetentionBasis.CONTRACT,
'trigger': 'last_activity_date',
'archive_before_delete': True
},
'transaction_records': {
'retention_period_days': 365 * 7, # 7 years for tax
'basis': RetentionBasis.LEGAL_OBLIGATION,
'trigger': 'transaction_date',
'archive_before_delete': True,
'legal_reference': 'Tax regulations require 7 year retention'
},
'marketing_consent': {
'retention_period_days': 365 * 2, # 2 years
'basis': RetentionBasis.CONSENT,
'trigger': 'consent_date',
'archive_before_delete': False
},
'support_tickets': {
'retention_period_days': 365 * 2,
'basis': RetentionBasis.LEGITIMATE_INTEREST,
'trigger': 'ticket_closed_date',
'archive_before_delete': True
},
'analytics_data': {
'retention_period_days': 365, # 1 year
'basis': RetentionBasis.CONSENT,
'trigger': 'collection_date',
'archive_before_delete': False,
'anonymize_instead': True
}
}
async def apply_retention_policies(self):
"""Run retention policy enforcement."""
for data_type, policy in self.POLICIES.items():
cutoff_date = datetime.utcnow() - timedelta(
days=policy['retention_period_days']
)
if policy.get('anonymize_instead'):
await self.anonymize_old_data(data_type, cutoff_date)
else:
if policy.get('archive_before_delete'):
await self.archive_data(data_type, cutoff_date)
await self.delete_old_data(data_type, cutoff_date)
await self.log_retention_action(data_type, cutoff_date)
async def anonymize_old_data(self, data_type: str, before_date: datetime):
"""Anonymize data instead of deleting."""
# Example: Replace identifying fields with hashes
if data_type == 'analytics_data':
await self.db.analytics.update_many(
{'collection_date': {'$lt': before_date}},
{'$set': {
'user_id': None,
'ip_address': None,
'device_id': None,
'anonymized': True,
'anonymized_date': datetime.utcnow()
}}
)
class PrivacyFirstDataModel:
"""Example of privacy-by-design data model."""
# Separate PII from behavioral data
user_profile_schema = {
'user_id': str, # UUID, not sequential
'email_hash': str, # Hashed for lookups
'created_at': datetime,
# Minimal data collection
'preferences': {
'language': str,
'timezone': str
}
}
# Encrypted at rest
user_pii_schema = {
'user_id': str,
'email': str, # Encrypted
'name': str, # Encrypted
'phone': str, # Encrypted (optional)
'address': dict, # Encrypted (optional)
'encryption_key_id': str
}
# Pseudonymized behavioral data
analytics_schema = {
'session_id': str, # Not linked to user_id
'pseudonym_id': str, # Rotating pseudonym
'events': list,
'device_category': str, # Generalized, not specific
'country': str, # Not city-level
}
class DataMinimization:
"""Implement data minimization principles."""
@staticmethod
def collect_only_needed(form_data: dict, purpose: str) -> dict:
"""Filter form data to only fields needed for purpose."""
REQUIRED_FIELDS = {
'account_creation': ['email', 'password'],
'newsletter': ['email'],
'purchase': ['email', 'name', 'address', 'payment'],
'support': ['email', 'message']
}
allowed = REQUIRED_FIELDS.get(purpose, [])
return {k: v for k, v in form_data.items() if k in allowed}
@staticmethod
def generalize_location(ip_address: str) -> str:
"""Generalize IP to country level only."""
import geoip2.database
reader = geoip2.database.Reader('GeoLite2-Country.mmdb')
try:
response = reader.country(ip_address)
return response.country.iso_code
except:
return 'UNKNOWN'
from datetime import datetime
from enum import Enum
class BreachSeverity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class BreachNotificationHandler:
"""Handle GDPR breach notification requirements."""
AUTHORITY_NOTIFICATION_HOURS = 72
AFFECTED_NOTIFICATION_REQUIRED_SEVERITY = BreachSeverity.HIGH
async def report_breach(
self,
description: str,
data_types: List[str],
affected_count: int,
severity: BreachSeverity
) -> dict:
"""Report and handle a data breach."""
breach = {
'id': self.generate_breach_id(),
'reported_at': datetime.utcnow(),
'description': description,
'data_types_affected': data_types,
'affected_individuals_count': affected_count,
'severity': severity.value,
'status': 'investigating',
'timeline': [{
'event': 'breach_reported',
'timestamp': datetime.utcnow(),
'details': description
}]
}
await self.db.breaches.insert_one(breach)
# Immediate notifications
await self.notify_dpo(breach)
await self.notify_security_team(breach)
# Authority notification required within 72 hours
if self.requires_authority_notification(severity, data_types):
breach['authority_notification_deadline'] = (
datetime.utcnow() + timedelta(hours=self.AUTHORITY_NOTIFICATION_HOURS)
)
await self.schedule_authority_notification(breach)
# Affected individuals notification
if severity.value in [BreachSeverity.HIGH.value, BreachSeverity.CRITICAL.value]:
await self.schedule_individual_notifications(breach)
return breach
def requires_authority_notification(
self,
severity: BreachSeverity,
data_types: List[str]
) -> bool:
"""Determine if supervisory authority must be notified."""
# Always notify for sensitive data
sensitive_types = ['health', 'financial', 'credentials', 'biometric']
if any(t in sensitive_types for t in data_types):
return True
# Notify for medium+ severity
return severity in [BreachSeverity.MEDIUM, BreachSeverity.HIGH, BreachSeverity.CRITICAL]
async def generate_authority_report(self, breach_id: str) -> dict:
"""Generate report for supervisory authority."""
breach = await self.get_breach(breach_id)
return {
'organization': {
'name': self.config.org_name,
'contact': self.config.dpo_contact,
'registration': self.config.registration_number
},
'breach': {
'nature': breach['description'],
'categories_affected': breach['data_types_affected'],
'approximate_number_affected': breach['affected_individuals_count'],
'likely_consequences': self.assess_consequences(breach),
'measures_taken': await self.get_remediation_measures(breach_id),
'measures_proposed': await self.get_proposed_measures(breach_id)
},
'timeline': breach['timeline'],
'submitted_at': datetime.utcnow().isoformat()
}
## GDPR Implementation Checklist
### Legal Basis
- [ ] Documented legal basis for each processing activity
- [ ] Consent mechanisms meet GDPR requirements
- [ ] Legitimate interest assessments completed
### Transparency
- [ ] Privacy policy is clear and accessible
- [ ] Processing purposes clearly stated
- [ ] Data retention periods documented
### Data Subject Rights
- [ ] Access request process implemented
- [ ] Erasure request process implemented
- [ ] Portability export available
- [ ] Rectification process available
- [ ] Response within 30-day deadline
### Security
- [ ] Encryption at rest implemented
- [ ] Encryption in transit (TLS)
- [ ] Access controls in place
- [ ] Audit logging enabled
### Breach Response
- [ ] Breach detection mechanisms
- [ ] 72-hour notification process
- [ ] Breach documentation system
### Documentation
- [ ] Records of processing activities (Art. 30)
- [ ] Data protection impact assessments
- [ ] Data processing agreements with vendors