summaryrefslogtreecommitdiff
path: root/dmarc_analyzer/parser.py
diff options
context:
space:
mode:
Diffstat (limited to 'dmarc_analyzer/parser.py')
-rw-r--r--dmarc_analyzer/parser.py185
1 files changed, 185 insertions, 0 deletions
diff --git a/dmarc_analyzer/parser.py b/dmarc_analyzer/parser.py
new file mode 100644
index 0000000..36a27a3
--- /dev/null
+++ b/dmarc_analyzer/parser.py
@@ -0,0 +1,185 @@
+"""DMARC XML report parser"""
+
+import xml.etree.ElementTree as ET
+from datetime import datetime
+
+
+def parse_dmarc_report(xml_content):
+ """Parse DMARC XML report and extract relevant data
+
+ Args:
+ xml_content: String containing XML report data
+
+ Returns:
+ dict: Parsed report data or None if parsing fails
+ """
+ try:
+ root = ET.fromstring(xml_content)
+
+ # Extract report metadata
+ report_metadata = _parse_report_metadata(root)
+
+ # Extract policy published
+ policy_published = _parse_policy_published(root)
+
+ # Extract records
+ records = _parse_records(root)
+
+ return {
+ 'metadata': report_metadata,
+ 'policy_published': policy_published,
+ 'records': records
+ }
+
+ except Exception as e:
+ print(f"Error parsing DMARC report: {e}")
+ return None
+
+
+def _parse_report_metadata(root):
+ """Parse report metadata section"""
+ metadata = {}
+
+ report_metadata = root.find('report_metadata')
+ if report_metadata is not None:
+ # Organization name
+ org_name = report_metadata.find('org_name')
+ if org_name is not None:
+ metadata['org_name'] = org_name.text
+
+ # Email
+ email = report_metadata.find('email')
+ if email is not None:
+ metadata['email'] = email.text
+
+ # Report ID
+ report_id = report_metadata.find('report_id')
+ if report_id is not None:
+ metadata['report_id'] = report_id.text
+
+ # Date range
+ date_range = report_metadata.find('date_range')
+ if date_range is not None:
+ begin = date_range.find('begin')
+ end = date_range.find('end')
+ if begin is not None:
+ metadata['date_begin'] = datetime.fromtimestamp(int(begin.text))
+ if end is not None:
+ metadata['date_end'] = datetime.fromtimestamp(int(end.text))
+
+ return metadata
+
+
+def _parse_policy_published(root):
+ """Parse policy published section"""
+ policy = {}
+
+ policy_published = root.find('policy_published')
+ if policy_published is not None:
+ # Domain
+ domain = policy_published.find('domain')
+ if domain is not None:
+ policy['domain'] = domain.text
+
+ # ADKIM (DKIM alignment)
+ adkim = policy_published.find('adkim')
+ if adkim is not None:
+ policy['adkim'] = adkim.text
+
+ # ASPF (SPF alignment)
+ aspf = policy_published.find('aspf')
+ if aspf is not None:
+ policy['aspf'] = aspf.text
+
+ # Policy
+ p = policy_published.find('p')
+ if p is not None:
+ policy['p'] = p.text
+
+ # Subdomain policy
+ sp = policy_published.find('sp')
+ if sp is not None:
+ policy['sp'] = sp.text
+
+ # Percentage
+ pct = policy_published.find('pct')
+ if pct is not None:
+ policy['pct'] = int(pct.text)
+
+ return policy
+
+
+def _parse_records(root):
+ """Parse record section"""
+ records = []
+
+ for record in root.findall('record'):
+ record_data = {}
+
+ # Row data
+ row = record.find('row')
+ if row is not None:
+ source_ip = row.find('source_ip')
+ if source_ip is not None:
+ record_data['source_ip'] = source_ip.text
+
+ count = row.find('count')
+ if count is not None:
+ record_data['count'] = int(count.text)
+
+ # Policy evaluation
+ policy_evaluated = row.find('policy_evaluated')
+ if policy_evaluated is not None:
+ disposition = policy_evaluated.find('disposition')
+ if disposition is not None:
+ record_data['disposition'] = disposition.text
+
+ dkim = policy_evaluated.find('dkim')
+ if dkim is not None:
+ record_data['dkim_result'] = dkim.text
+
+ spf = policy_evaluated.find('spf')
+ if spf is not None:
+ record_data['spf_result'] = spf.text
+
+ # Identifiers
+ identifiers = record.find('identifiers')
+ if identifiers is not None:
+ header_from = identifiers.find('header_from')
+ if header_from is not None:
+ record_data['header_from'] = header_from.text
+
+ # Auth results
+ auth_results = record.find('auth_results')
+ if auth_results is not None:
+ # DKIM auth results
+ dkim_results = []
+ for dkim in auth_results.findall('dkim'):
+ dkim_data = {}
+ domain = dkim.find('domain')
+ if domain is not None:
+ dkim_data['domain'] = domain.text
+ result = dkim.find('result')
+ if result is not None:
+ dkim_data['result'] = result.text
+ if dkim_data:
+ dkim_results.append(dkim_data)
+ record_data['dkim_auth'] = dkim_results
+
+ # SPF auth results
+ spf_results = []
+ for spf in auth_results.findall('spf'):
+ spf_data = {}
+ domain = spf.find('domain')
+ if domain is not None:
+ spf_data['domain'] = domain.text
+ result = spf.find('result')
+ if result is not None:
+ spf_data['result'] = result.text
+ if spf_data:
+ spf_results.append(spf_data)
+ record_data['spf_auth'] = spf_results
+
+ records.append(record_data)
+
+ return records \ No newline at end of file