Compare commits

..

No commits in common. "f7069a9c163464513c81835f526026890d5b8c1b" and "cbd4795bf612dbb14f20f435ecf947cc93c3c30e" have entirely different histories.

12 changed files with 523 additions and 683 deletions

View file

@ -15,9 +15,11 @@ log = logging.getLogger(__name__)
def get_runtime_name() -> str:
if platform.system() == "Linux":
return "podman"
runtime_name = "podman"
else:
# Windows, Darwin, and unknown use docker for now, dangerzone-vm eventually
return "docker"
runtime_name = "docker"
return runtime_name
def get_runtime_version() -> Tuple[int, int]:
@ -145,18 +147,3 @@ def load_image_tarball() -> None:
)
log.info("Successfully installed container image from")
def container_pull(image: str) -> bool:
"""Pull a container image from a registry."""
cmd = [get_runtime_name(), "pull", f"{image}"]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
process.communicate()
return process.returncode == 0
def load_image_hash(image: str) -> str:
"""Returns a image hash from a local image name"""
cmd = [get_runtime_name(), "image", "inspect", image, "-f", "{{.Digest}}"]
result = subprocess.run(cmd, capture_output=True, check=True)
return result.stdout.strip().decode().strip("sha256:")

View file

@ -1,3 +0,0 @@
import logging
log = logging.getLogger(__name__)

View file

