"""Report generation for DMARC analysis""" from datetime import datetime def generate_report(database, output_format='summary', show_failures_only=False, date_filter=None, show_timeline=False): """Generate terminal report from database""" stats = database.get_summary_stats(date_filter) # Get timeline data if requested timeline_data = None if show_timeline: timeline_data = database.get_timeline_stats(date_filter) if output_format == 'failures': failure_analysis = database.get_failure_analysis() return _generate_failure_report(failure_analysis, timeline_data) elif output_format == 'summary': return _generate_summary_report(stats, show_failures_only, date_filter, timeline_data) elif output_format == 'detailed': detailed_records = database.get_detailed_records() return _generate_detailed_report(stats, detailed_records, timeline_data) def _generate_summary_report(stats, show_failures_only=False, date_filter=None, timeline_data=None): """Generate summary report""" report = [] # Date range info if date_filter: if 'date_from' in date_filter: report.append(f"Date Range: From {date_filter['date_from'].strftime('%Y-%m-%d')}") if 'date_to' in date_filter: if 'date_from' in date_filter: report[-1] += f" to {date_filter['date_to'].strftime('%Y-%m-%d')}" else: report.append(f"Date Range: Up to {date_filter['date_to'].strftime('%Y-%m-%d')}") report.append("") # Overall statistics report.append(f"Total Reports: {stats['total_reports']}") report.append(f"Total Messages: {stats['total_messages']:,}") report.append("") # Timeline breakdown if requested if timeline_data: report.append("📅 DAILY BREAKDOWN:") for date_str, total_msgs, failed_msgs, reporters in timeline_data: failure_rate = (failed_msgs / total_msgs * 100) if total_msgs > 0 else 0 report.append(f" {date_str}: {total_msgs:,} messages ({failed_msgs:,} failed, {failure_rate:.1f}%) from {reporters} reporters") report.append("") # Messages by disposition if stats['by_disposition']: report.append("Messages by Disposition:") for disposition, count in stats['by_disposition']: percentage = (count / stats['total_messages'] * 100) if stats['total_messages'] > 0 else 0 report.append(f" {disposition}: {count:,} ({percentage:.1f}%)") report.append("") # Top domains if stats['by_domain']: report.append("Top Domains:") for domain, count in stats['by_domain']: percentage = (count / stats['total_messages'] * 100) if stats['total_messages'] > 0 else 0 report.append(f" {domain}: {count:,} ({percentage:.1f}%)") report.append("") # Top source IPs if stats['by_source_ip']: report.append("Top Source IPs:") for ip, count in stats['by_source_ip']: percentage = (count / stats['total_messages'] * 100) if stats['total_messages'] > 0 else 0 report.append(f" {ip}: {count:,} ({percentage:.1f}%)") report.append("") # DKIM Results if stats['dkim_results']: report.append("DKIM Results:") for result, count in stats['dkim_results']: report.append(f" {result}: {count}") report.append("") # SPF Results if stats['spf_results']: report.append("SPF Results:") for result, count in stats['spf_results']: report.append(f" {result}: {count}") return "\n".join(report) def _generate_detailed_report(stats, detailed_records, timeline_data=None): """Generate detailed report with individual records""" report = [] # Start with summary summary = _generate_summary_report(stats) report.append(summary) report.append("") report.append("="*80) report.append("DETAILED RECORDS (Top 100)") report.append("="*80) report.append("") # Table header header = f"{'Domain':<25} {'Source IP':<15} {'Count':<8} {'Disposition':<12} {'DKIM':<8} {'SPF':<8} {'From':<20}" report.append(header) report.append("-" * len(header)) # Records for record in detailed_records: (domain, org_name, source_ip, count, disposition, dkim_result, spf_result, header_from, date_begin, date_end) = record # Truncate long values domain = (domain or "")[:24] source_ip = (source_ip or "")[:14] disposition = (disposition or "")[:11] dkim_result = (dkim_result or "")[:7] spf_result = (spf_result or "")[:7] header_from = (header_from or "")[:19] line = f"{domain:<25} {source_ip:<15} {count:<8} {disposition:<12} {dkim_result:<8} {spf_result:<8} {header_from:<20}" report.append(line) return "\n".join(report) def _generate_failure_report(failure_analysis, timeline_data=None): """Generate detailed failure analysis report""" report = [] report.append("🚨 DMARC FAILURE ANALYSIS") report.append("="*50) report.append("") # Authentication failure breakdown if failure_analysis.get('auth_failure_breakdown'): report.append("Authentication Failure Breakdown:") for failure_type, total_count, record_count in failure_analysis['auth_failure_breakdown']: report.append(f" {failure_type}: {total_count:,} messages ({record_count} sources)") report.append("") # Policy actions on failures if failure_analysis.get('failure_dispositions'): report.append("Policy Actions on Failed Messages:") total_failures = sum(count for _, count in failure_analysis['failure_dispositions']) for disposition, count in failure_analysis['failure_dispositions']: percentage = (count / total_failures * 100) if total_failures > 0 else 0 report.append(f" {disposition}: {count:,} ({percentage:.1f}%)") report.append("") # Top failing source IPs if failure_analysis.get('failures_by_ip'): report.append("Top Failing Source IPs:") for ip, count in failure_analysis['failures_by_ip']: report.append(f" {ip}: {count:,} failed messages") report.append("") # Failures by domain if failure_analysis.get('failures_by_domain'): report.append("Failures by Domain:") for domain, count in failure_analysis['failures_by_domain']: report.append(f" {domain}: {count:,} failed messages") report.append("") # Failures by email provider (reporter) if failure_analysis.get('failures_by_provider'): report.append("🏢 Failures by Email Provider (Reporter):") for provider, count in failure_analysis['failures_by_provider']: report.append(f" {provider}: {count:,} failed messages") report.append("") # Provider timeline breakdown if failure_analysis.get('provider_timeline'): report.append("📊 Provider Timeline Breakdown:") current_date = None for provider, report_date, failed_count, unique_ips in failure_analysis['provider_timeline']: if report_date != current_date: if current_date is not None: report.append("") report.append(f" {report_date}:") current_date = report_date report.append(f" {provider}: {failed_count:,} failures from {unique_ips} unique IPs") report.append("") # Detailed failure records if failure_analysis.get('detailed_failures'): report.append("🔍 DETAILED FAILURE RECORDS (Top 50)") report.append("="*80) report.append("") # Table header header = f"{'Domain':<18} {'Source IP':<15} {'Count':<6} {'Action':<10} {'DKIM':<6} {'SPF':<6} {'Reporter':<15} {'Date':<10}" report.append(header) report.append("-" * len(header)) # Records for record in failure_analysis['detailed_failures']: (domain, source_ip, count, disposition, dkim_result, spf_result, header_from, date_begin, org_name, reporter_email) = record # Truncate long values domain = (domain or "")[:17] source_ip = (source_ip or "")[:14] disposition = (disposition or "")[:9] dkim_result = (dkim_result or "")[:5] spf_result = (spf_result or "")[:5] org_name = (org_name or "")[:14] # Format date try: if isinstance(date_begin, str): date_str = date_begin[:10] # Take just YYYY-MM-DD part else: date_str = date_begin.strftime('%Y-%m-%d') except: date_str = "N/A" # Highlight failures with emoji dkim_display = "❌" if dkim_result == 'fail' else "✅" spf_display = "❌" if spf_result == 'fail' else "✅" line = f"{domain:<18} {source_ip:<15} {count:<6} {disposition:<10} {dkim_display:<6} {spf_display:<6} {org_name:<15} {date_str:<10}" report.append(line) report.append("") report.append("💡 RECOMMENDATIONS:") report.append("- Investigate high-volume failing IPs for potential spoofing") report.append("- Review DKIM signing for domains with DKIM failures") report.append("- Check SPF records for domains with SPF failures") report.append("- Consider moving from 'none' to 'quarantine' policy if ready") return "\n".join(report) def format_table(headers, rows, max_width=None): """Helper function to format data as a table""" if not rows: return "" # Calculate column widths col_widths = [len(header) for header in headers] for row in rows: for i, cell in enumerate(row): if i < len(col_widths): col_widths[i] = max(col_widths[i], len(str(cell))) # Apply max width if specified if max_width: for i in range(len(col_widths)): col_widths[i] = min(col_widths[i], max_width) # Create format string format_str = " | ".join(f"{{:<{width}}}" for width in col_widths) # Generate table lines = [] # Header lines.append(format_str.format(*headers)) lines.append("-" * sum(col_widths) + "-" * (len(col_widths) - 1) * 3) # Rows for row in rows: formatted_row = [] for i, cell in enumerate(row): cell_str = str(cell) if max_width and len(cell_str) > col_widths[i]: cell_str = cell_str[:col_widths[i]-3] + "..." formatted_row.append(cell_str) lines.append(format_str.format(*formatted_row)) return "\n".join(lines)