SanitizeMailMan3/sanitize_mail.py
2026-01-17 13:46:20 -05:00

193 lines
6.6 KiB
Python

import sys
import shlex
from django.core.management.base import BaseCommand
from hyperkitty.models import Email, Attachment
# put command in /usr/lib/python3.10/site-packages/hyperkitty/management/commands
TEXT_MIMETYPES = {
"text/plain",
"text/html",
"application/xhtml+xml",
}
class Command(BaseCommand):
help = "Search and replace sensitive data in HyperKitty emails and attachments."
def add_arguments(self, parser):
parser.add_argument(
"--list",
required=True,
help="Mailing list name, e.g. bikeboard@lists.bikelover.org",
)
parser.add_argument(
"--simulate",
action="store_true",
help="Show what would be changed without saving.",
)
parser.add_argument(
"--replacements-file",
required=True,
help="Path to a text file containing replacements, one per line.",
)
parser.add_argument(
"--only-emails",
action="store_true",
help="Process only email bodies, not attachments.",
)
parser.add_argument(
"--only-attachments",
action="store_true",
help="Process only attachments, not email bodies.",
)
def load_replacements(self, filepath):
replacements = {}
with open(filepath, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
try:
parts = shlex.split(line)
except ValueError:
self.stdout.write(self.style.WARNING(f"Skipping malformed line: {line}"))
continue
if len(parts) != 2:
self.stdout.write(self.style.WARNING(f"Skipping invalid line: {line}"))
continue
old, new = parts
replacements[old] = new
return replacements
def handle(self, *args, **options):
mailing_list = options["list"]
simulate = options["simulate"]
replacements_file = options["replacements_file"]
only_emails = options["only_emails"]
only_attachments = options["only_attachments"]
# Validate flags
if only_emails and only_attachments:
self.stdout.write(self.style.ERROR("Cannot use --only-emails and --only-attachments together."))
return
replacements = self.load_replacements(replacements_file)
if not replacements:
self.stdout.write(self.style.ERROR("No valid replacements found."))
return
self.stdout.write(f"Loaded {len(replacements)} replacements.")
emails = Email.objects.filter(mailinglist__name=mailing_list)
self.stdout.write(f"Scanning {emails.count()} messages…")
# Deduplicate emails by message_id
unique_emails = {}
for msg in emails:
if msg.message_id not in unique_emails:
unique_emails[msg.message_id] = msg
emails = unique_emails.values()
for msg in emails:
changed = False
# --- Process email body ---
if not only_attachments:
if msg.content:
original = msg.content
updated = original
for old, new in replacements.items():
if old in updated and simulate:
self.stdout.write(f" Change in email body:")
self.stdout.write(f" - {old}")
self.stdout.write(f" + {new}")
updated = updated.replace(old, new)
if updated != original:
changed = True
self.stdout.write(f"[Email] {msg.subject}")
if not simulate:
msg.content = updated
msg.save()
# --- Process attachments ---
if not only_emails:
attachments = Attachment.objects.filter(email=msg)
import hashlib
# Deduplicate attachments by content hash
unique_attachments = {}
for att in attachments:
raw = att.content if isinstance(att.content, bytes) else att.content.encode("utf-8", errors="ignore")
digest = hashlib.sha1(raw).hexdigest()
if digest not in unique_attachments:
unique_attachments[digest] = att
attachments = unique_attachments.values()
seen_changes = set()
for att in attachments:
mime = getattr(att, "content_type", None)
filename = getattr(att, "name", None)
if not mime:
if filename and filename.lower().endswith((".htm", ".html", ".txt", ".xhtml")):
mime = "text/html"
else:
continue
if mime not in TEXT_MIMETYPES:
continue
try:
content = att.content.decode(att.encoding or "utf-8", errors="ignore")
except AttributeError:
content = att.content
original = content
updated = content
for old, new in replacements.items():
key = (filename, old, new)
if simulate and key not in seen_changes and old in updated:
seen_changes.add(key)
self.stdout.write(f" Change in attachment {filename}:")
self.stdout.write(f" - {old}")
self.stdout.write(f" + {new}")
updated = updated.replace(old, new)
if updated != original:
changed = True
self.stdout.write(f"[Attachment] {filename} in {msg.subject}")
if not simulate:
encoded = updated.encode(att.encoding or "utf-8")
att.content = encoded
att.size = len(encoded)
att.save()
if changed and simulate:
self.stdout.write(" (simulate mode: no changes saved)")
self.stdout.write("\nDone.")
if not simulate:
self.stdout.write("Run `./manage.py rebuild_index` to refresh search index.")