From 5921289454366310b463f056be45e6e03ab6e322 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexis=20M=C3=A9taireau?= Date: Wed, 29 Jan 2025 17:01:48 +0100 Subject: [PATCH] Refactoring of dangerzone/updater/* --- dangerzone/updater/__init__.py | 4 ++ dangerzone/updater/attestations.py | 10 +-- dangerzone/updater/cli.py | 36 +++++----- dangerzone/updater/registry.py | 110 ++++++++++++++++++----------- dangerzone/updater/signatures.py | 24 +++---- dangerzone/updater/utils.py | 3 - 6 files changed, 104 insertions(+), 83 deletions(-) delete mode 100644 dangerzone/updater/utils.py diff --git a/dangerzone/updater/__init__.py b/dangerzone/updater/__init__.py index e69de29..9ae9065 100644 --- a/dangerzone/updater/__init__.py +++ b/dangerzone/updater/__init__.py @@ -0,0 +1,4 @@ +import logging + +log = logging.getLogger(__name__) +log.setLevel(logging.INFO) diff --git a/dangerzone/updater/attestations.py b/dangerzone/updater/attestations.py index 8839e1c..c7d0a87 100644 --- a/dangerzone/updater/attestations.py +++ b/dangerzone/updater/attestations.py @@ -1,12 +1,10 @@ import subprocess from tempfile import NamedTemporaryFile -from .utils import write - def verify_attestation( manifest: bytes, attestation_bundle: bytes, image_tag: str, expected_repo: str -): +) -> bool: """ Look up the image attestation to see if the image has been built on Github runners, and from a given repository. @@ -17,8 +15,10 @@ def verify_attestation( NamedTemporaryFile(mode="wb") as manifest_json, NamedTemporaryFile(mode="wb") as attestation_bundle_json, ): - write(manifest_json, manifest) - write(attestation_bundle_json, attestation_bundle) + manifest_json.write(manifest) + manifest_json.flush() + attestation_bundle_json.write(attestation_bundle) + attestation_bundle_json.flush() # Call cosign with the temporary file paths cmd = [ diff --git a/dangerzone/updater/cli.py b/dangerzone/updater/cli.py index 6d3c9fb..e88817a 100644 --- a/dangerzone/updater/cli.py +++ b/dangerzone/updater/cli.py @@ -6,19 +6,20 @@ from . import registry from .attestations import verify_attestation from .signatures import upgrade_container_image, verify_offline_image_signature -DEFAULT_REPO = "freedomofpress/dangerzone" +DEFAULT_REPOSITORY = "freedomofpress/dangerzone" @click.group() -def main(): +def main() -> None: pass @main.command() -@click.argument("image") +@click.option("--image") @click.option("--pubkey", default="pub.key") +@click.option("--airgap", is_flag=True) # XXX Add options to do airgap upgrade -def upgrade(image, pubkey): +def upgrade(image: str, pubkey: str) -> None: manifest_hash = registry.get_manifest_hash(image) if upgrade_container_image(image, manifest_hash, pubkey): click.echo(f"✅ The local image {image} has been upgraded") @@ -27,9 +28,9 @@ def upgrade(image, pubkey): @main.command() @click.argument("image") @click.option("--pubkey", default="pub.key") -def verify_local(image, pubkey): +def verify_local(image: str, pubkey: str) -> None: """ - XXX document + Verify the local image signature against a public key and the stored signatures. """ # XXX remove a potentiel :tag if verify_offline_image_signature(image, pubkey): @@ -38,28 +39,26 @@ def verify_local(image, pubkey): @main.command() @click.argument("image") -def list_tags(image): - click.echo(f"Existing tags for {client.image}") +def list_remote_tags(image: str) -> None: + click.echo(f"Existing tags for {image}") for tag in registry.list_tags(image): click.echo(tag) @main.command() @click.argument("image") -@click.argument("tag") -def get_manifest(image, tag): - click.echo(registry.get_manifest(image, tag)) +def get_manifest(image: str) -> None: + click.echo(registry.get_manifest(image)) @main.command() @click.argument("image") @click.option( - "--repo", - default=DEFAULT_REPO, + "--repository", + default=DEFAULT_REPOSITORY, help="The github repository to check the attestation for", ) -# XXX use a consistent naming for these cli commands -def attest(image: str, repo: str): +def attest_provenance(image: str, repository: str) -> None: """ Look up the image attestation to see if the image has been built on Github runners, and from a given repository. @@ -68,14 +67,13 @@ def attest(image: str, repo: str): # if shutil.which("cosign") is None: # click.echo("The cosign binary is needed but not installed.") # raise click.Abort() - # XXX: refactor parse_image_location to return a dict. - _, _, _, image_tag = registry.parse_image_location(image) + parsed = registry.parse_image_location(image) manifest, bundle = registry.get_attestation(image) - verified = verify_attestation(manifest, bundle, image_tag, repo) + verified = verify_attestation(manifest, bundle, parsed.tag, repository) if verified: click.echo( - f"🎉 The image available at `{client.image}:{image_tag}` has been built by Github Runners from the `{repo}` repository" + f"🎉 The image available at `{parsed.full_name}` has been built by Github Runners from the `{repository}` repository" ) diff --git a/dangerzone/updater/registry.py b/dangerzone/updater/registry.py index 381124f..de919ea 100644 --- a/dangerzone/updater/registry.py +++ b/dangerzone/updater/registry.py @@ -1,12 +1,17 @@ import hashlib import re +from collections import namedtuple from typing import Dict, Optional, Tuple import requests +from . import log + __all__ = [ "get_manifest_hash", "list_tags", + "get_manifest", + "get_attestation", ] SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json" @@ -15,40 +20,51 @@ DOCKER_MANIFEST_INDEX = "application/vnd.oci.image.index.v1+json" OCI_IMAGE_MANIFEST = "application/vnd.oci.image.manifest.v1+json" -def parse_image_location(input_string: str) -> Tuple[str, str, str, str]: - """Parses container image location into (registry, namespace, repository, tag)""" +class Image(namedtuple("Image", ["registry", "namespace", "image_name", "tag"])): + __slots__ = () + + @property + def full_name(self) -> str: + tag = f":{self.tag}" if self.tag else "" + return f"{self.registry}/{self.namespace}/{self.image_name}{tag}" + + +def parse_image_location(input_string: str) -> Image: + """Parses container image location into an Image namedtuple""" pattern = ( r"^" r"(?P[a-zA-Z0-9.-]+)/" r"(?P[a-zA-Z0-9-]+)/" - r"(?P[^:]+)" + r"(?P[^:]+)" r"(?::(?P[a-zA-Z0-9.-]+))?" r"$" ) match = re.match(pattern, input_string) if not match: raise ValueError("Malformed image location") - - return ( - match.group("registry"), - match.group("namespace"), - match.group("repository"), - match.group("tag") or "latest", + return Image( + registry=match.group("registry"), + namespace=match.group("namespace"), + image_name=match.group("image_name"), + tag=match.group("tag") or "latest", ) class RegistryClient: - def __init__(self, registry: str, org: str, image: str): - self._registry = registry - self._org = org - self._image = image - self._auth_token = None - self._base_url = f"https://{registry}" - self._image_url = f"{self._base_url}/v2/{self._org}/{self._image}" + def __init__( + self, + image: Image | str, + ): + if isinstance(image, str): + image = parse_image_location(image) - @property - def image(self): - return f"{self._registry}/{self._org}/{self._image}" + self._image = image + self._registry = image.registry + self._namespace = image.namespace + self._image_name = image.image_name + self._auth_token = None + self._base_url = f"https://{self._registry}" + self._image_url = f"{self._base_url}/v2/{self._namespace}/{self._image_name}" def get_auth_token(self) -> Optional[str]: if not self._auth_token: @@ -57,7 +73,7 @@ class RegistryClient: auth_url, params={ "service": f"{self._registry}", - "scope": f"repository:{self._org}/{self._image}:pull", + "scope": f"repository:{self._namespace}/{self._image_name}:pull", }, ) response.raise_for_status() @@ -74,7 +90,9 @@ class RegistryClient: tags = response.json().get("tags", []) return tags - def get_manifest(self, tag, extra_headers=None) -> requests.Response: + def get_manifest( + self, tag: str, extra_headers: Optional[dict] = None + ) -> requests.Response: """Get manifest information for a specific tag""" manifest_url = f"{self._image_url}/manifests/{tag}" headers = { @@ -88,7 +106,7 @@ class RegistryClient: response.raise_for_status() return response - def list_manifests(self, tag) -> list: + def list_manifests(self, tag: str) -> list: return ( self.get_manifest( tag, @@ -100,7 +118,7 @@ class RegistryClient: .get("manifests") ) - def get_blob(self, hash) -> requests.Response: + def get_blob(self, hash: str) -> requests.Response: url = f"{self._image_url}/blobs/{hash}" response = requests.get( url, @@ -111,13 +129,15 @@ class RegistryClient: response.raise_for_status() return response - def get_manifest_hash(self, tag, tag_manifest_content=None) -> str: + def get_manifest_hash( + self, tag: str, tag_manifest_content: Optional[bytes] = None + ) -> str: if not tag_manifest_content: tag_manifest_content = self.get_manifest(tag).content return hashlib.sha256(tag_manifest_content).hexdigest() - def get_attestation(self, tag) -> Tuple[bytes, bytes]: + def get_attestation(self, tag: str) -> Tuple[bytes, bytes]: """ Retrieve an attestation from a given tag. @@ -129,15 +149,20 @@ class RegistryClient: Returns a tuple with the tag manifest content and the bundle content. """ - def _find_sigstore_bundle_manifest(manifests): + # FIXME: do not only rely on the first layer + def _find_sigstore_bundle_manifest( + manifests: list, + ) -> Tuple[Optional[str], Optional[str]]: for manifest in manifests: if manifest["artifactType"] == SIGSTORE_BUNDLE: return manifest["mediaType"], manifest["digest"] + return None, None - def _get_bundle_blob_digest(layers): + def _get_bundle_blob_digest(layers: list) -> Optional[str]: for layer in layers: if layer.get("mediaType") == SIGSTORE_BUNDLE: return layer["digest"] + return None tag_manifest_content = self.get_manifest(tag).content @@ -164,30 +189,29 @@ class RegistryClient: layers = bundle_manifest.get("layers", []) blob_digest = _get_bundle_blob_digest(layers) + log.info(f"Found sigstore bundle blob digest: {blob_digest}") + if not blob_digest: + raise Exception("Not able to find sigstore bundle blob info") bundle = self.get_blob(blob_digest) return tag_manifest_content, bundle.content -def get_manifest_hash(image: str) -> str: - registry, org, package, tag = parse_image_location(image) - client = RegistryClient(registry, org, package) - return client.get_manifest_hash(tag) +def get_manifest_hash(image_str: str) -> str: + image = parse_image_location(image_str) + return RegistryClient(image).get_manifest_hash(image.tag) -def list_tags(image: str) -> list: - registry, org, package, _ = parse_image_location(image) - client = RegistryClient(registry, org, package) - return client.list_tags() +def list_tags(image_str: str) -> list: + return RegistryClient(image_str).list_tags() -def get_manifest(image: str, tag: str) -> bytes: - registry, org, package, _ = parse_image_location(image) - client = RegistryClient(registry, org, package) - resp = client.get_manifest(tag, extra_headers={"Accept": OCI_IMAGE_MANIFEST}) +def get_manifest(image_str: str) -> bytes: + image = parse_image_location(image_str) + client = RegistryClient(image) + resp = client.get_manifest(image.tag, extra_headers={"Accept": OCI_IMAGE_MANIFEST}) return resp.content -def get_attestation(image: str) -> Tuple[bytes, bytes]: - registry, org, package, tag = parse_image_location(image) - client = RegistryClient(registry, org, package) - return client.get_attestation(tag) +def get_attestation(image_str: str) -> Tuple[bytes, bytes]: + image = parse_image_location(image_str) + return RegistryClient(image).get_attestation(image.tag) diff --git a/dangerzone/updater/signatures.py b/dangerzone/updater/signatures.py index 42154c2..c8e61ee 100644 --- a/dangerzone/updater/signatures.py +++ b/dangerzone/updater/signatures.py @@ -8,13 +8,10 @@ from pathlib import Path from tempfile import NamedTemporaryFile from typing import Dict, List, Tuple -from .registry import RegistryClient -from .utils import write - try: import platformdirs except ImportError: - import appdirs as platformdirs + import appdirs as platformdirs # type: ignore[no-redef] def get_config_dir() -> Path: @@ -67,7 +64,8 @@ def verify_signature(signature: dict, pubkey: str) -> bool: signature_file.flush() payload_bytes = b64decode(signature_bundle["Payload"]) - write(payload_file, payload_bytes) + payload_file.write(payload_bytes) + payload_file.flush() cmd = [ "cosign", @@ -91,7 +89,7 @@ def get_runtime_name() -> str: return "docker" -def container_pull(image: str): +def container_pull(image: str) -> bool: # XXX - Move to container_utils.py cmd = [get_runtime_name(), "pull", f"{image}"] process = subprocess.Popen(cmd, stdout=subprocess.PIPE) @@ -99,7 +97,7 @@ def container_pull(image: str): return process.returncode == 0 -def new_image_release(): +def new_image_release() -> bool: # XXX - Implement return True @@ -108,9 +106,9 @@ def upgrade_container_image( image: str, manifest_hash: str, pubkey: str, -): +) -> bool: if not new_image_release(): - return + return False # manifest_hash = registry.get_manifest_hash(tag) signatures = get_signatures(image, manifest_hash) @@ -138,7 +136,7 @@ def get_file_hash(file: str) -> str: return sha256(content).hexdigest() -def load_signatures(image_hash, pubkey): +def load_signatures(image_hash: str, pubkey: str) -> List[Dict]: """ Load signatures from the local filesystem @@ -156,7 +154,7 @@ def load_signatures(image_hash, pubkey): return json.load(f) -def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str): +def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str) -> None: """ Store signatures locally in the SIGNATURE_PATH folder, like this: @@ -172,7 +170,7 @@ def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str): the `signature_to_bundle()` function. """ - def _get_digest(sig): + def _get_digest(sig: Dict) -> str: payload = json.loads(b64decode(sig["Payload"])) return payload["critical"]["image"]["docker-manifest-digest"] @@ -216,7 +214,7 @@ def load_image_hash(image: str) -> str: return result.stdout.strip().decode().strip("sha256:") -def get_signatures(image, hash) -> List[Dict]: +def get_signatures(image: str, hash: str) -> List[Dict]: """ Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format. """ diff --git a/dangerzone/updater/utils.py b/dangerzone/updater/utils.py deleted file mode 100644 index fd7b989..0000000 --- a/dangerzone/updater/utils.py +++ /dev/null @@ -1,3 +0,0 @@ -def write(file, content: bytes | str): - file.write(content) - file.flush()