summaryrefslogtreecommitdiff
path: root/dmarc_analyzer/reporter.py
blob: 484f97da698d767cf00b3e6ea537102d9c2e157e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
"""Report generation for DMARC analysis"""

from datetime import datetime


def generate_report(database, output_format='summary', show_failures_only=False, date_filter=None, show_timeline=False):
    """Generate terminal report from database"""
    stats = database.get_summary_stats(date_filter)

    # Get timeline data if requested
    timeline_data = None
    if show_timeline:
        timeline_data = database.get_timeline_stats(date_filter)

    if output_format == 'failures':
        failure_analysis = database.get_failure_analysis()
        return _generate_failure_report(failure_analysis, timeline_data)
    elif output_format == 'summary':
        return _generate_summary_report(stats, show_failures_only, date_filter, timeline_data)
    elif output_format == 'detailed':
        detailed_records = database.get_detailed_records()
        return _generate_detailed_report(stats, detailed_records, timeline_data)


def _generate_summary_report(stats, show_failures_only=False, date_filter=None, timeline_data=None):
    """Generate summary report"""
    report = []

    # Date range info
    if date_filter:
        if 'date_from' in date_filter:
            report.append(f"Date Range: From {date_filter['date_from'].strftime('%Y-%m-%d')}")
        if 'date_to' in date_filter:
            if 'date_from' in date_filter:
                report[-1] += f" to {date_filter['date_to'].strftime('%Y-%m-%d')}"
            else:
                report.append(f"Date Range: Up to {date_filter['date_to'].strftime('%Y-%m-%d')}")
        report.append("")

    # Overall statistics
    report.append(f"Total Reports: {stats['total_reports']}")
    report.append(f"Total Messages: {stats['total_messages']:,}")
    report.append("")

    # Timeline breakdown if requested
    if timeline_data:
        report.append("📅 DAILY BREAKDOWN:")
        for date_str, total_msgs, failed_msgs, reporters in timeline_data:
            failure_rate = (failed_msgs / total_msgs * 100) if total_msgs > 0 else 0
            report.append(f"  {date_str}: {total_msgs:,} messages ({failed_msgs:,} failed, {failure_rate:.1f}%) from {reporters} reporters")
        report.append("")

    # Messages by disposition
    if stats['by_disposition']:
        report.append("Messages by Disposition:")
        for disposition, count in stats['by_disposition']:
            percentage = (count / stats['total_messages'] * 100) if stats['total_messages'] > 0 else 0
            report.append(f"  {disposition}: {count:,} ({percentage:.1f}%)")
        report.append("")

    # Top domains
    if stats['by_domain']:
        report.append("Top Domains:")
        for domain, count in stats['by_domain']:
            percentage = (count / stats['total_messages'] * 100) if stats['total_messages'] > 0 else 0
            report.append(f"  {domain}: {count:,} ({percentage:.1f}%)")
        report.append("")

    # Top source IPs
    if stats['by_source_ip']:
        report.append("Top Source IPs:")
        for ip, count in stats['by_source_ip']:
            percentage = (count / stats['total_messages'] * 100) if stats['total_messages'] > 0 else 0
            report.append(f"  {ip}: {count:,} ({percentage:.1f}%)")
        report.append("")

    # DKIM Results
    if stats['dkim_results']:
        report.append("DKIM Results:")
        for result, count in stats['dkim_results']:
            report.append(f"  {result}: {count}")
        report.append("")

    # SPF Results
    if stats['spf_results']:
        report.append("SPF Results:")
        for result, count in stats['spf_results']:
            report.append(f"  {result}: {count}")

    return "\n".join(report)


def _generate_detailed_report(stats, detailed_records, timeline_data=None):
    """Generate detailed report with individual records"""
    report = []

    # Start with summary
    summary = _generate_summary_report(stats)
    report.append(summary)
    report.append("")
    report.append("="*80)
    report.append("DETAILED RECORDS (Top 100)")
    report.append("="*80)
    report.append("")

    # Table header
    header = f"{'Domain':<25} {'Source IP':<15} {'Count':<8} {'Disposition':<12} {'DKIM':<8} {'SPF':<8} {'From':<20}"
    report.append(header)
    report.append("-" * len(header))

    # Records
    for record in detailed_records:
        (domain, org_name, source_ip, count, disposition,
         dkim_result, spf_result, header_from, date_begin, date_end) = record

        # Truncate long values
        domain = (domain or "")[:24]
        source_ip = (source_ip or "")[:14]
        disposition = (disposition or "")[:11]
        dkim_result = (dkim_result or "")[:7]
        spf_result = (spf_result or "")[:7]
        header_from = (header_from or "")[:19]

        line = f"{domain:<25} {source_ip:<15} {count:<8} {disposition:<12} {dkim_result:<8} {spf_result:<8} {header_from:<20}"
        report.append(line)

    return "\n".join(report)


