summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMax Resnick <max@ofmax.li>2025-12-02 21:21:50 -0800
committerMax Resnick <max@ofmax.li>2025-12-02 21:21:50 -0800
commit7d3432e055dc63935ce6df2b56d655aadf88938c (patch)
tree6d8e164e275116a605fcb6f0b2f5d9c0f88cb91d
downloaddmarc-tool-7d3432e055dc63935ce6df2b56d655aadf88938c.tar.gz
feat: init commit of toolHEADCHECKPOINTmaster
-rw-r--r--dmarc_analyzer/__init__.py3
-rw-r--r--dmarc_analyzer/database.py382
-rw-r--r--dmarc_analyzer/extractor.py65
-rw-r--r--dmarc_analyzer/main.py108
-rw-r--r--dmarc_analyzer/parser.py185
-rw-r--r--dmarc_analyzer/reporter.py276
-rw-r--r--pyproject.toml15
-rw-r--r--uv.lock58
8 files changed, 1092 insertions, 0 deletions
diff --git a/dmarc_analyzer/__init__.py b/dmarc_analyzer/__init__.py
new file mode 100644
index 0000000..ff23532
--- /dev/null
+++ b/dmarc_analyzer/__init__.py
@@ -0,0 +1,3 @@
+"""DMARC Report Analyzer"""
+
+__version__ = "0.1.0" \ No newline at end of file
diff --git a/dmarc_analyzer/database.py b/dmarc_analyzer/database.py
new file mode 100644
index 0000000..3ca33f2
--- /dev/null
+++ b/dmarc_analyzer/database.py
@@ -0,0 +1,382 @@
+"""SQLite database operations for DMARC reports"""
+
+import sqlite3
+import json
+from pathlib import Path
+
+
+class Database:
+ def __init__(self, db_path):
+ self.db_path = db_path
+
+ def init_db(self):
+ """Initialize the database schema"""
+ conn = sqlite3.connect(self.db_path)
+ cursor = conn.cursor()
+
+ # Create reports table
+ cursor.execute('''
+ CREATE TABLE IF NOT EXISTS reports (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ org_name TEXT,
+ email TEXT,
+ report_id TEXT UNIQUE,
+ date_begin DATETIME,
+ date_end DATETIME,
+ domain TEXT,
+ policy_p TEXT,
+ policy_sp TEXT,
+ policy_pct INTEGER,
+ policy_adkim TEXT,
+ policy_aspf TEXT,
+ created_at DATETIME DEFAULT CURRENT_TIMESTAMP
+ )
+ ''')
+
+ # Create records table
+ cursor.execute('''
+ CREATE TABLE IF NOT EXISTS records (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ report_id INTEGER,
+ source_ip TEXT,
+ count INTEGER,
+ disposition TEXT,
+ dkim_result TEXT,
+ spf_result TEXT,
+ header_from TEXT,
+ dkim_auth TEXT, -- JSON array
+ spf_auth TEXT, -- JSON array
+ FOREIGN KEY (report_id) REFERENCES reports (id)
+ )
+ ''')
+
+ # Create indexes for better performance
+ cursor.execute('CREATE INDEX IF NOT EXISTS idx_reports_domain ON reports (domain)')
+ cursor.execute('CREATE INDEX IF NOT EXISTS idx_records_source_ip ON records (source_ip)')
+ cursor.execute('CREATE INDEX IF NOT EXISTS idx_records_disposition ON records (disposition)')
+
+ conn.commit()
+ conn.close()
+
+ def store_report(self, report_data):
+ """Store parsed DMARC report data
+
+ Returns:
+ bool: True if report was stored, False if it was a duplicate
+ """
+ conn = sqlite3.connect(self.db_path)
+ cursor = conn.cursor()
+
+ try:
+ metadata = report_data.get('metadata', {})
+ policy = report_data.get('policy_published', {})
+ records = report_data.get('records', [])
+
+ # Check if report already exists
+ cursor.execute('SELECT id FROM reports WHERE report_id = ?', (metadata.get('report_id'),))
+ if cursor.fetchone():
+ conn.close()
+ return False # Duplicate found
+
+ # Insert report metadata
+ cursor.execute('''
+ INSERT INTO reports (
+ org_name, email, report_id, date_begin, date_end,
+ domain, policy_p, policy_sp, policy_pct, policy_adkim, policy_aspf
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ ''', (
+ metadata.get('org_name'),
+ metadata.get('email'),
+ metadata.get('report_id'),
+ metadata.get('date_begin'),
+ metadata.get('date_end'),
+ policy.get('domain'),
+ policy.get('p'),
+ policy.get('sp'),
+ policy.get('pct'),
+ policy.get('adkim'),
+ policy.get('aspf')
+ ))
+
+ report_id = cursor.lastrowid
+
+ # Insert records
+ for record in records:
+ cursor.execute('''
+ INSERT INTO records (
+ report_id, source_ip, count, disposition, dkim_result, spf_result,
+ header_from, dkim_auth, spf_auth
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
+ ''', (
+ report_id,
+ record.get('source_ip'),
+ record.get('count'),
+ record.get('disposition'),
+ record.get('dkim_result'),
+ record.get('spf_result'),
+ record.get('header_from'),
+ json.dumps(record.get('dkim_auth', [])),
+ json.dumps(record.get('spf_auth', []))
+ ))
+
+ conn.commit()
+ return True # Successfully stored
+
+ except Exception as e:
+ conn.rollback()
+ raise
+ finally:
+ conn.close()
+
+ def get_summary_stats(self, date_filter=None):
+ """Get summary statistics"""
+ conn = sqlite3.connect(self.db_path)
+ cursor = conn.cursor()
+
+ stats = {}
+
+ # Build date filter conditions
+ date_where, date_params = self._build_date_filter(date_filter)
+ reports_where = f"WHERE {date_where}" if date_where else ""
+ join_where = f"JOIN reports r ON r.id = rec.report_id WHERE {date_where}" if date_where else "JOIN reports r ON r.id = rec.report_id"
+
+ # Total reports
+ cursor.execute(f'SELECT COUNT(*) FROM reports {reports_where}', date_params)
+ stats['total_reports'] = cursor.fetchone()[0]
+
+ # Total messages
+ cursor.execute(f'SELECT SUM(rec.count) FROM records rec {join_where}', date_params)
+ result = cursor.fetchone()[0]
+ stats['total_messages'] = result if result else 0
+
+ # Messages by disposition
+ cursor.execute(f'''
+ SELECT disposition, SUM(rec.count) as total
+ FROM records rec
+ {join_where}
+ GROUP BY disposition
+ ORDER BY total DESC
+ ''', date_params)
+ stats['by_disposition'] = cursor.fetchall()
+
+ # Messages by domain
+ cursor.execute(f'''
+ SELECT r.domain, SUM(rec.count) as total
+ FROM reports r
+ JOIN records rec ON r.id = rec.report_id
+ {"WHERE " + date_where if date_where else ""}
+ GROUP BY r.domain
+ ORDER BY total DESC
+ LIMIT 10
+ ''', date_params)
+ stats['by_domain'] = cursor.fetchall()
+
+ # Messages by source IP (top 10)
+ cursor.execute(f'''
+ SELECT source_ip, SUM(rec.count) as total
+ FROM records rec
+ {join_where}
+ GROUP BY source_ip
+ ORDER BY total DESC
+ LIMIT 10
+ ''', date_params)
+ stats['by_source_ip'] = cursor.fetchall()
+
+ # DKIM/SPF results
+ dkim_where = f"{join_where} AND rec.dkim_result IS NOT NULL" if join_where else "JOIN reports r ON r.id = rec.report_id WHERE rec.dkim_result IS NOT NULL"
+ cursor.execute(f'''
+ SELECT dkim_result, COUNT(*) as count
+ FROM records rec
+ {dkim_where}
+ GROUP BY dkim_result
+ ''', date_params)
+ stats['dkim_results'] = cursor.fetchall()
+
+ spf_where = f"{join_where} AND rec.spf_result IS NOT NULL" if join_where else "JOIN reports r ON r.id = rec.report_id WHERE rec.spf_result IS NOT NULL"
+ cursor.execute(f'''
+ SELECT spf_result, COUNT(*) as count
+ FROM records rec
+ {spf_where}
+ GROUP BY spf_result
+ ''', date_params)
+ stats['spf_results'] = cursor.fetchall()
+
+ conn.close()
+ return stats
+
+ def get_detailed_records(self, limit=100):
+ """Get detailed record view"""
+ conn = sqlite3.connect(self.db_path)
+ cursor = conn.cursor()
+
+ cursor.execute('''
+ SELECT
+ r.domain,
+ r.org_name,
+ rec.source_ip,
+ rec.count,
+ rec.disposition,
+ rec.dkim_result,
+ rec.spf_result,
+ rec.header_from,
+ r.date_begin,
+ r.date_end
+ FROM reports r
+ JOIN records rec ON r.id = rec.report_id
+ ORDER BY rec.count DESC
+ LIMIT ?
+ ''', (limit,))
+
+ records = cursor.fetchall()
+ conn.close()
+ return records
+
+ def get_failure_analysis(self):
+ """Get detailed failure analysis"""
+ conn = sqlite3.connect(self.db_path)
+ cursor = conn.cursor()
+
+ analysis = {}
+
+ # Authentication failure breakdown
+ cursor.execute('''
+ SELECT
+ CASE
+ WHEN dkim_result = 'fail' AND spf_result = 'fail' THEN 'Both DKIM & SPF Failed'
+ WHEN dkim_result = 'fail' AND spf_result != 'fail' THEN 'DKIM Failed Only'
+ WHEN dkim_result != 'fail' AND spf_result = 'fail' THEN 'SPF Failed Only'
+ ELSE 'Both Passed'
+ END as failure_type,
+ SUM(count) as total_count,
+ COUNT(*) as record_count
+ FROM records
+ GROUP BY failure_type
+ ORDER BY total_count DESC
+ ''')
+ analysis['auth_failure_breakdown'] = cursor.fetchall()
+
+ # Failures by source IP
+ cursor.execute('''
+ SELECT source_ip, SUM(count) as total_count
+ FROM records
+ WHERE dkim_result = 'fail' OR spf_result = 'fail'
+ GROUP BY source_ip
+ ORDER BY total_count DESC
+ LIMIT 10
+ ''')
+ analysis['failures_by_ip'] = cursor.fetchall()
+
+ # Failures by domain
+ cursor.execute('''
+ SELECT r.domain, SUM(rec.count) as total_count
+ FROM reports r
+ JOIN records rec ON r.id = rec.report_id
+ WHERE rec.dkim_result = 'fail' OR rec.spf_result = 'fail'
+ GROUP BY r.domain
+ ORDER BY total_count DESC
+ ''')
+ analysis['failures_by_domain'] = cursor.fetchall()
+
+ # Failures by reporting provider
+ cursor.execute('''
+ SELECT r.org_name, SUM(rec.count) as total_count
+ FROM reports r
+ JOIN records rec ON r.id = rec.report_id
+ WHERE rec.dkim_result = 'fail' OR rec.spf_result = 'fail'
+ GROUP BY r.org_name
+ ORDER BY total_count DESC
+ ''')
+ analysis['failures_by_provider'] = cursor.fetchall()
+
+ # Detailed provider breakdown with dates
+ cursor.execute('''
+ SELECT
+ r.org_name,
+ DATE(r.date_begin) as report_date,
+ SUM(rec.count) as failed_count,
+ COUNT(DISTINCT rec.source_ip) as unique_ips
+ FROM reports r
+ JOIN records rec ON r.id = rec.report_id
+ WHERE rec.dkim_result = 'fail' OR rec.spf_result = 'fail'
+ GROUP BY r.org_name, DATE(r.date_begin)
+ ORDER BY report_date DESC, failed_count DESC
+ ''')
+ analysis['provider_timeline'] = cursor.fetchall()
+
+ # Policy actions on failures
+ cursor.execute('''
+ SELECT disposition, SUM(count) as total_count
+ FROM records
+ WHERE dkim_result = 'fail' OR spf_result = 'fail'
+ GROUP BY disposition
+ ORDER BY total_count DESC
+ ''')
+ analysis['failure_dispositions'] = cursor.fetchall()
+
+ # Detailed failure records
+ cursor.execute('''
+ SELECT
+ r.domain,
+ rec.source_ip,
+ rec.count,
+ rec.disposition,
+ rec.dkim_result,
+ rec.spf_result,
+ rec.header_from,
+ r.date_begin,
+ r.org_name,
+ r.email as reporter_email
+ FROM reports r
+ JOIN records rec ON r.id = rec.report_id
+ WHERE rec.dkim_result = 'fail' OR rec.spf_result = 'fail'
+ ORDER BY rec.count DESC
+ LIMIT 50
+ ''')
+ analysis['detailed_failures'] = cursor.fetchall()
+
+ conn.close()
+ return analysis
+
+ def _build_date_filter(self, date_filter):
+ """Build WHERE clause and parameters for date filtering"""
+ if not date_filter:
+ return "", []
+
+ conditions = []
+ params = []
+
+ if 'date_from' in date_filter:
+ conditions.append("DATE(date_begin) >= ?")
+ params.append(date_filter['date_from'].strftime('%Y-%m-%d'))
+
+ if 'date_to' in date_filter:
+ conditions.append("DATE(date_begin) <= ?")
+ params.append(date_filter['date_to'].strftime('%Y-%m-%d'))
+
+ where_clause = " AND ".join(conditions) if conditions else ""
+ return where_clause, params
+
+ def get_timeline_stats(self, date_filter=None):
+ """Get daily breakdown statistics"""
+ conn = sqlite3.connect(self.db_path)
+ cursor = conn.cursor()
+
+ date_where, date_params = self._build_date_filter(date_filter)
+ where_clause = f"WHERE {date_where}" if date_where else ""
+
+ cursor.execute(f'''
+ SELECT
+ DATE(r.date_begin) as report_date,
+ SUM(rec.count) as total_messages,
+ SUM(CASE WHEN rec.dkim_result = 'fail' OR rec.spf_result = 'fail' THEN rec.count ELSE 0 END) as failed_messages,
+ COUNT(DISTINCT r.org_name) as reporters
+ FROM reports r
+ JOIN records rec ON r.id = rec.report_id
+ {where_clause}
+ GROUP BY DATE(r.date_begin)
+ ORDER BY report_date
+ ''', date_params)
+
+ timeline = cursor.fetchall()
+ conn.close()
+ return timeline \ No newline at end of file
diff --git a/dmarc_analyzer/extractor.py b/dmarc_analyzer/extractor.py
new file mode 100644
index 0000000..d12d67b
--- /dev/null
+++ b/dmarc_analyzer/extractor.py
@@ -0,0 +1,65 @@
+"""File extraction utilities for DMARC reports"""
+
+import gzip
+import zipfile
+import tempfile
+import os
+from pathlib import Path
+
+
+def extract_files(file_path):
+ """Extract XML content from gzip or zip files
+
+ Args:
+ file_path: Path to the compressed file
+
+ Returns:
+ str: XML content if successful, None otherwise
+ """
+ file_path = Path(file_path)
+
+ try:
+ if file_path.suffix == '.gz':
+ return _extract_gzip(file_path)
+ elif file_path.suffix == '.zip':
+ return _extract_zip(file_path)
+ else:
+ # Assume it's already XML
+ return file_path.read_text()
+ except Exception as e:
+ print(f"Error extracting {file_path}: {e}")
+ return None
+
+
+def _extract_gzip(file_path):
+ """Extract content from gzip file"""
+ with gzip.open(file_path, 'rt') as f:
+ return f.read()
+
+
+def _extract_zip(file_path):
+ """Extract content from zip file
+
+ For zip files, we look for XML files inside and return the first one
+ """
+ with zipfile.ZipFile(file_path, 'r') as zip_ref:
+ # List all files in the zip
+ file_list = zip_ref.namelist()
+
+ # Find the first XML file
+ xml_file = None
+ for filename in file_list:
+ if filename.lower().endswith('.xml'):
+ xml_file = filename
+ break
+
+ if xml_file:
+ with zip_ref.open(xml_file) as f:
+ return f.read().decode('utf-8')
+ else:
+ # If no XML file found, try the first file
+ if file_list:
+ with zip_ref.open(file_list[0]) as f:
+ return f.read().decode('utf-8')
+
+ return None \ No newline at end of file
diff --git a/dmarc_analyzer/main.py b/dmarc_analyzer/main.py
new file mode 100644
index 0000000..18431d0
--- /dev/null
+++ b/dmarc_analyzer/main.py
@@ -0,0 +1,108 @@
+#!/usr/bin/env python3
+"""Main CLI entry point for DMARC analyzer"""
+
+import click
+import os
+from pathlib import Path
+
+from .extractor import extract_files
+from .parser import parse_dmarc_report
+from .database import Database
+from .reporter import generate_report
+
+
+@click.command()
+@click.argument('paths', nargs=-1, type=click.Path(exists=True))
+@click.option('--db', default='dmarc_reports.db', help='Database file path')
+@click.option('--output-format', type=click.Choice(['summary', 'detailed', 'failures']), default='summary', help='Report output format')
+@click.option('--show-failures-only', is_flag=True, help='Show only records with authentication failures')
+@click.option('--report-only', is_flag=True, help='Generate report from existing data without processing files')
+@click.option('--date-from', type=click.DateTime(['%Y-%m-%d']), help='Filter reports from this date (YYYY-MM-DD)')
+@click.option('--date-to', type=click.DateTime(['%Y-%m-%d']), help='Filter reports to this date (YYYY-MM-DD)')
+@click.option('--show-timeline', is_flag=True, help='Show daily breakdown in reports')
+def cli(paths, db, output_format, show_failures_only, report_only, date_from, date_to, show_timeline):
+ """Analyze DMARC reports from gzip/zip files or directories"""
+ database = Database(db)
+ database.init_db()
+
+ # Report-only mode: skip file processing
+ if report_only:
+ if paths:
+ click.echo("Warning: Paths provided but --report-only specified. Ignoring paths.", err=True)
+
+ click.echo("Generating report from existing database...")
+ click.echo("\n" + "="*50)
+ click.echo("DMARC REPORT SUMMARY")
+ click.echo("="*50)
+
+ # Create date filter
+ date_filter = {}
+ if date_from:
+ date_filter['date_from'] = date_from
+ if date_to:
+ date_filter['date_to'] = date_to
+
+ report = generate_report(database, output_format, show_failures_only, date_filter, show_timeline)
+ click.echo(report)
+ return
+
+ # Validate paths are provided for processing mode
+ if not paths:
+ click.echo("Error: Paths are required unless using --report-only", err=True)
+ return
+
+ processed_count = 0
+
+ for path in paths:
+ path = Path(path)
+
+ if path.is_file():
+ files = [path]
+ else:
+ # Find all gzip and zip files in directory
+ files = list(path.glob('*.gz')) + list(path.glob('*.zip'))
+
+ for file_path in files:
+ try:
+ click.echo(f"Processing: {file_path}")
+
+ # Extract and parse
+ xml_content = extract_files(file_path)
+ if xml_content:
+ report_data = parse_dmarc_report(xml_content)
+ if report_data:
+ was_stored = database.store_report(report_data)
+ if was_stored:
+ processed_count += 1
+ click.echo(f" ✓ Processed successfully")
+ else:
+ click.echo(f" ⊝ Skipped (duplicate)")
+ else:
+ click.echo(f" ⚠ Failed to parse DMARC report", err=True)
+ else:
+ click.echo(f" ⚠ Failed to extract file", err=True)
+
+ except Exception as e:
+ click.echo(f" ✗ Error processing {file_path}: {e}", err=True)
+
+ click.echo(f"\nProcessed {processed_count} reports")
+
+ # Generate summary report
+ if processed_count > 0:
+ click.echo("\n" + "="*50)
+ click.echo("DMARC REPORT SUMMARY")
+ click.echo("="*50)
+
+ # Create date filter for processing mode too
+ date_filter = {}
+ if date_from:
+ date_filter['date_from'] = date_from
+ if date_to:
+ date_filter['date_to'] = date_to
+
+ report = generate_report(database, output_format, show_failures_only, date_filter, show_timeline)
+ click.echo(report)
+
+
+if __name__ == '__main__':
+ cli() \ No newline at end of file
diff --git a/dmarc_analyzer/parser.py b/dmarc_analyzer/parser.py
new file mode 100644
index 0000000..36a27a3
--- /dev/null
+++ b/dmarc_analyzer/parser.py
@@ -0,0 +1,185 @@
+"""DMARC XML report parser"""
+
+import xml.etree.ElementTree as ET
+from datetime import datetime
+
+
+def parse_dmarc_report(xml_content):
+ """Parse DMARC XML report and extract relevant data
+
+ Args:
+ xml_content: String containing XML report data
+
+ Returns:
+ dict: Parsed report data or None if parsing fails
+ """
+ try:
+ root = ET.fromstring(xml_content)
+
+ # Extract report metadata
+ report_metadata = _parse_report_metadata(root)
+
+ # Extract policy published
+ policy_published = _parse_policy_published(root)
+
+ # Extract records
+ records = _parse_records(root)
+
+ return {
+ 'metadata': report_metadata,
+ 'policy_published': policy_published,
+ 'records': records
+ }
+
+ except Exception as e:
+ print(f"Error parsing DMARC report: {e}")
+ return None
+
+
+def _parse_report_metadata(root):
+ """Parse report metadata section"""
+ metadata = {}
+
+ report_metadata = root.find('report_metadata')
+ if report_metadata is not None:
+ # Organization name
+ org_name = report_metadata.find('org_name')
+ if org_name is not None:
+ metadata['org_name'] = org_name.text
+
+ # Email
+ email = report_metadata.find('email')
+ if email is not None:
+ metadata['email'] = email.text
+
+ # Report ID
+ report_id = report_metadata.find('report_id')
+ if report_id is not None:
+ metadata['report_id'] = report_id.text
+
+ # Date range
+ date_range = report_metadata.find('date_range')
+ if date_range is not None:
+ begin = date_range.find('begin')
+ end = date_range.find('end')
+ if begin is not None:
+ metadata['date_begin'] = datetime.fromtimestamp(int(begin.text))
+ if end is not None:
+ metadata['date_end'] = datetime.fromtimestamp(int(end.text))
+
+ return metadata
+
+
+def _parse_policy_published(root):
+ """Parse policy published section"""
+ policy = {}
+
+ policy_published = root.find('policy_published')
+ if policy_published is not None:
+ # Domain
+ domain = policy_published.find('domain')
+ if domain is not None:
+ policy['domain'] = domain.text
+
+ # ADKIM (DKIM alignment)
+ adkim = policy_published.find('adkim')
+ if adkim is not None:
+ policy['adkim'] = adkim.text
+
+ # ASPF (SPF alignment)
+ aspf = policy_published.find('aspf')
+ if aspf is not None:
+ policy['aspf'] = aspf.text
+
+ # Policy
+ p = policy_published.find('p')
+ if p is not None:
+ policy['p'] = p.text
+
+ # Subdomain policy
+ sp = policy_published.find('sp')
+ if sp is not None:
+ policy['sp'] = sp.text
+
+ # Percentage
+ pct = policy_published.find('pct')
+ if pct is not None:
+ policy['pct'] = int(pct.text)
+
+ return policy
+
+
+def _parse_records(root):
+ """Parse record section"""
+ records = []
+
+ for record in root.findall('record'):
+ record_data = {}
+
+ # Row data
+ row = record.find('row')
+ if row is not None:
+ source_ip = row.find('source_ip')
+ if source_ip is not None:
+ record_data['source_ip'] = source_ip.text
+
+ count = row.find('count')
+ if count is not None:
+ record_data['count'] = int(count.text)
+
+ # Policy evaluation
+ policy_evaluated = row.find('policy_evaluated')
+ if policy_evaluated is not None:
+ disposition = policy_evaluated.find('disposition')
+ if disposition is not None:
+ record_data['disposition'] = disposition.text
+
+ dkim = policy_evaluated.find('dkim')
+ if dkim is not None:
+ record_data['dkim_result'] = dkim.text
+
+ spf = policy_evaluated.find('spf')
+ if spf is not None:
+ record_data['spf_result'] = spf.text
+
+ # Identifiers
+ identifiers = record.find('identifiers')
+ if identifiers is not None:
+ header_from = identifiers.find('header_from')
+ if header_from is not None:
+ record_data['header_from'] = header_from.text
+
+ # Auth results
+ auth_results = record.find('auth_results')
+ if auth_results is not None:
+ # DKIM auth results
+ dkim_results = []
+ for dkim in auth_results.findall('dkim'):
+ dkim_data = {}
+ domain = dkim.find('domain')
+ if domain is not None:
+ dkim_data['domain'] = domain.text
+ result = dkim.find('result')
+ if result is not None:
+ dkim_data['result'] = result.text
+ if dkim_data:
+ dkim_results.append(dkim_data)
+ record_data['dkim_auth'] = dkim_results
+
+ # SPF auth results
+ spf_results = []
+ for spf in auth_results.findall('spf'):
+ spf_data = {}
+ domain = spf.find('domain')
+ if domain is not None:
+ spf_data['domain'] = domain.text
+ result = spf.find('result')
+ if result is not None:
+ spf_data['result'] = result.text
+ if spf_data:
+ spf_results.append(spf_data)
+ record_data['spf_auth'] = spf_results
+
+ records.append(record_data)
+
+ return records \ No newline at end of file
diff --git a/dmarc_analyzer/reporter.py b/dmarc_analyzer/reporter.py
new file mode 100644
index 0000000..484f97d
--- /dev/null
+++ b/dmarc_analyzer/reporter.py
@@ -0,0 +1,276 @@
+"""Report generation for DMARC analysis"""
+
+from datetime import datetime
+
+
+def generate_report(database, output_format='summary', show_failures_only=False, date_filter=None, show_timeline=False):
+ """Generate terminal report from database"""
+ stats = database.get_summary_stats(date_filter)
+
+ # Get timeline data if requested
+ timeline_data = None
+ if show_timeline:
+ timeline_data = database.get_timeline_stats(date_filter)
+
+ if output_format == 'failures':
+ failure_analysis = database.get_failure_analysis()
+ return _generate_failure_report(failure_analysis, timeline_data)
+ elif output_format == 'summary':
+ return _generate_summary_report(stats, show_failures_only, date_filter, timeline_data)
+ elif output_format == 'detailed':
+ detailed_records = database.get_detailed_records()
+ return _generate_detailed_report(stats, detailed_records, timeline_data)
+
+
+def _generate_summary_report(stats, show_failures_only=False, date_filter=None, timeline_data=None):
+ """Generate summary report"""
+ report = []
+
+ # Date range info
+ if date_filter:
+ if 'date_from' in date_filter:
+ report.append(f"Date Range: From {date_filter['date_from'].strftime('%Y-%m-%d')}")
+ if 'date_to' in date_filter:
+ if 'date_from' in date_filter:
+ report[-1] += f" to {date_filter['date_to'].strftime('%Y-%m-%d')}"
+ else:
+ report.append(f"Date Range: Up to {date_filter['date_to'].strftime('%Y-%m-%d')}")
+ report.append("")
+
+ # Overall statistics
+ report.append(f"Total Reports: {stats['total_reports']}")
+ report.append(f"Total Messages: {stats['total_messages']:,}")
+ report.append("")
+
+ # Timeline breakdown if requested
+ if timeline_data:
+ report.append("📅 DAILY BREAKDOWN:")
+ for date_str, total_msgs, failed_msgs, reporters in timeline_data:
+ failure_rate = (failed_msgs / total_msgs * 100) if total_msgs > 0 else 0
+ report.append(f" {date_str}: {total_msgs:,} messages ({failed_msgs:,} failed, {failure_rate:.1f}%) from {reporters} reporters")
+ report.append("")
+
+ # Messages by disposition
+ if stats['by_disposition']:
+ report.append("Messages by Disposition:")
+ for disposition, count in stats['by_disposition']:
+ percentage = (count / stats['total_messages'] * 100) if stats['total_messages'] > 0 else 0
+ report.append(f" {disposition}: {count:,} ({percentage:.1f}%)")
+ report.append("")
+
+ # Top domains
+ if stats['by_domain']:
+ report.append("Top Domains:")
+ for domain, count in stats['by_domain']:
+ percentage = (count / stats['total_messages'] * 100) if stats['total_messages'] > 0 else 0
+ report.append(f" {domain}: {count:,} ({percentage:.1f}%)")
+ report.append("")
+
+ # Top source IPs
+ if stats['by_source_ip']:
+ report.append("Top Source IPs:")
+ for ip, count in stats['by_source_ip']:
+ percentage = (count / stats['total_messages'] * 100) if stats['total_messages'] > 0 else 0
+ report.append(f" {ip}: {count:,} ({percentage:.1f}%)")
+ report.append("")
+
+ # DKIM Results
+ if stats['dkim_results']:
+ report.append("DKIM Results:")
+ for result, count in stats['dkim_results']:
+ report.append(f" {result}: {count}")
+ report.append("")
+
+ # SPF Results
+ if stats['spf_results']:
+ report.append("SPF Results:")
+ for result, count in stats['spf_results']:
+ report.append(f" {result}: {count}")
+
+ return "\n".join(report)
+
+
+def _generate_detailed_report(stats, detailed_records, timeline_data=None):
+ """Generate detailed report with individual records"""
+ report = []
+
+ # Start with summary
+ summary = _generate_summary_report(stats)
+ report.append(summary)
+ report.append("")
+ report.append("="*80)
+ report.append("DETAILED RECORDS (Top 100)")
+ report.append("="*80)
+ report.append("")
+
+ # Table header
+ header = f"{'Domain':<25} {'Source IP':<15} {'Count':<8} {'Disposition':<12} {'DKIM':<8} {'SPF':<8} {'From':<20}"
+ report.append(header)
+ report.append("-" * len(header))
+
+ # Records
+ for record in detailed_records:
+ (domain, org_name, source_ip, count, disposition,
+ dkim_result, spf_result, header_from, date_begin, date_end) = record
+
+ # Truncate long values
+ domain = (domain or "")[:24]
+ source_ip = (source_ip or "")[:14]
+ disposition = (disposition or "")[:11]
+ dkim_result = (dkim_result or "")[:7]
+ spf_result = (spf_result or "")[:7]
+ header_from = (header_from or "")[:19]
+
+ line = f"{domain:<25} {source_ip:<15} {count:<8} {disposition:<12} {dkim_result:<8} {spf_result:<8} {header_from:<20}"
+ report.append(line)
+
+ return "\n".join(report)
+
+
+def _generate_failure_report(failure_analysis, timeline_data=None):
+ """Generate detailed failure analysis report"""
+ report = []
+
+ report.append("🚨 DMARC FAILURE ANALYSIS")
+ report.append("="*50)
+ report.append("")
+
+ # Authentication failure breakdown
+ if failure_analysis.get('auth_failure_breakdown'):
+ report.append("Authentication Failure Breakdown:")
+ for failure_type, total_count, record_count in failure_analysis['auth_failure_breakdown']:
+ report.append(f" {failure_type}: {total_count:,} messages ({record_count} sources)")
+ report.append("")
+
+ # Policy actions on failures
+ if failure_analysis.get('failure_dispositions'):
+ report.append("Policy Actions on Failed Messages:")
+ total_failures = sum(count for _, count in failure_analysis['failure_dispositions'])
+ for disposition, count in failure_analysis['failure_dispositions']:
+ percentage = (count / total_failures * 100) if total_failures > 0 else 0
+ report.append(f" {disposition}: {count:,} ({percentage:.1f}%)")
+ report.append("")
+
+ # Top failing source IPs
+ if failure_analysis.get('failures_by_ip'):
+ report.append("Top Failing Source IPs:")
+ for ip, count in failure_analysis['failures_by_ip']:
+ report.append(f" {ip}: {count:,} failed messages")
+ report.append("")
+
+ # Failures by domain
+ if failure_analysis.get('failures_by_domain'):
+ report.append("Failures by Domain:")
+ for domain, count in failure_analysis['failures_by_domain']:
+ report.append(f" {domain}: {count:,} failed messages")
+ report.append("")
+
+ # Failures by email provider (reporter)
+ if failure_analysis.get('failures_by_provider'):
+ report.append("🏢 Failures by Email Provider (Reporter):")
+ for provider, count in failure_analysis['failures_by_provider']:
+ report.append(f" {provider}: {count:,} failed messages")
+ report.append("")
+
+ # Provider timeline breakdown
+ if failure_analysis.get('provider_timeline'):
+ report.append("📊 Provider Timeline Breakdown:")
+ current_date = None
+ for provider, report_date, failed_count, unique_ips in failure_analysis['provider_timeline']:
+ if report_date != current_date:
+ if current_date is not None:
+ report.append("")
+ report.append(f" {report_date}:")
+ current_date = report_date
+ report.append(f" {provider}: {failed_count:,} failures from {unique_ips} unique IPs")
+ report.append("")
+
+ # Detailed failure records
+ if failure_analysis.get('detailed_failures'):
+ report.append("🔍 DETAILED FAILURE RECORDS (Top 50)")
+ report.append("="*80)
+ report.append("")
+
+ # Table header
+ header = f"{'Domain':<18} {'Source IP':<15} {'Count':<6} {'Action':<10} {'DKIM':<6} {'SPF':<6} {'Reporter':<15} {'Date':<10}"
+ report.append(header)
+ report.append("-" * len(header))
+
+ # Records
+ for record in failure_analysis['detailed_failures']:
+ (domain, source_ip, count, disposition, dkim_result,
+ spf_result, header_from, date_begin, org_name, reporter_email) = record
+
+ # Truncate long values
+ domain = (domain or "")[:17]
+ source_ip = (source_ip or "")[:14]
+ disposition = (disposition or "")[:9]
+ dkim_result = (dkim_result or "")[:5]
+ spf_result = (spf_result or "")[:5]
+ org_name = (org_name or "")[:14]
+
+ # Format date
+ try:
+ if isinstance(date_begin, str):
+ date_str = date_begin[:10] # Take just YYYY-MM-DD part
+ else:
+ date_str = date_begin.strftime('%Y-%m-%d')
+ except:
+ date_str = "N/A"
+
+ # Highlight failures with emoji
+ dkim_display = "❌" if dkim_result == 'fail' else "✅"
+ spf_display = "❌" if spf_result == 'fail' else "✅"
+
+ line = f"{domain:<18} {source_ip:<15} {count:<6} {disposition:<10} {dkim_display:<6} {spf_display:<6} {org_name:<15} {date_str:<10}"
+ report.append(line)
+
+ report.append("")
+ report.append("💡 RECOMMENDATIONS:")
+ report.append("- Investigate high-volume failing IPs for potential spoofing")
+ report.append("- Review DKIM signing for domains with DKIM failures")
+ report.append("- Check SPF records for domains with SPF failures")
+ report.append("- Consider moving from 'none' to 'quarantine' policy if ready")
+
+ return "\n".join(report)
+
+
+def format_table(headers, rows, max_width=None):
+ """Helper function to format data as a table"""
+ if not rows:
+ return ""
+
+ # Calculate column widths
+ col_widths = [len(header) for header in headers]
+
+ for row in rows:
+ for i, cell in enumerate(row):
+ if i < len(col_widths):
+ col_widths[i] = max(col_widths[i], len(str(cell)))
+
+ # Apply max width if specified
+ if max_width:
+ for i in range(len(col_widths)):
+ col_widths[i] = min(col_widths[i], max_width)
+
+ # Create format string
+ format_str = " | ".join(f"{{:<{width}}}" for width in col_widths)
+
+ # Generate table
+ lines = []
+
+ # Header
+ lines.append(format_str.format(*headers))
+ lines.append("-" * sum(col_widths) + "-" * (len(col_widths) - 1) * 3)
+
+ # Rows
+ for row in rows:
+ formatted_row = []
+ for i, cell in enumerate(row):
+ cell_str = str(cell)
+ if max_width and len(cell_str) > col_widths[i]:
+ cell_str = cell_str[:col_widths[i]-3] + "..."
+ formatted_row.append(cell_str)
+ lines.append(format_str.format(*formatted_row))
+
+ return "\n".join(lines) \ No newline at end of file
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..9d7585c
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,15 @@
+[project]
+name = "dmarc-analyzer"
+version = "0.1.0"
+description = "A simple DMARC report analyzer tool"
+requires-python = ">=3.8"
+dependencies = [
+ "click>=8.0.0",
+]
+
+[project.scripts]
+dmarc-analyzer = "dmarc_analyzer.main:cli"
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build" \ No newline at end of file
diff --git a/uv.lock b/uv.lock
new file mode 100644
index 0000000..48bc407
--- /dev/null
+++ b/uv.lock
@@ -0,0 +1,58 @@
+version = 1
+revision = 3
+requires-python = ">=3.8"
+resolution-markers = [
+ "python_full_version >= '3.10'",
+ "python_full_version < '3.10'",
+]
+
+[[package]]
+name = "click"
+version = "8.1.8"
+source = { registry = "https://pypi.org/simple" }
+resolution-markers = [
+ "python_full_version < '3.10'",
+]
+dependencies = [
+ { name = "colorama", marker = "python_full_version < '3.10' and sys_platform == 'win32'" },
+]
+sdist = { url = "https://files.pythonhosted.org/packages/b9/2e/0090cbf739cee7d23781ad4b89a9894a41538e4fcf4c31dcdd705b78eb8b/click-8.1.8.tar.gz", hash = "sha256:ed53c9d8990d83c2a27deae68e4ee337473f6330c040a31d4225c9574d16096a", size = 226593, upload-time = "2024-12-21T18:38:44.339Z" }
+wheels = [
+ { url = "https://files.pythonhosted.org/packages/7e/d4/7ebdbd03970677812aac39c869717059dbb71a4cfc033ca6e5221787892c/click-8.1.8-py3-none-any.whl", hash = "sha256:63c132bbbed01578a06712a2d1f497bb62d9c1c0d329b7903a866228027263b2", size = 98188, upload-time = "2024-12-21T18:38:41.666Z" },
+]
+
+[[package]]
+name = "click"
+version = "8.2.1"
+source = { registry = "https://pypi.org/simple" }
+resolution-markers = [
+ "python_full_version >= '3.10'",
+]
+dependencies = [
+ { name = "colorama", marker = "python_full_version >= '3.10' and sys_platform == 'win32'" },
+]
+sdist = { url = "https://files.pythonhosted.org/packages/60/6c/8ca2efa64cf75a977a0d7fac081354553ebe483345c734fb6b6515d96bbc/click-8.2.1.tar.gz", hash = "sha256:27c491cc05d968d271d5a1db13e3b5a184636d9d930f148c50b038f0d0646202", size = 286342, upload-time = "2025-05-20T23:19:49.832Z" }
+wheels = [
+ { url = "https://files.pythonhosted.org/packages/85/32/10bb5764d90a8eee674e9dc6f4db6a0ab47c8c4d0d83c27f7c39ac415a4d/click-8.2.1-py3-none-any.whl", hash = "sha256:61a3265b914e850b85317d0b3109c7f8cd35a670f963866005d6ef1d5175a12b", size = 102215, upload-time = "2025-05-20T23:19:47.796Z" },
+]
+
+[[package]]
+name = "colorama"
+version = "0.4.6"
+source = { registry = "https://pypi.org/simple" }
+sdist = { url = "https://files.pythonhosted.org/packages/d8/53/6f443c9a4a8358a93a6792e2acffb9d9d5cb0a5cfd8802644b7b1c9a02e4/colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44", size = 27697, upload-time = "2022-10-25T02:36:22.414Z" }
+wheels = [
+ { url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
+]
+
+[[package]]
+name = "dmarc-analyzer"
+version = "0.1.0"
+source = { editable = "." }
+dependencies = [
+ { name = "click", version = "8.1.8", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" },
+ { name = "click", version = "8.2.1", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" },
+]
+
+[package.metadata]
+requires-dist = [{ name = "click", specifier = ">=8.0.0" }]