@ -1,44 +0,0 @@
import subprocess
from tempfile import NamedTemporaryFile
from . import utils
def verify_attestation(
manifest: bytes, attestation_bundle: bytes, image_tag: str, expected_repo: str
) -> bool:
"""
Look up the image attestation to see if the image has been built
on Github runners, and from a given repository.
"""
utils.ensure_cosign()
# Put the value in files and verify with cosign
with (
NamedTemporaryFile(mode="wb") as manifest_json,
NamedTemporaryFile(mode="wb") as attestation_bundle_json,
):
manifest_json.write(manifest)
manifest_json.flush()
attestation_bundle_json.write(attestation_bundle)
attestation_bundle_json.flush()
# Call cosign with the temporary file paths
cmd = [
"cosign",
"verify-blob-attestation",
"--bundle",
attestation_bundle_json.name,
"--new-bundle-format",
"--certificate-oidc-issuer",
"https://token.actions.githubusercontent.com",
"--certificate-identity-regexp",
f"^https://github.com/{expected_repo}/.github/workflows/release-container-image.yml@refs/heads/test/image-publication-cosign",
manifest_json.name,
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
error = result.stderr.decode()
raise Exception(f"Attestation cannot be verified. {error}")
return True

View file

@ -1,104 +0,0 @@
#!/usr/bin/python
import logging
import click
from ..util import get_resource_path
from . import errors, log, registry
from .attestations import verify_attestation
from .signatures import upgrade_container_image, verify_offline_image_signature
DEFAULT_REPOSITORY = "freedomofpress/dangerzone"
PUBKEY_DEFAULT_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key")
@click.group()
@click.option("--debug", is_flag=True)
def main(debug=False) -> None:
if debug:
click.echo("Debug mode enabled")
level = logging.DEBUG
else:
level = logging.INFO
logging.basicConfig(level=level)
@main.command()
@click.option("--image")
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
@click.option("--airgap", is_flag=True)
# XXX Add options to do airgap upgrade
def upgrade(image: str, pubkey: str, airgap: bool) -> None:
"""Upgrade the image to the latest signed version."""
manifest_hash = registry.get_manifest_hash(image)
try:
is_upgraded = upgrade_container_image(image, manifest_hash, pubkey)
click.echo(f"✅ The local image {image} has been upgraded")
except errors.ImageAlreadyUpToDate as e:
click.echo(f"{e}")
raise click.Abort()
@main.command()
@click.argument("image")
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
def verify_offline(image: str, pubkey: str) -> None:
"""
Verify the local image signature against a public key and the stored signatures.
"""
# XXX remove a potentiel :tag
if verify_offline_image_signature(image, pubkey):
click.echo(
(
f"Verifying the local image:\n\n"
f"pubkey: {pubkey}\n"
f"image: {image}\n\n"
f"✅ The local image {image} has been signed with {pubkey}"
)
)
@main.command()
@click.argument("image")
def list_remote_tags(image: str) -> None:
click.echo(f"Existing tags for {image}")
for tag in registry.list_tags(image):
click.echo(tag)
@main.command()
@click.argument("image")
def get_manifest(image: str) -> None:
click.echo(registry.get_manifest(image))
@main.command()
@click.argument("image")
@click.option(
"--repository",
default=DEFAULT_REPOSITORY,
help="The github repository to check the attestation for",
)
def attest_provenance(image: str, repository: str) -> None:
"""
Look up the image attestation to see if the image has been built
on Github runners, and from a given repository.
"""
# XXX put this inside a module
# if shutil.which("cosign") is None:
# click.echo("The cosign binary is needed but not installed.")
# raise click.Abort()
parsed = registry.parse_image_location(image)
manifest, bundle = registry.get_attestation(image)
verified = verify_attestation(manifest, bundle, parsed.tag, repository)
if verified:
click.echo(
f"🎉 The image available at `{parsed.full_name}` has been built by Github Runners from the `{repository}` repository"
)
if __name__ == "__main__":
main()

View file

@ -1,42 +0,0 @@
class UpdaterError(Exception):
pass
class ImageAlreadyUpToDate(UpdaterError):
pass
class SignatureError(UpdaterError):
pass
class RegistryError(UpdaterError):
pass
class NoRemoteSignatures(SignatureError):
pass
class SignatureVerificationError(SignatureError):
pass
class SignaturesFolderDoesNotExist(SignatureError):
pass
class InvalidSignatures(SignatureError):
pass
class SignatureMismatch(SignatureError):
pass
class LocalSignatureNotFound(SignatureError):
pass
class CosignNotInstalledError(SignatureError):
pass

View file

@ -1,219 +0,0 @@
import hashlib
import re
from collections import namedtuple
from typing import Dict, Optional, Tuple
import requests
from . import errors, log
__all__ = [
"get_manifest_hash",
"list_tags",
"get_manifest",
"get_attestation",
"Image",
"parse_image_location",
]
SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json"
DOCKER_MANIFEST_DISTRIBUTION = "application/vnd.docker.distribution.manifest.v2+json"
DOCKER_MANIFEST_INDEX = "application/vnd.oci.image.index.v1+json"
OCI_IMAGE_MANIFEST = "application/vnd.oci.image.manifest.v1+json"
class Image(namedtuple("Image", ["registry", "namespace", "image_name", "tag"])):
__slots__ = ()
@property
def full_name(self) -> str:
tag = f":{self.tag}" if self.tag else ""
return f"{self.registry}/{self.namespace}/{self.image_name}{tag}"
def parse_image_location(input_string: str) -> Image:
"""Parses container image location into an Image namedtuple"""
pattern = (
r"^"
r"(?P<registry>[a-zA-Z0-9.-]+)/"
r"(?P<namespace>[a-zA-Z0-9-]+)/"
r"(?P<image_name>[^:]+)"
r"(?::(?P<tag>[a-zA-Z0-9.-]+))?"
r"$"
)
match = re.match(pattern, input_string)
if not match:
raise ValueError("Malformed image location")
return Image(
registry=match.group("registry"),
namespace=match.group("namespace"),
image_name=match.group("image_name"),
tag=match.group("tag") or "latest",
)
class RegistryClient:
def __init__(
self,
image: Image | str,
):
if isinstance(image, str):
image = parse_image_location(image)
self._image = image
self._registry = image.registry
self._namespace = image.namespace
self._image_name = image.image_name
self._auth_token = None
self._base_url = f"https://{self._registry}"
self._image_url = f"{self._base_url}/v2/{self._namespace}/{self._image_name}"
def get_auth_token(self) -> Optional[str]:
if not self._auth_token:
auth_url = f"{self._base_url}/token"
response = requests.get(
auth_url,
params={
"service": f"{self._registry}",
"scope": f"repository:{self._namespace}/{self._image_name}:pull",
},
)
response.raise_for_status()
self._auth_token = response.json()["token"]
return self._auth_token
def get_auth_header(self) -> Dict[str, str]:
return {"Authorization": f"Bearer {self.get_auth_token()}"}
def list_tags(self) -> list:
url = f"{self._image_url}/tags/list"
response = requests.get(url, headers=self.get_auth_header())
response.raise_for_status()
tags = response.json().get("tags", [])
return tags
def get_manifest(
self, tag: str, extra_headers: Optional[dict] = None
) -> requests.Response:
"""Get manifest information for a specific tag"""
manifest_url = f"{self._image_url}/manifests/{tag}"
headers = {
"Accept": DOCKER_MANIFEST_DISTRIBUTION,
"Authorization": f"Bearer {self.get_auth_token()}",
}
if extra_headers:
headers.update(extra_headers)
response = requests.get(manifest_url, headers=headers)
response.raise_for_status()
return response
def list_manifests(self, tag: str) -> list:
return (
self.get_manifest(
tag,
{
"Accept": DOCKER_MANIFEST_INDEX,
},
)
.json()
.get("manifests")
)
def get_blob(self, hash: str) -> requests.Response:
url = f"{self._image_url}/blobs/{hash}"
response = requests.get(
url,
headers={
"Authorization": f"Bearer {self.get_auth_token()}",
},
)
response.raise_for_status()
return response
def get_manifest_hash(
self, tag: str, tag_manifest_content: Optional[bytes] = None
) -> str:
if not tag_manifest_content:
tag_manifest_content = self.get_manifest(tag).content
return hashlib.sha256(tag_manifest_content).hexdigest()
def get_attestation(self, tag: str) -> Tuple[bytes, bytes]:
"""
Retrieve an attestation from a given tag.
The attestation needs to be attached using the Cosign Bundle
Specification defined at:
https://github.com/sigstore/cosign/blob/main/specs/BUNDLE_SPEC.md
Returns a tuple with the tag manifest content and the bundle content.
"""
# FIXME: do not only rely on the first layer
def _find_sigstore_bundle_manifest(
manifests: list,
) -> Tuple[Optional[str], Optional[str]]:
for manifest in manifests:
if manifest["artifactType"] == SIGSTORE_BUNDLE:
return manifest["mediaType"], manifest["digest"]
return None, None
def _get_bundle_blob_digest(layers: list) -> Optional[str]:
for layer in layers:
if layer.get("mediaType") == SIGSTORE_BUNDLE:
return layer["digest"]
return None
tag_manifest_content = self.get_manifest(tag).content
# The attestation is available on the same container registry, with a
# specific tag named "sha256-{sha256(manifest)}"
tag_manifest_hash = self.get_manifest_hash(tag, tag_manifest_content)
# This will get us a "list" of manifests...
manifests = self.list_manifests(f"sha256-{tag_manifest_hash}")
# ... from which we want the sigstore bundle
bundle_manifest_mediatype, bundle_manifest_digest = (
_find_sigstore_bundle_manifest(manifests)
)
if not bundle_manifest_digest:
raise errors.RegistryError("Not able to find sigstore bundle manifest info")
bundle_manifest = self.get_manifest(
bundle_manifest_digest, extra_headers={"Accept": bundle_manifest_mediatype}
).json()
# From there, we will get the attestation in a blob.
# It will be the first layer listed at this manifest hash location
layers = bundle_manifest.get("layers", [])
blob_digest = _get_bundle_blob_digest(layers)
log.info(f"Found sigstore bundle blob digest: {blob_digest}")
if not blob_digest:
raise errors.RegistryError("Not able to find sigstore bundle blob info")
bundle = self.get_blob(blob_digest)
return tag_manifest_content, bundle.content
def get_manifest_hash(image_str: str) -> str:
image = parse_image_location(image_str)
return RegistryClient(image).get_manifest_hash(image.tag)
def list_tags(image_str: str) -> list:
return RegistryClient(image_str).list_tags()
def get_manifest(image_str: str) -> bytes:
image = parse_image_location(image_str)
client = RegistryClient(image)
resp = client.get_manifest(image.tag, extra_headers={"Accept": OCI_IMAGE_MANIFEST})
return resp.content
def get_attestation(image_str: str) -> Tuple[bytes, bytes]:
image = parse_image_location(image_str)
return RegistryClient(image).get_attestation(image.tag)

View file

@ -1,228 +0,0 @@
import json
import platform
import re
import subprocess
from base64 import b64decode
from hashlib import sha256
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Dict, List, Tuple
from ..container_utils import container_pull, load_image_hash
from . import errors, log, utils
from .registry import get_manifest_hash
try:
import platformdirs
except ImportError:
import appdirs as platformdirs # type: ignore[no-redef]
def get_config_dir() -> Path:
return Path(platformdirs.user_config_dir("dangerzone"))
# XXX Store this somewhere else.
SIGNATURES_PATH = get_config_dir() / "signatures"
__all__ = [
"verify_signature",
"load_signatures",
"store_signatures",
"verify_offline_image_signature",
]
def signature_to_bundle(sig: Dict) -> Dict:
"""Convert a cosign-download signature to the format expected by cosign bundle."""
bundle = sig["Bundle"]
payload = bundle["Payload"]
return {
"base64Signature": sig["Base64Signature"],
"Payload": sig["Payload"],
"cert": sig["Cert"],
"chain": sig["Chain"],
"rekorBundle": {
"SignedEntryTimestamp": bundle["SignedEntryTimestamp"],
"Payload": {
"body": payload["body"],
"integratedTime": payload["integratedTime"],
"logIndex": payload["logIndex"],
"logID": payload["logID"],
},
},
"RFC3161Timestamp": sig["RFC3161Timestamp"],
}
def verify_signature(signature: dict, pubkey: str) -> bool:
"""Verify a signature against a given public key"""
utils.ensure_cosign()
signature_bundle = signature_to_bundle(signature)
with (
NamedTemporaryFile(mode="w") as signature_file,
NamedTemporaryFile(mode="bw") as payload_file,
):
json.dump(signature_bundle, signature_file)
signature_file.flush()
payload_bytes = b64decode(signature_bundle["Payload"])
payload_file.write(payload_bytes)
payload_file.flush()
cmd = [
"cosign",
"verify-blob",
"--key",
pubkey,
"--bundle",
signature_file.name,
payload_file.name,
]
log.debug(" ".join(cmd))
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
# XXX Raise instead?
log.debug("Failed to verify signature", result.stderr)
return False
if result.stderr == b"Verified OK\n":
log.debug("Signature verified")
return True
return False
def new_image_release(image) -> bool:
remote_hash = get_manifest_hash(image)
local_hash = load_image_hash(image)
log.debug("Remote hash: %s", remote_hash)
log.debug("Local hash: %s", local_hash)
return remote_hash != local_hash
def upgrade_container_image(
image: str,
manifest_hash: str,
pubkey: str,
) -> bool:
if not new_image_release(image):
raise errors.ImageAlreadyUpToDate("The image is already up to date")
return False
signatures = get_signatures(image, manifest_hash)
log.debug("Signatures: %s", signatures)
if len(signatures) < 1:
raise errors.NoRemoteSignatures("No remote signatures found")
for signature in signatures:
signature_is_valid = verify_signature(signature, pubkey)
if not signature_is_valid:
raise errors.SignatureVerificationError()
# At this point, the signatures are verified
# We store the signatures just now to avoid storing unverified signatures
store_signatures(signatures, manifest_hash, pubkey)
# let's upgrade the image
# XXX Use the hash here to avoid race conditions
return container_pull(image)
def get_file_hash(file: str) -> str:
with open(file, "rb") as f:
content = f.read()
return sha256(content).hexdigest()
def load_signatures(image_hash: str, pubkey: str) -> List[Dict]:
"""
Load signatures from the local filesystem
See store_signatures() for the expected format.
"""
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
if not pubkey_signatures.exists():
msg = (
f"Cannot find a '{pubkey_signatures}' folder."
"You might need to download the image signatures first."
)
raise errors.SignaturesFolderDoesNotExist(msg)
with open(pubkey_signatures / f"{image_hash}.json") as f:
log.debug("Loading signatures from %s", f.name)
return json.load(f)
def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str) -> None:
"""
Store signatures locally in the SIGNATURE_PATH folder, like this:
~/.config/dangerzone/signatures/
<pubkey-hash>
<image-hash>.json
<image-hash>.json
The format used in the `.json` file is the one of `cosign download
signature`, which differs from the "bundle" one used afterwards.
It can be converted to the one expected by cosign verify --bundle with
the `signature_to_bundle()` function.
"""
def _get_digest(sig: Dict) -> str:
payload = json.loads(b64decode(sig["Payload"]))
return payload["critical"]["image"]["docker-manifest-digest"]
# All the signatures should share the same hash.
hashes = list(map(_get_digest, signatures))
if len(set(hashes)) != 1:
raise errors.InvalidSignatures("Signatures do not share the same image hash")
if f"sha256:{image_hash}" != hashes[0]:
raise errors.SignatureMismatch("Signatures do not match the given image hash")
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
pubkey_signatures.mkdir(exist_ok=True)
with open(pubkey_signatures / f"{image_hash}.json", "w") as f:
log.debug(
f"Storing signatures for {image_hash} in {pubkey_signatures}/{image_hash}.json"
)
json.dump(signatures, f)
def verify_offline_image_signature(image: str, pubkey: str) -> bool:
"""
Verifies that a local image has a valid signature
"""
log.info(f"Verifying local image {image} against pubkey {pubkey}")
image_hash = load_image_hash(image)
log.debug(f"Image hash: {image_hash}")
signatures = load_signatures(image_hash, pubkey)
if len(signatures) < 1:
raise errors.LocalSignatureNotFound("No signatures found")
for signature in signatures:
if not verify_signature(signature, pubkey):
msg = f"Unable to verify signature for {image} with pubkey {pubkey}"
raise errors.SignatureVerificationError(msg)
return True
def get_signatures(image: str, hash: str) -> List[Dict]:
"""
Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format.
"""
utils.ensure_cosign()
process = subprocess.run(
["cosign", "download", "signature", f"{image}@sha256:{hash}"],
capture_output=True,
check=True,
)
# XXX: Check the output first.
# Remove the last return, split on newlines, convert from JSON
signatures_raw = process.stdout.decode("utf-8").strip().split("\n")
return list(map(json.loads, signatures_raw))

