Allow a different runtime on dangerzone-image commands.

This can be done with the newly added `--runtime` flag, which needs to
be passed to the first group, e.g:

```bash
dangerzone-cli --runtime docker COMMAND
```
This commit is contained in:
Alexis Métaireau 2025-03-04 10:09:27 +01:00
parent 46b88716d3
commit c39cd4ea47
No known key found for this signature in database
GPG key ID: C65C7A89A8FFC56E
3 changed files with 31 additions and 13 deletions

View file

@ -240,7 +240,6 @@ def container_pull(image: str, manifest_digest: str, callback: Callable):
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
text=True, text=True,
bufsize=1, bufsize=1,
universal_newlines=True,
) )
for line in process.stdout: # type: ignore for line in process.stdout: # type: ignore

View file

@ -1,9 +1,12 @@
#!/usr/bin/python #!/usr/bin/python
import functools
import logging import logging
import click import click
from .. import container_utils
from ..container_utils import get_runtime_name
from . import attestations, errors, log, registry, signatures from . import attestations, errors, log, registry, signatures
DEFAULT_REPOSITORY = "freedomofpress/dangerzone" DEFAULT_REPOSITORY = "freedomofpress/dangerzone"
@ -13,7 +16,8 @@ DEFAULT_IMAGE_NAME = "ghcr.io/freedomofpress/dangerzone/dangerzone"
@click.group() @click.group()
@click.option("--debug", is_flag=True) @click.option("--debug", is_flag=True)
def main(debug: bool) -> None: @click.option("--runtime", default=get_runtime_name())
def main(debug: bool, runtime: str) -> None:
if debug: if debug:
click.echo("Debug mode enabled") click.echo("Debug mode enabled")
level = logging.DEBUG level = logging.DEBUG
@ -21,6 +25,10 @@ def main(debug: bool) -> None:
level = logging.INFO level = logging.INFO
logging.basicConfig(level=level) logging.basicConfig(level=level)
if runtime != get_runtime_name():
click.echo(f"Using container runtime: {runtime}")
container_utils.RUNTIME_NAME = runtime
@main.command() @main.command()
@click.argument("image", default=DEFAULT_IMAGE_NAME) @click.argument("image", default=DEFAULT_IMAGE_NAME)
@ -28,8 +36,10 @@ def main(debug: bool) -> None:
def upgrade(image: str, pubkey: str) -> None: def upgrade(image: str, pubkey: str) -> None:
"""Upgrade the image to the latest signed version.""" """Upgrade the image to the latest signed version."""
manifest_digest = registry.get_manifest_digest(image) manifest_digest = registry.get_manifest_digest(image)
try: try:
signatures.upgrade_container_image(image, manifest_digest, pubkey) callback = functools.partial(click.echo, nl=False)
signatures.upgrade_container_image(image, manifest_digest, pubkey, callback)
click.echo(f"✅ The local image {image} has been upgraded") click.echo(f"✅ The local image {image} has been upgraded")
click.echo(f"✅ The image has been signed with {pubkey}") click.echo(f"✅ The image has been signed with {pubkey}")
click.echo(f"✅ Signatures has been verified and stored locally") click.echo(f"✅ Signatures has been verified and stored locally")
@ -56,17 +66,23 @@ def store_signatures(image: str, pubkey: str) -> None:
@main.command() @main.command()
@click.argument("image_filename") @click.argument("image_filename")
@click.option("--pubkey", default=signatures.DEFAULT_PUBKEY_LOCATION) @click.option("--pubkey", default=signatures.DEFAULT_PUBKEY_LOCATION)
def load_archive(image_filename: str, pubkey: str) -> None: @click.option("--force", is_flag=True)
def load_archive(image_filename: str, pubkey: str, force: bool) -> None:
"""Upgrade the local image to the one in the archive.""" """Upgrade the local image to the one in the archive."""
try: try:
loaded_image = signatures.upgrade_container_image_airgapped( loaded_image = signatures.upgrade_container_image_airgapped(
image_filename, pubkey image_filename, pubkey, bypass_logindex=force
) )
click.echo( click.echo(
f"✅ Installed image {image_filename} on the system as {loaded_image}" f"✅ Installed image {image_filename} on the system as {loaded_image}"
) )
except errors.ImageAlreadyUpToDate as e: except errors.ImageAlreadyUpToDate as e:
click.echo(f"{e}") click.echo(f"{e}")
except errors.InvalidLogIndex as e:
click.echo(f"❌ Trying to install image older that the currently installed one")
raise click.Abort()
except Exception as e:
click.echo(f"{e}")
raise click.Abort() raise click.Abort()

View file

@ -215,7 +215,9 @@ def _get_blob(tmpdir: str, digest: str) -> Path:
return Path(tmpdir) / "blobs" / "sha256" / digest.replace("sha256:", "") return Path(tmpdir) / "blobs" / "sha256" / digest.replace("sha256:", "")
def upgrade_container_image_airgapped(container_tar: str, pubkey: str) -> str: def upgrade_container_image_airgapped(
container_tar: str, pubkey: str, bypass_logindex: bool = False
) -> str:
""" """
Verify the given archive against its self-contained signatures, then Verify the given archive against its self-contained signatures, then
upgrade the image and retag it to the expected tag. upgrade the image and retag it to the expected tag.
@ -261,14 +263,15 @@ def upgrade_container_image_airgapped(container_tar: str, pubkey: str) -> str:
image_name, signatures = convert_oci_images_signatures(json.load(f), tmpdir) image_name, signatures = convert_oci_images_signatures(json.load(f), tmpdir)
log.info(f"Found image name: {image_name}") log.info(f"Found image name: {image_name}")
# Ensure that we only upgrade if the log index is higher than the last known one if not bypass_logindex:
incoming_log_index = get_log_index_from_signatures(signatures) # Ensure that we only upgrade if the log index is higher than the last known one
last_log_index = get_last_log_index() incoming_log_index = get_log_index_from_signatures(signatures)
last_log_index = get_last_log_index()
if incoming_log_index < last_log_index: if incoming_log_index < last_log_index:
raise errors.InvalidLogIndex( raise errors.InvalidLogIndex(
"The log index is not higher than the last known one" "The log index is not higher than the last known one"
) )
image_digest = index_json["manifests"][0].get("digest").replace("sha256:", "") image_digest = index_json["manifests"][0].get("digest").replace("sha256:", "")