Compare commits

...

5 commits

Author SHA1 Message Date
77dc106eba
Merge 545e2156b5 into 83be5fb151 2025-04-17 15:27:19 +00:00
Alexis Métaireau
545e2156b5
Skip container signature verification during the tests
This is not required, and skipping them allows to make the whole
test-suite run faster.
2025-04-17 17:26:11 +02:00
Alexis Métaireau
77be24858c
Provide a simple function to install the shipped tarball.
It leaves in `dangerzone.updater.install_local_container_tar()`
2025-04-17 17:23:31 +02:00
Alexis Métaireau
b61a65db01
dangerzone.updater exposes a few funtions, constants and exceptions
This is done to avoid looking at the internal logic of
`dangerzone.updater`. Only the features that actually are part of
the exposed API are exposed, and do not require deep knowledge of the
updater's logic to be used.
2025-04-17 17:19:04 +02:00
Alexis Métaireau
11adba8ab7
Update container installation logic to allow in-place updates
The isolation provider `install()` method is now passed a
`should_upgrade` argument, which is read from the settings and
represents the user decision about updates.

The tests have been updated to reflect these changes.
2025-04-17 17:16:10 +02:00
11 changed files with 164 additions and 81 deletions

View file

@ -71,8 +71,8 @@ def cli_main(
) -> None:
setup_logging()
display_banner()
settings = Settings()
if set_container_runtime:
settings = Settings()
if set_container_runtime == "default":
settings.unset_custom_runtime()
click.echo(
@ -117,7 +117,8 @@ def cli_main(
sys.exit(1)
# Ensure container is installed
dangerzone.isolation_provider.install()
should_upgrade = bool(settings.get("updater_check_all"))
dangerzone.isolation_provider.install(should_upgrade)
# Convert the document
print_header("Converting document to safe PDF")

View file

@ -450,9 +450,9 @@ class InstallContainerThread(QtCore.QThread):
def run(self) -> None:
error = None
try:
should_upgrade = self.dangerzone.settings.get("updater_check_all")
should_upgrade = bool(self.dangerzone.settings.get("updater_check_all"))
installed = self.dangerzone.isolation_provider.install(
should_upgrade=bool(should_upgrade), callback=self.process_stdout.emit
should_upgrade=should_upgrade, callback=self.process_stdout.emit
)
except Exception as e:
log.error("Container installation problem")

View file

@ -3,11 +3,20 @@ import os
import platform
import shlex
import subprocess
from typing import Callable, List, Tuple
import sys
from typing import Callable, List, Optional, Tuple
from .. import container_utils, errors, updater
from ..container_utils import Runtime
from .. import container_utils, errors
from ..container_utils import CONTAINER_NAME, Runtime
from ..document import Document
from ..updater import (
DEFAULT_PUBKEY_LOCATION,
UpdaterError,
install_local_container_tar,
is_update_available,
upgrade_container_image,
verify_local_image,
)
from ..util import get_resource_path, get_subprocess_startupinfo
from .base import IsolationProvider, terminate_process_group
@ -95,31 +104,49 @@ class Container(IsolationProvider):
@staticmethod
def install(
should_upgrade: bool, callback: Callable, last_try: bool = False
should_upgrade: bool,
callback: Optional[Callable] = sys.stdout.write,
last_try: bool = False,
) -> bool:
"""Check if an update is available and install it if necessary."""
"""
Install a (local or remote) container image.
Use the local `container.tar` image if:
- No image is currently installed and `should_upgrade` is set to False
- No image is currently installed and no upgrades are available
Upgrade to the last remote container image if:
- An upgrade is available and `should_upgrade` is set to True
"""
installed_tags = container_utils.list_image_tags()
if not should_upgrade:
log.debug("Skipping container upgrade check as requested by the settings")
if not installed_tags:
install_local_container_tar()
else:
update_available, image_digest = updater.is_update_available(
container_utils.CONTAINER_NAME,
updater.DEFAULT_PUBKEY_LOCATION,
update_available, image_digest = is_update_available(
CONTAINER_NAME,
DEFAULT_PUBKEY_LOCATION,
)
if update_available and image_digest:
log.debug("Upgrading container image to %s", image_digest)
updater.upgrade_container_image(
container_utils.CONTAINER_NAME,
upgrade_container_image(
CONTAINER_NAME,
image_digest,
updater.DEFAULT_PUBKEY_LOCATION,
DEFAULT_PUBKEY_LOCATION,
callback=callback,
)
else:
log.debug("No update available for the container")
log.debug("No update available for the container.")
if not installed_tags:
install_local_container_tar()
try:
updater.verify_local_image(
container_utils.CONTAINER_NAME, updater.DEFAULT_PUBKEY_LOCATION
)
except errors.ImageNotPresentException:
verify_local_image(CONTAINER_NAME)
except UpdaterError:
# delete_image()
if last_try:
raise
log.debug("Container image not found, trying to install it.")
@ -210,13 +237,8 @@ class Container(IsolationProvider):
) -> subprocess.Popen:
runtime = Runtime()
image_digest = container_utils.get_local_image_digest(
container_utils.CONTAINER_NAME
)
updater.verify_local_image(
container_utils.CONTAINER_NAME,
updater.DEFAULT_PUBKEY_LOCATION,
)
image_digest = container_utils.get_local_image_digest(CONTAINER_NAME)
verify_local_image(CONTAINER_NAME)
security_args = self.get_runtime_security_args()
debug_args = []
if self.debug:
@ -225,7 +247,7 @@ class Container(IsolationProvider):
enable_stdin = ["-i"]
set_name = ["--name", name]
prevent_leakage_args = ["--rm"]
image_name = [container_utils.CONTAINER_NAME + "@sha256:" + image_digest]
image_name = [CONTAINER_NAME + "@sha256:" + image_digest]
args = (
["run"]
+ security_args

View file

@ -36,7 +36,7 @@ class Dummy(IsolationProvider):
)
super().__init__()
def install(self) -> bool:
def install(self, *args, **kwargs) -> bool:
return True
@staticmethod

View file

@ -18,7 +18,7 @@ log = logging.getLogger(__name__)
class Qubes(IsolationProvider):
"""Uses a disposable qube for performing the conversion"""
def install(self) -> bool:
def install(self, *args, **kwargs) -> bool:
return True
@staticmethod

View file

@ -1,3 +1,12 @@
import logging
log = logging.getLogger(__name__)
from .errors import SignatureError, UpdaterError
from .signatures import (
DEFAULT_PUBKEY_LOCATION,
install_local_container_tar,
is_update_available,
upgrade_container_image,
verify_local_image,
)

View file

@ -423,7 +423,7 @@ def store_signatures(
write_log_index(get_log_index_from_signatures(signatures))
def verify_local_image(image: str, pubkey: str) -> bool:
def verify_local_image(image: str, pubkey: str = DEFAULT_PUBKEY_LOCATION) -> bool:
"""
Verifies that a local image has a valid signature
"""
@ -498,3 +498,11 @@ def upgrade_container_image(
# Store the signatures just now to avoid storing them unverified
store_signatures(signatures, manifest_digest, pubkey)
return manifest_digest
def install_local_container_tar(
pubkey: Optional[str] = DEFAULT_PUBKEY_LOCATION,
) -> None:
tarball_path = get_resource_path("container.tar")
log.debug("Installing container image %s", tarball_path)
upgrade_container_image_airgapped(tarball_path, pubkey)

View file

@ -9,11 +9,11 @@ import pytest
from dangerzone.document import SAFE_EXTENSION
from dangerzone.gui import Application
from dangerzone.isolation_provider import container
sys.dangerzone_dev = True # type: ignore[attr-defined]
# Use this fixture to make `pytest-qt` invoke our custom QApplication.
# See https://pytest-qt.readthedocs.io/en/latest/qapplication.html#testing-custom-qapplications
@pytest.fixture(scope="session")
@ -112,6 +112,14 @@ def sample_pdf() -> str:
return str(test_docs_dir.joinpath(BASIC_SAMPLE_PDF))
@pytest.fixture
def skip_image_verification(monkeypatch):
def noop(*args, **kwargs):
return True
monkeypatch.setattr(container, "verify_local_image", noop)
SAMPLE_DIRECTORY = "test_docs"
BASIC_SAMPLE_PDF = "sample-pdf.pdf"
BASIC_SAMPLE_DOC = "sample-doc.doc"
@ -134,7 +142,6 @@ for_each_doc = pytest.mark.parametrize(
)
# External Docs - base64 docs encoded for externally sourced documents
# XXX to reduce the chance of accidentally opening them
test_docs_external_dir = Path(__file__).parent.joinpath(SAMPLE_EXTERNAL_DIRECTORY)

View file

@ -283,13 +283,13 @@ def test_update_errors(
) -> None:
"""Test update check errors."""
settings = updater.dangerzone.settings
# Always assume that we can perform multiple update checks in a row.
monkeypatch.setattr(releases, "_should_postpone_update_check", lambda _: False)
# Mock requests.get().
mocker.patch("dangerzone.updater.releases.requests.get")
requests_mock = releases.requests.get
# Always assume that we can perform multiple update checks in a row.
monkeypatch.setattr(releases, "_should_postpone_update_check", lambda: False)
# Test 1 - Check that request exceptions are being detected as errors.
requests_mock.side_effect = Exception("bad url") # type: ignore [attr-defined]
report = releases.check_for_updates(settings)

View file

@ -6,9 +6,10 @@ from pytest_mock import MockerFixture
from pytest_subprocess import FakeProcess
from dangerzone import errors
from dangerzone.container_utils import Runtime
from dangerzone.container_utils import CONTAINER_NAME, Runtime
from dangerzone.isolation_provider.container import Container
from dangerzone.isolation_provider.qubes import is_qubes_native_conversion
from dangerzone.updater import SignatureError, UpdaterError
from dangerzone.util import get_resource_path
from .base import IsolationProviderTermination, IsolationProviderTest
@ -57,8 +58,13 @@ class TestContainer(IsolationProviderTest):
)
provider.is_available()
def test_install_raise_if_image_cant_be_installed(
self, provider: Container, fp: FakeProcess, runtime_path: str
def test_install_raise_if_local_image_cant_be_installed(
self,
provider: Container,
fp: FakeProcess,
runtime_path: str,
skip_image_verification,
mocker: MockerFixture,
) -> None:
"""When an image installation fails, an exception should be raised"""
@ -74,60 +80,85 @@ class TestContainer(IsolationProviderTest):
"list",
"--format",
"{{ .Tag }}",
"dangerzone.rocks/dangerzone",
CONTAINER_NAME,
],
occurrences=2,
)
fp.register_subprocess(
[
runtime_path,
"load",
"-i",
get_resource_path("container.tar").absolute(),
],
returncode=-1,
mocker.patch(
"dangerzone.isolation_provider.container.install_local_container_tar",
side_effect=UpdaterError,
)
with pytest.raises(errors.ImageInstallationException):
provider.install()
with pytest.raises(UpdaterError):
provider.install(should_upgrade=False)
def test_install_raises_if_still_not_installed(
self, provider: Container, fp: FakeProcess, runtime_path: str
def test_install_raise_if_local_image_cant_be_verified(
self,
provider: Container,
runtime_path: str,
skip_image_verification,
mocker: MockerFixture,
) -> None:
"""When an image keep being not installed, it should return False"""
fp.register_subprocess(
[runtime_path, "version", "-f", "{{.Client.Version}}"],
stdout="4.0.0",
"""In case an image has been installed but its signature cannot be verified, an exception should be raised"""
mocker.patch(
"dangerzone.isolation_provider.container.container_utils.list_image_tags",
return_value=["a-tag"],
)
mocker.patch(
"dangerzone.isolation_provider.container.verify_local_image",
side_effect=SignatureError,
)
fp.register_subprocess(
[runtime_path, "image", "ls"],
with pytest.raises(SignatureError):
provider.install(should_upgrade=False)
def test_install_raise_if_local_image_install_works_on_second_try(
self,
provider: Container,
runtime_path: str,
skip_image_verification,
mocker: MockerFixture,
) -> None:
"""In case an image has been installed but its signature cannot be verified, an exception should be raised"""
mocker.patch(
"dangerzone.isolation_provider.container.container_utils.list_image_tags",
return_value=["a-tag"],
)
mocker.patch(
"dangerzone.isolation_provider.container.verify_local_image",
side_effect=[SignatureError, True],
)
# First check should return nothing.
fp.register_subprocess(
[
runtime_path,
"image",
"list",
"--format",
"{{ .Tag }}",
"dangerzone.rocks/dangerzone",
],
occurrences=2,
provider.install(should_upgrade=False)
def test_install_upgrades_if_available(
self,
provider: Container,
runtime_path: str,
skip_image_verification,
mocker: MockerFixture,
) -> None:
"""In case an image has been installed but its signature cannot be verified, an exception should be raised"""
mocker.patch(
"dangerzone.isolation_provider.container.container_utils.list_image_tags",
return_value=["a-tag"],
)
mocker.patch(
"dangerzone.isolation_provider.container.is_update_available",
return_value=(True, "digest"),
)
upgrade = mocker.patch(
"dangerzone.isolation_provider.container.upgrade_container_image",
)
mocker.patch(
"dangerzone.isolation_provider.container.verify_local_image",
)
fp.register_subprocess(
[
runtime_path,
"load",
"-i",
get_resource_path("container.tar").absolute(),
],
)
with pytest.raises(errors.ImageNotPresentException):
provider.install()
provider.install(should_upgrade=True)
upgrade.assert_called()
@pytest.mark.skipif(
platform.system() not in ("Windows", "Darwin"),

View file

@ -202,7 +202,12 @@ class TestCliConversion(TestCliBasic):
result.assert_success()
@for_each_doc
def test_formats(self, doc: Path, tmp_path_factory: pytest.TempPathFactory) -> None:
def test_formats(
self,
doc: Path,
tmp_path_factory: pytest.TempPathFactory,
skip_image_verification: pytest.FixtureRequest,
) -> None:
reference = (doc.parent / "reference" / doc.stem).with_suffix(".pdf")
destination = tmp_path_factory.mktemp(doc.stem).with_suffix(".pdf")