summaryrefslogtreecommitdiff
path: root/dmarc_analyzer/parser.py
blob: 36a27a36fddfb6776fe193e3c9f564190399a1cb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
"""DMARC XML report parser"""

import xml.etree.ElementTree as ET
from datetime import datetime


def parse_dmarc_report(xml_content):
    """Parse DMARC XML report and extract relevant data

    Args:
        xml_content: String containing XML report data

    Returns:
        dict: Parsed report data or None if parsing fails
    """
    try:
        root = ET.fromstring(xml_content)

        # Extract report metadata
        report_metadata = _parse_report_metadata(root)

        # Extract policy published
        policy_published = _parse_policy_published(root)

        # Extract records
        records = _parse_records(root)

        return {
            'metadata': report_metadata,
            'policy_published': policy_published,
            'records': records
        }

    except Exception as e:
        print(f"Error parsing DMARC report: {e}")
        return None


def _parse_report_metadata(root):
    """Parse report metadata section"""
    metadata = {}

    report_metadata = root.find('report_metadata')
    if report_metadata is not None:
        # Organization name
        org_name = report_metadata.find('org_name')
        if org_name is not None:
            metadata['org_name'] = org_name.text

        # Email
        email = report_metadata.find('email')
        if email is not None:
            metadata['email'] = email.text

        # Report ID
        report_id = report_metadata.find('report_id')
        if report_id is not None:
            metadata['report_id'] = report_id.text

        # Date range
        date_range = report_metadata.find('date_range')
        if date_range is not None:
            begin = date_range.find('begin')
            end = date_range.find('end')
            if begin is not None:
                metadata['date_begin'] = datetime.fromtimestamp(int(begin.text))
            if end is not None:
                metadata['date_end'] = datetime.fromtimestamp(int(end.text))

    return metadata


def _parse_policy_published(root):
    """Parse policy published section"""
    policy = {}

    policy_published = root.find('policy_published')
    if policy_published is not None:
        # Domain
        domain = policy_published.find('domain')
        if domain is not None:
            policy['domain'] = domain.text

        # ADKIM (DKIM alignment)
        adkim = policy_published.find('adkim')
        if adkim is not None:
            policy['adkim'] = adkim.text

        # ASPF (SPF alignment)
        aspf = policy_published.find('aspf')
        if aspf is not None:
            policy['aspf'] = aspf.text

        # Policy
        p = policy_published.find('p')
        if p is not None:
            policy['p'] = p.text

        # Subdomain policy
        sp = policy_published.find('sp')
        if sp is not None:
            policy['sp'] = sp.text

        # Percentage
        pct = policy_published.find('pct')
        if pct is not None:
            policy['pct'] = int(pct.text)

    return policy


def _parse_records(root):
    """Parse record section"""
    records = []

    for record in root.findall('record'):
        record_data = {}

        # Row data
        row = record.find('row')
        if row is not None:
            source_ip = row.find('source_ip')
            if source_ip is not None:
                record_data['source_ip'] = source_ip.text

            count = row.find('count')
            if count is not None:
                record_data['count'] = int(count.text)

            # Policy evaluation
            policy_evaluated = row.find('policy_evaluated')
            if policy_evaluated is not None:
                disposition = policy_evaluated.find('disposition')
                if disposition is not None:
                    record_data['disposition'] = disposition.text

                dkim = policy_evaluated.find('dkim')
                if dkim is not None:
                    record_data['dkim_result'] = dkim.text

                spf = policy_evaluated.find('spf')
                if spf is not None:
                    record_data['spf_result'] = spf.text

        # Identifiers
        identifiers = record.find('identifiers')
        if identifiers is not None:
            header_from = identifiers.find('header_from')
            if header_from is not None:
                record_data['header_from'] = header_from.text

        # Auth results
        auth_results = record.find('auth_results')
        if auth_results is not None:
            # DKIM auth results
            dkim_results = []
            for dkim in auth_results.findall('dkim'):
                dkim_data = {}
                domain = dkim.find('domain')
                if domain is not None:
                    dkim_data['domain'] = domain.text
                result = dkim.find('result')
                if result is not None:
                    dkim_data['result'] = result.text
                if dkim_data:
                    dkim_results.append(dkim_data)
            record_data['dkim_auth'] = dkim_results

            # SPF auth results
            spf_results = []
            for spf in auth_results.findall('spf'):
                spf_data = {}
                domain = spf.find('domain')
                if domain is not None:
                    spf_data['domain'] = domain.text
                result = spf.find('result')
                if result is not None:
                    spf_data['result'] = result.text
                if spf_data:
                    spf_results.append(spf_data)
            record_data['spf_auth'] = spf_results

        records.append(record_data)

    return records