summaryrefslogtreecommitdiff
path: root/dmarc_analyzer/database.py
diff options
context:
space:
mode:
Diffstat (limited to 'dmarc_analyzer/database.py')
-rw-r--r--dmarc_analyzer/database.py382
1 files changed, 382 insertions, 0 deletions
diff --git a/dmarc_analyzer/database.py b/dmarc_analyzer/database.py
new file mode 100644
index 0000000..3ca33f2
--- /dev/null
+++ b/dmarc_analyzer/database.py
@@ -0,0 +1,382 @@
+"""SQLite database operations for DMARC reports"""
+
+import sqlite3
+import json
+from pathlib import Path
+
+
+class Database:
+ def __init__(self, db_path):
+ self.db_path = db_path
+
+ def init_db(self):
+ """Initialize the database schema"""
+ conn = sqlite3.connect(self.db_path)
+ cursor = conn.cursor()
+
+ # Create reports table
+ cursor.execute('''
+ CREATE TABLE IF NOT EXISTS reports (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ org_name TEXT,
+ email TEXT,
+ report_id TEXT UNIQUE,
+ date_begin DATETIME,
+ date_end DATETIME,
+ domain TEXT,
+ policy_p TEXT,
+ policy_sp TEXT,
+ policy_pct INTEGER,
+ policy_adkim TEXT,
+ policy_aspf TEXT,
+ created_at DATETIME DEFAULT CURRENT_TIMESTAMP
+ )
+ ''')
+
+ # Create records table
+ cursor.execute('''
+ CREATE TABLE IF NOT EXISTS records (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ report_id INTEGER,
+ source_ip TEXT,
+ count INTEGER,
+ disposition TEXT,
+ dkim_result TEXT,
+ spf_result TEXT,
+ header_from TEXT,
+ dkim_auth TEXT, -- JSON array
+ spf_auth TEXT, -- JSON array
+ FOREIGN KEY (report_id) REFERENCES reports (id)
+ )
+ ''')
+
+ # Create indexes for better performance
+ cursor.execute('CREATE INDEX IF NOT EXISTS idx_reports_domain ON reports (domain)')
+ cursor.execute('CREATE INDEX IF NOT EXISTS idx_records_source_ip ON records (source_ip)')
+ cursor.execute('CREATE INDEX IF NOT EXISTS idx_records_disposition ON records (disposition)')
+
+ conn.commit()
+ conn.close()
+
+ def store_report(self, report_data):
+ """Store parsed DMARC report data
+
+ Returns:
+ bool: True if report was stored, False if it was a duplicate
+ """
+ conn = sqlite3.connect(self.db_path)
+ cursor = conn.cursor()
+
+ try:
+ metadata = report_data.get('metadata', {})
+ policy = report_data.get('policy_published', {})
+ records = report_data.get('records', [])
+
+ # Check if report already exists
+ cursor.execute('SELECT id FROM reports WHERE report_id = ?', (metadata.get('report_id'),))
+ if cursor.fetchone():
+ conn.close()
+ return False # Duplicate found
+
+ # Insert report metadata
+ cursor.execute('''
+ INSERT INTO reports (
+ org_name, email, report_id, date_begin, date_end,
+ domain, policy_p, policy_sp, policy_pct, policy_adkim, policy_aspf
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ ''', (
+ metadata.get('org_name'),
+ metadata.get('email'),
+ metadata.get('report_id'),
+ metadata.get('date_begin'),
+ metadata.get('date_end'),
+ policy.get('domain'),
+ policy.get('p'),
+ policy.get('sp'),
+ policy.get('pct'),
+ policy.get('adkim'),
+ policy.get('aspf')
+ ))
+
+ report_id = cursor.lastrowid
+
+ # Insert records
+ for record in records:
+ cursor.execute('''
+ INSERT INTO records (
+ report_id, source_ip, count, disposition, dkim_result, spf_result,
+ header_from, dkim_auth, spf_auth
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
+ ''', (
+ report_id,
+ record.get('source_ip'),
+ record.get('count'),
+ record.get('disposition'),
+ record.get('dkim_result'),
+ record.get('spf_result'),
+ record.get('header_from'),
+ json.dumps(record.get('dkim_auth', [])),
+ json.dumps(record.get('spf_auth', []))
+ ))
+
+ conn.commit()
+ return True # Successfully stored
+
+ except Exception as e:
+ conn.rollback()
+ raise
+ finally:
+ conn.close()
+
+ def get_summary_stats(self, date_filter=None):
+ """Get summary statistics"""
+ conn = sqlite3.connect(self.db_path)
+ cursor = conn.cursor()
+
+ stats = {}
+
+ # Build date filter conditions
+ date_where, date_params = self._build_date_filter(date_filter)
+ reports_where = f"WHERE {date_where}" if date_where else ""
+ join_where = f"JOIN reports r ON r.id = rec.report_id WHERE {date_where}" if date_where else "JOIN reports r ON r.id = rec.report_id"
+
+ # Total reports
+ cursor.execute(f'SELECT COUNT(*) FROM reports {reports_where}', date_params)
+ stats['total_reports'] = cursor.fetchone()[0]
+
+ # Total messages
+ cursor.execute(f'SELECT SUM(rec.count) FROM records rec {join_where}', date_params)
+ result = cursor.fetchone()[0]
+ stats['total_messages'] = result if result else 0
+
+ # Messages by disposition
+ cursor.execute(f'''
+ SELECT disposition, SUM(rec.count) as total
+ FROM records rec
+ {join_where}
+ GROUP BY disposition
+ ORDER BY total DESC
+ ''', date_params)
+ stats['by_disposition'] = cursor.fetchall()
+
+ # Messages by domain
+ cursor.execute(f'''
+ SELECT r.domain, SUM(rec.count) as total
+ FROM reports r
+ JOIN records rec ON r.id = rec.report_id
+ {"WHERE " + date_where if date_where else ""}
+ GROUP BY r.domain
+ ORDER BY total DESC
+ LIMIT 10
+ ''', date_params)
+ stats['by_domain'] = cursor.fetchall()
+
+ # Messages by source IP (top 10)
+ cursor.execute(f'''
+ SELECT source_ip, SUM(rec.count) as total
+ FROM records rec
+ {join_where}
+ GROUP BY source_ip
+ ORDER BY total DESC
+ LIMIT 10
+ ''', date_params)
+ stats['by_source_ip'] = cursor.fetchall()
+
+ # DKIM/SPF results
+ dkim_where = f"{join_where} AND rec.dkim_result IS NOT NULL" if join_where else "JOIN reports r ON r.id = rec.report_id WHERE rec.dkim_result IS NOT NULL"
+ cursor.execute(f'''
+ SELECT dkim_result, COUNT(*) as count
+ FROM records rec
+ {dkim_where}
+ GROUP BY dkim_result
+ ''', date_params)
+ stats['dkim_results'] = cursor.fetchall()
+
+ spf_where = f"{join_where} AND rec.spf_result IS NOT NULL" if join_where else "JOIN reports r ON r.id = rec.report_id WHERE rec.spf_result IS NOT NULL"
+ cursor.execute(f'''
+ SELECT spf_result, COUNT(*) as count
+ FROM records rec
+ {spf_where}
+ GROUP BY spf_result
+ ''', date_params)
+ stats['spf_results'] = cursor.fetchall()
+
+ conn.close()
+ return stats
+
+ def get_detailed_records(self, limit=100):
+ """Get detailed record view"""
+ conn = sqlite3.connect(self.db_path)
+ cursor = conn.cursor()
+
+ cursor.execute('''
+ SELECT
+ r.domain,
+ r.org_name,
+ rec.source_ip,
+ rec.count,
+ rec.disposition,
+ rec.dkim_result,
+ rec.spf_result,
+ rec.header_from,
+ r.date_begin,
+ r.date_end
+ FROM reports r
+ JOIN records rec ON r.id = rec.report_id
+ ORDER BY rec.count DESC
+ LIMIT ?
+ ''', (limit,))
+
+ records = cursor.fetchall()
+ conn.close()
+ return records
+
+ def get_failure_analysis(self):
+ """Get detailed failure analysis"""
+ conn = sqlite3.connect(self.db_path)
+ cursor = conn.cursor()
+
+ analysis = {}
+
+ # Authentication failure breakdown
+ cursor.execute('''
+ SELECT
+ CASE
+ WHEN dkim_result = 'fail' AND spf_result = 'fail' THEN 'Both DKIM & SPF Failed'
+ WHEN dkim_result = 'fail' AND spf_result != 'fail' THEN 'DKIM Failed Only'
+ WHEN dkim_result != 'fail' AND spf_result = 'fail' THEN 'SPF Failed Only'
+ ELSE 'Both Passed'
+ END as failure_type,
+ SUM(count) as total_count,
+ COUNT(*) as record_count
+ FROM records
+ GROUP BY failure_type
+ ORDER BY total_count DESC
+ ''')
+ analysis['auth_failure_breakdown'] = cursor.fetchall()
+
+ # Failures by source IP
+ cursor.execute('''
+ SELECT source_ip, SUM(count) as total_count
+ FROM records
+ WHERE dkim_result = 'fail' OR spf_result = 'fail'
+ GROUP BY source_ip
+ ORDER BY total_count DESC
+ LIMIT 10
+ ''')
+ analysis['failures_by_ip'] = cursor.fetchall()
+
+ # Failures by domain
+ cursor.execute('''
+ SELECT r.domain, SUM(rec.count) as total_count
+ FROM reports r
+ JOIN records rec ON r.id = rec.report_id
+ WHERE rec.dkim_result = 'fail' OR rec.spf_result = 'fail'
+ GROUP BY r.domain
+ ORDER BY total_count DESC
+ ''')
+ analysis['failures_by_domain'] = cursor.fetchall()
+
+ # Failures by reporting provider
+ cursor.execute('''
+ SELECT r.org_name, SUM(rec.count) as total_count
+ FROM reports r
+ JOIN records rec ON r.id = rec.report_id
+ WHERE rec.dkim_result = 'fail' OR rec.spf_result = 'fail'
+ GROUP BY r.org_name
+ ORDER BY total_count DESC
+ ''')
+ analysis['failures_by_provider'] = cursor.fetchall()
+
+ # Detailed provider breakdown with dates
+ cursor.execute('''
+ SELECT
+ r.org_name,
+ DATE(r.date_begin) as report_date,
+ SUM(rec.count) as failed_count,
+ COUNT(DISTINCT rec.source_ip) as unique_ips
+ FROM reports r
+ JOIN records rec ON r.id = rec.report_id
+ WHERE rec.dkim_result = 'fail' OR rec.spf_result = 'fail'
+ GROUP BY r.org_name, DATE(r.date_begin)
+ ORDER BY report_date DESC, failed_count DESC
+ ''')
+ analysis['provider_timeline'] = cursor.fetchall()
+
+ # Policy actions on failures
+ cursor.execute('''
+ SELECT disposition, SUM(count) as total_count
+ FROM records
+ WHERE dkim_result = 'fail' OR spf_result = 'fail'
+ GROUP BY disposition
+ ORDER BY total_count DESC
+ ''')
+ analysis['failure_dispositions'] = cursor.fetchall()
+
+ # Detailed failure records
+ cursor.execute('''
+ SELECT
+ r.domain,
+ rec.source_ip,
+ rec.count,
+ rec.disposition,
+ rec.dkim_result,
+ rec.spf_result,
+ rec.header_from,
+ r.date_begin,
+ r.org_name,
+ r.email as reporter_email
+ FROM reports r
+ JOIN records rec ON r.id = rec.report_id
+ WHERE rec.dkim_result = 'fail' OR rec.spf_result = 'fail'
+ ORDER BY rec.count DESC
+ LIMIT 50
+ ''')
+ analysis['detailed_failures'] = cursor.fetchall()
+
+ conn.close()
+ return analysis
+
+ def _build_date_filter(self, date_filter):
+ """Build WHERE clause and parameters for date filtering"""
+ if not date_filter:
+ return "", []
+
+ conditions = []
+ params = []
+
+ if 'date_from' in date_filter:
+ conditions.append("DATE(date_begin) >= ?")
+ params.append(date_filter['date_from'].strftime('%Y-%m-%d'))
+
+ if 'date_to' in date_filter:
+ conditions.append("DATE(date_begin) <= ?")
+ params.append(date_filter['date_to'].strftime('%Y-%m-%d'))
+
+ where_clause = " AND ".join(conditions) if conditions else ""
+ return where_clause, params
+
+ def get_timeline_stats(self, date_filter=None):
+ """Get daily breakdown statistics"""
+ conn = sqlite3.connect(self.db_path)
+ cursor = conn.cursor()
+
+ date_where, date_params = self._build_date_filter(date_filter)
+ where_clause = f"WHERE {date_where}" if date_where else ""
+
+ cursor.execute(f'''
+ SELECT
+ DATE(r.date_begin) as report_date,
+ SUM(rec.count) as total_messages,
+ SUM(CASE WHEN rec.dkim_result = 'fail' OR rec.spf_result = 'fail' THEN rec.count ELSE 0 END) as failed_messages,
+ COUNT(DISTINCT r.org_name) as reporters
+ FROM reports r
+ JOIN records rec ON r.id = rec.report_id
+ {where_clause}
+ GROUP BY DATE(r.date_begin)
+ ORDER BY report_date
+ ''', date_params)
+
+ timeline = cursor.fetchall()
+ conn.close()
+ return timeline \ No newline at end of file