summaryrefslogtreecommitdiff
path: root/dmarc_analyzer/main.py
blob: 18431d04da0b28fe87533be8bd298fd76a96677a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!/usr/bin/env python3
"""Main CLI entry point for DMARC analyzer"""

import click
import os
from pathlib import Path

from .extractor import extract_files
from .parser import parse_dmarc_report
from .database import Database
from .reporter import generate_report


@click.command()
@click.argument('paths', nargs=-1, type=click.Path(exists=True))
@click.option('--db', default='dmarc_reports.db', help='Database file path')
@click.option('--output-format', type=click.Choice(['summary', 'detailed', 'failures']), default='summary', help='Report output format')
@click.option('--show-failures-only', is_flag=True, help='Show only records with authentication failures')
@click.option('--report-only', is_flag=True, help='Generate report from existing data without processing files')
@click.option('--date-from', type=click.DateTime(['%Y-%m-%d']), help='Filter reports from this date (YYYY-MM-DD)')
@click.option('--date-to', type=click.DateTime(['%Y-%m-%d']), help='Filter reports to this date (YYYY-MM-DD)')
@click.option('--show-timeline', is_flag=True, help='Show daily breakdown in reports')
def cli(paths, db, output_format, show_failures_only, report_only, date_from, date_to, show_timeline):
    """Analyze DMARC reports from gzip/zip files or directories"""
    database = Database(db)
    database.init_db()

    # Report-only mode: skip file processing
    if report_only:
        if paths:
            click.echo("Warning: Paths provided but --report-only specified. Ignoring paths.", err=True)

        click.echo("Generating report from existing database...")
        click.echo("\n" + "="*50)
        click.echo("DMARC REPORT SUMMARY")
        click.echo("="*50)

        # Create date filter
        date_filter = {}
        if date_from:
            date_filter['date_from'] = date_from
        if date_to:
            date_filter['date_to'] = date_to

        report = generate_report(database, output_format, show_failures_only, date_filter, show_timeline)
        click.echo(report)
        return

    # Validate paths are provided for processing mode
    if not paths:
        click.echo("Error: Paths are required unless using --report-only", err=True)
        return

    processed_count = 0

    for path in paths:
        path = Path(path)

        if path.is_file():
            files = [path]
        else:
            # Find all gzip and zip files in directory
            files = list(path.glob('*.gz')) + list(path.glob('*.zip'))

        for file_path in files:
            try:
                click.echo(f"Processing: {file_path}")

                # Extract and parse
                xml_content = extract_files(file_path)
                if xml_content:
                    report_data = parse_dmarc_report(xml_content)
                    if report_data:
                        was_stored = database.store_report(report_data)
                        if was_stored:
                            processed_count += 1
                            click.echo(f"  ✓ Processed successfully")
                        else:
                            click.echo(f"  ⊝ Skipped (duplicate)")
                    else:
                        click.echo(f"  ⚠ Failed to parse DMARC report", err=True)
                else:
                    click.echo(f"  ⚠ Failed to extract file", err=True)

            except Exception as e:
                click.echo(f"  ✗ Error processing {file_path}: {e}", err=True)

    click.echo(f"\nProcessed {processed_count} reports")

    # Generate summary report
    if processed_count > 0:
        click.echo("\n" + "="*50)
        click.echo("DMARC REPORT SUMMARY")
        click.echo("="*50)

        # Create date filter for processing mode too
        date_filter = {}
        if date_from:
            date_filter['date_from'] = date_from
        if date_to:
            date_filter['date_to'] = date_to

        report = generate_report(database, output_format, show_failures_only, date_filter, show_timeline)
        click.echo(report)


if __name__ == '__main__':
    cli()