Some more refactoring

This commit is contained in:
Alexis Métaireau 2025-01-29 19:14:40 +01:00
parent fd1db717b7
commit 7991a5cb9c
No known key found for this signature in database
GPG key ID: C65C7A89A8FFC56E
8 changed files with 134 additions and 57 deletions

View file

@ -15,11 +15,9 @@ log = logging.getLogger(__name__)
def get_runtime_name() -> str:
if platform.system() == "Linux":
runtime_name = "podman"
else:
# Windows, Darwin, and unknown use docker for now, dangerzone-vm eventually
runtime_name = "docker"
return runtime_name
return "podman"
# Windows, Darwin, and unknown use docker for now, dangerzone-vm eventually
return "docker"
def get_runtime_version() -> Tuple[int, int]:
@ -147,3 +145,18 @@ def load_image_tarball() -> None:
)
log.info("Successfully installed container image from")
def container_pull(image: str) -> bool:
"""Pull a container image from a registry."""
cmd = [get_runtime_name(), "pull", f"{image}"]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
process.communicate()
return process.returncode == 0
def load_image_hash(image: str) -> str:
"""Returns a image hash from a local image name"""
cmd = [get_runtime_name(), "image", "inspect", image, "-f", "{{.Digest}}"]
result = subprocess.run(cmd, capture_output=True, check=True)
return result.stdout.strip().decode().strip("sha256:")

View file

@ -1,4 +1,3 @@
import logging
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)

View file

@ -1,40 +1,63 @@
#!/usr/bin/python
import logging
import click
from . import registry
from ..util import get_resource_path
from . import errors, log, registry
from .attestations import verify_attestation
from .signatures import upgrade_container_image, verify_offline_image_signature
DEFAULT_REPOSITORY = "freedomofpress/dangerzone"
PUBKEY_DEFAULT_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key")
@click.group()
def main() -> None:
pass
@click.option("--debug", is_flag=True)
def main(debug=False) -> None:
if debug:
click.echo("Debug mode enabled")
level = logging.DEBUG
else:
level = logging.INFO
logging.basicConfig(level=level)
@main.command()
@click.option("--image")
@click.option("--pubkey", default="pub.key")
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
@click.option("--airgap", is_flag=True)
# XXX Add options to do airgap upgrade
def upgrade(image: str, pubkey: str) -> None:
def upgrade(image: str, pubkey: str, airgap: bool) -> None:
"""Upgrade the image to the latest signed version."""
manifest_hash = registry.get_manifest_hash(image)
if upgrade_container_image(image, manifest_hash, pubkey):
try:
is_upgraded = upgrade_container_image(image, manifest_hash, pubkey)
click.echo(f"✅ The local image {image} has been upgraded")
except errors.ImageAlreadyUpToDate as e:
click.echo(f"{e}")
raise click.Abort()
@main.command()
@click.argument("image")
@click.option("--pubkey", default="pub.key")
def verify_local(image: str, pubkey: str) -> None:
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
def verify_offline(image: str, pubkey: str) -> None:
"""
Verify the local image signature against a public key and the stored signatures.
"""
# XXX remove a potentiel :tag
if verify_offline_image_signature(image, pubkey):
click.echo(f"✅ The local image {image} has been signed with {pubkey}")
click.echo(
(
f"Verifying the local image:\n\n"
f"pubkey: {pubkey}\n"
f"image: {image}\n\n"
f"✅ The local image {image} has been signed with {pubkey}"
)
)
@main.command()

View file

@ -0,0 +1,38 @@
class UpdaterError(Exception):
pass
class ImageAlreadyUpToDate(UpdaterError):
pass
class SignatureError(UpdaterError):
pass
class RegistryError(UpdaterError):
pass
class NoRemoteSignatures(SignatureError):
pass
class SignatureVerificationError(SignatureError):
pass
class SignaturesFolderDoesNotExist(SignatureError):
pass
class InvalidSignatures(SignatureError):
pass
class SignatureMismatch(SignatureError):
pass
class LocalSignatureNotFound(SignatureError):
pass

View file

