diff options
Diffstat (limited to 'dmarc_analyzer/database.py')
| -rw-r--r-- | dmarc_analyzer/database.py | 382 |
1 files changed, 382 insertions, 0 deletions
diff --git a/dmarc_analyzer/database.py b/dmarc_analyzer/database.py new file mode 100644 index 0000000..3ca33f2 --- /dev/null +++ b/dmarc_analyzer/database.py @@ -0,0 +1,382 @@ +"""SQLite database operations for DMARC reports""" + +import sqlite3 +import json +from pathlib import Path + + +class Database: + def __init__(self, db_path): + self.db_path = db_path + + def init_db(self): + """Initialize the database schema""" + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + # Create reports table + cursor.execute(''' + CREATE TABLE IF NOT EXISTS reports ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + org_name TEXT, + email TEXT, + report_id TEXT UNIQUE, + date_begin DATETIME, + date_end DATETIME, + domain TEXT, + policy_p TEXT, + policy_sp TEXT, + policy_pct INTEGER, + policy_adkim TEXT, + policy_aspf TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ) + ''') + + # Create records table + cursor.execute(''' + CREATE TABLE IF NOT EXISTS records ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + report_id INTEGER, + source_ip TEXT, + count INTEGER, + disposition TEXT, + dkim_result TEXT, + spf_result TEXT, + header_from TEXT, + dkim_auth TEXT, -- JSON array + spf_auth TEXT, -- JSON array + FOREIGN KEY (report_id) REFERENCES reports (id) + ) + ''') + + # Create indexes for better performance + cursor.execute('CREATE INDEX IF NOT EXISTS idx_reports_domain ON reports (domain)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_records_source_ip ON records (source_ip)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_records_disposition ON records (disposition)') + + conn.commit() + conn.close() + + def store_report(self, report_data): + """Store parsed DMARC report data + + Returns: + bool: True if report was stored, False if it was a duplicate + """ + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + try: + metadata = report_data.get('metadata', {}) + policy = report_data.get('policy_published', {}) + records = report_data.get('records', []) + + # Check if report already exists + cursor.execute('SELECT id FROM reports WHERE report_id = ?', (metadata.get('report_id'),)) + if cursor.fetchone(): + conn.close() + return False # Duplicate found + + # Insert report metadata + cursor.execute(''' + INSERT INTO reports ( + org_name, email, report_id, date_begin, date_end, + domain, policy_p, policy_sp, policy_pct, policy_adkim, policy_aspf + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ''', ( + metadata.get('org_name'), + metadata.get('email'), + metadata.get('report_id'), + metadata.get('date_begin'), + metadata.get('date_end'), + policy.get('domain'), + policy.get('p'), + policy.get('sp'), + policy.get('pct'), + policy.get('adkim'), + policy.get('aspf') + )) + + report_id = cursor.lastrowid + + # Insert records + for record in records: + cursor.execute(''' + INSERT INTO records ( + report_id, source_ip, count, disposition, dkim_result, spf_result, + header_from, dkim_auth, spf_auth + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ''', ( + report_id, + record.get('source_ip'), + record.get('count'), + record.get('disposition'), + record.get('dkim_result'), + record.get('spf_result'), + record.get('header_from'), + json.dumps(record.get('dkim_auth', [])), + json.dumps(record.get('spf_auth', [])) + )) + + conn.commit() + return True # Successfully stored + + except Exception as e: + conn.rollback() + raise + finally: + conn.close() + + def get_summary_stats(self, date_filter=None): + """Get summary statistics""" + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + stats = {} + + # Build date filter conditions + date_where, date_params = self._build_date_filter(date_filter) + reports_where = f"WHERE {date_where}" if date_where else "" + join_where = f"JOIN reports r ON r.id = rec.report_id WHERE {date_where}" if date_where else "JOIN reports r ON r.id = rec.report_id" + + # Total reports + cursor.execute(f'SELECT COUNT(*) FROM reports {reports_where}', date_params) + stats['total_reports'] = cursor.fetchone()[0] + + # Total messages + cursor.execute(f'SELECT SUM(rec.count) FROM records rec {join_where}', date_params) + result = cursor.fetchone()[0] + stats['total_messages'] = result if result else 0 + + # Messages by disposition + cursor.execute(f''' + SELECT disposition, SUM(rec.count) as total + FROM records rec + {join_where} + GROUP BY disposition + ORDER BY total DESC + ''', date_params) + stats['by_disposition'] = cursor.fetchall() + + # Messages by domain + cursor.execute(f''' + SELECT r.domain, SUM(rec.count) as total + FROM reports r + JOIN records rec ON r.id = rec.report_id + {"WHERE " + date_where if date_where else ""} + GROUP BY r.domain + ORDER BY total DESC + LIMIT 10 + ''', date_params) + stats['by_domain'] = cursor.fetchall() + + # Messages by source IP (top 10) + cursor.execute(f''' + SELECT source_ip, SUM(rec.count) as total + FROM records rec + {join_where} + GROUP BY source_ip + ORDER BY total DESC + LIMIT 10 + ''', date_params) + stats['by_source_ip'] = cursor.fetchall() + + # DKIM/SPF results + dkim_where = f"{join_where} AND rec.dkim_result IS NOT NULL" if join_where else "JOIN reports r ON r.id = rec.report_id WHERE rec.dkim_result IS NOT NULL" + cursor.execute(f''' + SELECT dkim_result, COUNT(*) as count + FROM records rec + {dkim_where} + GROUP BY dkim_result + ''', date_params) + stats['dkim_results'] = cursor.fetchall() + + spf_where = f"{join_where} AND rec.spf_result IS NOT NULL" if join_where else "JOIN reports r ON r.id = rec.report_id WHERE rec.spf_result IS NOT NULL" + cursor.execute(f''' + SELECT spf_result, COUNT(*) as count + FROM records rec + {spf_where} + GROUP BY spf_result + ''', date_params) + stats['spf_results'] = cursor.fetchall() + + conn.close() + return stats + + def get_detailed_records(self, limit=100): + """Get detailed record view""" + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + cursor.execute(''' + SELECT + r.domain, + r.org_name, + rec.source_ip, + rec.count, + rec.disposition, + rec.dkim_result, + rec.spf_result, + rec.header_from, + r.date_begin, + r.date_end + FROM reports r + JOIN records rec ON r.id = rec.report_id + ORDER BY rec.count DESC + LIMIT ? + ''', (limit,)) + + records = cursor.fetchall() + conn.close() + return records + + def get_failure_analysis(self): + """Get detailed failure analysis""" + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + analysis = {} + + # Authentication failure breakdown + cursor.execute(''' + SELECT + CASE + WHEN dkim_result = 'fail' AND spf_result = 'fail' THEN 'Both DKIM & SPF Failed' + WHEN dkim_result = 'fail' AND spf_result != 'fail' THEN 'DKIM Failed Only' + WHEN dkim_result != 'fail' AND spf_result = 'fail' THEN 'SPF Failed Only' + ELSE 'Both Passed' + END as failure_type, + SUM(count) as total_count, + COUNT(*) as record_count + FROM records + GROUP BY failure_type + ORDER BY total_count DESC + ''') + analysis['auth_failure_breakdown'] = cursor.fetchall() + + # Failures by source IP + cursor.execute(''' + SELECT source_ip, SUM(count) as total_count + FROM records + WHERE dkim_result = 'fail' OR spf_result = 'fail' + GROUP BY source_ip + ORDER BY total_count DESC + LIMIT 10 + ''') + analysis['failures_by_ip'] = cursor.fetchall() + + # Failures by domain + cursor.execute(''' + SELECT r.domain, SUM(rec.count) as total_count + FROM reports r + JOIN records rec ON r.id = rec.report_id + WHERE rec.dkim_result = 'fail' OR rec.spf_result = 'fail' + GROUP BY r.domain + ORDER BY total_count DESC + ''') + analysis['failures_by_domain'] = cursor.fetchall() + + # Failures by reporting provider + cursor.execute(''' + SELECT r.org_name, SUM(rec.count) as total_count + FROM reports r + JOIN records rec ON r.id = rec.report_id + WHERE rec.dkim_result = 'fail' OR rec.spf_result = 'fail' + GROUP BY r.org_name + ORDER BY total_count DESC + ''') + analysis['failures_by_provider'] = cursor.fetchall() + + # Detailed provider breakdown with dates + cursor.execute(''' + SELECT + r.org_name, + DATE(r.date_begin) as report_date, + SUM(rec.count) as failed_count, + COUNT(DISTINCT rec.source_ip) as unique_ips + FROM reports r + JOIN records rec ON r.id = rec.report_id + WHERE rec.dkim_result = 'fail' OR rec.spf_result = 'fail' + GROUP BY r.org_name, DATE(r.date_begin) + ORDER BY report_date DESC, failed_count DESC + ''') + analysis['provider_timeline'] = cursor.fetchall() + + # Policy actions on failures + cursor.execute(''' + SELECT disposition, SUM(count) as total_count + FROM records + WHERE dkim_result = 'fail' OR spf_result = 'fail' + GROUP BY disposition + ORDER BY total_count DESC + ''') + analysis['failure_dispositions'] = cursor.fetchall() + + # Detailed failure records + cursor.execute(''' + SELECT + r.domain, + rec.source_ip, + rec.count, + rec.disposition, + rec.dkim_result, + rec.spf_result, + rec.header_from, + r.date_begin, + r.org_name, + r.email as reporter_email + FROM reports r + JOIN records rec ON r.id = rec.report_id + WHERE rec.dkim_result = 'fail' OR rec.spf_result = 'fail' + ORDER BY rec.count DESC + LIMIT 50 + ''') + analysis['detailed_failures'] = cursor.fetchall() + + conn.close() + return analysis + + def _build_date_filter(self, date_filter): + """Build WHERE clause and parameters for date filtering""" + if not date_filter: + return "", [] + + conditions = [] + params = [] + + if 'date_from' in date_filter: + conditions.append("DATE(date_begin) >= ?") + params.append(date_filter['date_from'].strftime('%Y-%m-%d')) + + if 'date_to' in date_filter: + conditions.append("DATE(date_begin) <= ?") + params.append(date_filter['date_to'].strftime('%Y-%m-%d')) + + where_clause = " AND ".join(conditions) if conditions else "" + return where_clause, params + + def get_timeline_stats(self, date_filter=None): + """Get daily breakdown statistics""" + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + date_where, date_params = self._build_date_filter(date_filter) + where_clause = f"WHERE {date_where}" if date_where else "" + + cursor.execute(f''' + SELECT + DATE(r.date_begin) as report_date, + SUM(rec.count) as total_messages, + SUM(CASE WHEN rec.dkim_result = 'fail' OR rec.spf_result = 'fail' THEN rec.count ELSE 0 END) as failed_messages, + COUNT(DISTINCT r.org_name) as reporters + FROM reports r + JOIN records rec ON r.id = rec.report_id + {where_clause} + GROUP BY DATE(r.date_begin) + ORDER BY report_date + ''', date_params) + + timeline = cursor.fetchall() + conn.close() + return timeline
\ No newline at end of file |