diff --git a/dangerzone/updater/attestations.py b/dangerzone/updater/attestations.py index 5581e7d..ade878d 100644 --- a/dangerzone/updater/attestations.py +++ b/dangerzone/updater/attestations.py @@ -4,37 +4,82 @@ from tempfile import NamedTemporaryFile from . import cosign +# NOTE: You can grab the SLSA attestation for an image/tag pair with the following +# commands: +# +# IMAGE=ghcr.io/apyrgio/dangerzone/dangerzone +# TAG=20250129-0.8.0-149-gbf2f5ac +# DIGEST=$(crane digest ${IMAGE?}:${TAG?}) +# ATT_MANIFEST=${IMAGE?}:${DIGEST/:/-}.att +# ATT_BLOB=${IMAGE?}@$(crane manifest ${ATT_MANIFEST?} | jq -r '.layers[0].digest') +# crane blob ${ATT_BLOB?} | jq -r '.payload' | base64 -d | jq +CUE_POLICY = r""" +// The predicateType field must match this string +predicateType: "https://slsa.dev/provenance/v0.2" + +predicate: {{ + // This condition verifies that the builder is the builder we + // expect and trust. The following condition can be used + // unmodified. It verifies that the builder is the container + // workflow. + builder: {{ + id: =~"^https://github.com/slsa-framework/slsa-github-generator/.github/workflows/generator_container_slsa3.yml@refs/tags/v[0-9]+.[0-9]+.[0-9]+$" + }} + invocation: {{ + configSource: {{ + // This condition verifies the entrypoint of the workflow. + // Replace with the relative path to your workflow in your + // repository. + entryPoint: "{workflow}" + + // This condition verifies that the image was generated from + // the source repository we expect. Replace this with your + // repository. + uri: =~"^git\\+https://github.com/{repo}@refs/heads/{branch}" + // Add a condition to check for a specific commit hash + digest: {{ + sha1: "{commit}" + }} + }} + }} +}} +""" + + +def generate_cue_policy(repo, workflow, commit, branch): + return CUE_POLICY.format(repo=repo, workflow=workflow, commit=commit, branch=branch) + + def verify( - manifest: bytes, attestation_bundle: bytes, image_tag: str, expected_repo: str + image_name: str, branch: str, commit: str, repository: str, workflow: str, ) -> bool: """ Look up the image attestation to see if the image has been built on Github runners, and from a given repository. """ cosign.ensure_installed() + policy = generate_cue_policy(repository, workflow, commit, branch) # Put the value in files and verify with cosign with ( - NamedTemporaryFile(mode="wb") as manifest_json, - NamedTemporaryFile(mode="wb") as attestation_bundle_json, + NamedTemporaryFile(mode="w", suffix=".cue") as policy_f, ): - manifest_json.write(manifest) - manifest_json.flush() - attestation_bundle_json.write(attestation_bundle) - attestation_bundle_json.flush() + policy_f.write(policy) + policy_f.flush() # Call cosign with the temporary file paths cmd = [ "cosign", - "verify-blob-attestation", - "--bundle", - attestation_bundle_json.name, - "--new-bundle-format", + "verify-attestation", + "--type", + "slsaprovenance", + "--policy", + policy_f.name, "--certificate-oidc-issuer", "https://token.actions.githubusercontent.com", "--certificate-identity-regexp", - f"^https://github.com/{expected_repo}/.github/workflows/release-container-image.yml@refs/heads/test/image-publication-cosign", - manifest_json.name, + "^https://github.com/slsa-framework/slsa-github-generator/.github/workflows/generator_container_slsa3.yml@refs/tags/v[0-9]+.[0-9]+.[0-9]+$", + image_name, ] result = subprocess.run(cmd, capture_output=True) diff --git a/dangerzone/updater/cli.py b/dangerzone/updater/cli.py index 76e466a..f4ec2ac 100644 --- a/dangerzone/updater/cli.py +++ b/dangerzone/updater/cli.py @@ -8,6 +8,7 @@ from ..util import get_resource_path from . import attestations, errors, log, registry, signatures DEFAULT_REPOSITORY = "freedomofpress/dangerzone" +DEFAULT_BRANCH = "main" DEFAULT_IMAGE_NAME = "ghcr.io/freedomofpress/dangerzone/dangerzone" PUBKEY_DEFAULT_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key") @@ -103,29 +104,52 @@ def get_manifest(image: str) -> None: @main.command() -@click.argument("image") +@click.argument("image_name") +# XXX: Do we really want to check against this? +@click.option( + "--branch", + default=DEFAULT_BRANCH, + help="The Git branch that the image was built from", +) +@click.option( + "--commit", + required=True, + help="The Git commit the image was built from", +) @click.option( "--repository", default=DEFAULT_REPOSITORY, help="The github repository to check the attestation for", ) -def attest_provenance(image: str, repository: str) -> None: +@click.option( + "--workflow", + default=".github/workflows/multi_arch_build.yml", + help="The path of the GitHub actions workflow this image was created from", +) +def attest_provenance( + image_name: str, + branch: str, + commit: str, + repository: str, + workflow: str, +) -> None: """ Look up the image attestation to see if the image has been built on Github runners, and from a given repository. """ - # XXX put this inside a module - # if shutil.which("cosign") is None: - # click.echo("The cosign binary is needed but not installed.") - # raise click.Abort() - parsed = registry.parse_image_location(image) - manifest, bundle = registry.get_attestation(image) + # TODO: Parse image and make sure it has a tag. Might even check for a digest. + # parsed = registry.parse_image_location(image) - verified = attestations.verify(manifest, bundle, parsed.tag, repository) + verified = attestations.verify(image_name, branch, commit, repository, workflow) if verified: click.echo( - f"🎉 The image available at `{parsed.full_name}` has been built by Github Runners from the `{repository}` repository" + f"🎉 Successfully verified image '{image_name}' and its associated claims:" ) + click.echo(f"- ✅ SLSA Level 3 provenance") + click.echo(f"- ✅ GitHub repo: {repository}") + click.echo(f"- ✅ GitHub actions workflow: {workflow}") + click.echo(f"- ✅ Git branch: {branch}") + click.echo(f"- ✅ Git commit: {commit}") if __name__ == "__main__": diff --git a/dangerzone/updater/registry.py b/dangerzone/updater/registry.py index a0285c1..3b81ae9 100644 --- a/dangerzone/updater/registry.py +++ b/dangerzone/updater/registry.py @@ -11,14 +11,11 @@ __all__ = [ "get_manifest_hash", "list_tags", "get_manifest", - "get_attestation", "parse_image_location", ] SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json" -DOCKER_MANIFEST_DISTRIBUTION = "application/vnd.docker.distribution.manifest.v2+json" -DOCKER_MANIFEST_INDEX = "application/vnd.oci.image.index.v1+json" -OCI_IMAGE_MANIFEST = "application/vnd.oci.image.manifest.v1+json" +ACCEPT_MANIFESTS_HEADER="application/vnd.docker.distribution.manifest.v1+json,application/vnd.docker.distribution.manifest.v1+prettyjws,application/vnd.docker.distribution.manifest.v2+json,application/vnd.oci.image.manifest.v1+json,application/vnd.docker.distribution.manifest.list.v2+json,application/vnd.oci.image.index.v1+json" class Image(namedtuple("Image", ["registry", "namespace", "image_name", "tag"])): @@ -92,16 +89,14 @@ class RegistryClient: return tags def get_manifest( - self, tag: str, extra_headers: Optional[dict] = None + self, tag: str, ) -> requests.Response: """Get manifest information for a specific tag""" manifest_url = f"{self._image_url}/manifests/{tag}" headers = { - "Accept": DOCKER_MANIFEST_DISTRIBUTION, + "Accept": ACCEPT_MANIFESTS_HEADER, "Authorization": f"Bearer {self.get_auth_token()}", } - if extra_headers: - headers.update(extra_headers) response = requests.get(manifest_url, headers=headers) response.raise_for_status() @@ -111,9 +106,6 @@ class RegistryClient: return ( self.get_manifest( tag, - { - "Accept": DOCKER_MANIFEST_INDEX, - }, ) .json() .get("manifests") @@ -138,65 +130,6 @@ class RegistryClient: return hashlib.sha256(tag_manifest_content).hexdigest() - def get_attestation(self, tag: str) -> Tuple[bytes, bytes]: - """ - Retrieve an attestation from a given tag. - - The attestation needs to be attached using the Cosign Bundle - Specification defined at: - - https://github.com/sigstore/cosign/blob/main/specs/BUNDLE_SPEC.md - - Returns a tuple with the tag manifest content and the bundle content. - """ - - # FIXME: do not only rely on the first layer - def _find_sigstore_bundle_manifest( - manifests: list, - ) -> Tuple[Optional[str], Optional[str]]: - for manifest in manifests: - if manifest["artifactType"] == SIGSTORE_BUNDLE: - return manifest["mediaType"], manifest["digest"] - return None, None - - def _get_bundle_blob_digest(layers: list) -> Optional[str]: - for layer in layers: - if layer.get("mediaType") == SIGSTORE_BUNDLE: - return layer["digest"] - return None - - tag_manifest_content = self.get_manifest(tag).content - - # The attestation is available on the same container registry, with a - # specific tag named "sha256-{sha256(manifest)}" - tag_manifest_hash = self.get_manifest_hash(tag, tag_manifest_content) - - # This will get us a "list" of manifests... - manifests = self.list_manifests(f"sha256-{tag_manifest_hash}") - - # ... from which we want the sigstore bundle - bundle_manifest_mediatype, bundle_manifest_digest = ( - _find_sigstore_bundle_manifest(manifests) - ) - if not bundle_manifest_digest: - raise errors.RegistryError("Not able to find sigstore bundle manifest info") - - bundle_manifest = self.get_manifest( - bundle_manifest_digest, extra_headers={"Accept": bundle_manifest_mediatype} - ).json() - - # From there, we will get the attestation in a blob. - # It will be the first layer listed at this manifest hash location - layers = bundle_manifest.get("layers", []) - - blob_digest = _get_bundle_blob_digest(layers) - log.info(f"Found sigstore bundle blob digest: {blob_digest}") - if not blob_digest: - raise errors.RegistryError("Not able to find sigstore bundle blob info") - bundle = self.get_blob(blob_digest) - return tag_manifest_content, bundle.content - - def get_manifest_hash(image_str: str) -> str: image = parse_image_location(image_str) return RegistryClient(image).get_manifest_hash(image.tag) @@ -209,10 +142,5 @@ def list_tags(image_str: str) -> list: def get_manifest(image_str: str) -> bytes: image = parse_image_location(image_str) client = RegistryClient(image) - resp = client.get_manifest(image.tag, extra_headers={"Accept": OCI_IMAGE_MANIFEST}) + resp = client.get_manifest(image.tag) return resp.content - - -def get_attestation(image_str: str) -> Tuple[bytes, bytes]: - image = parse_image_location(image_str) - return RegistryClient(image).get_attestation(image.tag) diff --git a/docs/developer/independent-container-updates.md b/docs/developer/independent-container-updates.md index 51c40df..d644f64 100644 --- a/docs/developer/independent-container-updates.md +++ b/docs/developer/independent-container-updates.md @@ -19,7 +19,14 @@ dangerzone-image attest-provenance ghcr.io/freedomofpress/dangerzone/dangerzone In case of sucess, it will report back: ``` -🎉 The image available at `ghcr.io/freedomofpress/dangerzone/dangerzone:latest` has been built by Github runners from the `freedomofpress/dangerzone` repository. +🎉 Successfully verified image +'ghcr.io/apyrgio/dangerzone/dangerzone:20250129-0.8.0-149-gbf2f5ac@sha256:4da441235e84e93518778827a5c5745d532d7a4079886e1647924bee7ef1c14d' +and its associated claims: +- ✅ SLSA Level 3 provenance +- ✅ GitHub repo: apyrgio/dangerzone +- ✅ GitHub actions workflow: .github/workflows/multi_arch_build.yml +- ✅ Git branch: test/multi-arch +- ✅ Git commit: bf2f5accc24bd15a4f5c869a7f0b03b8fe48dfb6 ``` ## Install updates @@ -27,7 +34,7 @@ In case of sucess, it will report back: To check if a new container image has been released, and update your local installation with it, you can use the following commands: ```bash -./dev_scripts/dangerzone-image --debug upgrade ghcr.io/almet/dangerzone/dangerzone +dangerzone-image upgrade ghcr.io/almet/dangerzone/dangerzone ``` ## Verify local @@ -53,4 +60,3 @@ On the airgapped machine, copy the file and run the following command: ```bash dangerzone-image load-archive dz-fa94872.tar ``` -