Refactoring of dangerzone/updater/*

This commit is contained in:
Alexis Métaireau 2025-01-29 17:01:48 +01:00
parent ab15d25a18
commit 5921289454
No known key found for this signature in database
GPG key ID: C65C7A89A8FFC56E
6 changed files with 104 additions and 83 deletions

View file

@ -0,0 +1,4 @@
import logging
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)

View file

@ -1,12 +1,10 @@
import subprocess import subprocess
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from .utils import write
def verify_attestation( def verify_attestation(
manifest: bytes, attestation_bundle: bytes, image_tag: str, expected_repo: str manifest: bytes, attestation_bundle: bytes, image_tag: str, expected_repo: str
): ) -> bool:
""" """
Look up the image attestation to see if the image has been built Look up the image attestation to see if the image has been built
on Github runners, and from a given repository. on Github runners, and from a given repository.
@ -17,8 +15,10 @@ def verify_attestation(
NamedTemporaryFile(mode="wb") as manifest_json, NamedTemporaryFile(mode="wb") as manifest_json,
NamedTemporaryFile(mode="wb") as attestation_bundle_json, NamedTemporaryFile(mode="wb") as attestation_bundle_json,
): ):
write(manifest_json, manifest) manifest_json.write(manifest)
write(attestation_bundle_json, attestation_bundle) manifest_json.flush()
attestation_bundle_json.write(attestation_bundle)
attestation_bundle_json.flush()
# Call cosign with the temporary file paths # Call cosign with the temporary file paths
cmd = [ cmd = [

View file

@ -6,19 +6,20 @@ from . import registry
from .attestations import verify_attestation from .attestations import verify_attestation
from .signatures import upgrade_container_image, verify_offline_image_signature from .signatures import upgrade_container_image, verify_offline_image_signature
DEFAULT_REPO = "freedomofpress/dangerzone" DEFAULT_REPOSITORY = "freedomofpress/dangerzone"
@click.group() @click.group()
def main(): def main() -> None:
pass pass
@main.command() @main.command()
@click.argument("image") @click.option("--image")
@click.option("--pubkey", default="pub.key") @click.option("--pubkey", default="pub.key")
@click.option("--airgap", is_flag=True)
# XXX Add options to do airgap upgrade # XXX Add options to do airgap upgrade
def upgrade(image, pubkey): def upgrade(image: str, pubkey: str) -> None:
manifest_hash = registry.get_manifest_hash(image) manifest_hash = registry.get_manifest_hash(image)
if upgrade_container_image(image, manifest_hash, pubkey): if upgrade_container_image(image, manifest_hash, pubkey):
click.echo(f"✅ The local image {image} has been upgraded") click.echo(f"✅ The local image {image} has been upgraded")
@ -27,9 +28,9 @@ def upgrade(image, pubkey):
@main.command() @main.command()
@click.argument("image") @click.argument("image")
@click.option("--pubkey", default="pub.key") @click.option("--pubkey", default="pub.key")
def verify_local(image, pubkey): def verify_local(image: str, pubkey: str) -> None:
""" """
XXX document Verify the local image signature against a public key and the stored signatures.
""" """
# XXX remove a potentiel :tag # XXX remove a potentiel :tag
if verify_offline_image_signature(image, pubkey): if verify_offline_image_signature(image, pubkey):
@ -38,28 +39,26 @@ def verify_local(image, pubkey):
@main.command() @main.command()
@click.argument("image") @click.argument("image")
def list_tags(image): def list_remote_tags(image: str) -> None:
click.echo(f"Existing tags for {client.image}") click.echo(f"Existing tags for {image}")
for tag in registry.list_tags(image): for tag in registry.list_tags(image):
click.echo(tag) click.echo(tag)
@main.command() @main.command()
@click.argument("image") @click.argument("image")
@click.argument("tag") def get_manifest(image: str) -> None:
def get_manifest(image, tag): click.echo(registry.get_manifest(image))
click.echo(registry.get_manifest(image, tag))
@main.command() @main.command()
@click.argument("image") @click.argument("image")
@click.option( @click.option(
"--repo", "--repository",
default=DEFAULT_REPO, default=DEFAULT_REPOSITORY,
help="The github repository to check the attestation for", help="The github repository to check the attestation for",
) )
# XXX use a consistent naming for these cli commands def attest_provenance(image: str, repository: str) -> None:
def attest(image: str, repo: str):
""" """
Look up the image attestation to see if the image has been built Look up the image attestation to see if the image has been built
on Github runners, and from a given repository. on Github runners, and from a given repository.
@ -68,14 +67,13 @@ def attest(image: str, repo: str):
# if shutil.which("cosign") is None: # if shutil.which("cosign") is None:
# click.echo("The cosign binary is needed but not installed.") # click.echo("The cosign binary is needed but not installed.")
# raise click.Abort() # raise click.Abort()
# XXX: refactor parse_image_location to return a dict. parsed = registry.parse_image_location(image)
_, _, _, image_tag = registry.parse_image_location(image)
manifest, bundle = registry.get_attestation(image) manifest, bundle = registry.get_attestation(image)
verified = verify_attestation(manifest, bundle, image_tag, repo) verified = verify_attestation(manifest, bundle, parsed.tag, repository)
if verified: if verified:
click.echo( click.echo(
f"🎉 The image available at `{client.image}:{image_tag}` has been built by Github Runners from the `{repo}` repository" f"🎉 The image available at `{parsed.full_name}` has been built by Github Runners from the `{repository}` repository"
) )

View file

@ -1,12 +1,17 @@
import hashlib import hashlib
import re import re
from collections import namedtuple
from typing import Dict, Optional, Tuple from typing import Dict, Optional, Tuple
import requests import requests
from . import log
__all__ = [ __all__ = [
"get_manifest_hash", "get_manifest_hash",
"list_tags", "list_tags",
"get_manifest",
"get_attestation",
] ]
SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json" SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json"
@ -15,40 +20,51 @@ DOCKER_MANIFEST_INDEX = "application/vnd.oci.image.index.v1+json"
OCI_IMAGE_MANIFEST = "application/vnd.oci.image.manifest.v1+json" OCI_IMAGE_MANIFEST = "application/vnd.oci.image.manifest.v1+json"
def parse_image_location(input_string: str) -> Tuple[str, str, str, str]: class Image(namedtuple("Image", ["registry", "namespace", "image_name", "tag"])):
"""Parses container image location into (registry, namespace, repository, tag)""" __slots__ = ()
@property
def full_name(self) -> str:
tag = f":{self.tag}" if self.tag else ""
return f"{self.registry}/{self.namespace}/{self.image_name}{tag}"
def parse_image_location(input_string: str) -> Image:
"""Parses container image location into an Image namedtuple"""
pattern = ( pattern = (
r"^" r"^"
r"(?P<registry>[a-zA-Z0-9.-]+)/" r"(?P<registry>[a-zA-Z0-9.-]+)/"
r"(?P<namespace>[a-zA-Z0-9-]+)/" r"(?P<namespace>[a-zA-Z0-9-]+)/"
r"(?P<repository>[^:]+)" r"(?P<image_name>[^:]+)"
r"(?::(?P<tag>[a-zA-Z0-9.-]+))?" r"(?::(?P<tag>[a-zA-Z0-9.-]+))?"
r"$" r"$"
) )
match = re.match(pattern, input_string) match = re.match(pattern, input_string)
if not match: if not match:
raise ValueError("Malformed image location") raise ValueError("Malformed image location")
return Image(
return ( registry=match.group("registry"),
match.group("registry"), namespace=match.group("namespace"),
match.group("namespace"), image_name=match.group("image_name"),
match.group("repository"), tag=match.group("tag") or "latest",
match.group("tag") or "latest",
) )
class RegistryClient: class RegistryClient:
def __init__(self, registry: str, org: str, image: str): def __init__(
self._registry = registry self,
self._org = org image: Image | str,
self._image = image ):
self._auth_token = None if isinstance(image, str):
self._base_url = f"https://{registry}" image = parse_image_location(image)
self._image_url = f"{self._base_url}/v2/{self._org}/{self._image}"
@property self._image = image
def image(self): self._registry = image.registry
return f"{self._registry}/{self._org}/{self._image}" self._namespace = image.namespace
self._image_name = image.image_name
self._auth_token = None
self._base_url = f"https://{self._registry}"
self._image_url = f"{self._base_url}/v2/{self._namespace}/{self._image_name}"
def get_auth_token(self) -> Optional[str]: def get_auth_token(self) -> Optional[str]:
if not self._auth_token: if not self._auth_token:
@ -57,7 +73,7 @@ class RegistryClient:
auth_url, auth_url,
params={ params={
"service": f"{self._registry}", "service": f"{self._registry}",
"scope": f"repository:{self._org}/{self._image}:pull", "scope": f"repository:{self._namespace}/{self._image_name}:pull",
}, },
) )
response.raise_for_status() response.raise_for_status()
@ -74,7 +90,9 @@ class RegistryClient:
tags = response.json().get("tags", []) tags = response.json().get("tags", [])
return tags return tags
def get_manifest(self, tag, extra_headers=None) -> requests.Response: def get_manifest(
self, tag: str, extra_headers: Optional[dict] = None
) -> requests.Response:
"""Get manifest information for a specific tag""" """Get manifest information for a specific tag"""
manifest_url = f"{self._image_url}/manifests/{tag}" manifest_url = f"{self._image_url}/manifests/{tag}"
headers = { headers = {
@ -88,7 +106,7 @@ class RegistryClient:
response.raise_for_status() response.raise_for_status()
return response return response
def list_manifests(self, tag) -> list: def list_manifests(self, tag: str) -> list:
return ( return (
self.get_manifest( self.get_manifest(
tag, tag,
@ -100,7 +118,7 @@ class RegistryClient:
.get("manifests") .get("manifests")
) )
def get_blob(self, hash) -> requests.Response: def get_blob(self, hash: str) -> requests.Response:
url = f"{self._image_url}/blobs/{hash}" url = f"{self._image_url}/blobs/{hash}"
response = requests.get( response = requests.get(
url, url,
@ -111,13 +129,15 @@ class RegistryClient:
response.raise_for_status() response.raise_for_status()
return response return response
def get_manifest_hash(self, tag, tag_manifest_content=None) -> str: def get_manifest_hash(
self, tag: str, tag_manifest_content: Optional[bytes] = None
) -> str:
if not tag_manifest_content: if not tag_manifest_content:
tag_manifest_content = self.get_manifest(tag).content tag_manifest_content = self.get_manifest(tag).content
return hashlib.sha256(tag_manifest_content).hexdigest() return hashlib.sha256(tag_manifest_content).hexdigest()
def get_attestation(self, tag) -> Tuple[bytes, bytes]: def get_attestation(self, tag: str) -> Tuple[bytes, bytes]:
""" """
Retrieve an attestation from a given tag. Retrieve an attestation from a given tag.
@ -129,15 +149,20 @@ class RegistryClient:
Returns a tuple with the tag manifest content and the bundle content. Returns a tuple with the tag manifest content and the bundle content.
""" """
def _find_sigstore_bundle_manifest(manifests): # FIXME: do not only rely on the first layer
def _find_sigstore_bundle_manifest(
manifests: list,
) -> Tuple[Optional[str], Optional[str]]:
for manifest in manifests: for manifest in manifests:
if manifest["artifactType"] == SIGSTORE_BUNDLE: if manifest["artifactType"] == SIGSTORE_BUNDLE:
return manifest["mediaType"], manifest["digest"] return manifest["mediaType"], manifest["digest"]
return None, None
def _get_bundle_blob_digest(layers): def _get_bundle_blob_digest(layers: list) -> Optional[str]:
for layer in layers: for layer in layers:
if layer.get("mediaType") == SIGSTORE_BUNDLE: if layer.get("mediaType") == SIGSTORE_BUNDLE:
return layer["digest"] return layer["digest"]
return None
tag_manifest_content = self.get_manifest(tag).content tag_manifest_content = self.get_manifest(tag).content
@ -164,30 +189,29 @@ class RegistryClient:
layers = bundle_manifest.get("layers", []) layers = bundle_manifest.get("layers", [])
blob_digest = _get_bundle_blob_digest(layers) blob_digest = _get_bundle_blob_digest(layers)
log.info(f"Found sigstore bundle blob digest: {blob_digest}")
if not blob_digest:
raise Exception("Not able to find sigstore bundle blob info")
bundle = self.get_blob(blob_digest) bundle = self.get_blob(blob_digest)
return tag_manifest_content, bundle.content return tag_manifest_content, bundle.content
def get_manifest_hash(image: str) -> str: def get_manifest_hash(image_str: str) -> str:
registry, org, package, tag = parse_image_location(image) image = parse_image_location(image_str)
client = RegistryClient(registry, org, package) return RegistryClient(image).get_manifest_hash(image.tag)
return client.get_manifest_hash(tag)
def list_tags(image: str) -> list: def list_tags(image_str: str) -> list:
registry, org, package, _ = parse_image_location(image) return RegistryClient(image_str).list_tags()
client = RegistryClient(registry, org, package)
return client.list_tags()
def get_manifest(image: str, tag: str) -> bytes: def get_manifest(image_str: str) -> bytes:
registry, org, package, _ = parse_image_location(image) image = parse_image_location(image_str)
client = RegistryClient(registry, org, package) client = RegistryClient(image)
resp = client.get_manifest(tag, extra_headers={"Accept": OCI_IMAGE_MANIFEST}) resp = client.get_manifest(image.tag, extra_headers={"Accept": OCI_IMAGE_MANIFEST})
return resp.content return resp.content
def get_attestation(image: str) -> Tuple[bytes, bytes]: def get_attestation(image_str: str) -> Tuple[bytes, bytes]:
registry, org, package, tag = parse_image_location(image) image = parse_image_location(image_str)
client = RegistryClient(registry, org, package) return RegistryClient(image).get_attestation(image.tag)
return client.get_attestation(tag)

View file

@ -8,13 +8,10 @@ from pathlib import Path
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from typing import Dict, List, Tuple from typing import Dict, List, Tuple
from .registry import RegistryClient
from .utils import write
try: try:
import platformdirs import platformdirs
except ImportError: except ImportError:
import appdirs as platformdirs import appdirs as platformdirs # type: ignore[no-redef]
def get_config_dir() -> Path: def get_config_dir() -> Path:
@ -67,7 +64,8 @@ def verify_signature(signature: dict, pubkey: str) -> bool:
signature_file.flush() signature_file.flush()
payload_bytes = b64decode(signature_bundle["Payload"]) payload_bytes = b64decode(signature_bundle["Payload"])
write(payload_file, payload_bytes) payload_file.write(payload_bytes)
payload_file.flush()
cmd = [ cmd = [
"cosign", "cosign",
@ -91,7 +89,7 @@ def get_runtime_name() -> str:
return "docker" return "docker"
def container_pull(image: str): def container_pull(image: str) -> bool:
# XXX - Move to container_utils.py # XXX - Move to container_utils.py
cmd = [get_runtime_name(), "pull", f"{image}"] cmd = [get_runtime_name(), "pull", f"{image}"]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE) process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
@ -99,7 +97,7 @@ def container_pull(image: str):
return process.returncode == 0 return process.returncode == 0
def new_image_release(): def new_image_release() -> bool:
# XXX - Implement # XXX - Implement
return True return True
@ -108,9 +106,9 @@ def upgrade_container_image(
image: str, image: str,
manifest_hash: str, manifest_hash: str,
pubkey: str, pubkey: str,
): ) -> bool:
if not new_image_release(): if not new_image_release():
return return False
# manifest_hash = registry.get_manifest_hash(tag) # manifest_hash = registry.get_manifest_hash(tag)
signatures = get_signatures(image, manifest_hash) signatures = get_signatures(image, manifest_hash)
@ -138,7 +136,7 @@ def get_file_hash(file: str) -> str:
return sha256(content).hexdigest() return sha256(content).hexdigest()
def load_signatures(image_hash, pubkey): def load_signatures(image_hash: str, pubkey: str) -> List[Dict]:
""" """
Load signatures from the local filesystem Load signatures from the local filesystem
@ -156,7 +154,7 @@ def load_signatures(image_hash, pubkey):
return json.load(f) return json.load(f)
def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str): def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str) -> None:
""" """
Store signatures locally in the SIGNATURE_PATH folder, like this: Store signatures locally in the SIGNATURE_PATH folder, like this:
@ -172,7 +170,7 @@ def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str):
the `signature_to_bundle()` function. the `signature_to_bundle()` function.
""" """
def _get_digest(sig): def _get_digest(sig: Dict) -> str:
payload = json.loads(b64decode(sig["Payload"])) payload = json.loads(b64decode(sig["Payload"]))
return payload["critical"]["image"]["docker-manifest-digest"] return payload["critical"]["image"]["docker-manifest-digest"]
@ -216,7 +214,7 @@ def load_image_hash(image: str) -> str:
return result.stdout.strip().decode().strip("sha256:") return result.stdout.strip().decode().strip("sha256:")
def get_signatures(image, hash) -> List[Dict]: def get_signatures(image: str, hash: str) -> List[Dict]:
""" """
Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format. Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format.
""" """

View file

@ -1,3 +0,0 @@
def write(file, content: bytes | str):
file.write(content)
file.flush()