diff options
| author | Max Resnick <max@ofmax.li> | 2025-12-02 21:21:50 -0800 |
|---|---|---|
| committer | Max Resnick <max@ofmax.li> | 2025-12-02 21:21:50 -0800 |
| commit | 7d3432e055dc63935ce6df2b56d655aadf88938c (patch) | |
| tree | 6d8e164e275116a605fcb6f0b2f5d9c0f88cb91d /dmarc_analyzer/reporter.py | |
| download | dmarc-tool-7d3432e055dc63935ce6df2b56d655aadf88938c.tar.gz | |
feat: init commit of toolHEADCHECKPOINTmaster
Diffstat (limited to 'dmarc_analyzer/reporter.py')
| -rw-r--r-- | dmarc_analyzer/reporter.py | 276 |
1 files changed, 276 insertions, 0 deletions
diff --git a/dmarc_analyzer/reporter.py b/dmarc_analyzer/reporter.py new file mode 100644 index 0000000..484f97d --- /dev/null +++ b/dmarc_analyzer/reporter.py @@ -0,0 +1,276 @@ +"""Report generation for DMARC analysis""" + +from datetime import datetime + + +def generate_report(database, output_format='summary', show_failures_only=False, date_filter=None, show_timeline=False): + """Generate terminal report from database""" + stats = database.get_summary_stats(date_filter) + + # Get timeline data if requested + timeline_data = None + if show_timeline: + timeline_data = database.get_timeline_stats(date_filter) + + if output_format == 'failures': + failure_analysis = database.get_failure_analysis() + return _generate_failure_report(failure_analysis, timeline_data) + elif output_format == 'summary': + return _generate_summary_report(stats, show_failures_only, date_filter, timeline_data) + elif output_format == 'detailed': + detailed_records = database.get_detailed_records() + return _generate_detailed_report(stats, detailed_records, timeline_data) + + +def _generate_summary_report(stats, show_failures_only=False, date_filter=None, timeline_data=None): + """Generate summary report""" + report = [] + + # Date range info + if date_filter: + if 'date_from' in date_filter: + report.append(f"Date Range: From {date_filter['date_from'].strftime('%Y-%m-%d')}") + if 'date_to' in date_filter: + if 'date_from' in date_filter: + report[-1] += f" to {date_filter['date_to'].strftime('%Y-%m-%d')}" + else: + report.append(f"Date Range: Up to {date_filter['date_to'].strftime('%Y-%m-%d')}") + report.append("") + + # Overall statistics + report.append(f"Total Reports: {stats['total_reports']}") + report.append(f"Total Messages: {stats['total_messages']:,}") + report.append("") + + # Timeline breakdown if requested + if timeline_data: + report.append("📅 DAILY BREAKDOWN:") + for date_str, total_msgs, failed_msgs, reporters in timeline_data: + failure_rate = (failed_msgs / total_msgs * 100) if total_msgs > 0 else 0 + report.append(f" {date_str}: {total_msgs:,} messages ({failed_msgs:,} failed, {failure_rate:.1f}%) from {reporters} reporters") + report.append("") + + # Messages by disposition + if stats['by_disposition']: + report.append("Messages by Disposition:") + for disposition, count in stats['by_disposition']: + percentage = (count / stats['total_messages'] * 100) if stats['total_messages'] > 0 else 0 + report.append(f" {disposition}: {count:,} ({percentage:.1f}%)") + report.append("") + + # Top domains + if stats['by_domain']: + report.append("Top Domains:") + for domain, count in stats['by_domain']: + percentage = (count / stats['total_messages'] * 100) if stats['total_messages'] > 0 else 0 + report.append(f" {domain}: {count:,} ({percentage:.1f}%)") + report.append("") + + # Top source IPs + if stats['by_source_ip']: + report.append("Top Source IPs:") + for ip, count in stats['by_source_ip']: + percentage = (count / stats['total_messages'] * 100) if stats['total_messages'] > 0 else 0 + report.append(f" {ip}: {count:,} ({percentage:.1f}%)") + report.append("") + + # DKIM Results + if stats['dkim_results']: + report.append("DKIM Results:") + for result, count in stats['dkim_results']: + report.append(f" {result}: {count}") + report.append("") + + # SPF Results + if stats['spf_results']: + report.append("SPF Results:") + for result, count in stats['spf_results']: + report.append(f" {result}: {count}") + + return "\n".join(report) + + +def _generate_detailed_report(stats, detailed_records, timeline_data=None): + """Generate detailed report with individual records""" + report = [] + + # Start with summary + summary = _generate_summary_report(stats) + report.append(summary) + report.append("") + report.append("="*80) + report.append("DETAILED RECORDS (Top 100)") + report.append("="*80) + report.append("") + + # Table header + header = f"{'Domain':<25} {'Source IP':<15} {'Count':<8} {'Disposition':<12} {'DKIM':<8} {'SPF':<8} {'From':<20}" + report.append(header) + report.append("-" * len(header)) + + # Records + for record in detailed_records: + (domain, org_name, source_ip, count, disposition, + dkim_result, spf_result, header_from, date_begin, date_end) = record + + # Truncate long values + domain = (domain or "")[:24] + source_ip = (source_ip or "")[:14] + disposition = (disposition or "")[:11] + dkim_result = (dkim_result or "")[:7] + spf_result = (spf_result or "")[:7] + header_from = (header_from or "")[:19] + + line = f"{domain:<25} {source_ip:<15} {count:<8} {disposition:<12} {dkim_result:<8} {spf_result:<8} {header_from:<20}" + report.append(line) + + return "\n".join(report) + + +def _generate_failure_report(failure_analysis, timeline_data=None): + """Generate detailed failure analysis report""" + report = [] + + report.append("🚨 DMARC FAILURE ANALYSIS") + report.append("="*50) + report.append("") + + # Authentication failure breakdown + if failure_analysis.get('auth_failure_breakdown'): + report.append("Authentication Failure Breakdown:") + for failure_type, total_count, record_count in failure_analysis['auth_failure_breakdown']: + report.append(f" {failure_type}: {total_count:,} messages ({record_count} sources)") + report.append("") + + # Policy actions on failures + if failure_analysis.get('failure_dispositions'): + report.append("Policy Actions on Failed Messages:") + total_failures = sum(count for _, count in failure_analysis['failure_dispositions']) + for disposition, count in failure_analysis['failure_dispositions']: + percentage = (count / total_failures * 100) if total_failures > 0 else 0 + report.append(f" {disposition}: {count:,} ({percentage:.1f}%)") + report.append("") + + # Top failing source IPs + if failure_analysis.get('failures_by_ip'): + report.append("Top Failing Source IPs:") + for ip, count in failure_analysis['failures_by_ip']: + report.append(f" {ip}: {count:,} failed messages") + report.append("") + + # Failures by domain + if failure_analysis.get('failures_by_domain'): + report.append("Failures by Domain:") + for domain, count in failure_analysis['failures_by_domain']: + report.append(f" {domain}: {count:,} failed messages") + report.append("") + + # Failures by email provider (reporter) + if failure_analysis.get('failures_by_provider'): + report.append("🏢 Failures by Email Provider (Reporter):") + for provider, count in failure_analysis['failures_by_provider']: + report.append(f" {provider}: {count:,} failed messages") + report.append("") + + # Provider timeline breakdown + if failure_analysis.get('provider_timeline'): + report.append("📊 Provider Timeline Breakdown:") + current_date = None + for provider, report_date, failed_count, unique_ips in failure_analysis['provider_timeline']: + if report_date != current_date: + if current_date is not None: + report.append("") + report.append(f" {report_date}:") + current_date = report_date + report.append(f" {provider}: {failed_count:,} failures from {unique_ips} unique IPs") + report.append("") + + # Detailed failure records + if failure_analysis.get('detailed_failures'): + report.append("🔍 DETAILED FAILURE RECORDS (Top 50)") + report.append("="*80) + report.append("") + + # Table header + header = f"{'Domain':<18} {'Source IP':<15} {'Count':<6} {'Action':<10} {'DKIM':<6} {'SPF':<6} {'Reporter':<15} {'Date':<10}" + report.append(header) + report.append("-" * len(header)) + + # Records + for record in failure_analysis['detailed_failures']: + (domain, source_ip, count, disposition, dkim_result, + spf_result, header_from, date_begin, org_name, reporter_email) = record + + # Truncate long values + domain = (domain or "")[:17] + source_ip = (source_ip or "")[:14] + disposition = (disposition or "")[:9] + dkim_result = (dkim_result or "")[:5] + spf_result = (spf_result or "")[:5] + org_name = (org_name or "")[:14] + + # Format date + try: + if isinstance(date_begin, str): + date_str = date_begin[:10] # Take just YYYY-MM-DD part + else: + date_str = date_begin.strftime('%Y-%m-%d') + except: + date_str = "N/A" + + # Highlight failures with emoji + dkim_display = "❌" if dkim_result == 'fail' else "✅" + spf_display = "❌" if spf_result == 'fail' else "✅" + + line = f"{domain:<18} {source_ip:<15} {count:<6} {disposition:<10} {dkim_display:<6} {spf_display:<6} {org_name:<15} {date_str:<10}" + report.append(line) + + report.append("") + report.append("💡 RECOMMENDATIONS:") + report.append("- Investigate high-volume failing IPs for potential spoofing") + report.append("- Review DKIM signing for domains with DKIM failures") + report.append("- Check SPF records for domains with SPF failures") + report.append("- Consider moving from 'none' to 'quarantine' policy if ready") + + return "\n".join(report) + + +def format_table(headers, rows, max_width=None): + """Helper function to format data as a table""" + if not rows: + return "" + + # Calculate column widths + col_widths = [len(header) for header in headers] + + for row in rows: + for i, cell in enumerate(row): + if i < len(col_widths): + col_widths[i] = max(col_widths[i], len(str(cell))) + + # Apply max width if specified + if max_width: + for i in range(len(col_widths)): + col_widths[i] = min(col_widths[i], max_width) + + # Create format string + format_str = " | ".join(f"{{:<{width}}}" for width in col_widths) + + # Generate table + lines = [] + + # Header + lines.append(format_str.format(*headers)) + lines.append("-" * sum(col_widths) + "-" * (len(col_widths) - 1) * 3) + + # Rows + for row in rows: + formatted_row = [] + for i, cell in enumerate(row): + cell_str = str(cell) + if max_width and len(cell_str) > col_widths[i]: + cell_str = cell_str[:col_widths[i]-3] + "..." + formatted_row.append(cell_str) + lines.append(format_str.format(*formatted_row)) + + return "\n".join(lines)
\ No newline at end of file |