summaryrefslogtreecommitdiff
path: root/dmarc_analyzer/reporter.py
diff options
context:
space:
mode:
authorMax Resnick <max@ofmax.li>2025-12-02 21:21:50 -0800
committerMax Resnick <max@ofmax.li>2025-12-02 21:21:50 -0800
commit7d3432e055dc63935ce6df2b56d655aadf88938c (patch)
tree6d8e164e275116a605fcb6f0b2f5d9c0f88cb91d /dmarc_analyzer/reporter.py
downloaddmarc-tool-master.tar.gz
feat: init commit of toolHEADCHECKPOINTmaster
Diffstat (limited to 'dmarc_analyzer/reporter.py')
-rw-r--r--dmarc_analyzer/reporter.py276
1 files changed, 276 insertions, 0 deletions
diff --git a/dmarc_analyzer/reporter.py b/dmarc_analyzer/reporter.py
new file mode 100644
index 0000000..484f97d
--- /dev/null
+++ b/dmarc_analyzer/reporter.py
@@ -0,0 +1,276 @@
+"""Report generation for DMARC analysis"""
+
+from datetime import datetime
+
+
+def generate_report(database, output_format='summary', show_failures_only=False, date_filter=None, show_timeline=False):
+ """Generate terminal report from database"""
+ stats = database.get_summary_stats(date_filter)
+
+ # Get timeline data if requested
+ timeline_data = None
+ if show_timeline:
+ timeline_data = database.get_timeline_stats(date_filter)
+
+ if output_format == 'failures':
+ failure_analysis = database.get_failure_analysis()
+ return _generate_failure_report(failure_analysis, timeline_data)
+ elif output_format == 'summary':
+ return _generate_summary_report(stats, show_failures_only, date_filter, timeline_data)
+ elif output_format == 'detailed':
+ detailed_records = database.get_detailed_records()
+ return _generate_detailed_report(stats, detailed_records, timeline_data)
+
+
+def _generate_summary_report(stats, show_failures_only=False, date_filter=None, timeline_data=None):
+ """Generate summary report"""
+ report = []
+
+ # Date range info
+ if date_filter:
+ if 'date_from' in date_filter:
+ report.append(f"Date Range: From {date_filter['date_from'].strftime('%Y-%m-%d')}")
+ if 'date_to' in date_filter:
+ if 'date_from' in date_filter:
+ report[-1] += f" to {date_filter['date_to'].strftime('%Y-%m-%d')}"
+ else:
+ report.append(f"Date Range: Up to {date_filter['date_to'].strftime('%Y-%m-%d')}")
+ report.append("")
+
+ # Overall statistics
+ report.append(f"Total Reports: {stats['total_reports']}")
+ report.append(f"Total Messages: {stats['total_messages']:,}")
+ report.append("")
+
+ # Timeline breakdown if requested
+ if timeline_data:
+ report.append("📅 DAILY BREAKDOWN:")
+ for date_str, total_msgs, failed_msgs, reporters in timeline_data:
+ failure_rate = (failed_msgs / total_msgs * 100) if total_msgs > 0 else 0
+ report.append(f" {date_str}: {total_msgs:,} messages ({failed_msgs:,} failed, {failure_rate:.1f}%) from {reporters} reporters")
+ report.append("")
+
+ # Messages by disposition
+ if stats['by_disposition']:
+ report.append("Messages by Disposition:")
+ for disposition, count in stats['by_disposition']:
+ percentage = (count / stats['total_messages'] * 100) if stats['total_messages'] > 0 else 0
+ report.append(f" {disposition}: {count:,} ({percentage:.1f}%)")
+ report.append("")
+
+ # Top domains
+ if stats['by_domain']:
+ report.append("Top Domains:")
+ for domain, count in stats['by_domain']:
+ percentage = (count / stats['total_messages'] * 100) if stats['total_messages'] > 0 else 0
+ report.append(f" {domain}: {count:,} ({percentage:.1f}%)")
+ report.append("")
+
+ # Top source IPs
+ if stats['by_source_ip']:
+ report.append("Top Source IPs:")
+ for ip, count in stats['by_source_ip']:
+ percentage = (count / stats['total_messages'] * 100) if stats['total_messages'] > 0 else 0
+ report.append(f" {ip}: {count:,} ({percentage:.1f}%)")
+ report.append("")
+
+ # DKIM Results
+ if stats['dkim_results']:
+ report.append("DKIM Results:")
+ for result, count in stats['dkim_results']:
+ report.append(f" {result}: {count}")
+ report.append("")
+
+ # SPF Results
+ if stats['spf_results']:
+ report.append("SPF Results:")
+ for result, count in stats['spf_results']:
+ report.append(f" {result}: {count}")
+
+ return "\n".join(report)
+
+
+def _generate_detailed_report(stats, detailed_records, timeline_data=None):
+ """Generate detailed report with individual records"""
+ report = []
+
+ # Start with summary
+ summary = _generate_summary_report(stats)
+ report.append(summary)
+ report.append("")
+ report.append("="*80)
+ report.append("DETAILED RECORDS (Top 100)")
+ report.append("="*80)
+ report.append("")
+
+ # Table header
+ header = f"{'Domain':<25} {'Source IP':<15} {'Count':<8} {'Disposition':<12} {'DKIM':<8} {'SPF':<8} {'From':<20}"
+ report.append(header)
+ report.append("-" * len(header))
+
+ # Records
+ for record in detailed_records:
+ (domain, org_name, source_ip, count, disposition,
+ dkim_result, spf_result, header_from, date_begin, date_end) = record
+
+ # Truncate long values
+ domain = (domain or "")[:24]
+ source_ip = (source_ip or "")[:14]
+ disposition = (disposition or "")[:11]
+ dkim_result = (dkim_result or "")[:7]
+ spf_result = (spf_result or "")[:7]
+ header_from = (header_from or "")[:19]
+
+ line = f"{domain:<25} {source_ip:<15} {count:<8} {disposition:<12} {dkim_result:<8} {spf_result:<8} {header_from:<20}"
+ report.append(line)
+
+ return "\n".join(report)
+
+
+def _generate_failure_report(failure_analysis, timeline_data=None):
+ """Generate detailed failure analysis report"""
+ report = []
+
+ report.append("🚨 DMARC FAILURE ANALYSIS")
+ report.append("="*50)
+ report.append("")
+
+ # Authentication failure breakdown
+ if failure_analysis.get('auth_failure_breakdown'):
+ report.append("Authentication Failure Breakdown:")
+ for failure_type, total_count, record_count in failure_analysis['auth_failure_breakdown']:
+ report.append(f" {failure_type}: {total_count:,} messages ({record_count} sources)")
+ report.append("")
+
+ # Policy actions on failures
+ if failure_analysis.get('failure_dispositions'):
+ report.append("Policy Actions on Failed Messages:")
+ total_failures = sum(count for _, count in failure_analysis['failure_dispositions'])
+ for disposition, count in failure_analysis['failure_dispositions']:
+ percentage = (count / total_failures * 100) if total_failures > 0 else 0
+ report.append(f" {disposition}: {count:,} ({percentage:.1f}%)")
+ report.append("")
+
+ # Top failing source IPs
+ if failure_analysis.get('failures_by_ip'):
+ report.append("Top Failing Source IPs:")
+ for ip, count in failure_analysis['failures_by_ip']:
+ report.append(f" {ip}: {count:,} failed messages")
+ report.append("")
+
+ # Failures by domain
+ if failure_analysis.get('failures_by_domain'):
+ report.append("Failures by Domain:")
+ for domain, count in failure_analysis['failures_by_domain']:
+ report.append(f" {domain}: {count:,} failed messages")
+ report.append("")
+
+ # Failures by email provider (reporter)
+ if failure_analysis.get('failures_by_provider'):
+ report.append("🏢 Failures by Email Provider (Reporter):")
+ for provider, count in failure_analysis['failures_by_provider']:
+ report.append(f" {provider}: {count:,} failed messages")
+ report.append("")
+
+ # Provider timeline breakdown
+ if failure_analysis.get('provider_timeline'):
+ report.append("📊 Provider Timeline Breakdown:")
+ current_date = None
+ for provider, report_date, failed_count, unique_ips in failure_analysis['provider_timeline']:
+ if report_date != current_date:
+ if current_date is not None:
+ report.append("")
+ report.append(f" {report_date}:")
+ current_date = report_date
+ report.append(f" {provider}: {failed_count:,} failures from {unique_ips} unique IPs")
+ report.append("")
+
+ # Detailed failure records
+ if failure_analysis.get('detailed_failures'):
+ report.append("🔍 DETAILED FAILURE RECORDS (Top 50)")
+ report.append("="*80)
+ report.append("")
+
+ # Table header
+ header = f"{'Domain':<18} {'Source IP':<15} {'Count':<6} {'Action':<10} {'DKIM':<6} {'SPF':<6} {'Reporter':<15} {'Date':<10}"
+ report.append(header)
+ report.append("-" * len(header))
+
+ # Records
+ for record in failure_analysis['detailed_failures']:
+ (domain, source_ip, count, disposition, dkim_result,
+ spf_result, header_from, date_begin, org_name, reporter_email) = record
+
+ # Truncate long values
+ domain = (domain or "")[:17]
+ source_ip = (source_ip or "")[:14]
+ disposition = (disposition or "")[:9]
+ dkim_result = (dkim_result or "")[:5]
+ spf_result = (spf_result or "")[:5]
+ org_name = (org_name or "")[:14]
+
+ # Format date
+ try:
+ if isinstance(date_begin, str):
+ date_str = date_begin[:10] # Take just YYYY-MM-DD part
+ else:
+ date_str = date_begin.strftime('%Y-%m-%d')
+ except:
+ date_str = "N/A"
+
+ # Highlight failures with emoji
+ dkim_display = "❌" if dkim_result == 'fail' else "✅"
+ spf_display = "❌" if spf_result == 'fail' else "✅"
+
+ line = f"{domain:<18} {source_ip:<15} {count:<6} {disposition:<10} {dkim_display:<6} {spf_display:<6} {org_name:<15} {date_str:<10}"
+ report.append(line)
+
+ report.append("")
+ report.append("💡 RECOMMENDATIONS:")
+ report.append("- Investigate high-volume failing IPs for potential spoofing")
+ report.append("- Review DKIM signing for domains with DKIM failures")
+ report.append("- Check SPF records for domains with SPF failures")
+ report.append("- Consider moving from 'none' to 'quarantine' policy if ready")
+
+ return "\n".join(report)
+
+
+def format_table(headers, rows, max_width=None):
+ """Helper function to format data as a table"""
+ if not rows:
+ return ""
+
+ # Calculate column widths
+ col_widths = [len(header) for header in headers]
+
+ for row in rows:
+ for i, cell in enumerate(row):
+ if i < len(col_widths):
+ col_widths[i] = max(col_widths[i], len(str(cell)))
+
+ # Apply max width if specified
+ if max_width:
+ for i in range(len(col_widths)):
+ col_widths[i] = min(col_widths[i], max_width)
+
+ # Create format string
+ format_str = " | ".join(f"{{:<{width}}}" for width in col_widths)
+
+ # Generate table
+ lines = []
+
+ # Header
+ lines.append(format_str.format(*headers))
+ lines.append("-" * sum(col_widths) + "-" * (len(col_widths) - 1) * 3)
+
+ # Rows
+ for row in rows:
+ formatted_row = []
+ for i, cell in enumerate(row):
+ cell_str = str(cell)
+ if max_width and len(cell_str) > col_widths[i]:
+ cell_str = cell_str[:col_widths[i]-3] + "..."
+ formatted_row.append(cell_str)
+ lines.append(format_str.format(*formatted_row))
+
+ return "\n".join(lines) \ No newline at end of file