import os import argparse from pathlib import Path import datetime from functools import wraps import collections import csv import sh from sh.contrib import git import boto3 s3 = boto3.resource('s3') parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest='cmd', help='git snapshot commands') parser_backup= subparsers.add_parser('backup', help='backup repositories') parser_backup.add_argument('repositories', help='directory with repos') args = parser.parse_args() Context = collections.namedtuple('Context', 'repo bucket prefix bundle repo_name wal') class Wal(object): """WAL writer for server processes""" WAL_FILE = os.environ.get('WAL_FILE', 'git-snaphot.wal') wal = None fd = None def __init__(self): self.fd = open(self.WAL_FILE, 'a') self.wal = csv.writer(self.fd, quotechar='|', quoting=csv.QUOTE_ALL) def __enter__(self): return self def __exit__(self, type, value, traceback): self.fd_to_disk() self.fd.close() def write(self, row): ts_row = [datetime.datetime.utcnow().timestamp()] ts_row.extend(row) self.wal.writerow(ts_row) self.fd_to_disk() def fd_to_disk(self): self.fd.flush() os.fsync(self.fd) def logit(log_message): def logitargs(func): @wraps(func) def with_logging(ctx, *args, **kwargs): ctx.wal.write([log_message, f'started {ctx.repo_name}']) output = None try: output = func(ctx, *args, **kwargs) except Exception as e: ctx.wal.write([log_message, f'failed {e}']) ctx.wal.write([log_message, f' completed {ctx.repo_name}']) return output return with_logging return logitargs @logit('s3 object upload') def cloud_object(ctx, bundle): obj = s3.Object(ctx.bucket, f'{ctx.prefix}/{bundle.name}') obj.upload_file(str(bundle)) return obj @logit('repo bundle') def create_bundle(ctx): # new repo or backup ts = datetime.datetime.utcnow().timestamp() bundle_path = Path(f'{ctx.repo_name}.{ts}.bundle') ctx.repo.bundle('create', bundle_path, '--all') return bundle_path @logit('repo checkpoint tag') def tag_checkpoint(ctx): last_hash = ctx.repo('rev-list', '-n', 1, '--all').strip() ctx.repo.tag('-f', 'CHECKPOINT', last_hash) @logit('ready for backup') def requires_backup(ctx): last_hash = ctx.repo('rev-list', '-n', 1, '--all').strip() # empty repo if not last_hash: return False try: checkpoint = ctx.repo('rev-list', '-n', 1, 'CHECKPOINT').strip() # no checkpoint exists except sh.ErrorReturnCode as e: return True # flip truthy return last_hash != checkpoint def run_restore(bundle): pass def run_backup(ctx): if requires_backup(ctx): bundle_path = create_bundle(ctx) obj = cloud_object(ctx, bundle_path) tag_checkpoint(ctx) def run_backups(base): repo_base_path = Path(base) with Wal() as wal: for repo_path in repo_base_path.glob('*.git'): ctx = Context( repo=git.bake(f'--git-dir={repo_path}/'), repo_name=repo_path.name.split('.')[0], bucket='privategit', prefix='2', bundle=None, wal=wal ) run_backup(ctx) if args.cmd == 'backup': run_backups(args.repositories) elif args.cmd == 'restore': run_restore(args.bundle)