Some more refactoring

This commit is contained in:
Alexis Métaireau 2025-01-29 19:14:40 +01:00
parent 5921289454
commit 2e7af4aebf
No known key found for this signature in database
GPG key ID: C65C7A89A8FFC56E
8 changed files with 134 additions and 57 deletions

View file

@ -15,11 +15,9 @@ log = logging.getLogger(__name__)
def get_runtime_name() -> str: def get_runtime_name() -> str:
if platform.system() == "Linux": if platform.system() == "Linux":
runtime_name = "podman" return "podman"
else: # Windows, Darwin, and unknown use docker for now, dangerzone-vm eventually
# Windows, Darwin, and unknown use docker for now, dangerzone-vm eventually return "docker"
runtime_name = "docker"
return runtime_name
def get_runtime_version() -> Tuple[int, int]: def get_runtime_version() -> Tuple[int, int]:
@ -147,3 +145,18 @@ def load_image_tarball() -> None:
) )
log.info("Successfully installed container image from") log.info("Successfully installed container image from")
def container_pull(image: str) -> bool:
"""Pull a container image from a registry."""
cmd = [get_runtime_name(), "pull", f"{image}"]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
process.communicate()
return process.returncode == 0
def load_image_hash(image: str) -> str:
"""Returns a image hash from a local image name"""
cmd = [get_runtime_name(), "image", "inspect", image, "-f", "{{.Digest}}"]
result = subprocess.run(cmd, capture_output=True, check=True)
return result.stdout.strip().decode().strip("sha256:")

View file

@ -1,4 +1,3 @@
import logging import logging
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
log.setLevel(logging.INFO)

View file

@ -1,40 +1,63 @@
#!/usr/bin/python #!/usr/bin/python
import logging
import click import click
from . import registry from ..util import get_resource_path
from . import errors, log, registry
from .attestations import verify_attestation from .attestations import verify_attestation
from .signatures import upgrade_container_image, verify_offline_image_signature from .signatures import upgrade_container_image, verify_offline_image_signature
DEFAULT_REPOSITORY = "freedomofpress/dangerzone" DEFAULT_REPOSITORY = "freedomofpress/dangerzone"
PUBKEY_DEFAULT_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key")
@click.group() @click.group()
def main() -> None: @click.option("--debug", is_flag=True)
pass def main(debug=False) -> None:
if debug:
click.echo("Debug mode enabled")
level = logging.DEBUG
else:
level = logging.INFO
logging.basicConfig(level=level)
@main.command() @main.command()
@click.option("--image") @click.option("--image")
@click.option("--pubkey", default="pub.key") @click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
@click.option("--airgap", is_flag=True) @click.option("--airgap", is_flag=True)
# XXX Add options to do airgap upgrade # XXX Add options to do airgap upgrade
def upgrade(image: str, pubkey: str) -> None: def upgrade(image: str, pubkey: str, airgap: bool) -> None:
"""Upgrade the image to the latest signed version."""
manifest_hash = registry.get_manifest_hash(image) manifest_hash = registry.get_manifest_hash(image)
if upgrade_container_image(image, manifest_hash, pubkey): try:
is_upgraded = upgrade_container_image(image, manifest_hash, pubkey)
click.echo(f"✅ The local image {image} has been upgraded") click.echo(f"✅ The local image {image} has been upgraded")
except errors.ImageAlreadyUpToDate as e:
click.echo(f"{e}")
raise click.Abort()
@main.command() @main.command()
@click.argument("image") @click.argument("image")
@click.option("--pubkey", default="pub.key") @click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
def verify_local(image: str, pubkey: str) -> None: def verify_offline(image: str, pubkey: str) -> None:
""" """
Verify the local image signature against a public key and the stored signatures. Verify the local image signature against a public key and the stored signatures.
""" """
# XXX remove a potentiel :tag # XXX remove a potentiel :tag
if verify_offline_image_signature(image, pubkey): if verify_offline_image_signature(image, pubkey):
click.echo(f"✅ The local image {image} has been signed with {pubkey}") click.echo(
(
f"Verifying the local image:\n\n"
f"pubkey: {pubkey}\n"
f"image: {image}\n\n"
f"✅ The local image {image} has been signed with {pubkey}"
)
)
@main.command() @main.command()

View file

@ -0,0 +1,38 @@
class UpdaterError(Exception):
pass
class ImageAlreadyUpToDate(UpdaterError):
pass
class SignatureError(UpdaterError):
pass
class RegistryError(UpdaterError):
pass
class NoRemoteSignatures(SignatureError):
pass
class SignatureVerificationError(SignatureError):
pass
class SignaturesFolderDoesNotExist(SignatureError):
pass
class InvalidSignatures(SignatureError):
pass
class SignatureMismatch(SignatureError):
pass
class LocalSignatureNotFound(SignatureError):
pass

