diff --git a/dev_scripts/registry.py b/dev_scripts/registry.py index b688056..d883d81 100755 --- a/dev_scripts/registry.py +++ b/dev_scripts/registry.py @@ -1,9 +1,12 @@ #!/usr/bin/python import hashlib +import json +import platform import re import shutil import subprocess +from base64 import b64decode from tempfile import NamedTemporaryFile import click @@ -90,6 +93,12 @@ class RegistryClient: response.raise_for_status() return response + def get_manifest_hash(self, tag, tag_manifest_content=None): + if not tag_manifest_content: + tag_manifest_content = self.get_manifest(tag).content + + return hashlib.sha256(tag_manifest_content).hexdigest() + def get_attestation(self, tag): """ Retrieve an attestation from a given tag. @@ -114,7 +123,7 @@ class RegistryClient: # The attestation is available on the same container registry, with a # specific tag named "sha256-{sha256(manifest)}" - tag_manifest_hash = hashlib.sha256(tag_manifest_content).hexdigest() + tag_manifest_hash = self.get_manifest_hash(tag, tag_manifest_content) # This will get us a "list" of manifests... manifests = self.list_manifests(f"sha256-{tag_manifest_hash}") @@ -138,43 +147,153 @@ class RegistryClient: bundle = self.get_blob(blob_digest) return tag_manifest_content, bundle.content - def verify_attestation(self, image_tag: str, expected_repo: str): - """ - Look up the image attestation to see if the image has been built - on Github runners, and from a given repository. - """ - manifest, bundle = self.get_attestation(image_tag) - def _write(file, content): - file.write(content) - file.flush() +def _write(file, content): + file.write(content) + file.flush() - # Put the value in files and verify with cosign - with ( - NamedTemporaryFile(mode="wb") as manifest_json, - NamedTemporaryFile(mode="wb") as bundle_json, - ): - _write(manifest_json, manifest) - _write(bundle_json, bundle) - # Call cosign with the temporary file paths - cmd = [ - "cosign", - "verify-blob-attestation", - "--bundle", - bundle_json.name, - "--new-bundle-format", - "--certificate-oidc-issuer", - "https://token.actions.githubusercontent.com", - "--certificate-identity-regexp", - f"^https://github.com/{expected_repo}/.github/workflows/release-container-image.yml@refs/heads/test/image-publication-cosign", - manifest_json.name, - ] +def verify_attestation( + registry_client: RegistryClient, image_tag: str, expected_repo: str +): + """ + Look up the image attestation to see if the image has been built + on Github runners, and from a given repository. + """ + manifest, bundle = registry_client.get_attestation(image_tag) - result = subprocess.run(cmd, capture_output=True) - if result.returncode != 0: - raise Exception(f"Attestation cannot be verified. {result.stderr}") - return True + # Put the value in files and verify with cosign + with ( + NamedTemporaryFile(mode="wb") as manifest_json, + NamedTemporaryFile(mode="wb") as bundle_json, + ): + _write(manifest_json, manifest) + _write(bundle_json, bundle) + + # Call cosign with the temporary file paths + cmd = [ + "cosign", + "verify-blob-attestation", + "--bundle", + bundle_json.name, + "--new-bundle-format", + "--certificate-oidc-issuer", + "https://token.actions.githubusercontent.com", + "--certificate-identity-regexp", + f"^https://github.com/{expected_repo}/.github/workflows/release-container-image.yml@refs/heads/test/image-publication-cosign", + manifest_json.name, + ] + + result = subprocess.run(cmd, capture_output=True) + if result.returncode != 0: + raise Exception(f"Attestation cannot be verified. {result.stderr}") + return True + + +def new_version_available(): + # XXX - Implement + return True + + +def check_signature(signature_bundle, pub_key): + """Ensure that the signature bundle has been signed by the given public key.""" + + # Put the value in files and verify with cosign + with ( + NamedTemporaryFile(mode="w") as signature_file, + NamedTemporaryFile(mode="bw") as payload_file, + ): + json.dump(signature_bundle, signature_file) + signature_file.flush() + + payload_bytes = b64decode(signature_bundle["Payload"]) + _write(payload_file, payload_bytes) + + cmd = [ + "cosign", + "verify-blob", + "--key", + pub_key, + "--bundle", + signature_file.name, + payload_file.name, + ] + result = subprocess.run(cmd, capture_output=True) + if result.returncode != 0: + return False + return result.stderr == b"Verified OK\n" + + +def get_runtime_name() -> str: + if platform.system() == "Linux": + return "podman" + return "docker" + + +def container_pull(image): + cmd = [get_runtime_name(), "pull", f"{image}"] + process = subprocess.Popen(cmd, stdout=subprocess.PIPE) + process.communicate() + + +def upgrade_container_image(image, tag, pub_key, registry: RegistryClient): + if not new_version_available(): + return + + hash = registry.get_manifest_hash(tag) + signatures = get_signatures(image, hash) + if len(signatures) < 1: + raise Exception("Unable to retrieve signatures") + + print(f"Found {len(signatures)} signature(s) for {image}") + for signature in signatures: + signature_is_valid = check_signature(signature, pub_key) + if not signature_is_valid: + raise Exception("Unable to verify signature") + print("✅ Signature is valid") + + # At this point, the signature is verified, let's upgrade + # XXX Use the hash here to avoid race conditions + container_pull(image) + + +def get_signatures(image, hash): + """ + Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format. + """ + + def _to_bundle(sig): + # Convert cosign-download signatures to the format expected by cosign bundle. + bundle = sig["Bundle"] + payload = bundle["Payload"] + return { + "base64Signature": sig["Base64Signature"], + "Payload": sig["Payload"], + "cert": sig["Cert"], + "chain": sig["Chain"], + "rekorBundle": { + "SignedEntryTimestamp": bundle["SignedEntryTimestamp"], + "Payload": { + "body": payload["body"], + "integratedTime": payload["integratedTime"], + "logIndex": payload["logIndex"], + "logID": payload["logID"], + }, + }, + "RFC3161Timestamp": sig["RFC3161Timestamp"], + } + + process = subprocess.run( + ["cosign", "download", "signature", f"{image}@sha256:{hash}"], + capture_output=True, + check=True, + ) + + # XXX: Check the output first. + signatures_raw = process.stdout.decode("utf-8").strip().split("\n") + + # Remove the last return, split on newlines, convert from JSON + return [_to_bundle(json.loads(sig)) for sig in signatures_raw] def parse_image_location(input_string): @@ -190,7 +309,13 @@ def parse_image_location(input_string): match = re.match(pattern, input_string) if not match: raise ValueError("Malformed image location") - return match.group("registry", "namespace", "repository", "tag") + + return ( + match.group("registry"), + match.group("namespace"), + match.group("repository"), + match.group("tag") or "latest", + ) @click.group() @@ -198,6 +323,16 @@ def main(): pass +@main.command() +@click.argument("image") +@click.option("--pubkey", default="pub.key") +def upgrade_image(image, pubkey): + registry, org, package, tag = parse_image_location(image) + registry_client = RegistryClient(registry, org, package) + + upgrade_container_image(image, tag, pubkey, registry_client) + + @main.command() @click.argument("image") def list_tags(image): @@ -239,7 +374,7 @@ def attest(image: str, repo: str): tag = tag or "latest" client = RegistryClient(registry, org, package) - verified = client.verify_attestation(tag, repo) + verified = verify_attestation(client, tag, repo) if verified: click.echo( f"🎉 The image available at `{client.image}:{tag}` has been built by Github Runners from the `{repo}` repository"