Use a share/image-name.txt file which defines what image name we should expect

This is especially useful when running the tests against some
non-production version of the image. Note that the signature for this
image will need to match the key at `share/freedomofpress-dangerzone-pub.key`
This commit is contained in:
Alexis Métaireau 2025-04-28 17:02:23 +02:00
parent d678568ace
commit e9b399baf5
No known key found for this signature in database
GPG key ID: C65C7A89A8FFC56E
4 changed files with 45 additions and 32 deletions

View file

@ -11,7 +11,6 @@ from .settings import Settings
from .util import get_resource_path, get_subprocess_startupinfo from .util import get_resource_path, get_subprocess_startupinfo
OLD_CONTAINER_NAME = "dangerzone.rocks/dangerzone" OLD_CONTAINER_NAME = "dangerzone.rocks/dangerzone"
CONTAINER_NAME = "ghcr.io/freedomofpress/dangerzone/dangerzone"
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -110,6 +109,7 @@ def list_image_tags() -> List[str]:
and which image ID does the "latest" tag point to. and which image ID does the "latest" tag point to.
""" """
runtime = Runtime() runtime = Runtime()
container_name = expected_image_name()
return ( return (
subprocess.check_output( subprocess.check_output(
[ [
@ -118,7 +118,7 @@ def list_image_tags() -> List[str]:
"list", "list",
"--format", "--format",
"{{ .Tag }}", "{{ .Tag }}",
CONTAINER_NAME, container_name,
], ],
text=True, text=True,
startupinfo=get_subprocess_startupinfo(), startupinfo=get_subprocess_startupinfo(),
@ -154,9 +154,10 @@ def delete_image_tag(tag: str) -> None:
) )
def load_image_tarball() -> None: def load_image_tarball(tarball_path: Optional[Path] = None) -> None:
runtime = Runtime() runtime = Runtime()
log.info("Installing Dangerzone container image...") log.info("Installing Dangerzone container image...")
if not tarball_path:
tarball_path = get_resource_path("container.tar") tarball_path = get_resource_path("container.tar")
try: try:
res = subprocess.run( res = subprocess.run(
@ -185,17 +186,18 @@ def load_image_tarball() -> None:
# `share/image-id.txt` and delete the incorrect tag. # `share/image-id.txt` and delete the incorrect tag.
# #
# [1] https://github.com/containers/podman/issues/16490 # [1] https://github.com/containers/podman/issues/16490
if runtime.name == "podman" and get_runtime_version(runtime) == (3, 4): # if runtime.name == "podman" and get_runtime_version(runtime) == (3, 4):
expected_tag = get_expected_tag() # FIXME image-id.txt has been removed, this needs to be adapted.
bad_tag = f"localhost/{expected_tag}:latest" # expected_tag = get_expected_tag()
good_tag = f"{CONTAINER_NAME}:{expected_tag}" # bad_tag = f"localhost/{expected_tag}:latest"
# good_tag = f"{CONTAINER_NAME}:{expected_tag}"
log.debug( # log.debug(
f"Dangerzone images loaded in Podman v3.4 usually have an invalid tag." # f"Dangerzone images loaded in Podman v3.4 usually have an invalid tag."
" Fixing it..." # " Fixing it..."
) # )
add_image_tag(bad_tag, good_tag) # add_image_tag(bad_tag, good_tag)
delete_image_tag(bad_tag) # delete_image_tag(bad_tag)
def tag_image_by_digest(digest: str, tag: str) -> None: def tag_image_by_digest(digest: str, tag: str) -> None:
@ -228,6 +230,11 @@ def get_image_id_by_digest(digest: str) -> str:
return process.stdout.decode().strip().split("\n")[0] return process.stdout.decode().strip().split("\n")[0]
def expected_image_name():
image_name_path = get_resource_path("image-name.txt")
return image_name_path.read_text().strip("\n")
def container_pull( def container_pull(
image: str, manifest_digest: str, callback: Optional[Callable] = None image: str, manifest_digest: str, callback: Optional[Callable] = None
): ):
@ -253,10 +260,11 @@ def container_pull(
) )
def get_local_image_digest(image: str) -> str: def get_local_image_digest(image: Optional[str] = None) -> str:
""" """
Returns a image hash from a local image name Returns a image hash from a local image name
""" """
image = image or expected_image_name()
# Get the image hash from the "podman images" command. # Get the image hash from the "podman images" command.
# It's not possible to use "podman inspect" here as it # It's not possible to use "podman inspect" here as it
# returns the digest of the architecture-bound image # returns the digest of the architecture-bound image

View file

@ -7,7 +7,7 @@ import sys
from typing import Callable, List, Optional, Tuple from typing import Callable, List, Optional, Tuple
from .. import container_utils, errors from .. import container_utils, errors
from ..container_utils import CONTAINER_NAME, Runtime from ..container_utils import Runtime
from ..document import Document from ..document import Document
from ..updater import ( from ..updater import (
DEFAULT_PUBKEY_LOCATION, DEFAULT_PUBKEY_LOCATION,
@ -127,24 +127,20 @@ class Container(IsolationProvider):
if not installed_tags: if not installed_tags:
install_local_container_tar() install_local_container_tar()
else: else:
container_name = container_utils.expected_image_name()
update_available, image_digest = is_update_available( update_available, image_digest = is_update_available(
CONTAINER_NAME, container_name,
DEFAULT_PUBKEY_LOCATION, DEFAULT_PUBKEY_LOCATION,
) )
if update_available and image_digest: if update_available and image_digest:
log.debug("Upgrading container image to %s", image_digest) log.debug("Upgrading container image to %s", image_digest)
upgrade_container_image( upgrade_container_image(image_digest, callback=callback)
CONTAINER_NAME,
image_digest,
DEFAULT_PUBKEY_LOCATION,
callback=callback,
)
else: else:
log.debug("No update available for the container.") log.debug("No update available for the container.")
if not installed_tags: if not installed_tags:
install_local_container_tar() install_local_container_tar()
try: try:
verify_local_image(CONTAINER_NAME) verify_local_image()
except UpdaterError: except UpdaterError:
# delete_image() # delete_image()
if last_try: if last_try:
@ -236,9 +232,11 @@ class Container(IsolationProvider):
name: str, name: str,
) -> subprocess.Popen: ) -> subprocess.Popen:
runtime = Runtime() runtime = Runtime()
container_name = container_utils.expected_image_name()
image_digest = container_utils.get_local_image_digest(CONTAINER_NAME) # FIXME: Image digest is also computed inside the verify_local_image
verify_local_image(CONTAINER_NAME) image_digest = container_utils.get_local_image_digest()
verify_local_image()
security_args = self.get_runtime_security_args() security_args = self.get_runtime_security_args()
debug_args = [] debug_args = []
if self.debug: if self.debug:
@ -247,7 +245,7 @@ class Container(IsolationProvider):
enable_stdin = ["-i"] enable_stdin = ["-i"]
set_name = ["--name", name] set_name = ["--name", name]
prevent_leakage_args = ["--rm"] prevent_leakage_args = ["--rm"]
image_name = [CONTAINER_NAME + "@sha256:" + image_digest] image_name = [container_name + "@sha256:" + image_digest]
args = ( args = (
["run"] ["run"]
+ security_args + security_args

View file

@ -287,7 +287,7 @@ def upgrade_container_image_airgapped(
archive.add(Path(tmpdir) / "oci-layout", arcname="oci-layout") archive.add(Path(tmpdir) / "oci-layout", arcname="oci-layout")
archive.add(Path(tmpdir) / "blobs", arcname="blobs") archive.add(Path(tmpdir) / "blobs", arcname="blobs")
runtime.load_image_tarball_from_tar(temporary_tar.name) runtime.load_image_tarball(temporary_tar.name)
runtime.tag_image_by_digest(image_digest, image_name) runtime.tag_image_by_digest(image_digest, image_name)
store_signatures(signatures, image_digest, pubkey) store_signatures(signatures, image_digest, pubkey)
@ -433,10 +433,13 @@ def store_signatures(
write_log_index(get_log_index_from_signatures(signatures)) write_log_index(get_log_index_from_signatures(signatures))
def verify_local_image(image: str, pubkey: str = DEFAULT_PUBKEY_LOCATION) -> bool: def verify_local_image(
image: Optional[str] = None, pubkey: str = DEFAULT_PUBKEY_LOCATION
) -> bool:
""" """
Verifies that a local image has a valid signature Verifies that a local image has a valid signature
""" """
image = image or runtime.expected_image_name()
log.info(f"Verifying local image {image} against pubkey {pubkey}") log.info(f"Verifying local image {image} against pubkey {pubkey}")
try: try:
image_digest = runtime.get_local_image_digest(image) image_digest = runtime.get_local_image_digest(image)
@ -495,9 +498,13 @@ def prepare_airgapped_archive(image_name: str, destination: str) -> None:
def upgrade_container_image( def upgrade_container_image(
image: str, manifest_digest: str, pubkey: str, callback: Optional[Callable] = None manifest_digest: str,
image: Optional[str] = None,
pubkey: Optional[str] = DEFAULT_PUBKEY_LOCATION,
callback: Optional[Callable] = None,
) -> str: ) -> str:
"""Verify and upgrade the image to the latest, if signed.""" """Verify and upgrade the image to the latest, if signed."""
image = image or runtime.expected_image_name()
update_available, remote_digest = registry.is_new_remote_image_available(image) update_available, remote_digest = registry.is_new_remote_image_available(image)
if not update_available: if not update_available:
raise errors.ImageAlreadyUpToDate("The image is already up to date") raise errors.ImageAlreadyUpToDate("The image is already up to date")

View file

@ -6,7 +6,7 @@ from pytest_mock import MockerFixture
from pytest_subprocess import FakeProcess from pytest_subprocess import FakeProcess
from dangerzone import errors from dangerzone import errors
from dangerzone.container_utils import CONTAINER_NAME, Runtime from dangerzone.container_utils import Runtime, expected_image_name
from dangerzone.isolation_provider.container import Container from dangerzone.isolation_provider.container import Container
from dangerzone.isolation_provider.qubes import is_qubes_native_conversion from dangerzone.isolation_provider.qubes import is_qubes_native_conversion
from dangerzone.updater import SignatureError, UpdaterError from dangerzone.updater import SignatureError, UpdaterError
@ -80,7 +80,7 @@ class TestContainer(IsolationProviderTest):
"list", "list",
"--format", "--format",
"{{ .Tag }}", "{{ .Tag }}",
CONTAINER_NAME, expected_image_name(),
], ],
occurrences=2, occurrences=2,
) )