Use a Runtime class to get information about container runtimes

This is useful to avoid parsing too many times the settings.
This commit is contained in:
Alexis Métaireau 2025-03-25 12:20:10 +01:00
parent 20cd9cfc5c
commit 8e99764952
No known key found for this signature in database
GPG key ID: C65C7A89A8FFC56E
5 changed files with 111 additions and 73 deletions

View file

@ -3,7 +3,8 @@ import os
import platform import platform
import shutil import shutil
import subprocess import subprocess
from typing import List, Tuple from pathlib import Path
from typing import List, Optional, Tuple
from . import errors from . import errors
from .settings import Settings from .settings import Settings
@ -14,29 +15,26 @@ CONTAINER_NAME = "dangerzone.rocks/dangerzone"
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def get_runtime_name() -> str: class Runtime(object):
def __init__(self) -> None:
settings = Settings() settings = Settings()
try:
runtime_name = settings.get("container_runtime") if settings.custom_runtime_specified():
except KeyError: self.path = Path(settings.get("container_runtime"))
self.name = self.path.stem
else:
self.name = self.get_default_runtime_name()
binary_path = shutil.which(self.name)
if binary_path is None or not os.path.exists(binary_path):
raise errors.NoContainerTechException(self.name)
self.path = Path(binary_path)
@staticmethod
def get_default_runtime_name() -> str:
return "podman" if platform.system() == "Linux" else "docker" return "podman" if platform.system() == "Linux" else "docker"
return runtime_name
def get_runtime() -> str: def get_runtime_version(runtime: Optional[Runtime] = None) -> Tuple[int, int]:
container_tech = get_runtime_name()
runtime = shutil.which(container_tech)
if runtime is None:
# Fallback to the container runtime path from the settings
settings = Settings()
runtime_path = settings.get("container_runtime_path")
if os.path.exists(runtime_path):
return runtime_path
raise errors.NoContainerTechException(container_tech)
return runtime
def get_runtime_version() -> Tuple[int, int]:
"""Get the major/minor parts of the Docker/Podman version. """Get the major/minor parts of the Docker/Podman version.
Some of the operations we perform in this module rely on some Podman features Some of the operations we perform in this module rely on some Podman features
@ -45,15 +43,15 @@ def get_runtime_version() -> Tuple[int, int]:
just knowing the major and minor version, since writing/installing a full-blown just knowing the major and minor version, since writing/installing a full-blown
semver parser is an overkill. semver parser is an overkill.
""" """
# Get the Docker/Podman version, using a Go template. runtime = runtime or Runtime()
runtime_name = get_runtime_name()
if runtime_name == "podman": # Get the Docker/Podman version, using a Go template.
if runtime.name == "podman":
query = "{{.Client.Version}}" query = "{{.Client.Version}}"
else: else:
query = "{{.Server.Version}}" query = "{{.Server.Version}}"
cmd = [get_runtime(), "version", "-f", query] cmd = [str(runtime.path), "version", "-f", query]
try: try:
version = subprocess.run( version = subprocess.run(
cmd, cmd,
@ -72,7 +70,7 @@ def get_runtime_version() -> Tuple[int, int]:
return (int(major), int(minor)) return (int(major), int(minor))
except Exception as e: except Exception as e:
msg = ( msg = (
f"Could not parse the version of the {runtime_name.capitalize()} tool" f"Could not parse the version of the {runtime.name.capitalize()} tool"
f" (found: '{version}') due to the following error: {e}" f" (found: '{version}') due to the following error: {e}"
) )
raise RuntimeError(msg) raise RuntimeError(msg)
@ -85,10 +83,11 @@ def list_image_tags() -> List[str]:
images. This can be useful when we want to find which are the local image tags, images. This can be useful when we want to find which are the local image tags,
and which image ID does the "latest" tag point to. and which image ID does the "latest" tag point to.
""" """
runtime = Runtime()
return ( return (
subprocess.check_output( subprocess.check_output(
[ [
get_runtime(), str(runtime.path),
"image", "image",
"list", "list",
"--format", "--format",
@ -105,19 +104,21 @@ def list_image_tags() -> List[str]:
def add_image_tag(image_id: str, new_tag: str) -> None: def add_image_tag(image_id: str, new_tag: str) -> None:
"""Add a tag to the Dangerzone image.""" """Add a tag to the Dangerzone image."""
runtime = Runtime()
log.debug(f"Adding tag '{new_tag}' to image '{image_id}'") log.debug(f"Adding tag '{new_tag}' to image '{image_id}'")
subprocess.check_output( subprocess.check_output(
[get_runtime(), "tag", image_id, new_tag], [str(runtime.path), "tag", image_id, new_tag],
startupinfo=get_subprocess_startupinfo(), startupinfo=get_subprocess_startupinfo(),
) )
def delete_image_tag(tag: str) -> None: def delete_image_tag(tag: str) -> None:
"""Delete a Dangerzone image tag.""" """Delete a Dangerzone image tag."""
runtime = Runtime()
log.warning(f"Deleting old container image: {tag}") log.warning(f"Deleting old container image: {tag}")
try: try:
subprocess.check_output( subprocess.check_output(
[get_runtime(), "rmi", "--force", tag], [str(runtime.name), "rmi", "--force", tag],
startupinfo=get_subprocess_startupinfo(), startupinfo=get_subprocess_startupinfo(),
) )
except Exception as e: except Exception as e:
@ -134,11 +135,12 @@ def get_expected_tag() -> str:
def load_image_tarball() -> None: def load_image_tarball() -> None:
runtime = Runtime()
log.info("Installing Dangerzone container image...") log.info("Installing Dangerzone container image...")
tarball_path = get_resource_path("container.tar") tarball_path = get_resource_path("container.tar")
try: try:
res = subprocess.run( res = subprocess.run(
[get_runtime(), "load", "-i", str(tarball_path)], [str(runtime.path), "load", "-i", str(tarball_path)],
startupinfo=get_subprocess_startupinfo(), startupinfo=get_subprocess_startupinfo(),
capture_output=True, capture_output=True,
check=True, check=True,
@ -163,7 +165,7 @@ def load_image_tarball() -> None:
# `share/image-id.txt` and delete the incorrect tag. # `share/image-id.txt` and delete the incorrect tag.
# #
# [1] https://github.com/containers/podman/issues/16490 # [1] https://github.com/containers/podman/issues/16490
if get_runtime_name() == "podman" and get_runtime_version() == (3, 4): if runtime.name == "podman" and get_runtime_version(runtime) == (3, 4):
expected_tag = get_expected_tag() expected_tag = get_expected_tag()
bad_tag = f"localhost/{expected_tag}:latest" bad_tag = f"localhost/{expected_tag}:latest"
good_tag = f"{CONTAINER_NAME}:{expected_tag}" good_tag = f"{CONTAINER_NAME}:{expected_tag}"

View file

@ -221,11 +221,14 @@ class MainWindow(QtWidgets.QMainWindow):
self.setProperty("OSColorMode", self.dangerzone.app.os_color_mode.value) self.setProperty("OSColorMode", self.dangerzone.app.os_color_mode.value)
if hasattr(self.dangerzone.isolation_provider, "check_docker_desktop_version"): if hasattr(self.dangerzone.isolation_provider, "check_docker_desktop_version"):
try:
is_version_valid, version = ( is_version_valid, version = (
self.dangerzone.isolation_provider.check_docker_desktop_version() self.dangerzone.isolation_provider.check_docker_desktop_version()
) )
if not is_version_valid: if not is_version_valid:
self.handle_docker_desktop_version_check(is_version_valid, version) self.handle_docker_desktop_version_check(is_version_valid, version)
except errors.UnsupportedContainerRuntime as e:
pass # It's catched later in the flow.
self.show() self.show()

View file

@ -6,6 +6,7 @@ import subprocess
from typing import List, Tuple from typing import List, Tuple
from .. import container_utils, errors from .. import container_utils, errors
from ..container_utils import Runtime
from ..document import Document from ..document import Document
from ..util import get_resource_path, get_subprocess_startupinfo from ..util import get_resource_path, get_subprocess_startupinfo
from .base import IsolationProvider, terminate_process_group from .base import IsolationProvider, terminate_process_group
@ -50,7 +51,8 @@ class Container(IsolationProvider):
* Do not map the host user to the container, with `--userns nomap` (available * Do not map the host user to the container, with `--userns nomap` (available
from Podman 4.1 onwards) from Podman 4.1 onwards)
""" """
if container_utils.get_runtime_name() == "podman": runtime = Runtime()
if runtime.name == "podman":
security_args = ["--log-driver", "none"] security_args = ["--log-driver", "none"]
security_args += ["--security-opt", "no-new-privileges"] security_args += ["--security-opt", "no-new-privileges"]
if container_utils.get_runtime_version() >= (4, 1): if container_utils.get_runtime_version() >= (4, 1):
@ -123,12 +125,11 @@ class Container(IsolationProvider):
@staticmethod @staticmethod
def is_available() -> bool: def is_available() -> bool:
container_runtime = container_utils.get_runtime() runtime = Runtime()
runtime_name = container_utils.get_runtime_name()
# Can we run `docker/podman image ls` without an error # Can we run `docker/podman image ls` without an error
with subprocess.Popen( with subprocess.Popen(
[container_runtime, "image", "ls"], [str(runtime.path), "image", "ls"],
stdout=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
startupinfo=get_subprocess_startupinfo(), startupinfo=get_subprocess_startupinfo(),
@ -136,14 +137,15 @@ class Container(IsolationProvider):
_, stderr = p.communicate() _, stderr = p.communicate()
if p.returncode != 0: if p.returncode != 0:
raise errors.NotAvailableContainerTechException( raise errors.NotAvailableContainerTechException(
runtime_name, stderr.decode() runtime.name, stderr.decode()
) )
return True return True
def check_docker_desktop_version(self) -> Tuple[bool, str]: def check_docker_desktop_version(self) -> Tuple[bool, str]:
# On windows and darwin, check that the minimum version is met # On windows and darwin, check that the minimum version is met
version = "" version = ""
runtime_is_docker = container_utils.get_runtime_name() == "docker" runtime = Runtime()
runtime_is_docker = runtime.name == "docker"
platform_is_not_linux = platform.system() != "Linux" platform_is_not_linux = platform.system() != "Linux"
if runtime_is_docker and platform_is_not_linux: if runtime_is_docker and platform_is_not_linux:
@ -196,7 +198,7 @@ class Container(IsolationProvider):
command: List[str], command: List[str],
name: str, name: str,
) -> subprocess.Popen: ) -> subprocess.Popen:
container_runtime = container_utils.get_runtime() runtime = Runtime()
security_args = self.get_runtime_security_args() security_args = self.get_runtime_security_args()
debug_args = [] debug_args = []
if self.debug: if self.debug:
@ -218,7 +220,7 @@ class Container(IsolationProvider):
+ image_name + image_name
+ command + command
) )
return self.exec([container_runtime] + args) return self.exec([str(runtime.path)] + args)
def kill_container(self, name: str) -> None: def kill_container(self, name: str) -> None:
"""Terminate a spawned container. """Terminate a spawned container.
@ -230,8 +232,8 @@ class Container(IsolationProvider):
connected to the Docker daemon, and killing it will just close the associated connected to the Docker daemon, and killing it will just close the associated
standard streams. standard streams.
""" """
container_runtime = container_utils.get_runtime() runtime = Runtime()
cmd = [container_runtime, "kill", name] cmd = [str(runtime.path), "kill", name]
try: try:
# We do not check the exit code of the process here, since the container may # We do not check the exit code of the process here, since the container may
# have stopped right before invoking this command. In that case, the # have stopped right before invoking this command. In that case, the
@ -287,10 +289,10 @@ class Container(IsolationProvider):
# after a podman kill / docker kill invocation, this will likely be the case, # after a podman kill / docker kill invocation, this will likely be the case,
# else the container runtime (Docker/Podman) has experienced a problem, and we # else the container runtime (Docker/Podman) has experienced a problem, and we
# should report it. # should report it.
container_runtime = container_utils.get_runtime() runtime = Runtime()
name = self.doc_to_pixels_container_name(document) name = self.doc_to_pixels_container_name(document)
all_containers = subprocess.run( all_containers = subprocess.run(
[container_runtime, "ps", "-a"], [str(runtime.path), "ps", "-a"],
capture_output=True, capture_output=True,
startupinfo=get_subprocess_startupinfo(), startupinfo=get_subprocess_startupinfo(),
) )
@ -301,19 +303,20 @@ class Container(IsolationProvider):
# FIXME hardcoded 1 until length conversions are better handled # FIXME hardcoded 1 until length conversions are better handled
# https://github.com/freedomofpress/dangerzone/issues/257 # https://github.com/freedomofpress/dangerzone/issues/257
return 1 return 1
runtime = Runtime() # type: ignore [unreachable]
n_cpu = 1 # type: ignore [unreachable] n_cpu = 1
if platform.system() == "Linux": if platform.system() == "Linux":
# if on linux containers run natively # if on linux containers run natively
cpu_count = os.cpu_count() cpu_count = os.cpu_count()
if cpu_count is not None: if cpu_count is not None:
n_cpu = cpu_count n_cpu = cpu_count
elif container_utils.get_runtime_name() == "docker": elif runtime.name == "docker":
# For Windows and MacOS containers run in VM # For Windows and MacOS containers run in VM
# So we obtain the CPU count for the VM # So we obtain the CPU count for the VM
n_cpu_str = subprocess.check_output( n_cpu_str = subprocess.check_output(
[container_utils.get_runtime(), "info", "--format", "{{.NCPU}}"], [str(runtime.path), "info", "--format", "{{.NCPU}}"],
text=True, text=True,
startupinfo=get_subprocess_startupinfo(), startupinfo=get_subprocess_startupinfo(),
) )

View file

@ -5,7 +5,8 @@ import pytest
from pytest_mock import MockerFixture from pytest_mock import MockerFixture
from pytest_subprocess import FakeProcess from pytest_subprocess import FakeProcess
from dangerzone import container_utils, errors from dangerzone import errors
from dangerzone.container_utils import Runtime
from dangerzone.isolation_provider.container import Container from dangerzone.isolation_provider.container import Container
from dangerzone.isolation_provider.qubes import is_qubes_native_conversion from dangerzone.isolation_provider.qubes import is_qubes_native_conversion
from dangerzone.util import get_resource_path from dangerzone.util import get_resource_path
@ -24,42 +25,51 @@ def provider() -> Container:
return Container() return Container()
@pytest.fixture
def runtime_path() -> str:
return str(Runtime().path)
class TestContainer(IsolationProviderTest): class TestContainer(IsolationProviderTest):
def test_is_available_raises(self, provider: Container, fp: FakeProcess) -> None: def test_is_available_raises(
self, provider: Container, fp: FakeProcess, runtime_path: str
) -> None:
""" """
NotAvailableContainerTechException should be raised when NotAvailableContainerTechException should be raised when
the "podman image ls" command fails. the "podman image ls" command fails.
""" """
fp.register_subprocess( fp.register_subprocess(
[container_utils.get_runtime(), "image", "ls"], [runtime_path, "image", "ls"],
returncode=-1, returncode=-1,
stderr="podman image ls logs", stderr="podman image ls logs",
) )
with pytest.raises(errors.NotAvailableContainerTechException): with pytest.raises(errors.NotAvailableContainerTechException):
provider.is_available() provider.is_available()
def test_is_available_works(self, provider: Container, fp: FakeProcess) -> None: def test_is_available_works(
self, provider: Container, fp: FakeProcess, runtime_path: str
) -> None:
""" """
No exception should be raised when the "podman image ls" can return properly. No exception should be raised when the "podman image ls" can return properly.
""" """
fp.register_subprocess( fp.register_subprocess(
[container_utils.get_runtime(), "image", "ls"], [runtime_path, "image", "ls"],
) )
provider.is_available() provider.is_available()
def test_install_raise_if_image_cant_be_installed( def test_install_raise_if_image_cant_be_installed(
self, provider: Container, fp: FakeProcess self, provider: Container, fp: FakeProcess, runtime_path: str
) -> None: ) -> None:
"""When an image installation fails, an exception should be raised""" """When an image installation fails, an exception should be raised"""
fp.register_subprocess( fp.register_subprocess(
[container_utils.get_runtime(), "image", "ls"], [runtime_path, "image", "ls"],
) )
# First check should return nothing. # First check should return nothing.
fp.register_subprocess( fp.register_subprocess(
[ [
container_utils.get_runtime(), runtime_path,
"image", "image",
"list", "list",
"--format", "--format",
@ -71,7 +81,7 @@ class TestContainer(IsolationProviderTest):
fp.register_subprocess( fp.register_subprocess(
[ [
container_utils.get_runtime(), runtime_path,
"load", "load",
"-i", "-i",
get_resource_path("container.tar").absolute(), get_resource_path("container.tar").absolute(),
@ -83,22 +93,22 @@ class TestContainer(IsolationProviderTest):
provider.install() provider.install()
def test_install_raises_if_still_not_installed( def test_install_raises_if_still_not_installed(
self, provider: Container, fp: FakeProcess self, provider: Container, fp: FakeProcess, runtime_path: str
) -> None: ) -> None:
"""When an image keep being not installed, it should return False""" """When an image keep being not installed, it should return False"""
fp.register_subprocess( fp.register_subprocess(
[container_utils.get_runtime(), "version", "-f", "{{.Client.Version}}"], [runtime_path, "version", "-f", "{{.Client.Version}}"],
stdout="4.0.0", stdout="4.0.0",
) )
fp.register_subprocess( fp.register_subprocess(
[container_utils.get_runtime(), "image", "ls"], [runtime_path, "image", "ls"],
) )
# First check should return nothing. # First check should return nothing.
fp.register_subprocess( fp.register_subprocess(
[ [
container_utils.get_runtime(), runtime_path,
"image", "image",
"list", "list",
"--format", "--format",
@ -110,7 +120,7 @@ class TestContainer(IsolationProviderTest):
fp.register_subprocess( fp.register_subprocess(
[ [
container_utils.get_runtime(), runtime_path,
"load", "load",
"-i", "-i",
get_resource_path("container.tar").absolute(), get_resource_path("container.tar").absolute(),

View file

@ -2,7 +2,7 @@ from pathlib import Path
from pytest_mock import MockerFixture from pytest_mock import MockerFixture
from dangerzone.container_utils import get_runtime_name from dangerzone.container_utils import Runtime
from dangerzone.settings import Settings from dangerzone.settings import Settings
@ -10,19 +10,39 @@ def test_get_runtime_name_from_settings(mocker: MockerFixture, tmp_path: Path) -
mocker.patch("dangerzone.settings.get_config_dir", return_value=tmp_path) mocker.patch("dangerzone.settings.get_config_dir", return_value=tmp_path)
settings = Settings() settings = Settings()
settings.set("container_runtime", "new-kid-on-the-block", autosave=True) settings.set(
"container_runtime", "/opt/somewhere/new-kid-on-the-block", autosave=True
)
assert get_runtime_name() == "new-kid-on-the-block" assert Runtime().name == "new-kid-on-the-block"
def test_get_runtime_name_linux(mocker: MockerFixture) -> None: def test_get_runtime_name_linux(mocker: MockerFixture, tmp_path: Path) -> None:
mocker.patch("dangerzone.settings.get_config_dir", return_value=tmp_path)
mocker.patch("platform.system", return_value="Linux") mocker.patch("platform.system", return_value="Linux")
assert get_runtime_name() == "podman" mocker.patch(
"dangerzone.container_utils.shutil.which", return_value="/usr/bin/podman"
)
mocker.patch("dangerzone.container_utils.os.path.exists", return_value=True)
runtime = Runtime()
assert runtime.name == "podman"
assert runtime.path == Path("/usr/bin/podman")
def test_get_runtime_name_non_linux(mocker: MockerFixture) -> None: def test_get_runtime_name_non_linux(mocker: MockerFixture, tmp_path: Path) -> None:
mocker.patch("platform.system", return_value="Windows") mocker.patch("platform.system", return_value="Windows")
assert get_runtime_name() == "docker" mocker.patch("dangerzone.settings.get_config_dir", return_value=tmp_path)
mocker.patch(
"dangerzone.container_utils.shutil.which", return_value="/usr/bin/docker"
)
mocker.patch("dangerzone.container_utils.os.path.exists", return_value=True)
runtime = Runtime()
assert runtime.name == "docker"
assert runtime.path == Path("/usr/bin/docker")
mocker.patch("platform.system", return_value="Something else") mocker.patch("platform.system", return_value="Something else")
assert get_runtime_name() == "docker"
runtime = Runtime()
assert runtime.name == "docker"
assert runtime.path == Path("/usr/bin/docker")
assert Runtime().name == "docker"