@ -5,7 +5,7 @@ from typing import Dict, Optional, Tuple
import requests
from . import log
from . import errors, log
__all__ = [
"get_manifest_hash",
@ -178,7 +178,7 @@ class RegistryClient:
_find_sigstore_bundle_manifest(manifests)
)
if not bundle_manifest_digest:
raise Exception("Not able to find sigstore bundle manifest info")
raise errors.RegistryError("Not able to find sigstore bundle manifest info")
bundle_manifest = self.get_manifest(
bundle_manifest_digest, extra_headers={"Accept": bundle_manifest_mediatype}
@ -191,7 +191,7 @@ class RegistryClient:
blob_digest = _get_bundle_blob_digest(layers)
log.info(f"Found sigstore bundle blob digest: {blob_digest}")
if not blob_digest:
raise Exception("Not able to find sigstore bundle blob info")
raise errors.RegistryError("Not able to find sigstore bundle blob info")
bundle = self.get_blob(blob_digest)
return tag_manifest_content, bundle.content

View file

@ -8,6 +8,10 @@ from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Dict, List, Tuple
from ..container_utils import container_pull, load_image_hash
from . import errors, log
from .registry import get_manifest_hash
try:
import platformdirs
except ImportError:
@ -24,10 +28,18 @@ __all__ = [
"verify_signature",
"load_signatures",
"store_signatures",
"verify_local_image_signature",
"verify_offline_image_signature",
]
def is_cosign_installed() -> bool:
try:
subprocess.run(["cosign", "version"], capture_output=True, check=True)
return True
except subprocess.CalledProcessError:
return False
def signature_to_bundle(sig: Dict) -> Dict:
"""Convert a cosign-download signature to the format expected by cosign bundle."""
bundle = sig["Bundle"]
@ -55,7 +67,6 @@ def verify_signature(signature: dict, pubkey: str) -> bool:
signature_bundle = signature_to_bundle(signature)
# Put the value in files and verify with cosign
with (
NamedTemporaryFile(mode="w") as signature_file,
NamedTemporaryFile(mode="bw") as payload_file,
@ -76,30 +87,24 @@ def verify_signature(signature: dict, pubkey: str) -> bool:
signature_file.name,
payload_file.name,
]
log.debug(" ".join(cmd))
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
# XXX Raise instead?
log.debug("Failed to verify signature", result.stderr)
return False
return result.stderr == b"Verified OK\n"
if result.stderr == b"Verified OK\n":
log.debug("Signature verified")
return True
return False
def get_runtime_name() -> str:
if platform.system() == "Linux":
return "podman"
return "docker"
def container_pull(image: str) -> bool:
# XXX - Move to container_utils.py
cmd = [get_runtime_name(), "pull", f"{image}"]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
process.communicate()
return process.returncode == 0
def new_image_release() -> bool:
# XXX - Implement
return True
def new_image_release(image) -> bool:
remote_hash = get_manifest_hash(image)
local_hash = load_image_hash(image)
log.debug("Remote hash: %s", remote_hash)
log.debug("Local hash: %s", local_hash)
return remote_hash != local_hash
def upgrade_container_image(
@ -107,19 +112,20 @@ def upgrade_container_image(
manifest_hash: str,
pubkey: str,
) -> bool:
if not new_image_release():
if not new_image_release(image):
raise errors.ImageAlreadyUpToDate("The image is already up to date")
return False
# manifest_hash = registry.get_manifest_hash(tag)
signatures = get_signatures(image, manifest_hash)
log.debug("Signatures: %s", signatures)
if len(signatures) < 1:
raise Exception("Unable to retrieve signatures")
raise errors.NoRemoteSignatures("No remote signatures found")
for signature in signatures:
signature_is_valid = verify_signature(signature, pubkey)
if not signature_is_valid:
raise Exception("Unable to verify signature")
raise errors.SignatureVerificationError()
# At this point, the signatures are verified
# We store the signatures just now to avoid storing unverified signatures
@ -148,9 +154,10 @@ def load_signatures(image_hash: str, pubkey: str) -> List[Dict]:
f"Cannot find a '{pubkey_signatures}' folder."
"You might need to download the image signatures first."
)
raise Exception(msg)
raise errors.SignaturesFolderDoesNotExist(msg)
with open(pubkey_signatures / f"{image_hash}.json") as f:
log.debug("Loading signatures from %s", f.name)
return json.load(f)
@ -177,15 +184,18 @@ def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str) -> No
# All the signatures should share the same hash.
hashes = list(map(_get_digest, signatures))
if len(set(hashes)) != 1:
raise Exception("Signatures do not share the same image hash")
raise errors.InvalidSignatures("Signatures do not share the same image hash")
if f"sha256:{image_hash}" != hashes[0]:
raise Exception("Signatures do not match the given image hash")
raise errors.SignatureMismatch("Signatures do not match the given image hash")
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
pubkey_signatures.mkdir(exist_ok=True)
with open(pubkey_signatures / f"{image_hash}.json", "w") as f:
log.debug(
f"Storing signatures for {image_hash} in {pubkey_signatures}/{image_hash}.json"
)
json.dump(signatures, f)
@ -193,27 +203,20 @@ def verify_offline_image_signature(image: str, pubkey: str) -> bool:
"""
Verifies that a local image has a valid signature
"""
log.info(f"Verifying local image {image} against pubkey {pubkey}")
image_hash = load_image_hash(image)
log.debug(f"Image hash: {image_hash}")
signatures = load_signatures(image_hash, pubkey)
if len(signatures) < 1:
raise Exception("No signatures found")
raise errors.LocalSignatureNotFound("No signatures found")
for signature in signatures:
if not verify_signature(signature, pubkey):
msg = f"Unable to verify signature for {image} with pubkey {pubkey}"
raise Exception(msg)
raise errors.SignatureVerificationError(msg)
return True
def load_image_hash(image: str) -> str:
"""
Returns a image hash from a local image name
"""
cmd = [get_runtime_name(), "image", "inspect", image, "-f", "{{.Digest}}"]
result = subprocess.run(cmd, capture_output=True, check=True)
return result.stdout.strip().decode().strip("sha256:")
def get_signatures(image: str, hash: str) -> List[Dict]:
"""
Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format.

View file

@ -8,7 +8,7 @@ import unicodedata
try:
import platformdirs
except ImportError:
import appdirs as platformdirs
import appdirs as platformdirs # type: ignore[no-redef]
def get_config_dir() -> str:

View file

@ -0,0 +1 @@
dangerzone