diff --git a/dangerzone/updater/__init__.py b/dangerzone/updater/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dangerzone/updater/attestations.py b/dangerzone/updater/attestations.py new file mode 100644 index 0000000..8839e1c --- /dev/null +++ b/dangerzone/updater/attestations.py @@ -0,0 +1,41 @@ +import subprocess +from tempfile import NamedTemporaryFile + +from .utils import write + + +def verify_attestation( + manifest: bytes, attestation_bundle: bytes, image_tag: str, expected_repo: str +): + """ + Look up the image attestation to see if the image has been built + on Github runners, and from a given repository. + """ + + # Put the value in files and verify with cosign + with ( + NamedTemporaryFile(mode="wb") as manifest_json, + NamedTemporaryFile(mode="wb") as attestation_bundle_json, + ): + write(manifest_json, manifest) + write(attestation_bundle_json, attestation_bundle) + + # Call cosign with the temporary file paths + cmd = [ + "cosign", + "verify-blob-attestation", + "--bundle", + attestation_bundle_json.name, + "--new-bundle-format", + "--certificate-oidc-issuer", + "https://token.actions.githubusercontent.com", + "--certificate-identity-regexp", + f"^https://github.com/{expected_repo}/.github/workflows/release-container-image.yml@refs/heads/test/image-publication-cosign", + manifest_json.name, + ] + + result = subprocess.run(cmd, capture_output=True) + if result.returncode != 0: + error = result.stderr.decode() + raise Exception(f"Attestation cannot be verified. {error}") + return True diff --git a/dangerzone/updater/cli.py b/dangerzone/updater/cli.py new file mode 100644 index 0000000..6d3c9fb --- /dev/null +++ b/dangerzone/updater/cli.py @@ -0,0 +1,83 @@ +#!/usr/bin/python + +import click + +from . import registry +from .attestations import verify_attestation +from .signatures import upgrade_container_image, verify_offline_image_signature + +DEFAULT_REPO = "freedomofpress/dangerzone" + + +@click.group() +def main(): + pass + + +@main.command() +@click.argument("image") +@click.option("--pubkey", default="pub.key") +# XXX Add options to do airgap upgrade +def upgrade(image, pubkey): + manifest_hash = registry.get_manifest_hash(image) + if upgrade_container_image(image, manifest_hash, pubkey): + click.echo(f"✅ The local image {image} has been upgraded") + + +@main.command() +@click.argument("image") +@click.option("--pubkey", default="pub.key") +def verify_local(image, pubkey): + """ + XXX document + """ + # XXX remove a potentiel :tag + if verify_offline_image_signature(image, pubkey): + click.echo(f"✅ The local image {image} has been signed with {pubkey}") + + +@main.command() +@click.argument("image") +def list_tags(image): + click.echo(f"Existing tags for {client.image}") + for tag in registry.list_tags(image): + click.echo(tag) + + +@main.command() +@click.argument("image") +@click.argument("tag") +def get_manifest(image, tag): + click.echo(registry.get_manifest(image, tag)) + + +@main.command() +@click.argument("image") +@click.option( + "--repo", + default=DEFAULT_REPO, + help="The github repository to check the attestation for", +) +# XXX use a consistent naming for these cli commands +def attest(image: str, repo: str): + """ + Look up the image attestation to see if the image has been built + on Github runners, and from a given repository. + """ + # XXX put this inside a module + # if shutil.which("cosign") is None: + # click.echo("The cosign binary is needed but not installed.") + # raise click.Abort() + # XXX: refactor parse_image_location to return a dict. + _, _, _, image_tag = registry.parse_image_location(image) + manifest, bundle = registry.get_attestation(image) + + verified = verify_attestation(manifest, bundle, image_tag, repo) + if verified: + click.echo( + f"🎉 The image available at `{client.image}:{image_tag}` has been built by Github Runners from the `{repo}` repository" + ) + + +if __name__ == "__main__": + main() diff --git a/dangerzone/updater/registry.py b/dangerzone/updater/registry.py new file mode 100644 index 0000000..381124f --- /dev/null +++ b/dangerzone/updater/registry.py @@ -0,0 +1,193 @@ +import hashlib +import re +from typing import Dict, Optional, Tuple + +import requests + +__all__ = [ + "get_manifest_hash", + "list_tags", +] + +SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json" +DOCKER_MANIFEST_DISTRIBUTION = "application/vnd.docker.distribution.manifest.v2+json" +DOCKER_MANIFEST_INDEX = "application/vnd.oci.image.index.v1+json" +OCI_IMAGE_MANIFEST = "application/vnd.oci.image.manifest.v1+json" + + +def parse_image_location(input_string: str) -> Tuple[str, str, str, str]: + """Parses container image location into (registry, namespace, repository, tag)""" + pattern = ( + r"^" + r"(?P[a-zA-Z0-9.-]+)/" + r"(?P[a-zA-Z0-9-]+)/" + r"(?P[^:]+)" + r"(?::(?P[a-zA-Z0-9.-]+))?" + r"$" + ) + match = re.match(pattern, input_string) + if not match: + raise ValueError("Malformed image location") + + return ( + match.group("registry"), + match.group("namespace"), + match.group("repository"), + match.group("tag") or "latest", + ) + + +class RegistryClient: + def __init__(self, registry: str, org: str, image: str): + self._registry = registry + self._org = org + self._image = image + self._auth_token = None + self._base_url = f"https://{registry}" + self._image_url = f"{self._base_url}/v2/{self._org}/{self._image}" + + @property + def image(self): + return f"{self._registry}/{self._org}/{self._image}" + + def get_auth_token(self) -> Optional[str]: + if not self._auth_token: + auth_url = f"{self._base_url}/token" + response = requests.get( + auth_url, + params={ + "service": f"{self._registry}", + "scope": f"repository:{self._org}/{self._image}:pull", + }, + ) + response.raise_for_status() + self._auth_token = response.json()["token"] + return self._auth_token + + def get_auth_header(self) -> Dict[str, str]: + return {"Authorization": f"Bearer {self.get_auth_token()}"} + + def list_tags(self) -> list: + url = f"{self._image_url}/tags/list" + response = requests.get(url, headers=self.get_auth_header()) + response.raise_for_status() + tags = response.json().get("tags", []) + return tags + + def get_manifest(self, tag, extra_headers=None) -> requests.Response: + """Get manifest information for a specific tag""" + manifest_url = f"{self._image_url}/manifests/{tag}" + headers = { + "Accept": DOCKER_MANIFEST_DISTRIBUTION, + "Authorization": f"Bearer {self.get_auth_token()}", + } + if extra_headers: + headers.update(extra_headers) + + response = requests.get(manifest_url, headers=headers) + response.raise_for_status() + return response + + def list_manifests(self, tag) -> list: + return ( + self.get_manifest( + tag, + { + "Accept": DOCKER_MANIFEST_INDEX, + }, + ) + .json() + .get("manifests") + ) + + def get_blob(self, hash) -> requests.Response: + url = f"{self._image_url}/blobs/{hash}" + response = requests.get( + url, + headers={ + "Authorization": f"Bearer {self.get_auth_token()}", + }, + ) + response.raise_for_status() + return response + + def get_manifest_hash(self, tag, tag_manifest_content=None) -> str: + if not tag_manifest_content: + tag_manifest_content = self.get_manifest(tag).content + + return hashlib.sha256(tag_manifest_content).hexdigest() + + def get_attestation(self, tag) -> Tuple[bytes, bytes]: + """ + Retrieve an attestation from a given tag. + + The attestation needs to be attached using the Cosign Bundle + Specification defined at: + + https://github.com/sigstore/cosign/blob/main/specs/BUNDLE_SPEC.md + + Returns a tuple with the tag manifest content and the bundle content. + """ + + def _find_sigstore_bundle_manifest(manifests): + for manifest in manifests: + if manifest["artifactType"] == SIGSTORE_BUNDLE: + return manifest["mediaType"], manifest["digest"] + + def _get_bundle_blob_digest(layers): + for layer in layers: + if layer.get("mediaType") == SIGSTORE_BUNDLE: + return layer["digest"] + + tag_manifest_content = self.get_manifest(tag).content + + # The attestation is available on the same container registry, with a + # specific tag named "sha256-{sha256(manifest)}" + tag_manifest_hash = self.get_manifest_hash(tag, tag_manifest_content) + + # This will get us a "list" of manifests... + manifests = self.list_manifests(f"sha256-{tag_manifest_hash}") + + # ... from which we want the sigstore bundle + bundle_manifest_mediatype, bundle_manifest_digest = ( + _find_sigstore_bundle_manifest(manifests) + ) + if not bundle_manifest_digest: + raise Exception("Not able to find sigstore bundle manifest info") + + bundle_manifest = self.get_manifest( + bundle_manifest_digest, extra_headers={"Accept": bundle_manifest_mediatype} + ).json() + + # From there, we will get the attestation in a blob. + # It will be the first layer listed at this manifest hash location + layers = bundle_manifest.get("layers", []) + + blob_digest = _get_bundle_blob_digest(layers) + bundle = self.get_blob(blob_digest) + return tag_manifest_content, bundle.content + + +def get_manifest_hash(image: str) -> str: + registry, org, package, tag = parse_image_location(image) + client = RegistryClient(registry, org, package) + return client.get_manifest_hash(tag) + + +def list_tags(image: str) -> list: + registry, org, package, _ = parse_image_location(image) + client = RegistryClient(registry, org, package) + return client.list_tags() + + +def get_manifest(image: str, tag: str) -> bytes: + registry, org, package, _ = parse_image_location(image) + client = RegistryClient(registry, org, package) + resp = client.get_manifest(tag, extra_headers={"Accept": OCI_IMAGE_MANIFEST}) + return resp.content + + +def get_attestation(image: str) -> Tuple[bytes, bytes]: + registry, org, package, tag = parse_image_location(image) + client = RegistryClient(registry, org, package) + return client.get_attestation(tag) diff --git a/dangerzone/updater/signatures.py b/dangerzone/updater/signatures.py new file mode 100644 index 0000000..42154c2 --- /dev/null +++ b/dangerzone/updater/signatures.py @@ -0,0 +1,233 @@ +import json +import platform +import re +import subprocess +from base64 import b64decode +from hashlib import sha256 +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import Dict, List, Tuple + +from .registry import RegistryClient +from .utils import write + +try: + import platformdirs +except ImportError: + import appdirs as platformdirs + + +def get_config_dir() -> Path: + return Path(platformdirs.user_config_dir("dangerzone")) + + +# XXX Store this somewhere else. +SIGNATURES_PATH = get_config_dir() / "signatures" +__all__ = [ + "verify_signature", + "load_signatures", + "store_signatures", + "verify_local_image_signature", +] + + +def signature_to_bundle(sig: Dict) -> Dict: + """Convert a cosign-download signature to the format expected by cosign bundle.""" + bundle = sig["Bundle"] + payload = bundle["Payload"] + return { + "base64Signature": sig["Base64Signature"], + "Payload": sig["Payload"], + "cert": sig["Cert"], + "chain": sig["Chain"], + "rekorBundle": { + "SignedEntryTimestamp": bundle["SignedEntryTimestamp"], + "Payload": { + "body": payload["body"], + "integratedTime": payload["integratedTime"], + "logIndex": payload["logIndex"], + "logID": payload["logID"], + }, + }, + "RFC3161Timestamp": sig["RFC3161Timestamp"], + } + + +def verify_signature(signature: dict, pubkey: str) -> bool: + """Verify a signature against a given public key""" + + signature_bundle = signature_to_bundle(signature) + + # Put the value in files and verify with cosign + with ( + NamedTemporaryFile(mode="w") as signature_file, + NamedTemporaryFile(mode="bw") as payload_file, + ): + json.dump(signature_bundle, signature_file) + signature_file.flush() + + payload_bytes = b64decode(signature_bundle["Payload"]) + write(payload_file, payload_bytes) + + cmd = [ + "cosign", + "verify-blob", + "--key", + pubkey, + "--bundle", + signature_file.name, + payload_file.name, + ] + result = subprocess.run(cmd, capture_output=True) + if result.returncode != 0: + # XXX Raise instead? + return False + return result.stderr == b"Verified OK\n" + + +def get_runtime_name() -> str: + if platform.system() == "Linux": + return "podman" + return "docker" + + +def container_pull(image: str): + # XXX - Move to container_utils.py + cmd = [get_runtime_name(), "pull", f"{image}"] + process = subprocess.Popen(cmd, stdout=subprocess.PIPE) + process.communicate() + return process.returncode == 0 + + +def new_image_release(): + # XXX - Implement + return True + + +def upgrade_container_image( + image: str, + manifest_hash: str, + pubkey: str, +): + if not new_image_release(): + return + + # manifest_hash = registry.get_manifest_hash(tag) + signatures = get_signatures(image, manifest_hash) + + if len(signatures) < 1: + raise Exception("Unable to retrieve signatures") + + for signature in signatures: + signature_is_valid = verify_signature(signature, pubkey) + if not signature_is_valid: + raise Exception("Unable to verify signature") + + # At this point, the signatures are verified + # We store the signatures just now to avoid storing unverified signatures + store_signatures(signatures, manifest_hash, pubkey) + + # let's upgrade the image + # XXX Use the hash here to avoid race conditions + return container_pull(image) + + +def get_file_hash(file: str) -> str: + with open(file, "rb") as f: + content = f.read() + return sha256(content).hexdigest() + + +def load_signatures(image_hash, pubkey): + """ + Load signatures from the local filesystem + + See store_signatures() for the expected format. + """ + pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey) + if not pubkey_signatures.exists(): + msg = ( + f"Cannot find a '{pubkey_signatures}' folder." + "You might need to download the image signatures first." + ) + raise Exception(msg) + + with open(pubkey_signatures / f"{image_hash}.json") as f: + return json.load(f) + + +def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str): + """ + Store signatures locally in the SIGNATURE_PATH folder, like this: + + ~/.config/dangerzone/signatures/ + └── + └── .json + └── .json + + The format used in the `.json` file is the one of `cosign download + signature`, which differs from the "bundle" one used afterwards. + + It can be converted to the one expected by cosign verify --bundle with + the `signature_to_bundle()` function. + """ + + def _get_digest(sig): + payload = json.loads(b64decode(sig["Payload"])) + return payload["critical"]["image"]["docker-manifest-digest"] + + # All the signatures should share the same hash. + hashes = list(map(_get_digest, signatures)) + if len(set(hashes)) != 1: + raise Exception("Signatures do not share the same image hash") + + if f"sha256:{image_hash}" != hashes[0]: + raise Exception("Signatures do not match the given image hash") + + pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey) + pubkey_signatures.mkdir(exist_ok=True) + + with open(pubkey_signatures / f"{image_hash}.json", "w") as f: + json.dump(signatures, f) + + +def verify_offline_image_signature(image: str, pubkey: str) -> bool: + """ + Verifies that a local image has a valid signature + """ + image_hash = load_image_hash(image) + signatures = load_signatures(image_hash, pubkey) + if len(signatures) < 1: + raise Exception("No signatures found") + + for signature in signatures: + if not verify_signature(signature, pubkey): + msg = f"Unable to verify signature for {image} with pubkey {pubkey}" + raise Exception(msg) + return True + + +def load_image_hash(image: str) -> str: + """ + Returns a image hash from a local image name + """ + cmd = [get_runtime_name(), "image", "inspect", image, "-f", "{{.Digest}}"] + result = subprocess.run(cmd, capture_output=True, check=True) + return result.stdout.strip().decode().strip("sha256:") + + +def get_signatures(image, hash) -> List[Dict]: + """ + Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format. + """ + + process = subprocess.run( + ["cosign", "download", "signature", f"{image}@sha256:{hash}"], + capture_output=True, + check=True, + ) + + # XXX: Check the output first. + # Remove the last return, split on newlines, convert from JSON + signatures_raw = process.stdout.decode("utf-8").strip().split("\n") + return list(map(json.loads, signatures_raw)) diff --git a/dangerzone/updater/utils.py b/dangerzone/updater/utils.py new file mode 100644 index 0000000..fd7b989 --- /dev/null +++ b/dangerzone/updater/utils.py @@ -0,0 +1,3 @@ +def write(file, content: bytes | str): + file.write(content) + file.flush() diff --git a/dev_scripts/registry.py b/dev_scripts/registry.py deleted file mode 100755 index c5e5f4a..0000000 --- a/dev_scripts/registry.py +++ /dev/null @@ -1,517 +0,0 @@ -#!/usr/bin/python - -import hashlib -import json -import platform -import re -import shutil -import subprocess -from base64 import b64decode -from pathlib import Path -from tempfile import NamedTemporaryFile - -import click -import requests - -try: - import platformdirs -except ImportError: - import appdirs as platformdirs - - -def get_config_dir() -> str: - return Path(platformdirs.user_config_dir("dangerzone")) - - -SIGNATURES_PATH = get_config_dir() / "signatures" - -DEFAULT_REPO = "freedomofpress/dangerzone" -SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json" -DOCKER_MANIFEST_DISTRIBUTION = "application/vnd.docker.distribution.manifest.v2+json" -DOCKER_MANIFEST_INDEX = "application/vnd.oci.image.index.v1+json" -OCI_IMAGE_MANIFEST = "application/vnd.oci.image.manifest.v1+json" - - -class RegistryClient: - def __init__(self, registry, org, image): - self._registry = registry - self._org = org - self._image = image - self._auth_token = None - self._base_url = f"https://{registry}" - self._image_url = f"{self._base_url}/v2/{self._org}/{self._image}" - - @property - def image(self): - return f"{self._registry}/{self._org}/{self._image}" - - def get_auth_token(self): - if not self._auth_token: - auth_url = f"{self._base_url}/token" - response = requests.get( - auth_url, - params={ - "service": f"{self._registry}", - "scope": f"repository:{self._org}/{self._image}:pull", - }, - ) - response.raise_for_status() - self._auth_token = response.json()["token"] - return self._auth_token - - def get_auth_header(self): - return {"Authorization": f"Bearer {self.get_auth_token()}"} - - def list_tags(self): - url = f"{self._image_url}/tags/list" - response = requests.get(url, headers=self.get_auth_header()) - response.raise_for_status() - tags = response.json().get("tags", []) - return tags - - def get_manifest(self, tag, extra_headers=None): - """Get manifest information for a specific tag""" - manifest_url = f"{self._image_url}/manifests/{tag}" - headers = { - "Accept": DOCKER_MANIFEST_DISTRIBUTION, - "Authorization": f"Bearer {self.get_auth_token()}", - } - if extra_headers: - headers.update(extra_headers) - - response = requests.get(manifest_url, headers=headers) - response.raise_for_status() - return response - - def list_manifests(self, tag): - return ( - self.get_manifest( - tag, - { - "Accept": DOCKER_MANIFEST_INDEX, - }, - ) - .json() - .get("manifests") - ) - - def get_blob(self, hash): - url = f"{self._image_url}/blobs/{hash}" - response = requests.get( - url, - headers={ - "Authorization": f"Bearer {self.get_auth_token()}", - }, - ) - response.raise_for_status() - return response - - def get_manifest_hash(self, tag, tag_manifest_content=None): - if not tag_manifest_content: - tag_manifest_content = self.get_manifest(tag).content - - return hashlib.sha256(tag_manifest_content).hexdigest() - - def get_attestation(self, tag): - """ - Retrieve an attestation from a given tag. - - The attestation needs to be attached using the Cosign Bundle - Specification defined at: - - https://github.com/sigstore/cosign/blob/main/specs/BUNDLE_SPEC.md - """ - - def _find_sigstore_bundle_manifest(manifests): - for manifest in manifests: - if manifest["artifactType"] == SIGSTORE_BUNDLE: - return manifest["mediaType"], manifest["digest"] - - def _get_bundle_blob_digest(layers): - for layer in layers: - if layer.get("mediaType") == SIGSTORE_BUNDLE: - return layer["digest"] - - tag_manifest_content = self.get_manifest(tag).content - - # The attestation is available on the same container registry, with a - # specific tag named "sha256-{sha256(manifest)}" - tag_manifest_hash = self.get_manifest_hash(tag, tag_manifest_content) - - # This will get us a "list" of manifests... - manifests = self.list_manifests(f"sha256-{tag_manifest_hash}") - - # ... from which we want the sigstore bundle - bundle_manifest_mediatype, bundle_manifest_digest = ( - _find_sigstore_bundle_manifest(manifests) - ) - if not bundle_manifest_digest: - raise Error("Not able to find sigstore bundle manifest info") - - bundle_manifest = self.get_manifest( - bundle_manifest_digest, extra_headers={"Accept": bundle_manifest_mediatype} - ).json() - - # From there, we will get the attestation in a blob. - # It will be the first layer listed at this manifest hash location - layers = bundle_manifest.get("layers", []) - - blob_digest = _get_bundle_blob_digest(layers) - bundle = self.get_blob(blob_digest) - return tag_manifest_content, bundle.content - - -def _write(file, content): - file.write(content) - file.flush() - - -def verify_attestation( - registry_client: RegistryClient, image_tag: str, expected_repo: str -): - """ - Look up the image attestation to see if the image has been built - on Github runners, and from a given repository. - """ - manifest, bundle = registry_client.get_attestation(image_tag) - - # Put the value in files and verify with cosign - with ( - NamedTemporaryFile(mode="wb") as manifest_json, - NamedTemporaryFile(mode="wb") as bundle_json, - ): - _write(manifest_json, manifest) - _write(bundle_json, bundle) - - # Call cosign with the temporary file paths - cmd = [ - "cosign", - "verify-blob-attestation", - "--bundle", - bundle_json.name, - "--new-bundle-format", - "--certificate-oidc-issuer", - "https://token.actions.githubusercontent.com", - "--certificate-identity-regexp", - f"^https://github.com/{expected_repo}/.github/workflows/release-container-image.yml@refs/heads/test/image-publication-cosign", - manifest_json.name, - ] - - result = subprocess.run(cmd, capture_output=True) - if result.returncode != 0: - raise Exception(f"Attestation cannot be verified. {result.stderr}") - return True - - -def new_image_release(): - # XXX - Implement - return True - - -def signature_to_bundle(sig): - # Convert cosign-download signatures to the format expected by cosign bundle. - bundle = sig["Bundle"] - payload = bundle["Payload"] - return { - "base64Signature": sig["Base64Signature"], - "Payload": sig["Payload"], - "cert": sig["Cert"], - "chain": sig["Chain"], - "rekorBundle": { - "SignedEntryTimestamp": bundle["SignedEntryTimestamp"], - "Payload": { - "body": payload["body"], - "integratedTime": payload["integratedTime"], - "logIndex": payload["logIndex"], - "logID": payload["logID"], - }, - }, - "RFC3161Timestamp": sig["RFC3161Timestamp"], - } - - -def verify_signature(signature, pubkey): - """Verify a signature against a given public key""" - - signature_bundle = signature_to_bundle(signature) - - # Put the value in files and verify with cosign - with ( - NamedTemporaryFile(mode="w") as signature_file, - NamedTemporaryFile(mode="bw") as payload_file, - ): - json.dump(signature_bundle, signature_file) - signature_file.flush() - - payload_bytes = b64decode(signature_bundle["Payload"]) - _write(payload_file, payload_bytes) - - cmd = [ - "cosign", - "verify-blob", - "--key", - pubkey, - "--bundle", - signature_file.name, - payload_file.name, - ] - result = subprocess.run(cmd, capture_output=True) - if result.returncode != 0: - # XXX Raise instead? - return False - return result.stderr == b"Verified OK\n" - - -def get_runtime_name() -> str: - if platform.system() == "Linux": - return "podman" - return "docker" - - -def container_pull(image): - cmd = [get_runtime_name(), "pull", f"{image}"] - process = subprocess.Popen(cmd, stdout=subprocess.PIPE) - process.communicate() - - -def upgrade_container_image(image, tag, pubkey, registry: RegistryClient): - if not new_image_release(): - return - - hash = registry.get_manifest_hash(tag) - signatures = get_signatures(image, hash) - - if len(signatures) < 1: - raise Exception("Unable to retrieve signatures") - - print(f"Found {len(signatures)} signature(s) for {image}") - for signature in signatures: - signature_is_valid = verify_signature(signature, pubkey) - if not signature_is_valid: - raise Exception("Unable to verify signature") - print("✅ Signature is valid") - - # At this point, the signatures are verified - # We store the signatures just now to avoid storing unverified signatures - store_signatures(signatures, hash, pubkey) - - # let's upgrade the image - # XXX Use the hash here to avoid race conditions - container_pull(image) - - -def get_file_hash(file): - with open(file, "rb") as f: - content = f.read() - return hashlib.sha256(content).hexdigest() - - -def load_signatures(image_hash, pubkey): - pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey) - if not pubkey_signatures.exists(): - msg = ( - f"Cannot find a '{pubkey_signatures}' folder." - "You might need to download the image signatures first." - ) - raise Exception(msg) - - with open(pubkey_signatures / f"{image_hash}.json") as f: - return json.load(f) - - -def store_signatures(signatures, image_hash, pubkey): - """ - Store signatures locally in the SIGNATURE_PATH folder, like this: - - ~/.config/dangerzone/signatures/ - └── - └── .json - └── .json - - The format used in the `.json` file is the one of `cosign download - signature`, which differs from the "bundle" one used afterwards. - - It can be converted to the one expected by cosign verify --bundle with - the `signature_to_bundle()` function. - """ - - def _get_digest(sig): - payload = json.loads(b64decode(sig["Payload"])) - return payload["critical"]["image"]["docker-manifest-digest"] - - # All the signatures should share the same hash. - hashes = list(map(_get_digest, signatures)) - if len(set(hashes)) != 1: - raise Exception("Signatures do not share the same image hash") - - if f"sha256:{image_hash}" != hashes[0]: - raise Exception("Signatures do not match the given image hash") - - pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey) - pubkey_signatures.mkdir(exist_ok=True) - - with open(pubkey_signatures / f"{image_hash}.json", "w") as f: - json.dump(signatures, f) - - -def verify_local_image_signature(image, pubkey): - """ - Verifies that a local image has a valid signature - """ - image_hash = get_image_hash(image) - signatures = load_signatures(image_hash, pubkey) - if len(signatures) < 1: - raise Exception("No signatures found") - - for signature in signatures: - if not verify_signature(signature, pubkey): - msg = f"Unable to verify signature for {image} with pubkey {pubkey}" - raise Exception(msg) - return True - - -def get_image_hash(image): - """ - Returns a image hash from a local image name - """ - cmd = [get_runtime_name(), "image", "inspect", image, "-f", "{{.Digest}}"] - result = subprocess.run(cmd, capture_output=True, check=True) - return result.stdout.strip().decode().strip("sha256:") - - -def get_signatures(image, hash): - """ - Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format. - """ - - process = subprocess.run( - ["cosign", "download", "signature", f"{image}@sha256:{hash}"], - capture_output=True, - check=True, - ) - - # XXX: Check the output first. - # Remove the last return, split on newlines, convert from JSON - signatures_raw = process.stdout.decode("utf-8").strip().split("\n") - return list(map(json.loads, signatures_raw)) - - -class Image: - def __init__(self, registry, namespace, repository, tag="latest"): - self.registry = registry - self.namespace = namespace - self.repository = repository - self.tag = tag - - def properties(self): - return (self.registry, self.namespace, self.repository, self.tag) - - @property - def name_without_tag(self): - return f"{self.registry}/{self.namespace}/{self.repository}" - - @property - def name_with_tag(self): - return f"{self.name_without_tag}:{self.tag}" - - @classmethod - def from_string(cls, input_string): - """Parses container image location into (registry, namespace, repository, tag)""" - pattern = ( - r"^" - r"(?P[a-zA-Z0-9.-]+)/" - r"(?P[a-zA-Z0-9-]+)/" - r"(?P[^:]+)" - r"(?::(?P[a-zA-Z0-9.-]+))?" - r"$" - ) - match = re.match(pattern, input_string) - if not match: - raise ValueError("Malformed image location") - - return cls( - match.group("registry"), - match.group("namespace"), - match.group("repository"), - match.group("tag") or "latest", - ) - - -def parse_image_location(string): - return Image.from_string(string).properties - - -@click.group() -def main(): - pass - - -@main.command() -@click.argument("image") -@click.option("--pubkey", default="pub.key") -def upgrade_image(image, pubkey): - registry, namespace, repository, tag = parse_image_location(image) - registry_client = RegistryClient(registry, namespace, repository) - - upgrade_container_image(image, tag, pubkey, registry_client) - - -@main.command() -@click.argument("image") -@click.option("--pubkey", default="pub.key") -def verify_local_image(image, pubkey): - # XXX remove a potentiel :tag - if verify_local_image_signature(image, pubkey): - click.echo(f"✅ The local image {image} has been signed with {pubkey}") - - -@main.command() -@click.argument("image") -def list_tags(image): - registry, org, package, _ = parse_image_location(image) - client = RegistryClient(registry, org, package) - tags = client.list_tags() - click.echo(f"Existing tags for {client.image}") - for tag in tags: - click.echo(tag) - - -@main.command() -@click.argument("image") -@click.argument("tag") -def get_manifest(image, tag): - registry, org, package, _ = parse_image_location(image) - client = RegistryClient(registry, org, package) - resp = client.get_manifest(tag, extra_headers={"Accept": OCI_IMAGE_MANIFEST}) - click.echo(resp.content) - - -@main.command() -@click.argument("image") -@click.option( - "--repo", - default=DEFAULT_REPO, - help="The github repository to check the attestation for", -) -def attest(image: str, repo: str): - """ - Look up the image attestation to see if the image has been built - on Github runners, and from a given repository. - """ - if shutil.which("cosign") is None: - click.echo("The cosign binary is needed but not installed.") - raise click.Abort() - - registry, org, package, tag = parse_image_location(image) - tag = tag or "latest" - - client = RegistryClient(registry, org, package) - verified = verify_attestation(client, tag, repo) - if verified: - click.echo( - f"🎉 The image available at `{client.image}:{tag}` has been built by Github Runners from the `{repo}` repository" - ) - - -if __name__ == "__main__": - main() diff --git a/pyproject.toml b/pyproject.toml index 4bb4bb4..58093a4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ shiboken6 = [ [tool.poetry.scripts] dangerzone = 'dangerzone:main' dangerzone-cli = 'dangerzone:main' +dangerzone-image = "dangerzone.updater.cli:main" # Dependencies required for packaging the code on various platforms. [tool.poetry.group.package.dependencies]