diff --git a/dangerzone/isolation_provider/container.py b/dangerzone/isolation_provider/container.py index 8d26e48..8e5fb80 100644 --- a/dangerzone/isolation_provider/container.py +++ b/dangerzone/isolation_provider/container.py @@ -8,10 +8,15 @@ import shlex import shutil import subprocess import tempfile -from typing import Callable, List, Optional, Tuple +from typing import Any, Callable, List, Optional, Tuple from ..document import Document -from ..util import get_resource_path, get_subprocess_startupinfo, get_tmp_dir +from ..util import ( + get_resource_path, + get_subprocess_startupinfo, + get_tmp_dir, + replace_control_chars, +) from .base import IsolationProvider # Define startupinfo for subprocesses @@ -133,19 +138,37 @@ class Container(IsolationProvider): return installed - def parse_progress(self, document: Document, line: str) -> None: + def assert_field_type(self, val: Any, _type: object) -> None: + # XXX: Use a stricter check than isinstance because `bool` is a subclass of + # `int`. + # + # See https://stackoverflow.com/a/37888668 + if not type(val) == _type: + raise ValueError("Status field has incorrect type") + + def parse_progress(self, document: Document, untrusted_line: str) -> None: """ Parses a line returned by the container. """ try: - status = json.loads(line) - self.print_progress( - document, status["error"], status["text"], status["percentage"] + untrusted_status = json.loads(untrusted_line) + + text = untrusted_status["text"] + self.assert_field_type(text, str) + + error = untrusted_status["error"] + self.assert_field_type(error, bool) + + percentage = untrusted_status["percentage"] + self.assert_field_type(percentage, int) + + self.print_progress(document, error, text, percentage) + except Exception: + line = replace_control_chars(untrusted_line) + error_message = ( + f"Invalid JSON returned from container:\n\n\tUNTRUSTED> {line}" ) - except: - error_message = f"Invalid JSON returned from container:\n\n\t {line}" - log.error(error_message) - self.print_progress(document, True, error_message, -1) + self.print_progress_trusted(document, True, error_message, -1) def exec( self, @@ -165,8 +188,8 @@ class Container(IsolationProvider): startupinfo=startupinfo, ) as p: if p.stdout is not None: - for line in p.stdout: - self.parse_progress(document, line) + for untrusted_line in p.stdout: + self.parse_progress(document, untrusted_line) p.communicate() return p.returncode diff --git a/tests/isolation_provider/__init__.py b/tests/isolation_provider/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/isolation_provider/test_container.py b/tests/isolation_provider/test_container.py new file mode 100644 index 0000000..66f010d --- /dev/null +++ b/tests/isolation_provider/test_container.py @@ -0,0 +1,75 @@ +import itertools +import json +from typing import Any, Dict + +from pytest_mock import MockerFixture + +from dangerzone.document import Document +from dangerzone.isolation_provider.container import Container + +from .. import sanitized_text, uncommon_text + + +def test_parse_progress( + uncommon_text: str, sanitized_text: str, mocker: MockerFixture +) -> None: + """Test that the `parse_progress()` function handles escape sequences properly.""" + container = Container(enable_timeouts=False) + container.progress_callback = mocker.MagicMock() + print_progress_mock = mocker.patch.object(container, "_print_progress") + d = Document() + + # Test 1 - Check that normal JSON values are printed as is. + simple_json = json.dumps({"text": "text", "error": False, "percentage": 0}) + container.parse_progress(d, simple_json) + print_progress_mock.assert_called_with(d, False, "UNTRUSTED> text", 0) + + # Test 2 - Check that a totally invalid string is reported as a failure. If this + # string contains escape characters, they should be sanitized as well. + def assert_invalid_json(text: str) -> None: + print_progress_mock.assert_called_with( + d, True, f"Invalid JSON returned from container:\n\n\tUNTRUSTED> {text}", -1 + ) + + # Try first with a trivially invalid string. + invalid_json = "()" + container.parse_progress(d, invalid_json) + assert_invalid_json(invalid_json) + + # Try next with an invalid string that contains uncommon text. + container.parse_progress(d, uncommon_text) + assert_invalid_json(sanitized_text) + + # Test 3 - Check that a structurally valid JSON value with escape characters in it + # is sanitized. + valid_json = json.dumps({"text": uncommon_text, "error": False, "percentage": 0}) + sanitized_json = json.dumps( + {"text": sanitized_text, "error": False, "percentage": 0} + ) + container.parse_progress(d, valid_json) + print_progress_mock.assert_called_with(d, False, "UNTRUSTED> " + sanitized_text, 0) + + # Test 4 - Check that a structurally valid JSON, that otherwise does not have the + # expected keys, or the expected value types, is reported as an error, and any + # escape sequences are sanitized. + + keys = ["text", "error", "percentage", uncommon_text] + values = [uncommon_text, False, 0, None] + possible_kvs = list(itertools.product(keys, values, repeat=1)) + + # Based on the above keys and values, create any combination possible, from 0 to 3 + # elements. Take extra care to: + # + # * Remove combinations with non-unique keys. + # * Ignore the sole valid combination (see `valid_json`), since we have already + # tested it above. + for i in range(len(keys)): + for combo in itertools.combinations(possible_kvs, i): + dict_combo: Dict[str, Any] = dict(combo) # type: ignore [arg-type] + if len(combo) == len(dict_combo.keys()): + bad_json = json.dumps(dict_combo) + sanitized_json = bad_json.replace(uncommon_text, sanitized_text) + if bad_json == valid_json: + continue + container.parse_progress(d, bad_json) + assert_invalid_json(sanitized_json)