"""DMARC XML report parser""" import xml.etree.ElementTree as ET from datetime import datetime def parse_dmarc_report(xml_content): """Parse DMARC XML report and extract relevant data Args: xml_content: String containing XML report data Returns: dict: Parsed report data or None if parsing fails """ try: root = ET.fromstring(xml_content) # Extract report metadata report_metadata = _parse_report_metadata(root) # Extract policy published policy_published = _parse_policy_published(root) # Extract records records = _parse_records(root) return { 'metadata': report_metadata, 'policy_published': policy_published, 'records': records } except Exception as e: print(f"Error parsing DMARC report: {e}") return None def _parse_report_metadata(root): """Parse report metadata section""" metadata = {} report_metadata = root.find('report_metadata') if report_metadata is not None: # Organization name org_name = report_metadata.find('org_name') if org_name is not None: metadata['org_name'] = org_name.text # Email email = report_metadata.find('email') if email is not None: metadata['email'] = email.text # Report ID report_id = report_metadata.find('report_id') if report_id is not None: metadata['report_id'] = report_id.text # Date range date_range = report_metadata.find('date_range') if date_range is not None: begin = date_range.find('begin') end = date_range.find('end') if begin is not None: metadata['date_begin'] = datetime.fromtimestamp(int(begin.text)) if end is not None: metadata['date_end'] = datetime.fromtimestamp(int(end.text)) return metadata def _parse_policy_published(root): """Parse policy published section""" policy = {} policy_published = root.find('policy_published') if policy_published is not None: # Domain domain = policy_published.find('domain') if domain is not None: policy['domain'] = domain.text # ADKIM (DKIM alignment) adkim = policy_published.find('adkim') if adkim is not None: policy['adkim'] = adkim.text # ASPF (SPF alignment) aspf = policy_published.find('aspf') if aspf is not None: policy['aspf'] = aspf.text # Policy p = policy_published.find('p') if p is not None: policy['p'] = p.text # Subdomain policy sp = policy_published.find('sp') if sp is not None: policy['sp'] = sp.text # Percentage pct = policy_published.find('pct') if pct is not None: policy['pct'] = int(pct.text) return policy def _parse_records(root): """Parse record section""" records = [] for record in root.findall('record'): record_data = {} # Row data row = record.find('row') if row is not None: source_ip = row.find('source_ip') if source_ip is not None: record_data['source_ip'] = source_ip.text count = row.find('count') if count is not None: record_data['count'] = int(count.text) # Policy evaluation policy_evaluated = row.find('policy_evaluated') if policy_evaluated is not None: disposition = policy_evaluated.find('disposition') if disposition is not None: record_data['disposition'] = disposition.text dkim = policy_evaluated.find('dkim') if dkim is not None: record_data['dkim_result'] = dkim.text spf = policy_evaluated.find('spf') if spf is not None: record_data['spf_result'] = spf.text # Identifiers identifiers = record.find('identifiers') if identifiers is not None: header_from = identifiers.find('header_from') if header_from is not None: record_data['header_from'] = header_from.text # Auth results auth_results = record.find('auth_results') if auth_results is not None: # DKIM auth results dkim_results = [] for dkim in auth_results.findall('dkim'): dkim_data = {} domain = dkim.find('domain') if domain is not None: dkim_data['domain'] = domain.text result = dkim.find('result') if result is not None: dkim_data['result'] = result.text if dkim_data: dkim_results.append(dkim_data) record_data['dkim_auth'] = dkim_results # SPF auth results spf_results = [] for spf in auth_results.findall('spf'): spf_data = {} domain = spf.find('domain') if domain is not None: spf_data['domain'] = domain.text result = spf.find('result') if result is not None: spf_data['result'] = result.text if spf_data: spf_results.append(spf_data) record_data['spf_auth'] = spf_results records.append(record_data) return records