Add a dangerzone-image prepare-archive command

This commit is contained in:
Alexis Métaireau 2025-02-04 12:38:26 +01:00
parent 4d27449351
commit c6f5e61e0b
No known key found for this signature in database
GPG key ID: C65C7A89A8FFC56E
8 changed files with 102 additions and 55 deletions

View file

@ -180,7 +180,6 @@ def get_image_id_by_digest(digest: str) -> str:
process = subprocess.run( process = subprocess.run(
cmd, startupinfo=get_subprocess_startupinfo(), check=True, capture_output=True cmd, startupinfo=get_subprocess_startupinfo(), check=True, capture_output=True
) )
breakpoint()
# In case we have multiple lines, we only want the first one. # In case we have multiple lines, we only want the first one.
return process.stdout.decode().strip().split("\n")[0] return process.stdout.decode().strip().split("\n")[0]

View file

@ -1,17 +1,17 @@
import subprocess import subprocess
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from . import utils from . import cosign
def verify_attestation( def verify(
manifest: bytes, attestation_bundle: bytes, image_tag: str, expected_repo: str manifest: bytes, attestation_bundle: bytes, image_tag: str, expected_repo: str
) -> bool: ) -> bool:
""" """
Look up the image attestation to see if the image has been built Look up the image attestation to see if the image has been built
on Github runners, and from a given repository. on Github runners, and from a given repository.
""" """
utils.ensure_cosign() cosign.ensure_installed()
# Put the value in files and verify with cosign # Put the value in files and verify with cosign
with ( with (

View file

@ -5,13 +5,7 @@ import logging
import click import click
from ..util import get_resource_path from ..util import get_resource_path
from . import errors, log, registry from . import attestations, errors, log, registry, signatures
from .attestations import verify_attestation
from .signatures import (
upgrade_container_image,
upgrade_container_image_airgapped,
verify_offline_image_signature,
)
DEFAULT_REPOSITORY = "freedomofpress/dangerzone" DEFAULT_REPOSITORY = "freedomofpress/dangerzone"
DEFAULT_IMAGE_NAME = "ghcr.io/freedomofpress/dangerzone" DEFAULT_IMAGE_NAME = "ghcr.io/freedomofpress/dangerzone"
@ -36,7 +30,7 @@ def upgrade(image: str, pubkey: str) -> None:
"""Upgrade the image to the latest signed version.""" """Upgrade the image to the latest signed version."""
manifest_hash = registry.get_manifest_hash(image) manifest_hash = registry.get_manifest_hash(image)
try: try:
is_upgraded = upgrade_container_image(image, manifest_hash, pubkey) is_upgraded = signatures.upgrade_container_image(image, manifest_hash, pubkey)
click.echo(f"✅ The local image {image} has been upgraded") click.echo(f"✅ The local image {image} has been upgraded")
except errors.ImageAlreadyUpToDate as e: except errors.ImageAlreadyUpToDate as e:
click.echo(f"{e}") click.echo(f"{e}")
@ -47,25 +41,34 @@ def upgrade(image: str, pubkey: str) -> None:
@click.argument("image_filename") @click.argument("image_filename")
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION) @click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
@click.option("--image-name", default=DEFAULT_IMAGE_NAME) @click.option("--image-name", default=DEFAULT_IMAGE_NAME)
def upgrade_airgapped(image_filename: str, pubkey: str, image_name: str) -> None: def load_archive(image_filename: str, pubkey: str, image_name: str) -> None:
"""Upgrade the image to the latest signed version.""" """Upgrade the local image to the one in the archive."""
try: try:
upgrade_container_image_airgapped(image_filename, pubkey, image_name) signatures.upgrade_container_image_airgapped(image_filename, pubkey, image_name)
click.echo(f"✅ Installed image {image_filename} on the system") click.echo(f"✅ Installed image {image_filename} on the system")
except errors.ImageAlreadyUpToDate as e: except errors.ImageAlreadyUpToDate as e:
click.echo(f"{e}") click.echo(f"{e}")
raise click.Abort() raise click.Abort()
@main.command()
@click.argument("image")
@click.option("--destination", default="dangerzone-airgapped.tar")
def prepare_archive(image: str, destination: str) -> None:
"""Prepare an archive to upgrade the dangerzone image on an airgapped environment."""
signatures.prepare_airgapped_archive(image, destination)
click.echo(f"✅ Archive {destination} created")
@main.command() @main.command()
@click.argument("image") @click.argument("image")
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION) @click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
def verify_offline(image: str, pubkey: str) -> None: def verify_local(image: str, pubkey: str) -> None:
""" """
Verify the local image signature against a public key and the stored signatures. Verify the local image signature against a public key and the stored signatures.
""" """
# XXX remove a potentiel :tag # XXX remove a potentiel :tag
if verify_offline_image_signature(image, pubkey): if signatures.verify_local_image(image, pubkey):
click.echo( click.echo(
( (
f"Verifying the local image:\n\n" f"Verifying the local image:\n\n"
@ -109,7 +112,7 @@ def attest_provenance(image: str, repository: str) -> None:
parsed = registry.parse_image_location(image) parsed = registry.parse_image_location(image)
manifest, bundle = registry.get_attestation(image) manifest, bundle = registry.get_attestation(image)
verified = verify_attestation(manifest, bundle, parsed.tag, repository) verified = attestations.verify(manifest, bundle, parsed.tag, repository)
if verified: if verified:
click.echo( click.echo(
f"🎉 The image available at `{parsed.full_name}` has been built by Github Runners from the `{repository}` repository" f"🎉 The image available at `{parsed.full_name}` has been built by Github Runners from the `{repository}` repository"

View file

@ -0,0 +1,32 @@
import subprocess
from . import errors, log
def ensure_installed() -> None:
try:
subprocess.run(["cosign", "version"], capture_output=True, check=True)
except subprocess.CalledProcessError:
raise errors.CosignNotInstalledError()
def verify_local_image(oci_image_folder: str, pubkey: str) -> bool:
"""Verify the given path against the given public key"""
ensure_installed()
cmd = [
"cosign",
"verify",
"--key",
pubkey,
"--offline",
"--local-image",
oci_image_folder,
]
log.debug(" ".join(cmd))
result = subprocess.run(cmd, capture_output=True)
if result.returncode == 0:
log.debug("Signature verified")
return True
log.debug("Failed to verify signature", result.stderr)
return False

View file

@ -14,6 +14,10 @@ class RegistryError(UpdaterError):
pass pass
class AirgappedImageDownloadError(UpdaterError):
pass
class NoRemoteSignatures(SignatureError): class NoRemoteSignatures(SignatureError):
pass pass

View file

@ -11,7 +11,7 @@ from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
from .. import container_utils as runtime from .. import container_utils as runtime
from . import errors, log, registry, utils from . import cosign, errors, log, registry
try: try:
import platformdirs import platformdirs
@ -55,34 +55,12 @@ def signature_to_bundle(sig: Dict) -> Dict:
} }
def cosign_verify_local_image(oci_image_folder: str, pubkey: str) -> bool:
"""Verify the given path against the given public key"""
utils.ensure_cosign()
cmd = [
"cosign",
"verify",
"--key",
pubkey,
"--offline",
"--local-image",
oci_image_folder,
]
log.debug(" ".join(cmd))
result = subprocess.run(cmd, capture_output=True)
if result.returncode == 0:
log.debug("Signature verified")
return True
log.debug("Failed to verify signature", result.stderr)
return False
def verify_signature(signature: dict, image_hash: str, pubkey: str) -> bool: def verify_signature(signature: dict, image_hash: str, pubkey: str) -> bool:
"""Verify a signature against a given public key""" """Verify a signature against a given public key"""
# XXX - Also verfy the identity/docker-reference field against the expected value # XXX - Also verfy the identity/docker-reference field against the expected value
# e.g. ghcr.io/freedomofpress/dangerzone/dangerzone # e.g. ghcr.io/freedomofpress/dangerzone/dangerzone
utils.ensure_cosign() cosign.ensure_installed()
signature_bundle = signature_to_bundle(signature) signature_bundle = signature_to_bundle(signature)
payload_bytes = b64decode(signature_bundle["Payload"]) payload_bytes = b64decode(signature_bundle["Payload"])
@ -188,7 +166,7 @@ def upgrade_container_image_airgapped(
# XXX Check if the contained signatures match the given ones? # XXX Check if the contained signatures match the given ones?
# Or maybe store both signatures? # Or maybe store both signatures?
if not cosign_verify_local_image(tmpdir, pubkey): if not cosign.verify_local_image(tmpdir, pubkey):
raise errors.SignatureVerificationError() raise errors.SignatureVerificationError()
# Remove the signatures from the archive. # Remove the signatures from the archive.
@ -321,7 +299,7 @@ def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str) -> No
json.dump(signatures, f) json.dump(signatures, f)
def verify_offline_image_signature(image: str, pubkey: str) -> bool: def verify_local_image(image: str, pubkey: str) -> bool:
""" """
Verifies that a local image has a valid signature Verifies that a local image has a valid signature
""" """
@ -341,7 +319,7 @@ def verify_offline_image_signature(image: str, pubkey: str) -> bool:
def get_remote_signatures(image: str, hash: str) -> List[Dict]: def get_remote_signatures(image: str, hash: str) -> List[Dict]:
"""Retrieve the signatures from the registry, via `cosign download`.""" """Retrieve the signatures from the registry, via `cosign download`."""
utils.ensure_cosign() cosign.ensure_installed()
process = subprocess.run( process = subprocess.run(
["cosign", "download", "signature", f"{image}@sha256:{hash}"], ["cosign", "download", "signature", f"{image}@sha256:{hash}"],
@ -356,3 +334,28 @@ def get_remote_signatures(image: str, hash: str) -> List[Dict]:
if len(signatures) < 1: if len(signatures) < 1:
raise errors.NoRemoteSignatures("No signatures found for the image") raise errors.NoRemoteSignatures("No signatures found for the image")
return signatures return signatures
def prepare_airgapped_archive(image_name, destination):
if "@sha256:" not in image_name:
raise errors.AirgappedImageDownloadError(
"The image name must include a digest, e.g. ghcr.io/freedomofpress/dangerzone/dangerzone@sha256:123456"
)
cosign.ensure_installed()
# Get the image from the registry
with TemporaryDirectory() as tmpdir:
msg = f"Downloading image {image_name}. \nIt might take a while."
log.info(msg)
process = subprocess.run(
["cosign", "save", image_name, "--dir", tmpdir],
capture_output=True,
check=True,
)
if process.returncode != 0:
raise errors.AirgappedImageDownloadError()
with tarfile.open(destination, "w") as archive:
archive.add(tmpdir, arcname=".")

View file

@ -1,10 +0,0 @@
import subprocess
from . import errors
def ensure_cosign() -> None:
try:
subprocess.run(["cosign", "version"], capture_output=True, check=True)
except subprocess.CalledProcessError:
raise errors.CosignNotInstalledError()

View file

@ -21,3 +21,19 @@ In case of sucess, it will report back:
``` ```
🎉 The image available at `ghcr.io/freedomofpress/dangerzone/dangerzone:latest` has been built by Github runners from the `freedomofpress/dangerzone` repository. 🎉 The image available at `ghcr.io/freedomofpress/dangerzone/dangerzone:latest` has been built by Github runners from the `freedomofpress/dangerzone` repository.
``` ```
## Container updates on air-gapped environments
In order to make updates on an air-gapped environment, you will need to prepare an archive for the air-gapped environment. This archive will contain all the needed material to validate that the new container image has been signed and is valid.
On the machine on which you prepare the packages:
```bash
dangerzone-image prepare-archive ghcr.io/almet/dangerzone/dangerzone@sha256:fa948726aac29a6ac49f01ec8fbbac18522b35b2491fdf716236a0b3502a2ca7
```
On the airgapped machine, copy the file and run the following command:
```bash
dangerzone-image load-archive
```