import sys import shlex from django.core.management.base import BaseCommand from hyperkitty.models import Email, Attachment # put command in /usr/lib/python3.10/site-packages/hyperkitty/management/commands TEXT_MIMETYPES = { "text/plain", "text/html", "application/xhtml+xml", } class Command(BaseCommand): help = "Search and replace sensitive data in HyperKitty emails and attachments." def add_arguments(self, parser): parser.add_argument( "--list", required=True, help="Mailing list name, e.g. bikeboard@lists.bikelover.org", ) parser.add_argument( "--simulate", action="store_true", help="Show what would be changed without saving.", ) parser.add_argument( "--replacements-file", required=True, help="Path to a text file containing replacements, one per line.", ) parser.add_argument( "--only-emails", action="store_true", help="Process only email bodies, not attachments.", ) parser.add_argument( "--only-attachments", action="store_true", help="Process only attachments, not email bodies.", ) def load_replacements(self, filepath): replacements = {} with open(filepath, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue try: parts = shlex.split(line) except ValueError: self.stdout.write(self.style.WARNING(f"Skipping malformed line: {line}")) continue if len(parts) != 2: self.stdout.write(self.style.WARNING(f"Skipping invalid line: {line}")) continue old, new = parts replacements[old] = new return replacements def handle(self, *args, **options): mailing_list = options["list"] simulate = options["simulate"] replacements_file = options["replacements_file"] only_emails = options["only_emails"] only_attachments = options["only_attachments"] # Validate flags if only_emails and only_attachments: self.stdout.write(self.style.ERROR("Cannot use --only-emails and --only-attachments together.")) return replacements = self.load_replacements(replacements_file) if not replacements: self.stdout.write(self.style.ERROR("No valid replacements found.")) return self.stdout.write(f"Loaded {len(replacements)} replacements.") emails = Email.objects.filter(mailinglist__name=mailing_list) self.stdout.write(f"Scanning {emails.count()} messages…") # Deduplicate emails by message_id unique_emails = {} for msg in emails: if msg.message_id not in unique_emails: unique_emails[msg.message_id] = msg emails = unique_emails.values() for msg in emails: changed = False # --- Process email body --- if not only_attachments: if msg.content: original = msg.content updated = original for old, new in replacements.items(): if old in updated and simulate: self.stdout.write(f" Change in email body:") self.stdout.write(f" - {old}") self.stdout.write(f" + {new}") updated = updated.replace(old, new) if updated != original: changed = True self.stdout.write(f"[Email] {msg.subject}") if not simulate: msg.content = updated msg.save() # --- Process attachments --- if not only_emails: attachments = Attachment.objects.filter(email=msg) import hashlib # Deduplicate attachments by content hash unique_attachments = {} for att in attachments: raw = att.content if isinstance(att.content, bytes) else att.content.encode("utf-8", errors="ignore") digest = hashlib.sha1(raw).hexdigest() if digest not in unique_attachments: unique_attachments[digest] = att attachments = unique_attachments.values() seen_changes = set() for att in attachments: mime = getattr(att, "content_type", None) filename = getattr(att, "name", None) if not mime: if filename and filename.lower().endswith((".htm", ".html", ".txt", ".xhtml")): mime = "text/html" else: continue if mime not in TEXT_MIMETYPES: continue try: content = att.content.decode(att.encoding or "utf-8", errors="ignore") except AttributeError: content = att.content original = content updated = content for old, new in replacements.items(): key = (filename, old, new) if simulate and key not in seen_changes and old in updated: seen_changes.add(key) self.stdout.write(f" Change in attachment {filename}:") self.stdout.write(f" - {old}") self.stdout.write(f" + {new}") updated = updated.replace(old, new) if updated != original: changed = True self.stdout.write(f"[Attachment] {filename} in {msg.subject}") if not simulate: encoded = updated.encode(att.encoding or "utf-8") att.content = encoded att.size = len(encoded) att.save() if changed and simulate: self.stdout.write(" (simulate mode: no changes saved)") self.stdout.write("\nDone.") if not simulate: self.stdout.write("Run `./manage.py rebuild_index` to refresh search index.")