summaryrefslogtreecommitdiff
path: root/dmarc_analyzer/extractor.py
diff options
context:
space:
mode:
Diffstat (limited to 'dmarc_analyzer/extractor.py')
-rw-r--r--dmarc_analyzer/extractor.py65
1 files changed, 65 insertions, 0 deletions
diff --git a/dmarc_analyzer/extractor.py b/dmarc_analyzer/extractor.py
new file mode 100644
index 0000000..d12d67b
--- /dev/null
+++ b/dmarc_analyzer/extractor.py
@@ -0,0 +1,65 @@
+"""File extraction utilities for DMARC reports"""
+
+import gzip
+import zipfile
+import tempfile
+import os
+from pathlib import Path
+
+
+def extract_files(file_path):
+ """Extract XML content from gzip or zip files
+
+ Args:
+ file_path: Path to the compressed file
+
+ Returns:
+ str: XML content if successful, None otherwise
+ """
+ file_path = Path(file_path)
+
+ try:
+ if file_path.suffix == '.gz':
+ return _extract_gzip(file_path)
+ elif file_path.suffix == '.zip':
+ return _extract_zip(file_path)
+ else:
+ # Assume it's already XML
+ return file_path.read_text()
+ except Exception as e:
+ print(f"Error extracting {file_path}: {e}")
+ return None
+
+
+def _extract_gzip(file_path):
+ """Extract content from gzip file"""
+ with gzip.open(file_path, 'rt') as f:
+ return f.read()
+
+
+def _extract_zip(file_path):
+ """Extract content from zip file
+
+ For zip files, we look for XML files inside and return the first one
+ """
+ with zipfile.ZipFile(file_path, 'r') as zip_ref:
+ # List all files in the zip
+ file_list = zip_ref.namelist()
+
+ # Find the first XML file
+ xml_file = None
+ for filename in file_list:
+ if filename.lower().endswith('.xml'):
+ xml_file = filename
+ break
+
+ if xml_file:
+ with zip_ref.open(xml_file) as f:
+ return f.read().decode('utf-8')
+ else:
+ # If no XML file found, try the first file
+ if file_list:
+ with zip_ref.open(file_list[0]) as f:
+ return f.read().decode('utf-8')
+
+ return None \ No newline at end of file