View file

@ -1,10 +0,0 @@
import subprocess
from . import errors
def ensure_cosign() -> None:
try:
subprocess.run(["cosign", "version"], capture_output=True, check=True)
except subprocess.CalledProcessError:
raise errors.CosignNotInstalledError()

View file

@ -8,7 +8,7 @@ import unicodedata
try:
import platformdirs
except ImportError:
import appdirs as platformdirs # type: ignore[no-redef]
import appdirs as platformdirs
def get_config_dir() -> str:

View file

@ -1,13 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
# Load dangerzone module and resources from the source code tree
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.dangerzone_dev = True
from dangerzone.updater import cli
cli.main()

517
dev_scripts/registry.py Executable file
View file

@ -0,0 +1,517 @@
#!/usr/bin/python
import hashlib
import json
import platform
import re
import shutil
import subprocess
from base64 import b64decode
from pathlib import Path
from tempfile import NamedTemporaryFile
import click
import requests
try:
import platformdirs
except ImportError:
import appdirs as platformdirs
def get_config_dir() -> str:
return Path(platformdirs.user_config_dir("dangerzone"))
SIGNATURES_PATH = get_config_dir() / "signatures"
DEFAULT_REPO = "freedomofpress/dangerzone"
SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json"
DOCKER_MANIFEST_DISTRIBUTION = "application/vnd.docker.distribution.manifest.v2+json"
DOCKER_MANIFEST_INDEX = "application/vnd.oci.image.index.v1+json"
OCI_IMAGE_MANIFEST = "application/vnd.oci.image.manifest.v1+json"
class RegistryClient:
def __init__(self, registry, org, image):
self._registry = registry
self._org = org
self._image = image
self._auth_token = None
self._base_url = f"https://{registry}"
self._image_url = f"{self._base_url}/v2/{self._org}/{self._image}"
@property
def image(self):
return f"{self._registry}/{self._org}/{self._image}"
def get_auth_token(self):
if not self._auth_token:
auth_url = f"{self._base_url}/token"
response = requests.get(
auth_url,
params={
"service": f"{self._registry}",
"scope": f"repository:{self._org}/{self._image}:pull",
},
)
response.raise_for_status()
self._auth_token = response.json()["token"]
return self._auth_token
def get_auth_header(self):
return {"Authorization": f"Bearer {self.get_auth_token()}"}
def list_tags(self):
url = f"{self._image_url}/tags/list"
response = requests.get(url, headers=self.get_auth_header())
response.raise_for_status()
tags = response.json().get("tags", [])
return tags
def get_manifest(self, tag, extra_headers=None):
"""Get manifest information for a specific tag"""
manifest_url = f"{self._image_url}/manifests/{tag}"
headers = {
"Accept": DOCKER_MANIFEST_DISTRIBUTION,
"Authorization": f"Bearer {self.get_auth_token()}",
}
if extra_headers:
headers.update(extra_headers)
response = requests.get(manifest_url, headers=headers)
response.raise_for_status()
return response
def list_manifests(self, tag):
return (
self.get_manifest(
tag,
{
"Accept": DOCKER_MANIFEST_INDEX,
},
)
.json()
.get("manifests")
)
def get_blob(self, hash):
url = f"{self._image_url}/blobs/{hash}"
response = requests.get(
url,
headers={
"Authorization": f"Bearer {self.get_auth_token()}",
},
)
response.raise_for_status()
return response
def get_manifest_hash(self, tag, tag_manifest_content=None):
if not tag_manifest_content:
tag_manifest_content = self.get_manifest(tag).content
return hashlib.sha256(tag_manifest_content).hexdigest()
def get_attestation(self, tag):
"""
Retrieve an attestation from a given tag.
The attestation needs to be attached using the Cosign Bundle
Specification defined at:
https://github.com/sigstore/cosign/blob/main/specs/BUNDLE_SPEC.md
"""
def _find_sigstore_bundle_manifest(manifests):
for manifest in manifests:
if manifest["artifactType"] == SIGSTORE_BUNDLE:
return manifest["mediaType"], manifest["digest"]
def _get_bundle_blob_digest(layers):
for layer in layers:
if layer.get("mediaType") == SIGSTORE_BUNDLE:
return layer["digest"]
tag_manifest_content = self.get_manifest(tag).content
# The attestation is available on the same container registry, with a
# specific tag named "sha256-{sha256(manifest)}"
tag_manifest_hash = self.get_manifest_hash(tag, tag_manifest_content)
# This will get us a "list" of manifests...
manifests = self.list_manifests(f"sha256-{tag_manifest_hash}")
# ... from which we want the sigstore bundle
bundle_manifest_mediatype, bundle_manifest_digest = (
_find_sigstore_bundle_manifest(manifests)
)
if not bundle_manifest_digest:
raise Error("Not able to find sigstore bundle manifest info")
bundle_manifest = self.get_manifest(
bundle_manifest_digest, extra_headers={"Accept": bundle_manifest_mediatype}
).json()
# From there, we will get the attestation in a blob.
# It will be the first layer listed at this manifest hash location
layers = bundle_manifest.get("layers", [])
blob_digest = _get_bundle_blob_digest(layers)
bundle = self.get_blob(blob_digest)
return tag_manifest_content, bundle.content
def _write(file, content):
file.write(content)
file.flush()
def verify_attestation(
registry_client: RegistryClient, image_tag: str, expected_repo: str
):
"""
Look up the image attestation to see if the image has been built
on Github runners, and from a given repository.
"""
manifest, bundle = registry_client.get_attestation(image_tag)
# Put the value in files and verify with cosign
with (
NamedTemporaryFile(mode="wb") as manifest_json,
NamedTemporaryFile(mode="wb") as bundle_json,
):
_write(manifest_json, manifest)
_write(bundle_json, bundle)
# Call cosign with the temporary file paths
cmd = [
"cosign",
"verify-blob-attestation",
"--bundle",
bundle_json.name,
"--new-bundle-format",
"--certificate-oidc-issuer",
"https://token.actions.githubusercontent.com",
"--certificate-identity-regexp",
f"^https://github.com/{expected_repo}/.github/workflows/release-container-image.yml@refs/heads/test/image-publication-cosign",
manifest_json.name,
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
raise Exception(f"Attestation cannot be verified. {result.stderr}")
return True
def new_image_release():
# XXX - Implement
return True
def signature_to_bundle(sig):
# Convert cosign-download signatures to the format expected by cosign bundle.
bundle = sig["Bundle"]
payload = bundle["Payload"]
return {
"base64Signature": sig["Base64Signature"],
"Payload": sig["Payload"],
"cert": sig["Cert"],
"chain": sig["Chain"],
"rekorBundle": {
"SignedEntryTimestamp": bundle["SignedEntryTimestamp"],
"Payload": {
"body": payload["body"],
"integratedTime": payload["integratedTime"],
"logIndex": payload["logIndex"],
"logID": payload["logID"],
},
},
"RFC3161Timestamp": sig["RFC3161Timestamp"],
}
def verify_signature(signature, pubkey):
"""Verify a signature against a given public key"""
signature_bundle = signature_to_bundle(signature)
# Put the value in files and verify with cosign
with (
NamedTemporaryFile(mode="w") as signature_file,
NamedTemporaryFile(mode="bw") as payload_file,
):
json.dump(signature_bundle, signature_file)
signature_file.flush()
payload_bytes = b64decode(signature_bundle["Payload"])
_write(payload_file, payload_bytes)
cmd = [
"cosign",
"verify-blob",
"--key",
pubkey,
"--bundle",
signature_file.name,
payload_file.name,
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
# XXX Raise instead?
return False
return result.stderr == b"Verified OK\n"
def get_runtime_name() -> str:
if platform.system() == "Linux":
return "podman"
return "docker"
def container_pull(image):
cmd = [get_runtime_name(), "pull", f"{image}"]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
process.communicate()
def upgrade_container_image(image, tag, pubkey, registry: RegistryClient):
if not new_image_release():
return
hash = registry.get_manifest_hash(tag)
signatures = get_signatures(image, hash)
if len(signatures) < 1:
raise Exception("Unable to retrieve signatures")
print(f"Found {len(signatures)} signature(s) for {image}")
for signature in signatures:
signature_is_valid = verify_signature(signature, pubkey)
if not signature_is_valid:
raise Exception("Unable to verify signature")
print("✅ Signature is valid")
# At this point, the signatures are verified
# We store the signatures just now to avoid storing unverified signatures
store_signatures(signatures, hash, pubkey)
# let's upgrade the image
# XXX Use the hash here to avoid race conditions
container_pull(image)
def get_file_hash(file):
with open(file, "rb") as f:
content = f.read()
return hashlib.sha256(content).hexdigest()
def load_signatures(image_hash, pubkey):
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
if not pubkey_signatures.exists():
msg = (
f"Cannot find a '{pubkey_signatures}' folder."
"You might need to download the image signatures first."
)
raise Exception(msg)
with open(pubkey_signatures / f"{image_hash}.json") as f:
return json.load(f)
def store_signatures(signatures, image_hash, pubkey):
"""
Store signatures locally in the SIGNATURE_PATH folder, like this:
~/.config/dangerzone/signatures/
<pubkey-hash>
<image-hash>.json
<image-hash>.json
The format used in the `.json` file is the one of `cosign download
signature`, which differs from the "bundle" one used afterwards.
It can be converted to the one expected by cosign verify --bundle with
the `signature_to_bundle()` function.
"""
def _get_digest(sig):
payload = json.loads(b64decode(sig["Payload"]))
return payload["critical"]["image"]["docker-manifest-digest"]
# All the signatures should share the same hash.
hashes = list(map(_get_digest, signatures))
if len(set(hashes)) != 1:
raise Exception("Signatures do not share the same image hash")
if f"sha256:{image_hash}" != hashes[0]:
raise Exception("Signatures do not match the given image hash")
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
pubkey_signatures.mkdir(exist_ok=True)
with open(pubkey_signatures / f"{image_hash}.json", "w") as f:
json.dump(signatures, f)
def verify_local_image_signature(image, pubkey):
"""
Verifies that a local image has a valid signature
"""
image_hash = get_image_hash(image)
signatures = load_signatures(image_hash, pubkey)
if len(signatures) < 1:
raise Exception("No signatures found")
for signature in signatures:
if not verify_signature(signature, pubkey):
msg = f"Unable to verify signature for {image} with pubkey {pubkey}"
raise Exception(msg)
return True
def get_image_hash(image):
"""
Returns a image hash from a local image name
"""
cmd = [get_runtime_name(), "image", "inspect", image, "-f", "{{.Digest}}"]
result = subprocess.run(cmd, capture_output=True, check=True)
return result.stdout.strip().decode().strip("sha256:")
def get_signatures(image, hash):
"""
Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format.
"""
process = subprocess.run(
["cosign", "download", "signature", f"{image}@sha256:{hash}"],
capture_output=True,
check=True,
)
# XXX: Check the output first.
# Remove the last return, split on newlines, convert from JSON
signatures_raw = process.stdout.decode("utf-8").strip().split("\n")
return list(map(json.loads, signatures_raw))
class Image:
def __init__(self, registry, namespace, repository, tag="latest"):
self.registry = registry
self.namespace = namespace
self.repository = repository
self.tag = tag
def properties(self):
return (self.registry, self.namespace, self.repository, self.tag)
@property
def name_without_tag(self):
return f"{self.registry}/{self.namespace}/{self.repository}"
@property
def name_with_tag(self):
return f"{self.name_without_tag}:{self.tag}"
@classmethod
def from_string(cls, input_string):
"""Parses container image location into (registry, namespace, repository, tag)"""
pattern = (
r"^"
r"(?P<registry>[a-zA-Z0-9.-]+)/"
r"(?P<namespace>[a-zA-Z0-9-]+)/"
r"(?P<repository>[^:]+)"
r"(?::(?P<tag>[a-zA-Z0-9.-]+))?"
r"$"
)
match = re.match(pattern, input_string)
if not match:
raise ValueError("Malformed image location")
return cls(
match.group("registry"),
match.group("namespace"),
match.group("repository"),
match.group("tag") or "latest",
)
def parse_image_location(string):
return Image.from_string(string).properties
@click.group()
def main():
pass
@main.command()
@click.argument("image")
@click.option("--pubkey", default="pub.key")
def upgrade_image(image, pubkey):
registry, namespace, repository, tag = parse_image_location(image)
registry_client = RegistryClient(registry, namespace, repository)
upgrade_container_image(image, tag, pubkey, registry_client)
@main.command()
@click.argument("image")
@click.option("--pubkey", default="pub.key")
def verify_local_image(image, pubkey):
# XXX remove a potentiel :tag
if verify_local_image_signature(image, pubkey):
click.echo(f"✅ The local image {image} has been signed with {pubkey}")
@main.command()
@click.argument("image")
def list_tags(image):
registry, org, package, _ = parse_image_location(image)
client = RegistryClient(registry, org, package)
tags = client.list_tags()
click.echo(f"Existing tags for {client.image}")
for tag in tags:
click.echo(tag)
@main.command()
@click.argument("image")
@click.argument("tag")
def get_manifest(image, tag):
registry, org, package, _ = parse_image_location(image)
client = RegistryClient(registry, org, package)
resp = client.get_manifest(tag, extra_headers={"Accept": OCI_IMAGE_MANIFEST})
click.echo(resp.content)
@main.command()
@click.argument("image")
@click.option(
"--repo",
default=DEFAULT_REPO,
help="The github repository to check the attestation for",
)
def attest(image: str, repo: str):
"""
Look up the image attestation to see if the image has been built
on Github runners, and from a given repository.
"""
if shutil.which("cosign") is None:
click.echo("The cosign binary is needed but not installed.")
raise click.Abort()
registry, org, package, tag = parse_image_location(image)
tag = tag or "latest"
client = RegistryClient(registry, org, package)
verified = verify_attestation(client, tag, repo)
if verified:
click.echo(
f"🎉 The image available at `{client.image}:{tag}` has been built by Github Runners from the `{repo}` repository"
)
if __name__ == "__main__":
main()

View file

@ -34,7 +34,6 @@ shiboken6 = [
[tool.poetry.scripts]
dangerzone = 'dangerzone:main'
dangerzone-cli = 'dangerzone:main'
dangerzone-image = "dangerzone.updater.cli:main"
# Dependencies required for packaging the code on various platforms.
[tool.poetry.group.package.dependencies]