WIP: Make verify-attestation work for SLSA 3 attestations

This commit is contained in:
Alex Pyrgiotis 2025-02-04 18:35:21 +02:00
parent aedfc3b9a2
commit a77fc938fd
No known key found for this signature in database
GPG key ID: B6C15EBA0357C9AA
4 changed files with 105 additions and 102 deletions

View file

@ -4,37 +4,82 @@ from tempfile import NamedTemporaryFile
from . import cosign from . import cosign
# NOTE: You can grab the SLSA attestation for an image/tag pair with the following
# commands:
#
# IMAGE=ghcr.io/apyrgio/dangerzone/dangerzone
# TAG=20250129-0.8.0-149-gbf2f5ac
# DIGEST=$(crane digest ${IMAGE?}:${TAG?})
# ATT_MANIFEST=${IMAGE?}:${DIGEST/:/-}.att
# ATT_BLOB=${IMAGE?}@$(crane manifest ${ATT_MANIFEST?} | jq -r '.layers[0].digest')
# crane blob ${ATT_BLOB?} | jq -r '.payload' | base64 -d | jq
CUE_POLICY = r"""
// The predicateType field must match this string
predicateType: "https://slsa.dev/provenance/v0.2"
predicate: {{
// This condition verifies that the builder is the builder we
// expect and trust. The following condition can be used
// unmodified. It verifies that the builder is the container
// workflow.
builder: {{
id: =~"^https://github.com/slsa-framework/slsa-github-generator/.github/workflows/generator_container_slsa3.yml@refs/tags/v[0-9]+.[0-9]+.[0-9]+$"
}}
invocation: {{
configSource: {{
// This condition verifies the entrypoint of the workflow.
// Replace with the relative path to your workflow in your
// repository.
entryPoint: "{workflow}"
// This condition verifies that the image was generated from
// the source repository we expect. Replace this with your
// repository.
uri: =~"^git\\+https://github.com/{repo}@refs/heads/{branch}"
// Add a condition to check for a specific commit hash
digest: {{
sha1: "{commit}"
}}
}}
}}
}}
"""
def generate_cue_policy(repo, workflow, commit, branch):
return CUE_POLICY.format(repo=repo, workflow=workflow, commit=commit, branch=branch)
def verify( def verify(
manifest: bytes, attestation_bundle: bytes, image_tag: str, expected_repo: str image_name: str, branch: str, commit: str, repository: str, workflow: str,
) -> bool: ) -> bool:
""" """
Look up the image attestation to see if the image has been built Look up the image attestation to see if the image has been built
on Github runners, and from a given repository. on Github runners, and from a given repository.
""" """
cosign.ensure_installed() cosign.ensure_installed()
policy = generate_cue_policy(repository, workflow, commit, branch)
# Put the value in files and verify with cosign # Put the value in files and verify with cosign
with ( with (
NamedTemporaryFile(mode="wb") as manifest_json, NamedTemporaryFile(mode="w", suffix=".cue") as policy_f,
NamedTemporaryFile(mode="wb") as attestation_bundle_json,
): ):
manifest_json.write(manifest) policy_f.write(policy)
manifest_json.flush() policy_f.flush()
attestation_bundle_json.write(attestation_bundle)
attestation_bundle_json.flush()
# Call cosign with the temporary file paths # Call cosign with the temporary file paths
cmd = [ cmd = [
"cosign", "cosign",
"verify-blob-attestation", "verify-attestation",
"--bundle", "--type",
attestation_bundle_json.name, "slsaprovenance",
"--new-bundle-format", "--policy",
policy_f.name,
"--certificate-oidc-issuer", "--certificate-oidc-issuer",
"https://token.actions.githubusercontent.com", "https://token.actions.githubusercontent.com",
"--certificate-identity-regexp", "--certificate-identity-regexp",
f"^https://github.com/{expected_repo}/.github/workflows/release-container-image.yml@refs/heads/test/image-publication-cosign", "^https://github.com/slsa-framework/slsa-github-generator/.github/workflows/generator_container_slsa3.yml@refs/tags/v[0-9]+.[0-9]+.[0-9]+$",
manifest_json.name, image_name,
] ]
result = subprocess.run(cmd, capture_output=True) result = subprocess.run(cmd, capture_output=True)

View file