View file

@ -5,7 +5,7 @@ from typing import Dict, Optional, Tuple
import requests import requests
from . import log from . import errors, log
__all__ = [ __all__ = [
"get_manifest_hash", "get_manifest_hash",
@ -178,7 +178,7 @@ class RegistryClient:
_find_sigstore_bundle_manifest(manifests) _find_sigstore_bundle_manifest(manifests)
) )
if not bundle_manifest_digest: if not bundle_manifest_digest:
raise Exception("Not able to find sigstore bundle manifest info") raise errors.RegistryError("Not able to find sigstore bundle manifest info")
bundle_manifest = self.get_manifest( bundle_manifest = self.get_manifest(
bundle_manifest_digest, extra_headers={"Accept": bundle_manifest_mediatype} bundle_manifest_digest, extra_headers={"Accept": bundle_manifest_mediatype}
@ -191,7 +191,7 @@ class RegistryClient:
blob_digest = _get_bundle_blob_digest(layers) blob_digest = _get_bundle_blob_digest(layers)
log.info(f"Found sigstore bundle blob digest: {blob_digest}") log.info(f"Found sigstore bundle blob digest: {blob_digest}")
if not blob_digest: if not blob_digest:
raise Exception("Not able to find sigstore bundle blob info") raise errors.RegistryError("Not able to find sigstore bundle blob info")
bundle = self.get_blob(blob_digest) bundle = self.get_blob(blob_digest)
return tag_manifest_content, bundle.content return tag_manifest_content, bundle.content

View file

@ -8,6 +8,10 @@ from pathlib import Path
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from typing import Dict, List, Tuple from typing import Dict, List, Tuple
from ..container_utils import container_pull, load_image_hash
from . import errors, log
from .registry import get_manifest_hash
try: try:
import platformdirs import platformdirs
except ImportError: except ImportError:
@ -24,10 +28,18 @@ __all__ = [
"verify_signature", "verify_signature",
"load_signatures", "load_signatures",
"store_signatures", "store_signatures",
"verify_local_image_signature", "verify_offline_image_signature",
] ]
def is_cosign_installed() -> bool:
try:
subprocess.run(["cosign", "version"], capture_output=True, check=True)
return True
except subprocess.CalledProcessError:
return False
def signature_to_bundle(sig: Dict) -> Dict: def signature_to_bundle(sig: Dict) -> Dict:
"""Convert a cosign-download signature to the format expected by cosign bundle.""" """Convert a cosign-download signature to the format expected by cosign bundle."""
bundle = sig["Bundle"] bundle = sig["Bundle"]
@ -55,7 +67,6 @@ def verify_signature(signature: dict, pubkey: str) -> bool:
signature_bundle = signature_to_bundle(signature) signature_bundle = signature_to_bundle(signature)
# Put the value in files and verify with cosign
with ( with (
NamedTemporaryFile(mode="w") as signature_file, NamedTemporaryFile(mode="w") as signature_file,
NamedTemporaryFile(mode="bw") as payload_file, NamedTemporaryFile(mode="bw") as payload_file,
@ -76,30 +87,24 @@ def verify_signature(signature: dict, pubkey: str) -> bool:
signature_file.name, signature_file.name,
payload_file.name, payload_file.name,
] ]
log.debug(" ".join(cmd))
result = subprocess.run(cmd, capture_output=True) result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0: if result.returncode != 0:
# XXX Raise instead? # XXX Raise instead?
log.debug("Failed to verify signature", result.stderr)
return False return False
return result.stderr == b"Verified OK\n" if result.stderr == b"Verified OK\n":
log.debug("Signature verified")
return True
return False
def get_runtime_name() -> str: def new_image_release(image) -> bool:
if platform.system() == "Linux": remote_hash = get_manifest_hash(image)
return "podman" local_hash = load_image_hash(image)
return "docker" log.debug("Remote hash: %s", remote_hash)
log.debug("Local hash: %s", local_hash)
return remote_hash != local_hash
def container_pull(image: str) -> bool:
# XXX - Move to container_utils.py
cmd = [get_runtime_name(), "pull", f"{image}"]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
process.communicate()
return process.returncode == 0
def new_image_release() -> bool:
# XXX - Implement
return True
def upgrade_container_image( def upgrade_container_image(
@ -107,19 +112,20 @@ def upgrade_container_image(
manifest_hash: str, manifest_hash: str,
pubkey: str, pubkey: str,
) -> bool: ) -> bool:
if not new_image_release(): if not new_image_release(image):
raise errors.ImageAlreadyUpToDate("The image is already up to date")
return False return False
# manifest_hash = registry.get_manifest_hash(tag)
signatures = get_signatures(image, manifest_hash) signatures = get_signatures(image, manifest_hash)
log.debug("Signatures: %s", signatures)
if len(signatures) < 1: if len(signatures) < 1:
raise Exception("Unable to retrieve signatures") raise errors.NoRemoteSignatures("No remote signatures found")
for signature in signatures: for signature in signatures:
signature_is_valid = verify_signature(signature, pubkey) signature_is_valid = verify_signature(signature, pubkey)
if not signature_is_valid: if not signature_is_valid:
raise Exception("Unable to verify signature") raise errors.SignatureVerificationError()
# At this point, the signatures are verified # At this point, the signatures are verified
# We store the signatures just now to avoid storing unverified signatures # We store the signatures just now to avoid storing unverified signatures
@ -148,9 +154,10 @@ def load_signatures(image_hash: str, pubkey: str) -> List[Dict]:
f"Cannot find a '{pubkey_signatures}' folder." f"Cannot find a '{pubkey_signatures}' folder."
"You might need to download the image signatures first." "You might need to download the image signatures first."
) )
raise Exception(msg) raise errors.SignaturesFolderDoesNotExist(msg)
with open(pubkey_signatures / f"{image_hash}.json") as f: with open(pubkey_signatures / f"{image_hash}.json") as f:
log.debug("Loading signatures from %s", f.name)
return json.load(f) return json.load(f)
@ -177,15 +184,18 @@ def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str) -> No
# All the signatures should share the same hash. # All the signatures should share the same hash.
hashes = list(map(_get_digest, signatures)) hashes = list(map(_get_digest, signatures))
if len(set(hashes)) != 1: if len(set(hashes)) != 1:
raise Exception("Signatures do not share the same image hash") raise errors.InvalidSignatures("Signatures do not share the same image hash")
if f"sha256:{image_hash}" != hashes[0]: if f"sha256:{image_hash}" != hashes[0]:
raise Exception("Signatures do not match the given image hash") raise errors.SignatureMismatch("Signatures do not match the given image hash")
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey) pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
pubkey_signatures.mkdir(exist_ok=True) pubkey_signatures.mkdir(exist_ok=True)
with open(pubkey_signatures / f"{image_hash}.json", "w") as f: with open(pubkey_signatures / f"{image_hash}.json", "w") as f:
log.debug(
f"Storing signatures for {image_hash} in {pubkey_signatures}/{image_hash}.json"
)
json.dump(signatures, f) json.dump(signatures, f)
@ -193,27 +203,20 @@ def verify_offline_image_signature(image: str, pubkey: str) -> bool:
""" """
Verifies that a local image has a valid signature Verifies that a local image has a valid signature
""" """
log.info(f"Verifying local image {image} against pubkey {pubkey}")
image_hash = load_image_hash(image) image_hash = load_image_hash(image)
log.debug(f"Image hash: {image_hash}")
signatures = load_signatures(image_hash, pubkey) signatures = load_signatures(image_hash, pubkey)
if len(signatures) < 1: if len(signatures) < 1:
raise Exception("No signatures found") raise errors.LocalSignatureNotFound("No signatures found")
for signature in signatures: for signature in signatures:
if not verify_signature(signature, pubkey): if not verify_signature(signature, pubkey):
msg = f"Unable to verify signature for {image} with pubkey {pubkey}" msg = f"Unable to verify signature for {image} with pubkey {pubkey}"
raise Exception(msg) raise errors.SignatureVerificationError(msg)
return True return True
def load_image_hash(image: str) -> str:
"""
Returns a image hash from a local image name
"""
cmd = [get_runtime_name(), "image", "inspect", image, "-f", "{{.Digest}}"]
result = subprocess.run(cmd, capture_output=True, check=True)
return result.stdout.strip().decode().strip("sha256:")
def get_signatures(image: str, hash: str) -> List[Dict]: def get_signatures(image: str, hash: str) -> List[Dict]:
""" """
Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format. Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format.

View file

@ -8,7 +8,7 @@ import unicodedata
try: try:
import platformdirs import platformdirs
except ImportError: except ImportError:
import appdirs as platformdirs import appdirs as platformdirs # type: ignore[no-redef]
def get_config_dir() -> str: def get_config_dir() -> str:

View file

@ -0,0 +1 @@
dangerzone