diff options
| author | Max Resnick <max@ofmax.li> | 2025-12-02 21:21:50 -0800 |
|---|---|---|
| committer | Max Resnick <max@ofmax.li> | 2025-12-02 21:21:50 -0800 |
| commit | 7d3432e055dc63935ce6df2b56d655aadf88938c (patch) | |
| tree | 6d8e164e275116a605fcb6f0b2f5d9c0f88cb91d | |
| download | dmarc-tool-master.tar.gz | |
feat: init commit of toolHEADCHECKPOINTmaster
| -rw-r--r-- | dmarc_analyzer/__init__.py | 3 | ||||
| -rw-r--r-- | dmarc_analyzer/database.py | 382 | ||||
| -rw-r--r-- | dmarc_analyzer/extractor.py | 65 | ||||
| -rw-r--r-- | dmarc_analyzer/main.py | 108 | ||||
| -rw-r--r-- | dmarc_analyzer/parser.py | 185 | ||||
| -rw-r--r-- | dmarc_analyzer/reporter.py | 276 | ||||
| -rw-r--r-- | pyproject.toml | 15 | ||||
| -rw-r--r-- | uv.lock | 58 |
8 files changed, 1092 insertions, 0 deletions
diff --git a/dmarc_analyzer/__init__.py b/dmarc_analyzer/__init__.py new file mode 100644 index 0000000..ff23532 --- /dev/null +++ b/dmarc_analyzer/__init__.py @@ -0,0 +1,3 @@ +"""DMARC Report Analyzer""" + +__version__ = "0.1.0"
\ No newline at end of file diff --git a/dmarc_analyzer/database.py b/dmarc_analyzer/database.py new file mode 100644 index 0000000..3ca33f2 --- /dev/null +++ b/dmarc_analyzer/database.py @@ -0,0 +1,382 @@ +"""SQLite database operations for DMARC reports""" + +import sqlite3 +import json +from pathlib import Path + + +class Database: + def __init__(self, db_path): + self.db_path = db_path + + def init_db(self): + """Initialize the database schema""" + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + # Create reports table + cursor.execute(''' + CREATE TABLE IF NOT EXISTS reports ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + org_name TEXT, + email TEXT, + report_id TEXT UNIQUE, + date_begin DATETIME, + date_end DATETIME, + domain TEXT, + policy_p TEXT, + policy_sp TEXT, + policy_pct INTEGER, + policy_adkim TEXT, + policy_aspf TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ) + ''') + + # Create records table + cursor.execute(''' + CREATE TABLE IF NOT EXISTS records ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + report_id INTEGER, + source_ip TEXT, + count INTEGER, + disposition TEXT, + dkim_result TEXT, + spf_result TEXT, + header_from TEXT, + dkim_auth TEXT, -- JSON array + spf_auth TEXT, -- JSON array + FOREIGN KEY (report_id) REFERENCES reports (id) + ) + ''') + + # Create indexes for better performance + cursor.execute('CREATE INDEX IF NOT EXISTS idx_reports_domain ON reports (domain)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_records_source_ip ON records (source_ip)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_records_disposition ON records (disposition)') + + conn.commit() + conn.close() + + def store_report(self, report_data): + """Store parsed DMARC report data + + Returns: + bool: True if report was stored, False if it was a duplicate + """ + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + try: + metadata = report_data.get('metadata', {}) + policy = report_data.get('policy_published', {}) + records = report_data.get('records', []) + + # Check if report already exists + cursor.execute('SELECT id FROM reports WHERE report_id = ?', (metadata.get('report_id'),)) + if cursor.fetchone(): + conn.close() + return False # Duplicate found + + # Insert report metadata + cursor.execute(''' + INSERT INTO reports ( + org_name, email, report_id, date_begin, date_end, + domain, policy_p, policy_sp, policy_pct, policy_adkim, policy_aspf + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ''', ( + metadata.get('org_name'), + metadata.get('email'), + metadata.get('report_id'), + metadata.get('date_begin'), + metadata.get('date_end'), + policy.get('domain'), + policy.get('p'), + policy.get('sp'), + policy.get('pct'), + policy.get('adkim'), + policy.get('aspf') + )) + + report_id = cursor.lastrowid + + # Insert records + for record in records: + cursor.execute(''' + INSERT INTO records ( + report_id, source_ip, count, disposition, dkim_result, spf_result, + header_from, dkim_auth, spf_auth + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ''', ( + report_id, + record.get('source_ip'), + record.get('count'), + record.get('disposition'), + record.get('dkim_result'), + record.get('spf_result'), + record.get('header_from'), + json.dumps(record.get('dkim_auth', [])), + json.dumps(record.get('spf_auth', [])) + )) + + conn.commit() + return True # Successfully stored + + except Exception as e: + conn.rollback() + raise + finally: + conn.close() + + def get_summary_stats(self, date_filter=None): + """Get summary statistics""" + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + stats = {} + + # Build date filter conditions + date_where, date_params = self._build_date_filter(date_filter) + reports_where = f"WHERE {date_where}" if date_where else "" + join_where = f"JOIN reports r ON r.id = rec.report_id WHERE {date_where}" if date_where else "JOIN reports r ON r.id = rec.report_id" + + # Total reports + cursor.execute(f'SELECT COUNT(*) FROM reports {reports_where}', date_params) + stats['total_reports'] = cursor.fetchone()[0] + + # Total messages + cursor.execute(f'SELECT SUM(rec.count) FROM records rec {join_where}', date_params) + result = cursor.fetchone()[0] + stats['total_messages'] = result if result else 0 + + # Messages by disposition + cursor.execute(f''' + SELECT disposition, SUM(rec.count) as total + FROM records rec + {join_where} + GROUP BY disposition + ORDER BY total DESC + ''', date_params) + stats['by_disposition'] = cursor.fetchall() + + # Messages by domain + cursor.execute(f''' + SELECT r.domain, SUM(rec.count) as total + FROM reports r + JOIN records rec ON r.id = rec.report_id + {"WHERE " + date_where if date_where else ""} + GROUP BY r.domain + ORDER BY total DESC + LIMIT 10 + ''', date_params) + stats['by_domain'] = cursor.fetchall() + + # Messages by source IP (top 10) + cursor.execute(f''' + SELECT source_ip, SUM(rec.count) as total + FROM records rec + {join_where} + GROUP BY source_ip + ORDER BY total DESC + LIMIT 10 + ''', date_params) + stats['by_source_ip'] = cursor.fetchall() + + # DKIM/SPF results + dkim_where = f"{join_where} AND rec.dkim_result IS NOT NULL" if join_where else "JOIN reports r ON r.id = rec.report_id WHERE rec.dkim_result IS NOT NULL" + cursor.execute(f''' + SELECT dkim_result, COUNT(*) as count + FROM records rec + {dkim_where} + GROUP BY dkim_result + ''', date_params) + stats['dkim_results'] = cursor.fetchall() + + spf_where = f"{join_where} AND rec.spf_result IS NOT NULL" if join_where else "JOIN reports r ON r.id = rec.report_id WHERE rec.spf_result IS NOT NULL" + cursor.execute(f''' + SELECT spf_result, COUNT(*) as count + FROM records rec + {spf_where} + GROUP BY spf_result + ''', date_params) + stats['spf_results'] = cursor.fetchall() + + conn.close() + return stats + + def get_detailed_records(self, limit=100): + """Get detailed record view""" + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + cursor.execute(''' + SELECT + r.domain, + r.org_name, + rec.source_ip, + rec.count, + rec.disposition, + rec.dkim_result, + rec.spf_result, + rec.header_from, + r.date_begin, + r.date_end + FROM reports r + JOIN records rec ON r.id = rec.report_id + ORDER BY rec.count DESC + LIMIT ? + ''', (limit,)) + + records = cursor.fetchall() + conn.close() + return records + + def get_failure_analysis(self): + """Get detailed failure analysis""" + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + analysis = {} + + # Authentication failure breakdown + cursor.execute(''' + SELECT + CASE + WHEN dkim_result = 'fail' AND spf_result = 'fail' THEN 'Both DKIM & SPF Failed' + WHEN dkim_result = 'fail' AND spf_result != 'fail' THEN 'DKIM Failed Only' + WHEN dkim_result != 'fail' AND spf_result = 'fail' THEN 'SPF Failed Only' + ELSE 'Both Passed' + END as failure_type, + SUM(count) as total_count, + COUNT(*) as record_count + FROM records + GROUP BY failure_type + ORDER BY total_count DESC + ''') + analysis['auth_failure_breakdown'] = cursor.fetchall() + + # Failures by source IP + cursor.execute(''' + SELECT source_ip, SUM(count) as total_count + FROM records + WHERE dkim_result = 'fail' OR spf_result = 'fail' + GROUP BY source_ip + ORDER BY total_count DESC + LIMIT 10 + ''') + analysis['failures_by_ip'] = cursor.fetchall() + + # Failures by domain + cursor.execute(''' + SELECT r.domain, SUM(rec.count) as total_count + FROM reports r + JOIN records rec ON r.id = rec.report_id + WHERE rec.dkim_result = 'fail' OR rec.spf_result = 'fail' + GROUP BY r.domain + ORDER BY total_count DESC + ''') + analysis['failures_by_domain'] = cursor.fetchall() + + # Failures by reporting provider + cursor.execute(''' + SELECT r.org_name, SUM(rec.count) as total_count + FROM reports r + JOIN records rec ON r.id = rec.report_id + WHERE rec.dkim_result = 'fail' OR rec.spf_result = 'fail' + GROUP BY r.org_name + ORDER BY total_count DESC + ''') + analysis['failures_by_provider'] = cursor.fetchall() + + # Detailed provider breakdown with dates + cursor.execute(''' + SELECT + r.org_name, + DATE(r.date_begin) as report_date, + SUM(rec.count) as failed_count, + COUNT(DISTINCT rec.source_ip) as unique_ips + FROM reports r + JOIN records rec ON r.id = rec.report_id + WHERE rec.dkim_result = 'fail' OR rec.spf_result = 'fail' + GROUP BY r.org_name, DATE(r.date_begin) + ORDER BY report_date DESC, failed_count DESC + ''') + analysis['provider_timeline'] = cursor.fetchall() + + # Policy actions on failures + cursor.execute(''' + SELECT disposition, SUM(count) as total_count + FROM records + WHERE dkim_result = 'fail' OR spf_result = 'fail' + GROUP BY disposition + ORDER BY total_count DESC + ''') + analysis['failure_dispositions'] = cursor.fetchall() + + # Detailed failure records + cursor.execute(''' + SELECT + r.domain, + rec.source_ip, + rec.count, + rec.disposition, + rec.dkim_result, + rec.spf_result, + rec.header_from, + r.date_begin, + r.org_name, + r.email as reporter_email + FROM reports r + JOIN records rec ON r.id = rec.report_id + WHERE rec.dkim_result = 'fail' OR rec.spf_result = 'fail' + ORDER BY rec.count DESC + LIMIT 50 + ''') + analysis['detailed_failures'] = cursor.fetchall() + + conn.close() + return analysis + + def _build_date_filter(self, date_filter): + """Build WHERE clause and parameters for date filtering""" + if not date_filter: + return "", [] + + conditions = [] + params = [] + + if 'date_from' in date_filter: + conditions.append("DATE(date_begin) >= ?") + params.append(date_filter['date_from'].strftime('%Y-%m-%d')) + + if 'date_to' in date_filter: + conditions.append("DATE(date_begin) <= ?") + params.append(date_filter['date_to'].strftime('%Y-%m-%d')) + + where_clause = " AND ".join(conditions) if conditions else "" + return where_clause, params + + def get_timeline_stats(self, date_filter=None): + """Get daily breakdown statistics""" + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + date_where, date_params = self._build_date_filter(date_filter) + where_clause = f"WHERE {date_where}" if date_where else "" + + cursor.execute(f''' + SELECT + DATE(r.date_begin) as report_date, + SUM(rec.count) as total_messages, + SUM(CASE WHEN rec.dkim_result = 'fail' OR rec.spf_result = 'fail' THEN rec.count ELSE 0 END) as failed_messages, + COUNT(DISTINCT r.org_name) as reporters + FROM reports r + JOIN records rec ON r.id = rec.report_id + {where_clause} + GROUP BY DATE(r.date_begin) + ORDER BY report_date + ''', date_params) + + timeline = cursor.fetchall() + conn.close() + return timeline
\ No newline at end of file diff --git a/dmarc_analyzer/extractor.py b/dmarc_analyzer/extractor.py new file mode 100644 index 0000000..d12d67b --- /dev/null +++ b/dmarc_analyzer/extractor.py @@ -0,0 +1,65 @@ +"""File extraction utilities for DMARC reports""" + +import gzip +import zipfile +import tempfile +import os +from pathlib import Path + + +def extract_files(file_path): + """Extract XML content from gzip or zip files + + Args: + file_path: Path to the compressed file + + Returns: + str: XML content if successful, None otherwise + """ + file_path = Path(file_path) + + try: + if file_path.suffix == '.gz': + return _extract_gzip(file_path) + elif file_path.suffix == '.zip': + return _extract_zip(file_path) + else: + # Assume it's already XML + return file_path.read_text() + except Exception as e: + print(f"Error extracting {file_path}: {e}") + return None + + +def _extract_gzip(file_path): + """Extract content from gzip file""" + with gzip.open(file_path, 'rt') as f: + return f.read() + + +def _extract_zip(file_path): + """Extract content from zip file + + For zip files, we look for XML files inside and return the first one + """ + with zipfile.ZipFile(file_path, 'r') as zip_ref: + # List all files in the zip + file_list = zip_ref.namelist() + + # Find the first XML file + xml_file = None + for filename in file_list: + if filename.lower().endswith('.xml'): + xml_file = filename + break + + if xml_file: + with zip_ref.open(xml_file) as f: + return f.read().decode('utf-8') + else: + # If no XML file found, try the first file + if file_list: + with zip_ref.open(file_list[0]) as f: + return f.read().decode('utf-8') + + return None
\ No newline at end of file diff --git a/dmarc_analyzer/main.py b/dmarc_analyzer/main.py new file mode 100644 index 0000000..18431d0 --- /dev/null +++ b/dmarc_analyzer/main.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +"""Main CLI entry point for DMARC analyzer""" + +import click +import os +from pathlib import Path + +from .extractor import extract_files +from .parser import parse_dmarc_report +from .database import Database +from .reporter import generate_report + + +@click.command() +@click.argument('paths', nargs=-1, type=click.Path(exists=True)) +@click.option('--db', default='dmarc_reports.db', help='Database file path') +@click.option('--output-format', type=click.Choice(['summary', 'detailed', 'failures']), default='summary', help='Report output format') +@click.option('--show-failures-only', is_flag=True, help='Show only records with authentication failures') +@click.option('--report-only', is_flag=True, help='Generate report from existing data without processing files') +@click.option('--date-from', type=click.DateTime(['%Y-%m-%d']), help='Filter reports from this date (YYYY-MM-DD)') +@click.option('--date-to', type=click.DateTime(['%Y-%m-%d']), help='Filter reports to this date (YYYY-MM-DD)') +@click.option('--show-timeline', is_flag=True, help='Show daily breakdown in reports') +def cli(paths, db, output_format, show_failures_only, report_only, date_from, date_to, show_timeline): + """Analyze DMARC reports from gzip/zip files or directories""" + database = Database(db) + database.init_db() + + # Report-only mode: skip file processing + if report_only: + if paths: + click.echo("Warning: Paths provided but --report-only specified. Ignoring paths.", err=True) + + click.echo("Generating report from existing database...") + click.echo("\n" + "="*50) + click.echo("DMARC REPORT SUMMARY") + click.echo("="*50) + + # Create date filter + date_filter = {} + if date_from: + date_filter['date_from'] = date_from + if date_to: + date_filter['date_to'] = date_to + + report = generate_report(database, output_format, show_failures_only, date_filter, show_timeline) + click.echo(report) + return + + # Validate paths are provided for processing mode + if not paths: + click.echo("Error: Paths are required unless using --report-only", err=True) + return + + processed_count = 0 + + for path in paths: + path = Path(path) + + if path.is_file(): + files = [path] + else: + # Find all gzip and zip files in directory + files = list(path.glob('*.gz')) + list(path.glob('*.zip')) + + for file_path in files: + try: + click.echo(f"Processing: {file_path}") + + # Extract and parse + xml_content = extract_files(file_path) + if xml_content: + report_data = parse_dmarc_report(xml_content) + if report_data: + was_stored = database.store_report(report_data) + if was_stored: + processed_count += 1 + click.echo(f" ✓ Processed successfully") + else: + click.echo(f" ⊝ Skipped (duplicate)") + else: + click.echo(f" ⚠ Failed to parse DMARC report", err=True) + else: + click.echo(f" ⚠ Failed to extract file", err=True) + + except Exception as e: + click.echo(f" ✗ Error processing {file_path}: {e}", err=True) + + click.echo(f"\nProcessed {processed_count} reports") + + # Generate summary report + if processed_count > 0: + click.echo("\n" + "="*50) + click.echo("DMARC REPORT SUMMARY") + click.echo("="*50) + + # Create date filter for processing mode too + date_filter = {} + if date_from: + date_filter['date_from'] = date_from + if date_to: + date_filter['date_to'] = date_to + + report = generate_report(database, output_format, show_failures_only, date_filter, show_timeline) + click.echo(report) + + +if __name__ == '__main__': + cli()
\ No newline at end of file diff --git a/dmarc_analyzer/parser.py b/dmarc_analyzer/parser.py new file mode 100644 index 0000000..36a27a3 --- /dev/null +++ b/dmarc_analyzer/parser.py @@ -0,0 +1,185 @@ +"""DMARC XML report parser""" + +import xml.etree.ElementTree as ET +from datetime import datetime + + +def parse_dmarc_report(xml_content): + """Parse DMARC XML report and extract relevant data + + Args: + xml_content: String containing XML report data + + Returns: + dict: Parsed report data or None if parsing fails + """ + try: + root = ET.fromstring(xml_content) + + # Extract report metadata + report_metadata = _parse_report_metadata(root) + + # Extract policy published + policy_published = _parse_policy_published(root) + + # Extract records + records = _parse_records(root) + + return { + 'metadata': report_metadata, + 'policy_published': policy_published, + 'records': records + } + + except Exception as e: + print(f"Error parsing DMARC report: {e}") + return None + + +def _parse_report_metadata(root): + """Parse report metadata section""" + metadata = {} + + report_metadata = root.find('report_metadata') + if report_metadata is not None: + # Organization name + org_name = report_metadata.find('org_name') + if org_name is not None: + metadata['org_name'] = org_name.text + + # Email + email = report_metadata.find('email') + if email is not None: + metadata['email'] = email.text + + # Report ID + report_id = report_metadata.find('report_id') + if report_id is not None: + metadata['report_id'] = report_id.text + + # Date range + date_range = report_metadata.find('date_range') + if date_range is not None: + begin = date_range.find('begin') + end = date_range.find('end') + if begin is not None: + metadata['date_begin'] = datetime.fromtimestamp(int(begin.text)) + if end is not None: + metadata['date_end'] = datetime.fromtimestamp(int(end.text)) + + return metadata + + +def _parse_policy_published(root): + """Parse policy published section""" + policy = {} + + policy_published = root.find('policy_published') + if policy_published is not None: + # Domain + domain = policy_published.find('domain') + if domain is not None: + policy['domain'] = domain.text + + # ADKIM (DKIM alignment) + adkim = policy_published.find('adkim') + if adkim is not None: + policy['adkim'] = adkim.text + + # ASPF (SPF alignment) + aspf = policy_published.find('aspf') + if aspf is not None: + policy['aspf'] = aspf.text + + # Policy + p = policy_published.find('p') + if p is not None: + policy['p'] = p.text + + # Subdomain policy + sp = policy_published.find('sp') + if sp is not None: + policy['sp'] = sp.text + + # Percentage + pct = policy_published.find('pct') + if pct is not None: + policy['pct'] = int(pct.text) + + return policy + + +def _parse_records(root): + """Parse record section""" + records = [] + + for record in root.findall('record'): + record_data = {} + + # Row data + row = record.find('row') + if row is not None: + source_ip = row.find('source_ip') + if source_ip is not None: + record_data['source_ip'] = source_ip.text + + count = row.find('count') + if count is not None: + record_data['count'] = int(count.text) + + # Policy evaluation + policy_evaluated = row.find('policy_evaluated') + if policy_evaluated is not None: + disposition = policy_evaluated.find('disposition') + if disposition is not None: + record_data['disposition'] = disposition.text + + dkim = policy_evaluated.find('dkim') + if dkim is not None: + record_data['dkim_result'] = dkim.text + + spf = policy_evaluated.find('spf') + if spf is not None: + record_data['spf_result'] = spf.text + + # Identifiers + identifiers = record.find('identifiers') + if identifiers is not None: + header_from = identifiers.find('header_from') + if header_from is not None: + record_data['header_from'] = header_from.text + + # Auth results + auth_results = record.find('auth_results') + if auth_results is not None: + # DKIM auth results + dkim_results = [] + for dkim in auth_results.findall('dkim'): + dkim_data = {} + domain = dkim.find('domain') + if domain is not None: + dkim_data['domain'] = domain.text + result = dkim.find('result') + if result is not None: + dkim_data['result'] = result.text + if dkim_data: + dkim_results.append(dkim_data) + record_data['dkim_auth'] = dkim_results + + # SPF auth results + spf_results = [] + for spf in auth_results.findall('spf'): + spf_data = {} + domain = spf.find('domain') + if domain is not None: + spf_data['domain'] = domain.text + result = spf.find('result') + if result is not None: + spf_data['result'] = result.text + if spf_data: + spf_results.append(spf_data) + record_data['spf_auth'] = spf_results + + records.append(record_data) + + return records
\ No newline at end of file diff --git a/dmarc_analyzer/reporter.py b/dmarc_analyzer/reporter.py new file mode 100644 index 0000000..484f97d --- /dev/null +++ b/dmarc_analyzer/reporter.py @@ -0,0 +1,276 @@ +"""Report generation for DMARC analysis""" + +from datetime import datetime + + +def generate_report(database, output_format='summary', show_failures_only=False, date_filter=None, show_timeline=False): + """Generate terminal report from database""" + stats = database.get_summary_stats(date_filter) + + # Get timeline data if requested + timeline_data = None + if show_timeline: + timeline_data = database.get_timeline_stats(date_filter) + + if output_format == 'failures': + failure_analysis = database.get_failure_analysis() + return _generate_failure_report(failure_analysis, timeline_data) + elif output_format == 'summary': + return _generate_summary_report(stats, show_failures_only, date_filter, timeline_data) + elif output_format == 'detailed': + detailed_records = database.get_detailed_records() + return _generate_detailed_report(stats, detailed_records, timeline_data) + + +def _generate_summary_report(stats, show_failures_only=False, date_filter=None, timeline_data=None): + """Generate summary report""" + report = [] + + # Date range info + if date_filter: + if 'date_from' in date_filter: + report.append(f"Date Range: From {date_filter['date_from'].strftime('%Y-%m-%d')}") + if 'date_to' in date_filter: + if 'date_from' in date_filter: + report[-1] += f" to {date_filter['date_to'].strftime('%Y-%m-%d')}" + else: + report.append(f"Date Range: Up to {date_filter['date_to'].strftime('%Y-%m-%d')}") + report.append("") + + # Overall statistics + report.append(f"Total Reports: {stats['total_reports']}") + report.append(f"Total Messages: {stats['total_messages']:,}") + report.append("") + + # Timeline breakdown if requested + if timeline_data: + report.append("📅 DAILY BREAKDOWN:") + for date_str, total_msgs, failed_msgs, reporters in timeline_data: + failure_rate = (failed_msgs / total_msgs * 100) if total_msgs > 0 else 0 + report.append(f" {date_str}: {total_msgs:,} messages ({failed_msgs:,} failed, {failure_rate:.1f}%) from {reporters} reporters") + report.append("") + + # Messages by disposition + if stats['by_disposition']: + report.append("Messages by Disposition:") + for disposition, count in stats['by_disposition']: + percentage = (count / stats['total_messages'] * 100) if stats['total_messages'] > 0 else 0 + report.append(f" {disposition}: {count:,} ({percentage:.1f}%)") + report.append("") + + # Top domains + if stats['by_domain']: + report.append("Top Domains:") + for domain, count in stats['by_domain']: + percentage = (count / stats['total_messages'] * 100) if stats['total_messages'] > 0 else 0 + report.append(f" {domain}: {count:,} ({percentage:.1f}%)") + report.append("") + + # Top source IPs + if stats['by_source_ip']: + report.append("Top Source IPs:") + for ip, count in stats['by_source_ip']: + percentage = (count / stats['total_messages'] * 100) if stats['total_messages'] > 0 else 0 + report.append(f" {ip}: {count:,} ({percentage:.1f}%)") + report.append("") + + # DKIM Results + if stats['dkim_results']: + report.append("DKIM Results:") + for result, count in stats['dkim_results']: + report.append(f" {result}: {count}") + report.append("") + + # SPF Results + if stats['spf_results']: + report.append("SPF Results:") + for result, count in stats['spf_results']: + report.append(f" {result}: {count}") + + return "\n".join(report) + + +def _generate_detailed_report(stats, detailed_records, timeline_data=None): + """Generate detailed report with individual records""" + report = [] + + # Start with summary + summary = _generate_summary_report(stats) + report.append(summary) + report.append("") + report.append("="*80) + report.append("DETAILED RECORDS (Top 100)") + report.append("="*80) + report.append("") + + # Table header + header = f"{'Domain':<25} {'Source IP':<15} {'Count':<8} {'Disposition':<12} {'DKIM':<8} {'SPF':<8} {'From':<20}" + report.append(header) + report.append("-" * len(header)) + + # Records + for record in detailed_records: + (domain, org_name, source_ip, count, disposition, + dkim_result, spf_result, header_from, date_begin, date_end) = record + + # Truncate long values + domain = (domain or "")[:24] + source_ip = (source_ip or "")[:14] + disposition = (disposition or "")[:11] + dkim_result = (dkim_result or "")[:7] + spf_result = (spf_result or "")[:7] + header_from = (header_from or "")[:19] + + line = f"{domain:<25} {source_ip:<15} {count:<8} {disposition:<12} {dkim_result:<8} {spf_result:<8} {header_from:<20}" + report.append(line) + + return "\n".join(report) + + +def _generate_failure_report(failure_analysis, timeline_data=None): + """Generate detailed failure analysis report""" + report = [] + + report.append("🚨 DMARC FAILURE ANALYSIS") + report.append("="*50) + report.append("") + + # Authentication failure breakdown + if failure_analysis.get('auth_failure_breakdown'): + report.append("Authentication Failure Breakdown:") + for failure_type, total_count, record_count in failure_analysis['auth_failure_breakdown']: + report.append(f" {failure_type}: {total_count:,} messages ({record_count} sources)") + report.append("") + + # Policy actions on failures + if failure_analysis.get('failure_dispositions'): + report.append("Policy Actions on Failed Messages:") + total_failures = sum(count for _, count in failure_analysis['failure_dispositions']) + for disposition, count in failure_analysis['failure_dispositions']: + percentage = (count / total_failures * 100) if total_failures > 0 else 0 + report.append(f" {disposition}: {count:,} ({percentage:.1f}%)") + report.append("") + + # Top failing source IPs + if failure_analysis.get('failures_by_ip'): + report.append("Top Failing Source IPs:") + for ip, count in failure_analysis['failures_by_ip']: + report.append(f" {ip}: {count:,} failed messages") + report.append("") + + # Failures by domain + if failure_analysis.get('failures_by_domain'): + report.append("Failures by Domain:") + for domain, count in failure_analysis['failures_by_domain']: + report.append(f" {domain}: {count:,} failed messages") + report.append("") + + # Failures by email provider (reporter) + if failure_analysis.get('failures_by_provider'): + report.append("🏢 Failures by Email Provider (Reporter):") + for provider, count in failure_analysis['failures_by_provider']: + report.append(f" {provider}: {count:,} failed messages") + report.append("") + + # Provider timeline breakdown + if failure_analysis.get('provider_timeline'): + report.append("📊 Provider Timeline Breakdown:") + current_date = None + for provider, report_date, failed_count, unique_ips in failure_analysis['provider_timeline']: + if report_date != current_date: + if current_date is not None: + report.append("") + report.append(f" {report_date}:") + current_date = report_date + report.append(f" {provider}: {failed_count:,} failures from {unique_ips} unique IPs") + report.append("") + + # Detailed failure records + if failure_analysis.get('detailed_failures'): + report.append("🔍 DETAILED FAILURE RECORDS (Top 50)") + report.append("="*80) + report.append("") + + # Table header + header = f"{'Domain':<18} {'Source IP':<15} {'Count':<6} {'Action':<10} {'DKIM':<6} {'SPF':<6} {'Reporter':<15} {'Date':<10}" + report.append(header) + report.append("-" * len(header)) + + # Records + for record in failure_analysis['detailed_failures']: + (domain, source_ip, count, disposition, dkim_result, + spf_result, header_from, date_begin, org_name, reporter_email) = record + + # Truncate long values + domain = (domain or "")[:17] + source_ip = (source_ip or "")[:14] + disposition = (disposition or "")[:9] + dkim_result = (dkim_result or "")[:5] + spf_result = (spf_result or "")[:5] + org_name = (org_name or "")[:14] + + # Format date + try: + if isinstance(date_begin, str): + date_str = date_begin[:10] # Take just YYYY-MM-DD part + else: + date_str = date_begin.strftime('%Y-%m-%d') + except: + date_str = "N/A" + + # Highlight failures with emoji + dkim_display = "❌" if dkim_result == 'fail' else "✅" + spf_display = "❌" if spf_result == 'fail' else "✅" + + line = f"{domain:<18} {source_ip:<15} {count:<6} {disposition:<10} {dkim_display:<6} {spf_display:<6} {org_name:<15} {date_str:<10}" + report.append(line) + + report.append("") + report.append("💡 RECOMMENDATIONS:") + report.append("- Investigate high-volume failing IPs for potential spoofing") + report.append("- Review DKIM signing for domains with DKIM failures") + report.append("- Check SPF records for domains with SPF failures") + report.append("- Consider moving from 'none' to 'quarantine' policy if ready") + + return "\n".join(report) + + +def format_table(headers, rows, max_width=None): + """Helper function to format data as a table""" + if not rows: + return "" + + # Calculate column widths + col_widths = [len(header) for header in headers] + + for row in rows: + for i, cell in enumerate(row): + if i < len(col_widths): + col_widths[i] = max(col_widths[i], len(str(cell))) + + # Apply max width if specified + if max_width: + for i in range(len(col_widths)): + col_widths[i] = min(col_widths[i], max_width) + + # Create format string + format_str = " | ".join(f"{{:<{width}}}" for width in col_widths) + + # Generate table + lines = [] + + # Header + lines.append(format_str.format(*headers)) + lines.append("-" * sum(col_widths) + "-" * (len(col_widths) - 1) * 3) + + # Rows + for row in rows: + formatted_row = [] + for i, cell in enumerate(row): + cell_str = str(cell) + if max_width and len(cell_str) > col_widths[i]: + cell_str = cell_str[:col_widths[i]-3] + "..." + formatted_row.append(cell_str) + lines.append(format_str.format(*formatted_row)) + + return "\n".join(lines)
\ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..9d7585c --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,15 @@ +[project] +name = "dmarc-analyzer" +version = "0.1.0" +description = "A simple DMARC report analyzer tool" +requires-python = ">=3.8" +dependencies = [ + "click>=8.0.0", +] + +[project.scripts] +dmarc-analyzer = "dmarc_analyzer.main:cli" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build"
\ No newline at end of file @@ -0,0 +1,58 @@ +version = 1 +revision = 3 +requires-python = ">=3.8" +resolution-markers = [ + "python_full_version >= '3.10'", + "python_full_version < '3.10'", +] + +[[package]] +name = "click" +version = "8.1.8" +source = { registry = "https://pypi.org/simple" } +resolution-markers = [ + "python_full_version < '3.10'", +] +dependencies = [ + { name = "colorama", marker = "python_full_version < '3.10' and sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b9/2e/0090cbf739cee7d23781ad4b89a9894a41538e4fcf4c31dcdd705b78eb8b/click-8.1.8.tar.gz", hash = "sha256:ed53c9d8990d83c2a27deae68e4ee337473f6330c040a31d4225c9574d16096a", size = 226593, upload-time = "2024-12-21T18:38:44.339Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7e/d4/7ebdbd03970677812aac39c869717059dbb71a4cfc033ca6e5221787892c/click-8.1.8-py3-none-any.whl", hash = "sha256:63c132bbbed01578a06712a2d1f497bb62d9c1c0d329b7903a866228027263b2", size = 98188, upload-time = "2024-12-21T18:38:41.666Z" }, +] + +[[package]] +name = "click" +version = "8.2.1" +source = { registry = "https://pypi.org/simple" } +resolution-markers = [ + "python_full_version >= '3.10'", +] +dependencies = [ + { name = "colorama", marker = "python_full_version >= '3.10' and sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/60/6c/8ca2efa64cf75a977a0d7fac081354553ebe483345c734fb6b6515d96bbc/click-8.2.1.tar.gz", hash = "sha256:27c491cc05d968d271d5a1db13e3b5a184636d9d930f148c50b038f0d0646202", size = 286342, upload-time = "2025-05-20T23:19:49.832Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/85/32/10bb5764d90a8eee674e9dc6f4db6a0ab47c8c4d0d83c27f7c39ac415a4d/click-8.2.1-py3-none-any.whl", hash = "sha256:61a3265b914e850b85317d0b3109c7f8cd35a670f963866005d6ef1d5175a12b", size = 102215, upload-time = "2025-05-20T23:19:47.796Z" }, +] + +[[package]] +name = "colorama" +version = "0.4.6" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d8/53/6f443c9a4a8358a93a6792e2acffb9d9d5cb0a5cfd8802644b7b1c9a02e4/colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44", size = 27697, upload-time = "2022-10-25T02:36:22.414Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" }, +] + +[[package]] +name = "dmarc-analyzer" +version = "0.1.0" +source = { editable = "." } +dependencies = [ + { name = "click", version = "8.1.8", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" }, + { name = "click", version = "8.2.1", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, +] + +[package.metadata] +requires-dist = [{ name = "click", specifier = ">=8.0.0" }] |