@ -8,6 +8,7 @@ from ..util import get_resource_path
from . import attestations, errors, log, registry, signatures from . import attestations, errors, log, registry, signatures
DEFAULT_REPOSITORY = "freedomofpress/dangerzone" DEFAULT_REPOSITORY = "freedomofpress/dangerzone"
DEFAULT_BRANCH = "main"
DEFAULT_IMAGE_NAME = "ghcr.io/freedomofpress/dangerzone/dangerzone" DEFAULT_IMAGE_NAME = "ghcr.io/freedomofpress/dangerzone/dangerzone"
PUBKEY_DEFAULT_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key") PUBKEY_DEFAULT_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key")
@ -103,29 +104,52 @@ def get_manifest(image: str) -> None:
@main.command() @main.command()
@click.argument("image") @click.argument("image_name")
# XXX: Do we really want to check against this?
@click.option(
"--branch",
default=DEFAULT_BRANCH,
help="The Git branch that the image was built from",
)
@click.option(
"--commit",
required=True,
help="The Git commit the image was built from",
)
@click.option( @click.option(
"--repository", "--repository",
default=DEFAULT_REPOSITORY, default=DEFAULT_REPOSITORY,
help="The github repository to check the attestation for", help="The github repository to check the attestation for",
) )
def attest_provenance(image: str, repository: str) -> None: @click.option(
"--workflow",
default=".github/workflows/multi_arch_build.yml",
help="The path of the GitHub actions workflow this image was created from",
)
def attest_provenance(
image_name: str,
branch: str,
commit: str,
repository: str,
workflow: str,
) -> None:
""" """
Look up the image attestation to see if the image has been built Look up the image attestation to see if the image has been built
on Github runners, and from a given repository. on Github runners, and from a given repository.
""" """
# XXX put this inside a module # TODO: Parse image and make sure it has a tag. Might even check for a digest.
# if shutil.which("cosign") is None: # parsed = registry.parse_image_location(image)
# click.echo("The cosign binary is needed but not installed.")
# raise click.Abort()
parsed = registry.parse_image_location(image)
manifest, bundle = registry.get_attestation(image)
verified = attestations.verify(manifest, bundle, parsed.tag, repository) verified = attestations.verify(image_name, branch, commit, repository, workflow)
if verified: if verified:
click.echo( click.echo(
f"🎉 The image available at `{parsed.full_name}` has been built by Github Runners from the `{repository}` repository" f"🎉 Successfully verified image '{image_name}' and its associated claims:"
) )
click.echo(f"- ✅ SLSA Level 3 provenance")
click.echo(f"- ✅ GitHub repo: {repository}")
click.echo(f"- ✅ GitHub actions workflow: {workflow}")
click.echo(f"- ✅ Git branch: {branch}")
click.echo(f"- ✅ Git commit: {commit}")
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -11,14 +11,11 @@ __all__ = [
"get_manifest_hash", "get_manifest_hash",
"list_tags", "list_tags",
"get_manifest", "get_manifest",
"get_attestation",
"parse_image_location", "parse_image_location",
] ]
SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json" SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json"
DOCKER_MANIFEST_DISTRIBUTION = "application/vnd.docker.distribution.manifest.v2+json" ACCEPT_MANIFESTS_HEADER="application/vnd.docker.distribution.manifest.v1+json,application/vnd.docker.distribution.manifest.v1+prettyjws,application/vnd.docker.distribution.manifest.v2+json,application/vnd.oci.image.manifest.v1+json,application/vnd.docker.distribution.manifest.list.v2+json,application/vnd.oci.image.index.v1+json"
DOCKER_MANIFEST_INDEX = "application/vnd.oci.image.index.v1+json"
OCI_IMAGE_MANIFEST = "application/vnd.oci.image.manifest.v1+json"
class Image(namedtuple("Image", ["registry", "namespace", "image_name", "tag"])): class Image(namedtuple("Image", ["registry", "namespace", "image_name", "tag"])):
@ -92,16 +89,14 @@ class RegistryClient:
return tags return tags
def get_manifest( def get_manifest(
self, tag: str, extra_headers: Optional[dict] = None self, tag: str,
) -> requests.Response: ) -> requests.Response:
"""Get manifest information for a specific tag""" """Get manifest information for a specific tag"""
manifest_url = f"{self._image_url}/manifests/{tag}" manifest_url = f"{self._image_url}/manifests/{tag}"
headers = { headers = {
"Accept": DOCKER_MANIFEST_DISTRIBUTION, "Accept": ACCEPT_MANIFESTS_HEADER,
"Authorization": f"Bearer {self.get_auth_token()}", "Authorization": f"Bearer {self.get_auth_token()}",
} }
if extra_headers:
headers.update(extra_headers)
response = requests.get(manifest_url, headers=headers) response = requests.get(manifest_url, headers=headers)
response.raise_for_status() response.raise_for_status()
@ -111,9 +106,6 @@ class RegistryClient:
return ( return (
self.get_manifest( self.get_manifest(
tag, tag,
{
"Accept": DOCKER_MANIFEST_INDEX,
},
) )
.json() .json()
.get("manifests") .get("manifests")
@ -138,65 +130,6 @@ class RegistryClient:
return hashlib.sha256(tag_manifest_content).hexdigest() return hashlib.sha256(tag_manifest_content).hexdigest()
def get_attestation(self, tag: str) -> Tuple[bytes, bytes]:
"""
Retrieve an attestation from a given tag.
The attestation needs to be attached using the Cosign Bundle
Specification defined at:
https://github.com/sigstore/cosign/blob/main/specs/BUNDLE_SPEC.md
Returns a tuple with the tag manifest content and the bundle content.
"""
# FIXME: do not only rely on the first layer
def _find_sigstore_bundle_manifest(
manifests: list,
) -> Tuple[Optional[str], Optional[str]]:
for manifest in manifests:
if manifest["artifactType"] == SIGSTORE_BUNDLE:
return manifest["mediaType"], manifest["digest"]
return None, None
def _get_bundle_blob_digest(layers: list) -> Optional[str]:
for layer in layers:
if layer.get("mediaType") == SIGSTORE_BUNDLE:
return layer["digest"]
return None
tag_manifest_content = self.get_manifest(tag).content
# The attestation is available on the same container registry, with a
# specific tag named "sha256-{sha256(manifest)}"
tag_manifest_hash = self.get_manifest_hash(tag, tag_manifest_content)
# This will get us a "list" of manifests...
manifests = self.list_manifests(f"sha256-{tag_manifest_hash}")
# ... from which we want the sigstore bundle
bundle_manifest_mediatype, bundle_manifest_digest = (
_find_sigstore_bundle_manifest(manifests)
)
if not bundle_manifest_digest:
raise errors.RegistryError("Not able to find sigstore bundle manifest info")
bundle_manifest = self.get_manifest(
bundle_manifest_digest, extra_headers={"Accept": bundle_manifest_mediatype}
).json()
# From there, we will get the attestation in a blob.
# It will be the first layer listed at this manifest hash location
layers = bundle_manifest.get("layers", [])
blob_digest = _get_bundle_blob_digest(layers)
log.info(f"Found sigstore bundle blob digest: {blob_digest}")
if not blob_digest:
raise errors.RegistryError("Not able to find sigstore bundle blob info")
bundle = self.get_blob(blob_digest)
return tag_manifest_content, bundle.content
def get_manifest_hash(image_str: str) -> str: def get_manifest_hash(image_str: str) -> str:
image = parse_image_location(image_str) image = parse_image_location(image_str)
return RegistryClient(image).get_manifest_hash(image.tag) return RegistryClient(image).get_manifest_hash(image.tag)
@ -209,10 +142,5 @@ def list_tags(image_str: str) -> list:
def get_manifest(image_str: str) -> bytes: def get_manifest(image_str: str) -> bytes:
image = parse_image_location(image_str) image = parse_image_location(image_str)
client = RegistryClient(image) client = RegistryClient(image)
resp = client.get_manifest(image.tag, extra_headers={"Accept": OCI_IMAGE_MANIFEST}) resp = client.get_manifest(image.tag)
return resp.content return resp.content
def get_attestation(image_str: str) -> Tuple[bytes, bytes]:
image = parse_image_location(image_str)
return RegistryClient(image).get_attestation(image.tag)

View file

@ -19,7 +19,14 @@ dangerzone-image attest-provenance ghcr.io/freedomofpress/dangerzone/dangerzone
In case of sucess, it will report back: In case of sucess, it will report back:
``` ```
🎉 The image available at `ghcr.io/freedomofpress/dangerzone/dangerzone:latest` has been built by Github runners from the `freedomofpress/dangerzone` repository. 🎉 Successfully verified image
'ghcr.io/apyrgio/dangerzone/dangerzone:20250129-0.8.0-149-gbf2f5ac@sha256:4da441235e84e93518778827a5c5745d532d7a4079886e1647924bee7ef1c14d'
and its associated claims:
- ✅ SLSA Level 3 provenance
- ✅ GitHub repo: apyrgio/dangerzone
- ✅ GitHub actions workflow: .github/workflows/multi_arch_build.yml
- ✅ Git branch: test/multi-arch
- ✅ Git commit: bf2f5accc24bd15a4f5c869a7f0b03b8fe48dfb6
``` ```
## Install updates ## Install updates
@ -27,7 +34,7 @@ In case of sucess, it will report back:
To check if a new container image has been released, and update your local installation with it, you can use the following commands: To check if a new container image has been released, and update your local installation with it, you can use the following commands:
```bash ```bash
./dev_scripts/dangerzone-image --debug upgrade ghcr.io/almet/dangerzone/dangerzone dangerzone-image upgrade ghcr.io/almet/dangerzone/dangerzone
``` ```
## Verify local ## Verify local
@ -53,4 +60,3 @@ On the airgapped machine, copy the file and run the following command:
```bash ```bash
dangerzone-image load-archive dz-fa94872.tar dangerzone-image load-archive dz-fa94872.tar
``` ```