diff options
Diffstat (limited to 'dmarc_analyzer/parser.py')
| -rw-r--r-- | dmarc_analyzer/parser.py | 185 |
1 files changed, 185 insertions, 0 deletions
diff --git a/dmarc_analyzer/parser.py b/dmarc_analyzer/parser.py new file mode 100644 index 0000000..36a27a3 --- /dev/null +++ b/dmarc_analyzer/parser.py @@ -0,0 +1,185 @@ +"""DMARC XML report parser""" + +import xml.etree.ElementTree as ET +from datetime import datetime + + +def parse_dmarc_report(xml_content): + """Parse DMARC XML report and extract relevant data + + Args: + xml_content: String containing XML report data + + Returns: + dict: Parsed report data or None if parsing fails + """ + try: + root = ET.fromstring(xml_content) + + # Extract report metadata + report_metadata = _parse_report_metadata(root) + + # Extract policy published + policy_published = _parse_policy_published(root) + + # Extract records + records = _parse_records(root) + + return { + 'metadata': report_metadata, + 'policy_published': policy_published, + 'records': records + } + + except Exception as e: + print(f"Error parsing DMARC report: {e}") + return None + + +def _parse_report_metadata(root): + """Parse report metadata section""" + metadata = {} + + report_metadata = root.find('report_metadata') + if report_metadata is not None: + # Organization name + org_name = report_metadata.find('org_name') + if org_name is not None: + metadata['org_name'] = org_name.text + + # Email + email = report_metadata.find('email') + if email is not None: + metadata['email'] = email.text + + # Report ID + report_id = report_metadata.find('report_id') + if report_id is not None: + metadata['report_id'] = report_id.text + + # Date range + date_range = report_metadata.find('date_range') + if date_range is not None: + begin = date_range.find('begin') + end = date_range.find('end') + if begin is not None: + metadata['date_begin'] = datetime.fromtimestamp(int(begin.text)) + if end is not None: + metadata['date_end'] = datetime.fromtimestamp(int(end.text)) + + return metadata + + +def _parse_policy_published(root): + """Parse policy published section""" + policy = {} + + policy_published = root.find('policy_published') + if policy_published is not None: + # Domain + domain = policy_published.find('domain') + if domain is not None: + policy['domain'] = domain.text + + # ADKIM (DKIM alignment) + adkim = policy_published.find('adkim') + if adkim is not None: + policy['adkim'] = adkim.text + + # ASPF (SPF alignment) + aspf = policy_published.find('aspf') + if aspf is not None: + policy['aspf'] = aspf.text + + # Policy + p = policy_published.find('p') + if p is not None: + policy['p'] = p.text + + # Subdomain policy + sp = policy_published.find('sp') + if sp is not None: + policy['sp'] = sp.text + + # Percentage + pct = policy_published.find('pct') + if pct is not None: + policy['pct'] = int(pct.text) + + return policy + + +def _parse_records(root): + """Parse record section""" + records = [] + + for record in root.findall('record'): + record_data = {} + + # Row data + row = record.find('row') + if row is not None: + source_ip = row.find('source_ip') + if source_ip is not None: + record_data['source_ip'] = source_ip.text + + count = row.find('count') + if count is not None: + record_data['count'] = int(count.text) + + # Policy evaluation + policy_evaluated = row.find('policy_evaluated') + if policy_evaluated is not None: + disposition = policy_evaluated.find('disposition') + if disposition is not None: + record_data['disposition'] = disposition.text + + dkim = policy_evaluated.find('dkim') + if dkim is not None: + record_data['dkim_result'] = dkim.text + + spf = policy_evaluated.find('spf') + if spf is not None: + record_data['spf_result'] = spf.text + + # Identifiers + identifiers = record.find('identifiers') + if identifiers is not None: + header_from = identifiers.find('header_from') + if header_from is not None: + record_data['header_from'] = header_from.text + + # Auth results + auth_results = record.find('auth_results') + if auth_results is not None: + # DKIM auth results + dkim_results = [] + for dkim in auth_results.findall('dkim'): + dkim_data = {} + domain = dkim.find('domain') + if domain is not None: + dkim_data['domain'] = domain.text + result = dkim.find('result') + if result is not None: + dkim_data['result'] = result.text + if dkim_data: + dkim_results.append(dkim_data) + record_data['dkim_auth'] = dkim_results + + # SPF auth results + spf_results = [] + for spf in auth_results.findall('spf'): + spf_data = {} + domain = spf.find('domain') + if domain is not None: + spf_data['domain'] = domain.text + result = spf.find('result') + if result is not None: + spf_data['result'] = result.text + if spf_data: + spf_results.append(spf_data) + record_data['spf_auth'] = spf_results + + records.append(record_data) + + return records
\ No newline at end of file |