"""SQLite database operations for DMARC reports""" import sqlite3 import json from pathlib import Path class Database: def __init__(self, db_path): self.db_path = db_path def init_db(self): """Initialize the database schema""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Create reports table cursor.execute(''' CREATE TABLE IF NOT EXISTS reports ( id INTEGER PRIMARY KEY AUTOINCREMENT, org_name TEXT, email TEXT, report_id TEXT UNIQUE, date_begin DATETIME, date_end DATETIME, domain TEXT, policy_p TEXT, policy_sp TEXT, policy_pct INTEGER, policy_adkim TEXT, policy_aspf TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') # Create records table cursor.execute(''' CREATE TABLE IF NOT EXISTS records ( id INTEGER PRIMARY KEY AUTOINCREMENT, report_id INTEGER, source_ip TEXT, count INTEGER, disposition TEXT, dkim_result TEXT, spf_result TEXT, header_from TEXT, dkim_auth TEXT, -- JSON array spf_auth TEXT, -- JSON array FOREIGN KEY (report_id) REFERENCES reports (id) ) ''') # Create indexes for better performance cursor.execute('CREATE INDEX IF NOT EXISTS idx_reports_domain ON reports (domain)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_records_source_ip ON records (source_ip)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_records_disposition ON records (disposition)') conn.commit() conn.close() def store_report(self, report_data): """Store parsed DMARC report data Returns: bool: True if report was stored, False if it was a duplicate """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: metadata = report_data.get('metadata', {}) policy = report_data.get('policy_published', {}) records = report_data.get('records', []) # Check if report already exists cursor.execute('SELECT id FROM reports WHERE report_id = ?', (metadata.get('report_id'),)) if cursor.fetchone(): conn.close() return False # Duplicate found # Insert report metadata cursor.execute(''' INSERT INTO reports ( org_name, email, report_id, date_begin, date_end, domain, policy_p, policy_sp, policy_pct, policy_adkim, policy_aspf ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( metadata.get('org_name'), metadata.get('email'), metadata.get('report_id'), metadata.get('date_begin'), metadata.get('date_end'), policy.get('domain'), policy.get('p'), policy.get('sp'), policy.get('pct'), policy.get('adkim'), policy.get('aspf') )) report_id = cursor.lastrowid # Insert records for record in records: cursor.execute(''' INSERT INTO records ( report_id, source_ip, count, disposition, dkim_result, spf_result, header_from, dkim_auth, spf_auth ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( report_id, record.get('source_ip'), record.get('count'), record.get('disposition'), record.get('dkim_result'), record.get('spf_result'), record.get('header_from'), json.dumps(record.get('dkim_auth', [])), json.dumps(record.get('spf_auth', [])) )) conn.commit() return True # Successfully stored except Exception as e: conn.rollback() raise finally: conn.close() def get_summary_stats(self, date_filter=None): """Get summary statistics""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() stats = {} # Build date filter conditions date_where, date_params = self._build_date_filter(date_filter) reports_where = f"WHERE {date_where}" if date_where else "" join_where = f"JOIN reports r ON r.id = rec.report_id WHERE {date_where}" if date_where else "JOIN reports r ON r.id = rec.report_id" # Total reports cursor.execute(f'SELECT COUNT(*) FROM reports {reports_where}', date_params) stats['total_reports'] = cursor.fetchone()[0] # Total messages cursor.execute(f'SELECT SUM(rec.count) FROM records rec {join_where}', date_params) result = cursor.fetchone()[0] stats['total_messages'] = result if result else 0 # Messages by disposition cursor.execute(f''' SELECT disposition, SUM(rec.count) as total FROM records rec {join_where} GROUP BY disposition ORDER BY total DESC ''', date_params) stats['by_disposition'] = cursor.fetchall() # Messages by domain cursor.execute(f''' SELECT r.domain, SUM(rec.count) as total FROM reports r JOIN records rec ON r.id = rec.report_id {"WHERE " + date_where if date_where else ""} GROUP BY r.domain ORDER BY total DESC LIMIT 10 ''', date_params) stats['by_domain'] = cursor.fetchall() # Messages by source IP (top 10) cursor.execute(f''' SELECT source_ip, SUM(rec.count) as total FROM records rec {join_where} GROUP BY source_ip ORDER BY total DESC LIMIT 10 ''', date_params) stats['by_source_ip'] = cursor.fetchall() # DKIM/SPF results dkim_where = f"{join_where} AND rec.dkim_result IS NOT NULL" if join_where else "JOIN reports r ON r.id = rec.report_id WHERE rec.dkim_result IS NOT NULL" cursor.execute(f''' SELECT dkim_result, COUNT(*) as count FROM records rec {dkim_where} GROUP BY dkim_result ''', date_params) stats['dkim_results'] = cursor.fetchall() spf_where = f"{join_where} AND rec.spf_result IS NOT NULL" if join_where else "JOIN reports r ON r.id = rec.report_id WHERE rec.spf_result IS NOT NULL" cursor.execute(f''' SELECT spf_result, COUNT(*) as count FROM records rec {spf_where} GROUP BY spf_result ''', date_params) stats['spf_results'] = cursor.fetchall() conn.close() return stats def get_detailed_records(self, limit=100): """Get detailed record view""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT r.domain, r.org_name, rec.source_ip, rec.count, rec.disposition, rec.dkim_result, rec.spf_result, rec.header_from, r.date_begin, r.date_end FROM reports r JOIN records rec ON r.id = rec.report_id ORDER BY rec.count DESC LIMIT ? ''', (limit,)) records = cursor.fetchall() conn.close() return records def get_failure_analysis(self): """Get detailed failure analysis""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() analysis = {} # Authentication failure breakdown cursor.execute(''' SELECT CASE WHEN dkim_result = 'fail' AND spf_result = 'fail' THEN 'Both DKIM & SPF Failed' WHEN dkim_result = 'fail' AND spf_result != 'fail' THEN 'DKIM Failed Only' WHEN dkim_result != 'fail' AND spf_result = 'fail' THEN 'SPF Failed Only' ELSE 'Both Passed' END as failure_type, SUM(count) as total_count, COUNT(*) as record_count FROM records GROUP BY failure_type ORDER BY total_count DESC ''') analysis['auth_failure_breakdown'] = cursor.fetchall() # Failures by source IP cursor.execute(''' SELECT source_ip, SUM(count) as total_count FROM records WHERE dkim_result = 'fail' OR spf_result = 'fail' GROUP BY source_ip ORDER BY total_count DESC LIMIT 10 ''') analysis['failures_by_ip'] = cursor.fetchall() # Failures by domain cursor.execute(''' SELECT r.domain, SUM(rec.count) as total_count FROM reports r JOIN records rec ON r.id = rec.report_id WHERE rec.dkim_result = 'fail' OR rec.spf_result = 'fail' GROUP BY r.domain ORDER BY total_count DESC ''') analysis['failures_by_domain'] = cursor.fetchall() # Failures by reporting provider cursor.execute(''' SELECT r.org_name, SUM(rec.count) as total_count FROM reports r JOIN records rec ON r.id = rec.report_id WHERE rec.dkim_result = 'fail' OR rec.spf_result = 'fail' GROUP BY r.org_name ORDER BY total_count DESC ''') analysis['failures_by_provider'] = cursor.fetchall() # Detailed provider breakdown with dates cursor.execute(''' SELECT r.org_name, DATE(r.date_begin) as report_date, SUM(rec.count) as failed_count, COUNT(DISTINCT rec.source_ip) as unique_ips FROM reports r JOIN records rec ON r.id = rec.report_id WHERE rec.dkim_result = 'fail' OR rec.spf_result = 'fail' GROUP BY r.org_name, DATE(r.date_begin) ORDER BY report_date DESC, failed_count DESC ''') analysis['provider_timeline'] = cursor.fetchall() # Policy actions on failures cursor.execute(''' SELECT disposition, SUM(count) as total_count FROM records WHERE dkim_result = 'fail' OR spf_result = 'fail' GROUP BY disposition ORDER BY total_count DESC ''') analysis['failure_dispositions'] = cursor.fetchall() # Detailed failure records cursor.execute(''' SELECT r.domain, rec.source_ip, rec.count, rec.disposition, rec.dkim_result, rec.spf_result, rec.header_from, r.date_begin, r.org_name, r.email as reporter_email FROM reports r JOIN records rec ON r.id = rec.report_id WHERE rec.dkim_result = 'fail' OR rec.spf_result = 'fail' ORDER BY rec.count DESC LIMIT 50 ''') analysis['detailed_failures'] = cursor.fetchall() conn.close() return analysis def _build_date_filter(self, date_filter): """Build WHERE clause and parameters for date filtering""" if not date_filter: return "", [] conditions = [] params = [] if 'date_from' in date_filter: conditions.append("DATE(date_begin) >= ?") params.append(date_filter['date_from'].strftime('%Y-%m-%d')) if 'date_to' in date_filter: conditions.append("DATE(date_begin) <= ?") params.append(date_filter['date_to'].strftime('%Y-%m-%d')) where_clause = " AND ".join(conditions) if conditions else "" return where_clause, params def get_timeline_stats(self, date_filter=None): """Get daily breakdown statistics""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() date_where, date_params = self._build_date_filter(date_filter) where_clause = f"WHERE {date_where}" if date_where else "" cursor.execute(f''' SELECT DATE(r.date_begin) as report_date, SUM(rec.count) as total_messages, SUM(CASE WHEN rec.dkim_result = 'fail' OR rec.spf_result = 'fail' THEN rec.count ELSE 0 END) as failed_messages, COUNT(DISTINCT r.org_name) as reporters FROM reports r JOIN records rec ON r.id = rec.report_id {where_clause} GROUP BY DATE(r.date_begin) ORDER BY report_date ''', date_params) timeline = cursor.fetchall() conn.close() return timeline