#!/usr/bin/env python import argparse import os import subprocess import sys import typing as T import yaml ENCODING = "utf-8" HOSTS: T.Dict[T.Text, T.Dict[T.Text, T.Text]] = {} # From: https://stackoverflow.com/a/43060743 class DummyVault(yaml.YAMLObject): yaml_tag = "!vault" def __init__(self, cyphertext): self.cyphertext = "\n".join([e.strip() for e in cyphertext.split("\n")]) def __repr__(self): return f"{self.__class__.__name__}({self.cyphertext[:10]}...)" @classmethod def from_yaml(cls, loader, node): return DummyVault(node.value) @classmethod def to_yaml(cls, dumper, data): return dumper.represent_scalar(cls.yaml_tag, data.cyphertext, style="|") yaml.SafeLoader.add_constructor("!vault", DummyVault.from_yaml) yaml.SafeDumper.add_multi_representer(DummyVault, DummyVault.to_yaml) def load_hosts(inventory: T.Text, role: T.Optional[T.Text]) -> T.List[T.Text]: with open(inventory, "r") as f: data = yaml.load(f, Loader=yaml.SafeLoader) if role: return [k for k in data["all"]["children"][role]["hosts"].keys()] return [k for k in data["all"]["hosts"].keys()] def gen_key(name: T.Text) -> None: with open(f"{name}.sec", "w") as sec: subprocess.call(["wg", "genkey"], stdout=sec) with open(f"{name}.sec", "rb", 0) as sec_r, open(f"{name}.pub", "w") as pub: subprocess.call(["wg", "pubkey"], stdin=sec_r, stdout=pub) def _encrypt_string( plaintext: T.Text, passfile: T.Text, vault_id: T.Optional[T.Text], passfile_dir: T.Optional[T.Text], ) -> bytes: if passfile_dir is None: passfile_dir = os.getcwd() if vault_id is None: return _encrypt_string_simple(plaintext, passfile, passfile_dir) else: return _encrypt_string_vault(plaintext, f"{vault_id}@{passfile}", passfile_dir) def _encrypt_string_simple( plaintext: T.Text, passfile: T.Text, passfile_dir: T.Text ) -> bytes: return subprocess.check_output( [ "ansible-vault", "encrypt_string", f"--vault-password-file={passfile}", plaintext, ], cwd=passfile_dir, ) def _encrypt_string_vault( plaintext: T.Text, vault_passfile: T.Text, passfile_dir: T.Text ) -> bytes: return subprocess.check_output( ["ansible-vault", "encrypt_string", f"--vault-id={vault_passfile}", plaintext], cwd=passfile_dir, ) def to_vault( name: T.Text, passfile: T.Text, vault_id: T.Optional[T.Text], passfile_dir: T.Optional[T.Text], ) -> None: with open(f"{name}.pub", "r") as pub: pubkey = pub.readline().strip("\n") enc_pub = _encrypt_string(pubkey, passfile, vault_id, passfile_dir) with open(f"{name}.sec", "r") as sec: seckey = sec.readline().strip("\n") enc_sec = _encrypt_string(seckey, passfile, vault_id, passfile_dir) HOSTS[name] = { "public_key": yaml.load(enc_pub.decode(ENCODING), Loader=yaml.SafeLoader), "private_key": yaml.load(enc_sec.decode(ENCODING), Loader=yaml.SafeLoader), } if __name__ == "__main__": parser = argparse.ArgumentParser( description="Generates keys for wireguard connection of the dns servers" ) parser.add_argument( "inventory", metavar="INVENTORY", type=str, help="path to the inventory" ) parser.add_argument( "passfile", metavar="VAULT_PASSFILE", type=str, help="the name of the file that contains the passphrase for the inventory", ) parser.add_argument( "--vault-id", metavar="VAULT_ID", type=str, help="the name of the (existing) vault", ) parser.add_argument( "--passfile-dir", metavar="PASSFILE_DIR", type=str, help="path where the passfile is located", ) parser.add_argument( "--role", metavar="ROLE", type=str, help="an optional key to use to filter the search for hosts in the inventory", ) args = parser.parse_args() for host in load_hosts(args.inventory, args.role): gen_key(host) to_vault(host, args.passfile, args.vault_id, args.passfile_dir) result = yaml.dump(HOSTS, Dumper=yaml.SafeDumper) with open("result.yml", "w") as res: res.write(result) print(result)