diff --git a/dev_scripts/registry.py b/dev_scripts/registry.py index d883d81..c5e5f4a 100755 --- a/dev_scripts/registry.py +++ b/dev_scripts/registry.py @@ -7,11 +7,24 @@ import re import shutil import subprocess from base64 import b64decode +from pathlib import Path from tempfile import NamedTemporaryFile import click import requests +try: + import platformdirs +except ImportError: + import appdirs as platformdirs + + +def get_config_dir() -> str: + return Path(platformdirs.user_config_dir("dangerzone")) + + +SIGNATURES_PATH = get_config_dir() / "signatures" + DEFAULT_REPO = "freedomofpress/dangerzone" SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json" DOCKER_MANIFEST_DISTRIBUTION = "application/vnd.docker.distribution.manifest.v2+json" @@ -190,13 +203,37 @@ def verify_attestation( return True -def new_version_available(): +def new_image_release(): # XXX - Implement return True -def check_signature(signature_bundle, pub_key): - """Ensure that the signature bundle has been signed by the given public key.""" +def signature_to_bundle(sig): + # Convert cosign-download signatures to the format expected by cosign bundle. + bundle = sig["Bundle"] + payload = bundle["Payload"] + return { + "base64Signature": sig["Base64Signature"], + "Payload": sig["Payload"], + "cert": sig["Cert"], + "chain": sig["Chain"], + "rekorBundle": { + "SignedEntryTimestamp": bundle["SignedEntryTimestamp"], + "Payload": { + "body": payload["body"], + "integratedTime": payload["integratedTime"], + "logIndex": payload["logIndex"], + "logID": payload["logID"], + }, + }, + "RFC3161Timestamp": sig["RFC3161Timestamp"], + } + + +def verify_signature(signature, pubkey): + """Verify a signature against a given public key""" + + signature_bundle = signature_to_bundle(signature) # Put the value in files and verify with cosign with ( @@ -213,13 +250,14 @@ def check_signature(signature_bundle, pub_key): "cosign", "verify-blob", "--key", - pub_key, + pubkey, "--bundle", signature_file.name, payload_file.name, ] result = subprocess.run(cmd, capture_output=True) if result.returncode != 0: + # XXX Raise instead? return False return result.stderr == b"Verified OK\n" @@ -236,53 +274,116 @@ def container_pull(image): process.communicate() -def upgrade_container_image(image, tag, pub_key, registry: RegistryClient): - if not new_version_available(): +def upgrade_container_image(image, tag, pubkey, registry: RegistryClient): + if not new_image_release(): return hash = registry.get_manifest_hash(tag) signatures = get_signatures(image, hash) + if len(signatures) < 1: raise Exception("Unable to retrieve signatures") print(f"Found {len(signatures)} signature(s) for {image}") for signature in signatures: - signature_is_valid = check_signature(signature, pub_key) + signature_is_valid = verify_signature(signature, pubkey) if not signature_is_valid: raise Exception("Unable to verify signature") print("✅ Signature is valid") - # At this point, the signature is verified, let's upgrade + # At this point, the signatures are verified + # We store the signatures just now to avoid storing unverified signatures + store_signatures(signatures, hash, pubkey) + + # let's upgrade the image # XXX Use the hash here to avoid race conditions container_pull(image) +def get_file_hash(file): + with open(file, "rb") as f: + content = f.read() + return hashlib.sha256(content).hexdigest() + + +def load_signatures(image_hash, pubkey): + pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey) + if not pubkey_signatures.exists(): + msg = ( + f"Cannot find a '{pubkey_signatures}' folder." + "You might need to download the image signatures first." + ) + raise Exception(msg) + + with open(pubkey_signatures / f"{image_hash}.json") as f: + return json.load(f) + + +def store_signatures(signatures, image_hash, pubkey): + """ + Store signatures locally in the SIGNATURE_PATH folder, like this: + + ~/.config/dangerzone/signatures/ + └── + └── .json + └── .json + + The format used in the `.json` file is the one of `cosign download + signature`, which differs from the "bundle" one used afterwards. + + It can be converted to the one expected by cosign verify --bundle with + the `signature_to_bundle()` function. + """ + + def _get_digest(sig): + payload = json.loads(b64decode(sig["Payload"])) + return payload["critical"]["image"]["docker-manifest-digest"] + + # All the signatures should share the same hash. + hashes = list(map(_get_digest, signatures)) + if len(set(hashes)) != 1: + raise Exception("Signatures do not share the same image hash") + + if f"sha256:{image_hash}" != hashes[0]: + raise Exception("Signatures do not match the given image hash") + + pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey) + pubkey_signatures.mkdir(exist_ok=True) + + with open(pubkey_signatures / f"{image_hash}.json", "w") as f: + json.dump(signatures, f) + + +def verify_local_image_signature(image, pubkey): + """ + Verifies that a local image has a valid signature + """ + image_hash = get_image_hash(image) + signatures = load_signatures(image_hash, pubkey) + if len(signatures) < 1: + raise Exception("No signatures found") + + for signature in signatures: + if not verify_signature(signature, pubkey): + msg = f"Unable to verify signature for {image} with pubkey {pubkey}" + raise Exception(msg) + return True + + +def get_image_hash(image): + """ + Returns a image hash from a local image name + """ + cmd = [get_runtime_name(), "image", "inspect", image, "-f", "{{.Digest}}"] + result = subprocess.run(cmd, capture_output=True, check=True) + return result.stdout.strip().decode().strip("sha256:") + + def get_signatures(image, hash): """ Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format. """ - def _to_bundle(sig): - # Convert cosign-download signatures to the format expected by cosign bundle. - bundle = sig["Bundle"] - payload = bundle["Payload"] - return { - "base64Signature": sig["Base64Signature"], - "Payload": sig["Payload"], - "cert": sig["Cert"], - "chain": sig["Chain"], - "rekorBundle": { - "SignedEntryTimestamp": bundle["SignedEntryTimestamp"], - "Payload": { - "body": payload["body"], - "integratedTime": payload["integratedTime"], - "logIndex": payload["logIndex"], - "logID": payload["logID"], - }, - }, - "RFC3161Timestamp": sig["RFC3161Timestamp"], - } - process = subprocess.run( ["cosign", "download", "signature", f"{image}@sha256:{hash}"], capture_output=True, @@ -290,32 +391,54 @@ def get_signatures(image, hash): ) # XXX: Check the output first. - signatures_raw = process.stdout.decode("utf-8").strip().split("\n") - # Remove the last return, split on newlines, convert from JSON - return [_to_bundle(json.loads(sig)) for sig in signatures_raw] + signatures_raw = process.stdout.decode("utf-8").strip().split("\n") + return list(map(json.loads, signatures_raw)) -def parse_image_location(input_string): - """Parses container image location into (registry, namespace, repository, tag)""" - pattern = ( - r"^" - r"(?P[a-zA-Z0-9.-]+)/" - r"(?P[a-zA-Z0-9-]+)/" - r"(?P[^:]+)" - r"(?::(?P[a-zA-Z0-9.-]+))?" - r"$" - ) - match = re.match(pattern, input_string) - if not match: - raise ValueError("Malformed image location") +class Image: + def __init__(self, registry, namespace, repository, tag="latest"): + self.registry = registry + self.namespace = namespace + self.repository = repository + self.tag = tag - return ( - match.group("registry"), - match.group("namespace"), - match.group("repository"), - match.group("tag") or "latest", - ) + def properties(self): + return (self.registry, self.namespace, self.repository, self.tag) + + @property + def name_without_tag(self): + return f"{self.registry}/{self.namespace}/{self.repository}" + + @property + def name_with_tag(self): + return f"{self.name_without_tag}:{self.tag}" + + @classmethod + def from_string(cls, input_string): + """Parses container image location into (registry, namespace, repository, tag)""" + pattern = ( + r"^" + r"(?P[a-zA-Z0-9.-]+)/" + r"(?P[a-zA-Z0-9-]+)/" + r"(?P[^:]+)" + r"(?::(?P[a-zA-Z0-9.-]+))?" + r"$" + ) + match = re.match(pattern, input_string) + if not match: + raise ValueError("Malformed image location") + + return cls( + match.group("registry"), + match.group("namespace"), + match.group("repository"), + match.group("tag") or "latest", + ) + + +def parse_image_location(string): + return Image.from_string(string).properties @click.group() @@ -327,12 +450,21 @@ def main(): @click.argument("image") @click.option("--pubkey", default="pub.key") def upgrade_image(image, pubkey): - registry, org, package, tag = parse_image_location(image) - registry_client = RegistryClient(registry, org, package) + registry, namespace, repository, tag = parse_image_location(image) + registry_client = RegistryClient(registry, namespace, repository) upgrade_container_image(image, tag, pubkey, registry_client) +@main.command() +@click.argument("image") +@click.option("--pubkey", default="pub.key") +def verify_local_image(image, pubkey): + # XXX remove a potentiel :tag + if verify_local_image_signature(image, pubkey): + click.echo(f"✅ The local image {image} has been signed with {pubkey}") + + @main.command() @click.argument("image") def list_tags(image):