Compare commits

...

5 commits

Author SHA1 Message Date
77dc106eba
Merge 545e2156b5 into 83be5fb151 2025-04-17 15:27:19 +00:00
Alexis Métaireau
545e2156b5
Skip container signature verification during the tests
This is not required, and skipping them allows to make the whole
test-suite run faster.
2025-04-17 17:26:11 +02:00
Alexis Métaireau
77be24858c
Provide a simple function to install the shipped tarball.
It leaves in `dangerzone.updater.install_local_container_tar()`
2025-04-17 17:23:31 +02:00
Alexis Métaireau
b61a65db01
dangerzone.updater exposes a few funtions, constants and exceptions
This is done to avoid looking at the internal logic of
`dangerzone.updater`. Only the features that actually are part of
the exposed API are exposed, and do not require deep knowledge of the
updater's logic to be used.
2025-04-17 17:19:04 +02:00
Alexis Métaireau
11adba8ab7
Update container installation logic to allow in-place updates
The isolation provider `install()` method is now passed a
`should_upgrade` argument, which is read from the settings and
represents the user decision about updates.

The tests have been updated to reflect these changes.
2025-04-17 17:16:10 +02:00
11 changed files with 164 additions and 81 deletions

View file

@ -71,8 +71,8 @@ def cli_main(
) -> None: ) -> None:
setup_logging() setup_logging()
display_banner() display_banner()
settings = Settings()
if set_container_runtime: if set_container_runtime:
settings = Settings()
if set_container_runtime == "default": if set_container_runtime == "default":
settings.unset_custom_runtime() settings.unset_custom_runtime()
click.echo( click.echo(
@ -117,7 +117,8 @@ def cli_main(
sys.exit(1) sys.exit(1)
# Ensure container is installed # Ensure container is installed
dangerzone.isolation_provider.install() should_upgrade = bool(settings.get("updater_check_all"))
dangerzone.isolation_provider.install(should_upgrade)
# Convert the document # Convert the document
print_header("Converting document to safe PDF") print_header("Converting document to safe PDF")

View file

@ -450,9 +450,9 @@ class InstallContainerThread(QtCore.QThread):
def run(self) -> None: def run(self) -> None:
error = None error = None
try: try:
should_upgrade = self.dangerzone.settings.get("updater_check_all") should_upgrade = bool(self.dangerzone.settings.get("updater_check_all"))
installed = self.dangerzone.isolation_provider.install( installed = self.dangerzone.isolation_provider.install(
should_upgrade=bool(should_upgrade), callback=self.process_stdout.emit should_upgrade=should_upgrade, callback=self.process_stdout.emit
) )
except Exception as e: except Exception as e:
log.error("Container installation problem") log.error("Container installation problem")

View file

@ -3,11 +3,20 @@ import os
import platform import platform
import shlex import shlex
import subprocess import subprocess
from typing import Callable, List, Tuple import sys
from typing import Callable, List, Optional, Tuple
from .. import container_utils, errors, updater from .. import container_utils, errors
from ..container_utils import Runtime from ..container_utils import CONTAINER_NAME, Runtime
from ..document import Document from ..document import Document
from ..updater import (
DEFAULT_PUBKEY_LOCATION,
UpdaterError,
install_local_container_tar,
is_update_available,
upgrade_container_image,
verify_local_image,
)
from ..util import get_resource_path, get_subprocess_startupinfo from ..util import get_resource_path, get_subprocess_startupinfo
from .base import IsolationProvider, terminate_process_group from .base import IsolationProvider, terminate_process_group
@ -95,31 +104,49 @@ class Container(IsolationProvider):
@staticmethod @staticmethod
def install( def install(
should_upgrade: bool, callback: Callable, last_try: bool = False should_upgrade: bool,
callback: Optional[Callable] = sys.stdout.write,
last_try: bool = False,
) -> bool: ) -> bool:
"""Check if an update is available and install it if necessary.""" """
Install a (local or remote) container image.
Use the local `container.tar` image if:
- No image is currently installed and `should_upgrade` is set to False
- No image is currently installed and no upgrades are available
Upgrade to the last remote container image if:
- An upgrade is available and `should_upgrade` is set to True
"""
installed_tags = container_utils.list_image_tags()
if not should_upgrade: if not should_upgrade:
log.debug("Skipping container upgrade check as requested by the settings") log.debug("Skipping container upgrade check as requested by the settings")
if not installed_tags:
install_local_container_tar()
else: else:
update_available, image_digest = updater.is_update_available( update_available, image_digest = is_update_available(
container_utils.CONTAINER_NAME, CONTAINER_NAME,
updater.DEFAULT_PUBKEY_LOCATION, DEFAULT_PUBKEY_LOCATION,
) )
if update_available and image_digest: if update_available and image_digest:
log.debug("Upgrading container image to %s", image_digest) log.debug("Upgrading container image to %s", image_digest)
updater.upgrade_container_image( upgrade_container_image(
container_utils.CONTAINER_NAME, CONTAINER_NAME,
image_digest, image_digest,
updater.DEFAULT_PUBKEY_LOCATION, DEFAULT_PUBKEY_LOCATION,
callback=callback, callback=callback,
) )
else: else:
log.debug("No update available for the container") log.debug("No update available for the container.")
if not installed_tags:
install_local_container_tar()
try: try:
updater.verify_local_image( verify_local_image(CONTAINER_NAME)
container_utils.CONTAINER_NAME, updater.DEFAULT_PUBKEY_LOCATION except UpdaterError:
) # delete_image()
except errors.ImageNotPresentException:
if last_try: if last_try:
raise raise
log.debug("Container image not found, trying to install it.") log.debug("Container image not found, trying to install it.")
@ -210,13 +237,8 @@ class Container(IsolationProvider):
) -> subprocess.Popen: ) -> subprocess.Popen:
runtime = Runtime() runtime = Runtime()
image_digest = container_utils.get_local_image_digest( image_digest = container_utils.get_local_image_digest(CONTAINER_NAME)
container_utils.CONTAINER_NAME verify_local_image(CONTAINER_NAME)
)
updater.verify_local_image(
container_utils.CONTAINER_NAME,
updater.DEFAULT_PUBKEY_LOCATION,
)
security_args = self.get_runtime_security_args() security_args = self.get_runtime_security_args()
debug_args = [] debug_args = []
if self.debug: if self.debug:
@ -225,7 +247,7 @@ class Container(IsolationProvider):
enable_stdin = ["-i"] enable_stdin = ["-i"]
set_name = ["--name", name] set_name = ["--name", name]
prevent_leakage_args = ["--rm"] prevent_leakage_args = ["--rm"]
image_name = [container_utils.CONTAINER_NAME + "@sha256:" + image_digest] image_name = [CONTAINER_NAME + "@sha256:" + image_digest]
args = ( args = (
["run"] ["run"]
+ security_args + security_args

View file

@ -36,7 +36,7 @@ class Dummy(IsolationProvider):
) )
super().__init__() super().__init__()
def install(self) -> bool: def install(self, *args, **kwargs) -> bool:
return True return True
@staticmethod @staticmethod

View file

@ -18,7 +18,7 @@ log = logging.getLogger(__name__)
class Qubes(IsolationProvider): class Qubes(IsolationProvider):
"""Uses a disposable qube for performing the conversion""" """Uses a disposable qube for performing the conversion"""
def install(self) -> bool: def install(self, *args, **kwargs) -> bool:
return True return True
@staticmethod @staticmethod

View file

@ -1,3 +1,12 @@
import logging import logging
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
from .errors import SignatureError, UpdaterError
from .signatures import (
DEFAULT_PUBKEY_LOCATION,
install_local_container_tar,
is_update_available,
upgrade_container_image,
verify_local_image,
)

View file

@ -423,7 +423,7 @@ def store_signatures(
write_log_index(get_log_index_from_signatures(signatures)) write_log_index(get_log_index_from_signatures(signatures))
def verify_local_image(image: str, pubkey: str) -> bool: def verify_local_image(image: str, pubkey: str = DEFAULT_PUBKEY_LOCATION) -> bool:
""" """
Verifies that a local image has a valid signature Verifies that a local image has a valid signature
""" """
@ -498,3 +498,11 @@ def upgrade_container_image(
# Store the signatures just now to avoid storing them unverified # Store the signatures just now to avoid storing them unverified
store_signatures(signatures, manifest_digest, pubkey) store_signatures(signatures, manifest_digest, pubkey)
return manifest_digest return manifest_digest
def install_local_container_tar(
pubkey: Optional[str] = DEFAULT_PUBKEY_LOCATION,
) -> None:
tarball_path = get_resource_path("container.tar")
log.debug("Installing container image %s", tarball_path)
upgrade_container_image_airgapped(tarball_path, pubkey)

View file

@ -9,11 +9,11 @@ import pytest
from dangerzone.document import SAFE_EXTENSION from dangerzone.document import SAFE_EXTENSION
from dangerzone.gui import Application from dangerzone.gui import Application
from dangerzone.isolation_provider import container
sys.dangerzone_dev = True # type: ignore[attr-defined] sys.dangerzone_dev = True # type: ignore[attr-defined]
# Use this fixture to make `pytest-qt` invoke our custom QApplication. # Use this fixture to make `pytest-qt` invoke our custom QApplication.
# See https://pytest-qt.readthedocs.io/en/latest/qapplication.html#testing-custom-qapplications # See https://pytest-qt.readthedocs.io/en/latest/qapplication.html#testing-custom-qapplications
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
@ -112,6 +112,14 @@ def sample_pdf() -> str:
return str(test_docs_dir.joinpath(BASIC_SAMPLE_PDF)) return str(test_docs_dir.joinpath(BASIC_SAMPLE_PDF))
@pytest.fixture
def skip_image_verification(monkeypatch):
def noop(*args, **kwargs):
return True
monkeypatch.setattr(container, "verify_local_image", noop)
SAMPLE_DIRECTORY = "test_docs" SAMPLE_DIRECTORY = "test_docs"
BASIC_SAMPLE_PDF = "sample-pdf.pdf" BASIC_SAMPLE_PDF = "sample-pdf.pdf"
BASIC_SAMPLE_DOC = "sample-doc.doc" BASIC_SAMPLE_DOC = "sample-doc.doc"
@ -134,7 +142,6 @@ for_each_doc = pytest.mark.parametrize(
) )
# External Docs - base64 docs encoded for externally sourced documents # External Docs - base64 docs encoded for externally sourced documents
# XXX to reduce the chance of accidentally opening them # XXX to reduce the chance of accidentally opening them
test_docs_external_dir = Path(__file__).parent.joinpath(SAMPLE_EXTERNAL_DIRECTORY) test_docs_external_dir = Path(__file__).parent.joinpath(SAMPLE_EXTERNAL_DIRECTORY)

View file

@ -283,13 +283,13 @@ def test_update_errors(
) -> None: ) -> None:
"""Test update check errors.""" """Test update check errors."""
settings = updater.dangerzone.settings settings = updater.dangerzone.settings
# Always assume that we can perform multiple update checks in a row.
monkeypatch.setattr(releases, "_should_postpone_update_check", lambda _: False)
# Mock requests.get(). # Mock requests.get().
mocker.patch("dangerzone.updater.releases.requests.get") mocker.patch("dangerzone.updater.releases.requests.get")
requests_mock = releases.requests.get requests_mock = releases.requests.get
# Always assume that we can perform multiple update checks in a row.
monkeypatch.setattr(releases, "_should_postpone_update_check", lambda: False)
# Test 1 - Check that request exceptions are being detected as errors. # Test 1 - Check that request exceptions are being detected as errors.
requests_mock.side_effect = Exception("bad url") # type: ignore [attr-defined] requests_mock.side_effect = Exception("bad url") # type: ignore [attr-defined]
report = releases.check_for_updates(settings) report = releases.check_for_updates(settings)

View file

@ -6,9 +6,10 @@ from pytest_mock import MockerFixture
from pytest_subprocess import FakeProcess from pytest_subprocess import FakeProcess
from dangerzone import errors from dangerzone import errors
from dangerzone.container_utils import Runtime from dangerzone.container_utils import CONTAINER_NAME, Runtime
from dangerzone.isolation_provider.container import Container from dangerzone.isolation_provider.container import Container
from dangerzone.isolation_provider.qubes import is_qubes_native_conversion from dangerzone.isolation_provider.qubes import is_qubes_native_conversion
from dangerzone.updater import SignatureError, UpdaterError
from dangerzone.util import get_resource_path from dangerzone.util import get_resource_path
from .base import IsolationProviderTermination, IsolationProviderTest from .base import IsolationProviderTermination, IsolationProviderTest
@ -57,8 +58,13 @@ class TestContainer(IsolationProviderTest):
) )
provider.is_available() provider.is_available()
def test_install_raise_if_image_cant_be_installed( def test_install_raise_if_local_image_cant_be_installed(
self, provider: Container, fp: FakeProcess, runtime_path: str self,
provider: Container,
fp: FakeProcess,
runtime_path: str,
skip_image_verification,
mocker: MockerFixture,
) -> None: ) -> None:
"""When an image installation fails, an exception should be raised""" """When an image installation fails, an exception should be raised"""
@ -74,60 +80,85 @@ class TestContainer(IsolationProviderTest):
"list", "list",
"--format", "--format",
"{{ .Tag }}", "{{ .Tag }}",
"dangerzone.rocks/dangerzone", CONTAINER_NAME,
], ],
occurrences=2, occurrences=2,
) )
mocker.patch(
fp.register_subprocess( "dangerzone.isolation_provider.container.install_local_container_tar",
[ side_effect=UpdaterError,
runtime_path,
"load",
"-i",
get_resource_path("container.tar").absolute(),
],
returncode=-1,
) )
with pytest.raises(errors.ImageInstallationException): with pytest.raises(UpdaterError):
provider.install() provider.install(should_upgrade=False)
def test_install_raises_if_still_not_installed( def test_install_raise_if_local_image_cant_be_verified(
self, provider: Container, fp: FakeProcess, runtime_path: str self,
provider: Container,
runtime_path: str,
skip_image_verification,
mocker: MockerFixture,
) -> None: ) -> None:
"""When an image keep being not installed, it should return False""" """In case an image has been installed but its signature cannot be verified, an exception should be raised"""
fp.register_subprocess(
[runtime_path, "version", "-f", "{{.Client.Version}}"], mocker.patch(
stdout="4.0.0", "dangerzone.isolation_provider.container.container_utils.list_image_tags",
return_value=["a-tag"],
)
mocker.patch(
"dangerzone.isolation_provider.container.verify_local_image",
side_effect=SignatureError,
) )
fp.register_subprocess( with pytest.raises(SignatureError):
[runtime_path, "image", "ls"], provider.install(should_upgrade=False)
def test_install_raise_if_local_image_install_works_on_second_try(
self,
provider: Container,
runtime_path: str,
skip_image_verification,
mocker: MockerFixture,
) -> None:
"""In case an image has been installed but its signature cannot be verified, an exception should be raised"""
mocker.patch(
"dangerzone.isolation_provider.container.container_utils.list_image_tags",
return_value=["a-tag"],
)
mocker.patch(
"dangerzone.isolation_provider.container.verify_local_image",
side_effect=[SignatureError, True],
) )
# First check should return nothing. provider.install(should_upgrade=False)
fp.register_subprocess(
[ def test_install_upgrades_if_available(
runtime_path, self,
"image", provider: Container,
"list", runtime_path: str,
"--format", skip_image_verification,
"{{ .Tag }}", mocker: MockerFixture,
"dangerzone.rocks/dangerzone", ) -> None:
], """In case an image has been installed but its signature cannot be verified, an exception should be raised"""
occurrences=2,
mocker.patch(
"dangerzone.isolation_provider.container.container_utils.list_image_tags",
return_value=["a-tag"],
)
mocker.patch(
"dangerzone.isolation_provider.container.is_update_available",
return_value=(True, "digest"),
)
upgrade = mocker.patch(
"dangerzone.isolation_provider.container.upgrade_container_image",
)
mocker.patch(
"dangerzone.isolation_provider.container.verify_local_image",
) )
fp.register_subprocess( provider.install(should_upgrade=True)
[ upgrade.assert_called()
runtime_path,
"load",
"-i",
get_resource_path("container.tar").absolute(),
],
)
with pytest.raises(errors.ImageNotPresentException):
provider.install()
@pytest.mark.skipif( @pytest.mark.skipif(
platform.system() not in ("Windows", "Darwin"), platform.system() not in ("Windows", "Darwin"),

View file

@ -202,7 +202,12 @@ class TestCliConversion(TestCliBasic):
result.assert_success() result.assert_success()
@for_each_doc @for_each_doc
def test_formats(self, doc: Path, tmp_path_factory: pytest.TempPathFactory) -> None: def test_formats(
self,
doc: Path,
tmp_path_factory: pytest.TempPathFactory,
skip_image_verification: pytest.FixtureRequest,
) -> None:
reference = (doc.parent / "reference" / doc.stem).with_suffix(".pdf") reference = (doc.parent / "reference" / doc.stem).with_suffix(".pdf")
destination = tmp_path_factory.mktemp(doc.stem).with_suffix(".pdf") destination = tmp_path_factory.mktemp(doc.stem).with_suffix(".pdf")