Compare commits

..

1 commit

Author SHA1 Message Date
d43bbf68fc
Merge 6c3316089f into 83be5fb151 2025-04-16 13:25:28 +00:00
11 changed files with 81 additions and 164 deletions

View file

@ -71,8 +71,8 @@ def cli_main(
) -> None: ) -> None:
setup_logging() setup_logging()
display_banner() display_banner()
settings = Settings()
if set_container_runtime: if set_container_runtime:
settings = Settings()
if set_container_runtime == "default": if set_container_runtime == "default":
settings.unset_custom_runtime() settings.unset_custom_runtime()
click.echo( click.echo(
@ -117,8 +117,7 @@ def cli_main(
sys.exit(1) sys.exit(1)
# Ensure container is installed # Ensure container is installed
should_upgrade = bool(settings.get("updater_check_all")) dangerzone.isolation_provider.install()
dangerzone.isolation_provider.install(should_upgrade)
# Convert the document # Convert the document
print_header("Converting document to safe PDF") print_header("Converting document to safe PDF")

View file

@ -450,9 +450,9 @@ class InstallContainerThread(QtCore.QThread):
def run(self) -> None: def run(self) -> None:
error = None error = None
try: try:
should_upgrade = bool(self.dangerzone.settings.get("updater_check_all")) should_upgrade = self.dangerzone.settings.get("updater_check_all")
installed = self.dangerzone.isolation_provider.install( installed = self.dangerzone.isolation_provider.install(
should_upgrade=should_upgrade, callback=self.process_stdout.emit should_upgrade=bool(should_upgrade), callback=self.process_stdout.emit
) )
except Exception as e: except Exception as e:
log.error("Container installation problem") log.error("Container installation problem")

View file

@ -3,20 +3,11 @@ import os
import platform import platform
import shlex import shlex
import subprocess import subprocess
import sys from typing import Callable, List, Tuple
from typing import Callable, List, Optional, Tuple
from .. import container_utils, errors from .. import container_utils, errors, updater
from ..container_utils import CONTAINER_NAME, Runtime from ..container_utils import Runtime
from ..document import Document from ..document import Document
from ..updater import (
DEFAULT_PUBKEY_LOCATION,
UpdaterError,
install_local_container_tar,
is_update_available,
upgrade_container_image,
verify_local_image,
)
from ..util import get_resource_path, get_subprocess_startupinfo from ..util import get_resource_path, get_subprocess_startupinfo
from .base import IsolationProvider, terminate_process_group from .base import IsolationProvider, terminate_process_group
@ -104,49 +95,31 @@ class Container(IsolationProvider):
@staticmethod @staticmethod
def install( def install(
should_upgrade: bool, should_upgrade: bool, callback: Callable, last_try: bool = False
callback: Optional[Callable] = sys.stdout.write,
last_try: bool = False,
) -> bool: ) -> bool:
""" """Check if an update is available and install it if necessary."""
Install a (local or remote) container image.
Use the local `container.tar` image if:
- No image is currently installed and `should_upgrade` is set to False
- No image is currently installed and no upgrades are available
Upgrade to the last remote container image if:
- An upgrade is available and `should_upgrade` is set to True
"""
installed_tags = container_utils.list_image_tags()
if not should_upgrade: if not should_upgrade:
log.debug("Skipping container upgrade check as requested by the settings") log.debug("Skipping container upgrade check as requested by the settings")
if not installed_tags:
install_local_container_tar()
else: else:
update_available, image_digest = is_update_available( update_available, image_digest = updater.is_update_available(
CONTAINER_NAME, container_utils.CONTAINER_NAME,
DEFAULT_PUBKEY_LOCATION, updater.DEFAULT_PUBKEY_LOCATION,
) )
if update_available and image_digest: if update_available and image_digest:
log.debug("Upgrading container image to %s", image_digest) log.debug("Upgrading container image to %s", image_digest)
upgrade_container_image( updater.upgrade_container_image(
CONTAINER_NAME, container_utils.CONTAINER_NAME,
image_digest, image_digest,
DEFAULT_PUBKEY_LOCATION, updater.DEFAULT_PUBKEY_LOCATION,
callback=callback, callback=callback,
) )
else: else:
log.debug("No update available for the container.") log.debug("No update available for the container")
if not installed_tags:
install_local_container_tar()
try: try:
verify_local_image(CONTAINER_NAME) updater.verify_local_image(
except UpdaterError: container_utils.CONTAINER_NAME, updater.DEFAULT_PUBKEY_LOCATION
# delete_image() )
except errors.ImageNotPresentException:
if last_try: if last_try:
raise raise
log.debug("Container image not found, trying to install it.") log.debug("Container image not found, trying to install it.")
@ -237,8 +210,13 @@ class Container(IsolationProvider):
) -> subprocess.Popen: ) -> subprocess.Popen:
runtime = Runtime() runtime = Runtime()
image_digest = container_utils.get_local_image_digest(CONTAINER_NAME) image_digest = container_utils.get_local_image_digest(
verify_local_image(CONTAINER_NAME) container_utils.CONTAINER_NAME
)
updater.verify_local_image(
container_utils.CONTAINER_NAME,
updater.DEFAULT_PUBKEY_LOCATION,
)
security_args = self.get_runtime_security_args() security_args = self.get_runtime_security_args()
debug_args = [] debug_args = []
if self.debug: if self.debug:
@ -247,7 +225,7 @@ class Container(IsolationProvider):
enable_stdin = ["-i"] enable_stdin = ["-i"]
set_name = ["--name", name] set_name = ["--name", name]
prevent_leakage_args = ["--rm"] prevent_leakage_args = ["--rm"]
image_name = [CONTAINER_NAME + "@sha256:" + image_digest] image_name = [container_utils.CONTAINER_NAME + "@sha256:" + image_digest]
args = ( args = (
["run"] ["run"]
+ security_args + security_args

View file

@ -36,7 +36,7 @@ class Dummy(IsolationProvider):
) )
super().__init__() super().__init__()
def install(self, *args, **kwargs) -> bool: def install(self) -> bool:
return True return True
@staticmethod @staticmethod

View file

@ -18,7 +18,7 @@ log = logging.getLogger(__name__)
class Qubes(IsolationProvider): class Qubes(IsolationProvider):
"""Uses a disposable qube for performing the conversion""" """Uses a disposable qube for performing the conversion"""
def install(self, *args, **kwargs) -> bool: def install(self) -> bool:
return True return True
@staticmethod @staticmethod

View file

@ -1,12 +1,3 @@
import logging import logging
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
from .errors import SignatureError, UpdaterError
from .signatures import (
DEFAULT_PUBKEY_LOCATION,
install_local_container_tar,
is_update_available,
upgrade_container_image,
verify_local_image,
)

View file

@ -423,7 +423,7 @@ def store_signatures(
write_log_index(get_log_index_from_signatures(signatures)) write_log_index(get_log_index_from_signatures(signatures))
def verify_local_image(image: str, pubkey: str = DEFAULT_PUBKEY_LOCATION) -> bool: def verify_local_image(image: str, pubkey: str) -> bool:
""" """
Verifies that a local image has a valid signature Verifies that a local image has a valid signature
""" """
@ -498,11 +498,3 @@ def upgrade_container_image(
# Store the signatures just now to avoid storing them unverified # Store the signatures just now to avoid storing them unverified
store_signatures(signatures, manifest_digest, pubkey) store_signatures(signatures, manifest_digest, pubkey)
return manifest_digest return manifest_digest
def install_local_container_tar(
pubkey: Optional[str] = DEFAULT_PUBKEY_LOCATION,
) -> None:
tarball_path = get_resource_path("container.tar")
log.debug("Installing container image %s", tarball_path)
upgrade_container_image_airgapped(tarball_path, pubkey)

View file

@ -9,11 +9,11 @@ import pytest
from dangerzone.document import SAFE_EXTENSION from dangerzone.document import SAFE_EXTENSION
from dangerzone.gui import Application from dangerzone.gui import Application
from dangerzone.isolation_provider import container
sys.dangerzone_dev = True # type: ignore[attr-defined] sys.dangerzone_dev = True # type: ignore[attr-defined]
# Use this fixture to make `pytest-qt` invoke our custom QApplication. # Use this fixture to make `pytest-qt` invoke our custom QApplication.
# See https://pytest-qt.readthedocs.io/en/latest/qapplication.html#testing-custom-qapplications # See https://pytest-qt.readthedocs.io/en/latest/qapplication.html#testing-custom-qapplications
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
@ -112,14 +112,6 @@ def sample_pdf() -> str:
return str(test_docs_dir.joinpath(BASIC_SAMPLE_PDF)) return str(test_docs_dir.joinpath(BASIC_SAMPLE_PDF))
@pytest.fixture
def skip_image_verification(monkeypatch):
def noop(*args, **kwargs):
return True
monkeypatch.setattr(container, "verify_local_image", noop)
SAMPLE_DIRECTORY = "test_docs" SAMPLE_DIRECTORY = "test_docs"
BASIC_SAMPLE_PDF = "sample-pdf.pdf" BASIC_SAMPLE_PDF = "sample-pdf.pdf"
BASIC_SAMPLE_DOC = "sample-doc.doc" BASIC_SAMPLE_DOC = "sample-doc.doc"
@ -142,6 +134,7 @@ for_each_doc = pytest.mark.parametrize(
) )
# External Docs - base64 docs encoded for externally sourced documents # External Docs - base64 docs encoded for externally sourced documents
# XXX to reduce the chance of accidentally opening them # XXX to reduce the chance of accidentally opening them
test_docs_external_dir = Path(__file__).parent.joinpath(SAMPLE_EXTERNAL_DIRECTORY) test_docs_external_dir = Path(__file__).parent.joinpath(SAMPLE_EXTERNAL_DIRECTORY)

View file

@ -283,13 +283,13 @@ def test_update_errors(
) -> None: ) -> None:
"""Test update check errors.""" """Test update check errors."""
settings = updater.dangerzone.settings settings = updater.dangerzone.settings
# Always assume that we can perform multiple update checks in a row.
monkeypatch.setattr(releases, "_should_postpone_update_check", lambda _: False)
# Mock requests.get(). # Mock requests.get().
mocker.patch("dangerzone.updater.releases.requests.get") mocker.patch("dangerzone.updater.releases.requests.get")
requests_mock = releases.requests.get requests_mock = releases.requests.get
# Always assume that we can perform multiple update checks in a row.
monkeypatch.setattr(releases, "_should_postpone_update_check", lambda: False)
# Test 1 - Check that request exceptions are being detected as errors. # Test 1 - Check that request exceptions are being detected as errors.
requests_mock.side_effect = Exception("bad url") # type: ignore [attr-defined] requests_mock.side_effect = Exception("bad url") # type: ignore [attr-defined]
report = releases.check_for_updates(settings) report = releases.check_for_updates(settings)

View file

@ -6,10 +6,9 @@ from pytest_mock import MockerFixture
from pytest_subprocess import FakeProcess from pytest_subprocess import FakeProcess
from dangerzone import errors from dangerzone import errors
from dangerzone.container_utils import CONTAINER_NAME, Runtime from dangerzone.container_utils import Runtime
from dangerzone.isolation_provider.container import Container from dangerzone.isolation_provider.container import Container
from dangerzone.isolation_provider.qubes import is_qubes_native_conversion from dangerzone.isolation_provider.qubes import is_qubes_native_conversion
from dangerzone.updater import SignatureError, UpdaterError
from dangerzone.util import get_resource_path from dangerzone.util import get_resource_path
from .base import IsolationProviderTermination, IsolationProviderTest from .base import IsolationProviderTermination, IsolationProviderTest
@ -58,13 +57,8 @@ class TestContainer(IsolationProviderTest):
) )
provider.is_available() provider.is_available()
def test_install_raise_if_local_image_cant_be_installed( def test_install_raise_if_image_cant_be_installed(
self, self, provider: Container, fp: FakeProcess, runtime_path: str
provider: Container,
fp: FakeProcess,
runtime_path: str,
skip_image_verification,
mocker: MockerFixture,
) -> None: ) -> None:
"""When an image installation fails, an exception should be raised""" """When an image installation fails, an exception should be raised"""
@ -80,85 +74,60 @@ class TestContainer(IsolationProviderTest):
"list", "list",
"--format", "--format",
"{{ .Tag }}", "{{ .Tag }}",
CONTAINER_NAME, "dangerzone.rocks/dangerzone",
], ],
occurrences=2, occurrences=2,
) )
mocker.patch(
"dangerzone.isolation_provider.container.install_local_container_tar", fp.register_subprocess(
side_effect=UpdaterError, [
runtime_path,
"load",
"-i",
get_resource_path("container.tar").absolute(),
],
returncode=-1,
) )
with pytest.raises(UpdaterError): with pytest.raises(errors.ImageInstallationException):
provider.install(should_upgrade=False) provider.install()
def test_install_raise_if_local_image_cant_be_verified( def test_install_raises_if_still_not_installed(
self, self, provider: Container, fp: FakeProcess, runtime_path: str
provider: Container,
runtime_path: str,
skip_image_verification,
mocker: MockerFixture,
) -> None: ) -> None:
"""In case an image has been installed but its signature cannot be verified, an exception should be raised""" """When an image keep being not installed, it should return False"""
fp.register_subprocess(
mocker.patch( [runtime_path, "version", "-f", "{{.Client.Version}}"],
"dangerzone.isolation_provider.container.container_utils.list_image_tags", stdout="4.0.0",
return_value=["a-tag"],
)
mocker.patch(
"dangerzone.isolation_provider.container.verify_local_image",
side_effect=SignatureError,
) )
with pytest.raises(SignatureError): fp.register_subprocess(
provider.install(should_upgrade=False) [runtime_path, "image", "ls"],
def test_install_raise_if_local_image_install_works_on_second_try(
self,
provider: Container,
runtime_path: str,
skip_image_verification,
mocker: MockerFixture,
) -> None:
"""In case an image has been installed but its signature cannot be verified, an exception should be raised"""
mocker.patch(
"dangerzone.isolation_provider.container.container_utils.list_image_tags",
return_value=["a-tag"],
)
mocker.patch(
"dangerzone.isolation_provider.container.verify_local_image",
side_effect=[SignatureError, True],
) )
provider.install(should_upgrade=False) # First check should return nothing.
fp.register_subprocess(
def test_install_upgrades_if_available( [
self, runtime_path,
provider: Container, "image",
runtime_path: str, "list",
skip_image_verification, "--format",
mocker: MockerFixture, "{{ .Tag }}",
) -> None: "dangerzone.rocks/dangerzone",
"""In case an image has been installed but its signature cannot be verified, an exception should be raised""" ],
occurrences=2,
mocker.patch(
"dangerzone.isolation_provider.container.container_utils.list_image_tags",
return_value=["a-tag"],
)
mocker.patch(
"dangerzone.isolation_provider.container.is_update_available",
return_value=(True, "digest"),
)
upgrade = mocker.patch(
"dangerzone.isolation_provider.container.upgrade_container_image",
)
mocker.patch(
"dangerzone.isolation_provider.container.verify_local_image",
) )
provider.install(should_upgrade=True) fp.register_subprocess(
upgrade.assert_called() [
runtime_path,
"load",
"-i",
get_resource_path("container.tar").absolute(),
],
)
with pytest.raises(errors.ImageNotPresentException):
provider.install()
@pytest.mark.skipif( @pytest.mark.skipif(
platform.system() not in ("Windows", "Darwin"), platform.system() not in ("Windows", "Darwin"),

View file

@ -202,12 +202,7 @@ class TestCliConversion(TestCliBasic):
result.assert_success() result.assert_success()
@for_each_doc @for_each_doc
def test_formats( def test_formats(self, doc: Path, tmp_path_factory: pytest.TempPathFactory) -> None:
self,
doc: Path,
tmp_path_factory: pytest.TempPathFactory,
skip_image_verification: pytest.FixtureRequest,
) -> None:
reference = (doc.parent / "reference" / doc.stem).with_suffix(".pdf") reference = (doc.parent / "reference" / doc.stem).with_suffix(".pdf")
destination = tmp_path_factory.mktemp(doc.stem).with_suffix(".pdf") destination = tmp_path_factory.mktemp(doc.stem).with_suffix(".pdf")