From 66ac7e56f8375a9d68defacf684793bded33efcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexis=20M=C3=A9taireau?= Date: Wed, 22 Jan 2025 15:21:10 +0100 Subject: [PATCH] Add a script to verify Github attestations --- dev_scripts/registry.py | 240 ++++++++++++++++++ .../independent-container-updates.md | 23 ++ 2 files changed, 263 insertions(+) create mode 100755 dev_scripts/registry.py create mode 100644 docs/developer/independent-container-updates.md diff --git a/dev_scripts/registry.py b/dev_scripts/registry.py new file mode 100755 index 0000000..9b26420 --- /dev/null +++ b/dev_scripts/registry.py @@ -0,0 +1,240 @@ +#!/usr/bin/python + +import hashlib +import re +import shutil +import subprocess +from tempfile import NamedTemporaryFile + +import click +import requests + +DEFAULT_REPO = "freedomofpress/dangerzone" +SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json" +DOCKER_MANIFEST_DISTRIBUTION = "application/vnd.docker.distribution.manifest.v2+json" +DOCKER_MANIFEST_INDEX = "application/vnd.oci.image.index.v1+json" +OCI_IMAGE_MANIFEST = "application/vnd.oci.image.manifest.v1+json" + + +class RegistryClient: + def __init__(self, registry, org, image): + self._registry = registry + self._org = org + self._image = image + self._auth_token = None + self._base_url = f"https://{registry}" + self._image_url = f"{self._base_url}/v2/{self._org}/{self._image}" + + @property + def image(self): + return f"{self._registry}/{self._org}/{self._image}" + + def get_auth_token(self): + if not self._auth_token: + auth_url = f"{self._base_url}/token" + response = requests.get( + auth_url, + params={ + "service": f"{self._registry}", + "scope": f"repository:{self._org}/{self._image}:pull", + }, + ) + response.raise_for_status() + self._auth_token = response.json()["token"] + return self._auth_token + + def get_auth_header(self): + return {"Authorization": f"Bearer {self.get_auth_token()}"} + + def list_tags(self): + url = f"{self._image_url}/tags/list" + response = requests.get(url, headers=self.get_auth_header()) + response.raise_for_status() + tags = response.json().get("tags", []) + return tags + + def get_manifest(self, tag, extra_headers=None): + """Get manifest information for a specific tag""" + manifest_url = f"{self._image_url}/manifests/{tag}" + headers = { + "Accept": DOCKER_MANIFEST_DISTRIBUTION, + "Authorization": f"Bearer {self.get_auth_token()}", + } + if extra_headers: + headers.update(extra_headers) + + response = requests.get(manifest_url, headers=headers) + response.raise_for_status() + return response + + def list_manifests(self, tag): + return ( + self.get_manifest( + tag, + { + "Accept": DOCKER_MANIFEST_INDEX, + }, + ) + .json() + .get("manifests") + ) + + def get_blob(self, hash): + url = f"{self._image_url}/blobs/{hash}" + response = requests.get( + url, + headers={ + "Authorization": f"Bearer {self.get_auth_token()}", + }, + ) + response.raise_for_status() + return response + + def get_attestation(self, tag): + """ + Retrieve an attestation from a given tag. + + The attestation needs to be attached using the Cosign Bundle + Specification defined at: + + https://github.com/sigstore/cosign/blob/main/specs/BUNDLE_SPEC.md + """ + + def _find_sigstore_bundle_manifest(manifests): + for manifest in manifests: + if manifest["artifactType"] == SIGSTORE_BUNDLE: + return manifest["mediaType"], manifest["digest"] + + def _get_bundle_blob_digest(layers): + for layer in layers: + if layer.get("mediaType") == SIGSTORE_BUNDLE: + return layer["digest"] + + tag_manifest_content = self.get_manifest(tag).content + + # The attestation is available on the same container registry, with a + # specific tag named "sha256-{sha256(manifest)}" + tag_manifest_hash = hashlib.sha256(tag_manifest_content).hexdigest() + + # This will get us a "list" of manifests... + manifests = self.list_manifests(f"sha256-{tag_manifest_hash}") + + # ... from which we want the sigstore bundle + bundle_manifest_mediatype, bundle_manifest_digest = ( + _find_sigstore_bundle_manifest(manifests) + ) + if not bundle_manifest_digest: + raise Error("Not able to find sigstore bundle manifest info") + + bundle_manifest = self.get_manifest( + bundle_manifest_digest, extra_headers={"Accept": bundle_manifest_mediatype} + ).json() + + # From there, we will get the attestation in a blob. + # It will be the first layer listed at this manifest hash location + layers = bundle_manifest.get("layers", []) + + blob_digest = _get_bundle_blob_digest(layers) + bundle = self.get_blob(blob_digest) + return tag_manifest_content, bundle.content + + def verify_attestation(self, image_tag: str, expected_repo: str): + """ + Look up the image attestation to see if the image has been built + on Github runners, and from a given repository. + """ + manifest, bundle = self.get_attestation(image_tag) + + def _write(file, content): + file.write(content) + file.flush() + + # Put the value in files and verify with cosign + with ( + NamedTemporaryFile(mode="wb") as manifest_json, + NamedTemporaryFile(mode="wb") as bundle_json, + ): + _write(manifest_json, manifest) + _write(bundle_json, bundle) + + # Call cosign with the temporary file paths + cmd = [ + "cosign", + "verify-blob-attestation", + "--bundle", + bundle_json.name, + "--new-bundle-format", + "--certificate-oidc-issuer", + "https://token.actions.githubusercontent.com", + "--certificate-identity-regexp", + f"^https://github.com/{expected_repo}/.github/workflows/release-container-image.yml@refs/heads/test/image-publication-cosign", + manifest_json.name, + ] + + result = subprocess.run(cmd, capture_output=True) + if result.returncode != 0: + raise Exception(f"Attestation cannot be verified. {result.stderr}") + return True + + +def parse_image_location(input_string): + """Parses container image location into (registry, namespace, repository, tag)""" + pattern = ( + r"^" + r"(?P[a-zA-Z0-9.-]+)/" + r"(?P[a-zA-Z0-9-]+)/" + r"(?P[^:]+)" + r"(?::(?P[a-zA-Z0-9.-]+))?" + r"$" + ) + match = re.match(pattern, input_string) + if not match: + raise ValueError("Malformed image location") + return match.group("registry", "namespace", "repository", "tag") + + +@click.group() +def main(): + pass + + +@main.command() +@click.argument("image") +def list_tags(image): + registry, org, package, _ = parse_image_location(image) + client = RegistryClient(registry, org, package) + tags = client.list_tags() + click.echo(f"Existing tags for {client.image}") + for tag in tags: + click.echo(tag) + + +@main.command() +@click.argument("image") +@click.option( + "--repo", + default=DEFAULT_REPO, + help="The github repository to check the attestation for", +) +def attest(image: str, repo: str): + """ + Look up the image attestation to see if the image has been built + on Github runners, and from a given repository. + """ + if shutil.which("cosign") is None: + click.echo("The cosign binary is needed but not installed.") + raise click.Abort() + + registry, org, package, tag = parse_image_location(image) + tag = tag or "latest" + + client = RegistryClient(registry, org, package) + verified = client.verify_attestation(tag, repo) + if verified: + click.echo( + f"🎉 The image available at `{client.image}:{tag}` has been built by Github Runners from the `{repo}` repository" + ) + + +if __name__ == "__main__": + main() diff --git a/docs/developer/independent-container-updates.md b/docs/developer/independent-container-updates.md new file mode 100644 index 0000000..25a7d43 --- /dev/null +++ b/docs/developer/independent-container-updates.md @@ -0,0 +1,23 @@ +# Independent Container Updates + +Since version 0.9.0, Dangerzone is able to ship container images independently +from issuing a new release of the software. + +This is useful as images need to be kept updated with the latest security fixes. + +## Nightly images and attestations + +Each night, new images are built and pushed to our container registry, alongside +with a provenance attestation, enabling anybody to ensure that the image has +been originally built by Github CI runners, from a defined source repository (in our case `freedomofpress/dangerzone`). + +To verify the attestations against our expectations, use the following command: +```bash +poetry run ./dev_scripts/registry.py attest ghcr.io/freedomofpress/dangerzone/dangerzone:latest --repo freedomofpress/dangerzone +``` + +In case of sucess, it will report back: + +``` +🎉 The image available at `ghcr.io/freedomofpress/dangerzone/dangerzone:latest` has been built by Github runners from the `freedomofpress/dangerzone` repository. +```