Compare commits

..

1 commit

Author SHA1 Message Date
d43bbf68fc
Merge 6c3316089f into 83be5fb151 2025-04-16 13:25:28 +00:00
11 changed files with 81 additions and 164 deletions

View file

@ -71,8 +71,8 @@ def cli_main(
) -> None:
setup_logging()
display_banner()
settings = Settings()
if set_container_runtime:
settings = Settings()
if set_container_runtime == "default":
settings.unset_custom_runtime()
click.echo(
@ -117,8 +117,7 @@ def cli_main(
sys.exit(1)
# Ensure container is installed
should_upgrade = bool(settings.get("updater_check_all"))
dangerzone.isolation_provider.install(should_upgrade)
dangerzone.isolation_provider.install()
# Convert the document
print_header("Converting document to safe PDF")

View file

@ -450,9 +450,9 @@ class InstallContainerThread(QtCore.QThread):
def run(self) -> None:
error = None
try:
should_upgrade = bool(self.dangerzone.settings.get("updater_check_all"))
should_upgrade = self.dangerzone.settings.get("updater_check_all")
installed = self.dangerzone.isolation_provider.install(
should_upgrade=should_upgrade, callback=self.process_stdout.emit
should_upgrade=bool(should_upgrade), callback=self.process_stdout.emit
)
except Exception as e:
log.error("Container installation problem")

View file

@ -3,20 +3,11 @@ import os
import platform
import shlex
import subprocess
import sys
from typing import Callable, List, Optional, Tuple
from typing import Callable, List, Tuple
from .. import container_utils, errors
from ..container_utils import CONTAINER_NAME, Runtime
from .. import container_utils, errors, updater
from ..container_utils import Runtime
from ..document import Document
from ..updater import (
DEFAULT_PUBKEY_LOCATION,
UpdaterError,
install_local_container_tar,
is_update_available,
upgrade_container_image,
verify_local_image,
)
from ..util import get_resource_path, get_subprocess_startupinfo
from .base import IsolationProvider, terminate_process_group
@ -104,49 +95,31 @@ class Container(IsolationProvider):
@staticmethod
def install(
should_upgrade: bool,
callback: Optional[Callable] = sys.stdout.write,
last_try: bool = False,
should_upgrade: bool, callback: Callable, last_try: bool = False
) -> bool:
"""
Install a (local or remote) container image.
Use the local `container.tar` image if:
- No image is currently installed and `should_upgrade` is set to False
- No image is currently installed and no upgrades are available
Upgrade to the last remote container image if:
- An upgrade is available and `should_upgrade` is set to True
"""
installed_tags = container_utils.list_image_tags()
"""Check if an update is available and install it if necessary."""
if not should_upgrade:
log.debug("Skipping container upgrade check as requested by the settings")
if not installed_tags:
install_local_container_tar()
else:
update_available, image_digest = is_update_available(
CONTAINER_NAME,
DEFAULT_PUBKEY_LOCATION,
update_available, image_digest = updater.is_update_available(
container_utils.CONTAINER_NAME,
updater.DEFAULT_PUBKEY_LOCATION,
)
if update_available and image_digest:
log.debug("Upgrading container image to %s", image_digest)
upgrade_container_image(
CONTAINER_NAME,
updater.upgrade_container_image(
container_utils.CONTAINER_NAME,
image_digest,
DEFAULT_PUBKEY_LOCATION,
updater.DEFAULT_PUBKEY_LOCATION,
callback=callback,
)
else:
log.debug("No update available for the container.")
if not installed_tags:
install_local_container_tar()
log.debug("No update available for the container")
try:
verify_local_image(CONTAINER_NAME)
except UpdaterError:
# delete_image()
updater.verify_local_image(
container_utils.CONTAINER_NAME, updater.DEFAULT_PUBKEY_LOCATION
)
except errors.ImageNotPresentException:
if last_try:
raise
log.debug("Container image not found, trying to install it.")
@ -237,8 +210,13 @@ class Container(IsolationProvider):
) -> subprocess.Popen:
runtime = Runtime()
image_digest = container_utils.get_local_image_digest(CONTAINER_NAME)
verify_local_image(CONTAINER_NAME)
image_digest = container_utils.get_local_image_digest(
container_utils.CONTAINER_NAME
)
updater.verify_local_image(
container_utils.CONTAINER_NAME,
updater.DEFAULT_PUBKEY_LOCATION,
)
security_args = self.get_runtime_security_args()
debug_args = []
if self.debug:
@ -247,7 +225,7 @@ class Container(IsolationProvider):
enable_stdin = ["-i"]
set_name = ["--name", name]
prevent_leakage_args = ["--rm"]
image_name = [CONTAINER_NAME + "@sha256:" + image_digest]
image_name = [container_utils.CONTAINER_NAME + "@sha256:" + image_digest]
args = (
["run"]
+ security_args

View file

@ -36,7 +36,7 @@ class Dummy(IsolationProvider):
)
super().__init__()
def install(self, *args, **kwargs) -> bool:
def install(self) -> bool:
return True
@staticmethod

View file

@ -18,7 +18,7 @@ log = logging.getLogger(__name__)
class Qubes(IsolationProvider):
"""Uses a disposable qube for performing the conversion"""
def install(self, *args, **kwargs) -> bool:
def install(self) -> bool:
return True
@staticmethod

View file

@ -1,12 +1,3 @@
import logging
log = logging.getLogger(__name__)
from .errors import SignatureError, UpdaterError
from .signatures import (
DEFAULT_PUBKEY_LOCATION,
install_local_container_tar,
is_update_available,
upgrade_container_image,
verify_local_image,
)

View file

@ -423,7 +423,7 @@ def store_signatures(
write_log_index(get_log_index_from_signatures(signatures))
def verify_local_image(image: str, pubkey: str = DEFAULT_PUBKEY_LOCATION) -> bool:
def verify_local_image(image: str, pubkey: str) -> bool:
"""
Verifies that a local image has a valid signature
"""
@ -498,11 +498,3 @@ def upgrade_container_image(
# Store the signatures just now to avoid storing them unverified
store_signatures(signatures, manifest_digest, pubkey)
return manifest_digest
def install_local_container_tar(
pubkey: Optional[str] = DEFAULT_PUBKEY_LOCATION,
) -> None:
tarball_path = get_resource_path("container.tar")
log.debug("Installing container image %s", tarball_path)
upgrade_container_image_airgapped(tarball_path, pubkey)

View file

@ -9,11 +9,11 @@ import pytest
from dangerzone.document import SAFE_EXTENSION
from dangerzone.gui import Application
from dangerzone.isolation_provider import container
sys.dangerzone_dev = True # type: ignore[attr-defined]
# Use this fixture to make `pytest-qt` invoke our custom QApplication.
# See https://pytest-qt.readthedocs.io/en/latest/qapplication.html#testing-custom-qapplications
@pytest.fixture(scope="session")
@ -112,14 +112,6 @@ def sample_pdf() -> str:
return str(test_docs_dir.joinpath(BASIC_SAMPLE_PDF))
@pytest.fixture
def skip_image_verification(monkeypatch):
def noop(*args, **kwargs):
return True
monkeypatch.setattr(container, "verify_local_image", noop)
SAMPLE_DIRECTORY = "test_docs"
BASIC_SAMPLE_PDF = "sample-pdf.pdf"
BASIC_SAMPLE_DOC = "sample-doc.doc"
@ -142,6 +134,7 @@ for_each_doc = pytest.mark.parametrize(
)
# External Docs - base64 docs encoded for externally sourced documents
# XXX to reduce the chance of accidentally opening them
test_docs_external_dir = Path(__file__).parent.joinpath(SAMPLE_EXTERNAL_DIRECTORY)

View file

@ -283,13 +283,13 @@ def test_update_errors(
) -> None:
"""Test update check errors."""
settings = updater.dangerzone.settings
# Always assume that we can perform multiple update checks in a row.
monkeypatch.setattr(releases, "_should_postpone_update_check", lambda _: False)
# Mock requests.get().
mocker.patch("dangerzone.updater.releases.requests.get")
requests_mock = releases.requests.get
# Always assume that we can perform multiple update checks in a row.
monkeypatch.setattr(releases, "_should_postpone_update_check", lambda: False)
# Test 1 - Check that request exceptions are being detected as errors.
requests_mock.side_effect = Exception("bad url") # type: ignore [attr-defined]
report = releases.check_for_updates(settings)

View file

@ -6,10 +6,9 @@ from pytest_mock import MockerFixture
from pytest_subprocess import FakeProcess
from dangerzone import errors
from dangerzone.container_utils import CONTAINER_NAME, Runtime
from dangerzone.container_utils import Runtime
from dangerzone.isolation_provider.container import Container
from dangerzone.isolation_provider.qubes import is_qubes_native_conversion
from dangerzone.updater import SignatureError, UpdaterError
from dangerzone.util import get_resource_path
from .base import IsolationProviderTermination, IsolationProviderTest
@ -58,13 +57,8 @@ class TestContainer(IsolationProviderTest):
)
provider.is_available()
def test_install_raise_if_local_image_cant_be_installed(
self,
provider: Container,
fp: FakeProcess,
runtime_path: str,
skip_image_verification,
mocker: MockerFixture,
def test_install_raise_if_image_cant_be_installed(
self, provider: Container, fp: FakeProcess, runtime_path: str
) -> None:
"""When an image installation fails, an exception should be raised"""
@ -80,85 +74,60 @@ class TestContainer(IsolationProviderTest):
"list",
"--format",
"{{ .Tag }}",
CONTAINER_NAME,
"dangerzone.rocks/dangerzone",
],
occurrences=2,
)
mocker.patch(
"dangerzone.isolation_provider.container.install_local_container_tar",
side_effect=UpdaterError,
fp.register_subprocess(
[
runtime_path,
"load",
"-i",
get_resource_path("container.tar").absolute(),
],
returncode=-1,
)
with pytest.raises(UpdaterError):
provider.install(should_upgrade=False)
with pytest.raises(errors.ImageInstallationException):
provider.install()
def test_install_raise_if_local_image_cant_be_verified(
self,
provider: Container,
runtime_path: str,
skip_image_verification,
mocker: MockerFixture,
def test_install_raises_if_still_not_installed(
self, provider: Container, fp: FakeProcess, runtime_path: str
) -> None:
"""In case an image has been installed but its signature cannot be verified, an exception should be raised"""
mocker.patch(
"dangerzone.isolation_provider.container.container_utils.list_image_tags",
return_value=["a-tag"],
)
mocker.patch(
"dangerzone.isolation_provider.container.verify_local_image",
side_effect=SignatureError,
"""When an image keep being not installed, it should return False"""
fp.register_subprocess(
[runtime_path, "version", "-f", "{{.Client.Version}}"],
stdout="4.0.0",
)
with pytest.raises(SignatureError):
provider.install(should_upgrade=False)
def test_install_raise_if_local_image_install_works_on_second_try(
self,
provider: Container,
runtime_path: str,
skip_image_verification,
mocker: MockerFixture,
) -> None:
"""In case an image has been installed but its signature cannot be verified, an exception should be raised"""
mocker.patch(
"dangerzone.isolation_provider.container.container_utils.list_image_tags",
return_value=["a-tag"],
)
mocker.patch(
"dangerzone.isolation_provider.container.verify_local_image",
side_effect=[SignatureError, True],
fp.register_subprocess(
[runtime_path, "image", "ls"],
)
provider.install(should_upgrade=False)
def test_install_upgrades_if_available(
self,
provider: Container,
runtime_path: str,
skip_image_verification,
mocker: MockerFixture,
) -> None:
"""In case an image has been installed but its signature cannot be verified, an exception should be raised"""
mocker.patch(
"dangerzone.isolation_provider.container.container_utils.list_image_tags",
return_value=["a-tag"],
)
mocker.patch(
"dangerzone.isolation_provider.container.is_update_available",
return_value=(True, "digest"),
)
upgrade = mocker.patch(
"dangerzone.isolation_provider.container.upgrade_container_image",
)
mocker.patch(
"dangerzone.isolation_provider.container.verify_local_image",
# First check should return nothing.
fp.register_subprocess(
[
runtime_path,
"image",
"list",
"--format",
"{{ .Tag }}",
"dangerzone.rocks/dangerzone",
],
occurrences=2,
)
provider.install(should_upgrade=True)
upgrade.assert_called()
fp.register_subprocess(
[
runtime_path,
"load",
"-i",
get_resource_path("container.tar").absolute(),
],
)
with pytest.raises(errors.ImageNotPresentException):
provider.install()
@pytest.mark.skipif(
platform.system() not in ("Windows", "Darwin"),

View file

@ -202,12 +202,7 @@ class TestCliConversion(TestCliBasic):
result.assert_success()
@for_each_doc
def test_formats(
self,
doc: Path,
tmp_path_factory: pytest.TempPathFactory,
skip_image_verification: pytest.FixtureRequest,
) -> None:
def test_formats(self, doc: Path, tmp_path_factory: pytest.TempPathFactory) -> None:
reference = (doc.parent / "reference" / doc.stem).with_suffix(".pdf")
destination = tmp_path_factory.mktemp(doc.stem).with_suffix(".pdf")