def _generate_failure_report(failure_analysis, timeline_data=None):
    """Generate detailed failure analysis report"""
    report = []

    report.append("🚨 DMARC FAILURE ANALYSIS")
    report.append("="*50)
    report.append("")

    # Authentication failure breakdown
    if failure_analysis.get('auth_failure_breakdown'):
        report.append("Authentication Failure Breakdown:")
        for failure_type, total_count, record_count in failure_analysis['auth_failure_breakdown']:
            report.append(f"  {failure_type}: {total_count:,} messages ({record_count} sources)")
        report.append("")

    # Policy actions on failures
    if failure_analysis.get('failure_dispositions'):
        report.append("Policy Actions on Failed Messages:")
        total_failures = sum(count for _, count in failure_analysis['failure_dispositions'])
        for disposition, count in failure_analysis['failure_dispositions']:
            percentage = (count / total_failures * 100) if total_failures > 0 else 0
            report.append(f"  {disposition}: {count:,} ({percentage:.1f}%)")
        report.append("")

    # Top failing source IPs
    if failure_analysis.get('failures_by_ip'):
        report.append("Top Failing Source IPs:")
        for ip, count in failure_analysis['failures_by_ip']:
            report.append(f"  {ip}: {count:,} failed messages")
        report.append("")

    # Failures by domain
    if failure_analysis.get('failures_by_domain'):
        report.append("Failures by Domain:")
        for domain, count in failure_analysis['failures_by_domain']:
            report.append(f"  {domain}: {count:,} failed messages")
        report.append("")

    # Failures by email provider (reporter)
    if failure_analysis.get('failures_by_provider'):
        report.append("🏢 Failures by Email Provider (Reporter):")
        for provider, count in failure_analysis['failures_by_provider']:
            report.append(f"  {provider}: {count:,} failed messages")
        report.append("")

    # Provider timeline breakdown
    if failure_analysis.get('provider_timeline'):
        report.append("📊 Provider Timeline Breakdown:")
        current_date = None
        for provider, report_date, failed_count, unique_ips in failure_analysis['provider_timeline']:
            if report_date != current_date:
                if current_date is not None:
                    report.append("")
                report.append(f"  {report_date}:")
                current_date = report_date
            report.append(f"    {provider}: {failed_count:,} failures from {unique_ips} unique IPs")
        report.append("")

    # Detailed failure records
    if failure_analysis.get('detailed_failures'):
        report.append("🔍 DETAILED FAILURE RECORDS (Top 50)")
        report.append("="*80)
        report.append("")

        # Table header
        header = f"{'Domain':<18} {'Source IP':<15} {'Count':<6} {'Action':<10} {'DKIM':<6} {'SPF':<6} {'Reporter':<15} {'Date':<10}"
        report.append(header)
        report.append("-" * len(header))

        # Records
        for record in failure_analysis['detailed_failures']:
            (domain, source_ip, count, disposition, dkim_result,
             spf_result, header_from, date_begin, org_name, reporter_email) = record

            # Truncate long values
            domain = (domain or "")[:17]
            source_ip = (source_ip or "")[:14]
            disposition = (disposition or "")[:9]
            dkim_result = (dkim_result or "")[:5]
            spf_result = (spf_result or "")[:5]
            org_name = (org_name or "")[:14]

            # Format date
            try:
                if isinstance(date_begin, str):
                    date_str = date_begin[:10]  # Take just YYYY-MM-DD part
                else:
                    date_str = date_begin.strftime('%Y-%m-%d')
            except:
                date_str = "N/A"

            # Highlight failures with emoji
            dkim_display = "❌" if dkim_result == 'fail' else "✅"
            spf_display = "❌" if spf_result == 'fail' else "✅"

            line = f"{domain:<18} {source_ip:<15} {count:<6} {disposition:<10} {dkim_display:<6} {spf_display:<6} {org_name:<15} {date_str:<10}"
            report.append(line)

        report.append("")
        report.append("💡 RECOMMENDATIONS:")
        report.append("- Investigate high-volume failing IPs for potential spoofing")
        report.append("- Review DKIM signing for domains with DKIM failures")
        report.append("- Check SPF records for domains with SPF failures")
        report.append("- Consider moving from 'none' to 'quarantine' policy if ready")

    return "\n".join(report)


def format_table(headers, rows, max_width=None):
    """Helper function to format data as a table"""
    if not rows:
        return ""

    # Calculate column widths
    col_widths = [len(header) for header in headers]

    for row in rows:
        for i, cell in enumerate(row):
            if i < len(col_widths):
                col_widths[i] = max(col_widths[i], len(str(cell)))

    # Apply max width if specified
    if max_width:
        for i in range(len(col_widths)):
            col_widths[i] = min(col_widths[i], max_width)

    # Create format string
    format_str = " | ".join(f"{{:<{width}}}" for width in col_widths)

    # Generate table
    lines = []

    # Header
    lines.append(format_str.format(*headers))
    lines.append("-" * sum(col_widths) + "-" * (len(col_widths) - 1) * 3)

    # Rows
    for row in rows:
        formatted_row = []
        for i, cell in enumerate(row):
            cell_str = str(cell)
            if max_width and len(cell_str) > col_widths[i]:
                cell_str = cell_str[:col_widths[i]-3] + "..."
            formatted_row.append(cell_str)
        lines.append(format_str.format(*formatted_row))

    return "\n".join(lines)