dangerzone/tests/isolation_provider/test_container.py
Alex Pyrgiotis d6410652cb
Kill the process group when conversion terminates
Instead of killing just the invoked Podman/Docker/qrexec process, kill
the whole process group, to make sure that other components that have
been spawned die as well. In the case of Podman, conmon is one of the
processes that lingers, so that's one way to kill it.
2024-10-07 17:37:39 +03:00

102 lines
3.7 KiB
Python

import os
import subprocess
import time
import pytest
from pytest_mock import MockerFixture
from dangerzone.document import Document
from dangerzone.isolation_provider import base
from dangerzone.isolation_provider.container import Container
from dangerzone.isolation_provider.qubes import is_qubes_native_conversion
from .base import IsolationProviderTermination, IsolationProviderTest
# Run the tests in this module only if we can spawn containers.
if is_qubes_native_conversion():
pytest.skip("Qubes native conversion is enabled", allow_module_level=True)
elif os.environ.get("DUMMY_CONVERSION", False):
pytest.skip("Dummy conversion is enabled", allow_module_level=True)
@pytest.fixture
def provider() -> Container:
return Container()
class ContainerWait(Container):
"""Container isolation provider that blocks until the container has started."""
def exec_container(self, *args, **kwargs): # type: ignore [no-untyped-def]
# Check every 100ms if a container with the expected name has showed up.
# Else, closing the file descriptors may not work.
name = kwargs["name"]
runtime = self.get_runtime()
p = super().exec_container(*args, **kwargs)
for i in range(50):
containers = subprocess.run(
[runtime, "ps"], capture_output=True
).stdout.decode()
if name in containers:
return p
time.sleep(0.1)
raise RuntimeError(f"Container {name} did not start within 5 seconds")
@pytest.fixture
def provider_wait() -> ContainerWait:
return ContainerWait()
class TestContainer(IsolationProviderTest):
pass
class TestContainerTermination(IsolationProviderTermination):
def test_linger_runtime_kill(
self,
provider_wait: base.IsolationProvider,
mocker: MockerFixture,
) -> None:
# Check that conversions that remain stuck on `docker|podman kill` are
# terminated forcefully.
doc = Document()
provider_wait.progress_callback = mocker.MagicMock()
get_proc_exception_spy = mocker.spy(provider_wait, "get_proc_exception")
terminate_proc_spy = mocker.spy(provider_wait, "terminate_doc_to_pixels_proc")
kill_proc_spy = mocker.spy(base, "kill_process_group")
# Switch the subprocess.run() function with a patched function that
# intercepts the `kill` command and switches it with `wait` instead. This way,
# we emulate a `docker|podman kill` command that has hang.
orig_subprocess_run = subprocess.run
def patched_subprocess_run(*args, **kwargs): # type: ignore [no-untyped-def]
assert len(args) == 1
cmd = args[0]
if cmd[1] == "kill":
# Switch the `kill` command with `wait`, thereby triggering a timeout.
cmd[1] = "wait"
# Make sure that a timeout has been specified, and make it 0, so that
# the test ends us quickly as possible.
assert "timeout" in kwargs
kwargs[timeout] = 0
# Make sure that the modified command times out.
with pytest.raises(subprocess.TimeoutExpired):
orig_subprocess_run(cmd, **kwargs)
else:
return orig_subprocess_run(*args, **kwargs)
mocker.patch("subprocess.run", patched_subprocess_run)
with provider_wait.doc_to_pixels_proc(doc, timeout_grace=0) as proc:
# We purposefully do nothing here, so that the process remains running.
pass
get_proc_exception_spy.assert_not_called()
terminate_proc_spy.assert_called()
kill_proc_spy.assert_called()
assert proc.poll() is not None