container: Improve parsing of progress reports

Improve the `parse_progress()` method of the container isolation
provider in the following ways:

1. Make sure that the fields of the progress report have the expected
   type.
2. In case of a JSON parsing error, sanitize the invalid string so that
   it doesn't contain escape sequences, or the user considers it as
   trusted.
This commit is contained in:
deeplow 2023-07-28 18:52:13 +03:00 committed by Alex Pyrgiotis
parent 9410b68c1d
commit 72536a05ac
No known key found for this signature in database
GPG key ID: B6C15EBA0357C9AA
3 changed files with 110 additions and 12 deletions

View file

@ -8,10 +8,15 @@ import shlex
import shutil
import subprocess
import tempfile
from typing import Callable, List, Optional, Tuple
from typing import Any, Callable, List, Optional, Tuple
from ..document import Document
from ..util import get_resource_path, get_subprocess_startupinfo, get_tmp_dir
from ..util import (
get_resource_path,
get_subprocess_startupinfo,
get_tmp_dir,
replace_control_chars,
)
from .base import IsolationProvider
# Define startupinfo for subprocesses
@ -133,19 +138,37 @@ class Container(IsolationProvider):
return installed
def parse_progress(self, document: Document, line: str) -> None:
def assert_field_type(self, val: Any, _type: object) -> None:
# XXX: Use a stricter check than isinstance because `bool` is a subclass of
# `int`.
#
# See https://stackoverflow.com/a/37888668
if not type(val) == _type:
raise ValueError("Status field has incorrect type")
def parse_progress(self, document: Document, untrusted_line: str) -> None:
"""
Parses a line returned by the container.
"""
try:
status = json.loads(line)
self.print_progress(
document, status["error"], status["text"], status["percentage"]
untrusted_status = json.loads(untrusted_line)
text = untrusted_status["text"]
self.assert_field_type(text, str)
error = untrusted_status["error"]
self.assert_field_type(error, bool)
percentage = untrusted_status["percentage"]
self.assert_field_type(percentage, int)
self.print_progress(document, error, text, percentage)
except Exception:
line = replace_control_chars(untrusted_line)
error_message = (
f"Invalid JSON returned from container:\n\n\tUNTRUSTED> {line}"
)
except:
error_message = f"Invalid JSON returned from container:\n\n\t {line}"
log.error(error_message)
self.print_progress(document, True, error_message, -1)
self.print_progress_trusted(document, True, error_message, -1)
def exec(
self,
@ -165,8 +188,8 @@ class Container(IsolationProvider):
startupinfo=startupinfo,
) as p:
if p.stdout is not None:
for line in p.stdout:
self.parse_progress(document, line)
for untrusted_line in p.stdout:
self.parse_progress(document, untrusted_line)
p.communicate()
return p.returncode

View file

View file

@ -0,0 +1,75 @@
import itertools
import json
from typing import Any, Dict
from pytest_mock import MockerFixture
from dangerzone.document import Document
from dangerzone.isolation_provider.container import Container
from .. import sanitized_text, uncommon_text
def test_parse_progress(
uncommon_text: str, sanitized_text: str, mocker: MockerFixture
) -> None:
"""Test that the `parse_progress()` function handles escape sequences properly."""
container = Container(enable_timeouts=False)
container.progress_callback = mocker.MagicMock()
print_progress_mock = mocker.patch.object(container, "_print_progress")
d = Document()
# Test 1 - Check that normal JSON values are printed as is.
simple_json = json.dumps({"text": "text", "error": False, "percentage": 0})
container.parse_progress(d, simple_json)
print_progress_mock.assert_called_with(d, False, "UNTRUSTED> text", 0)
# Test 2 - Check that a totally invalid string is reported as a failure. If this
# string contains escape characters, they should be sanitized as well.
def assert_invalid_json(text: str) -> None:
print_progress_mock.assert_called_with(
d, True, f"Invalid JSON returned from container:\n\n\tUNTRUSTED> {text}", -1
)
# Try first with a trivially invalid string.
invalid_json = "()"
container.parse_progress(d, invalid_json)
assert_invalid_json(invalid_json)
# Try next with an invalid string that contains uncommon text.
container.parse_progress(d, uncommon_text)
assert_invalid_json(sanitized_text)
# Test 3 - Check that a structurally valid JSON value with escape characters in it
# is sanitized.
valid_json = json.dumps({"text": uncommon_text, "error": False, "percentage": 0})
sanitized_json = json.dumps(
{"text": sanitized_text, "error": False, "percentage": 0}
)
container.parse_progress(d, valid_json)
print_progress_mock.assert_called_with(d, False, "UNTRUSTED> " + sanitized_text, 0)
# Test 4 - Check that a structurally valid JSON, that otherwise does not have the
# expected keys, or the expected value types, is reported as an error, and any
# escape sequences are sanitized.
keys = ["text", "error", "percentage", uncommon_text]
values = [uncommon_text, False, 0, None]
possible_kvs = list(itertools.product(keys, values, repeat=1))
# Based on the above keys and values, create any combination possible, from 0 to 3
# elements. Take extra care to:
#
# * Remove combinations with non-unique keys.
# * Ignore the sole valid combination (see `valid_json`), since we have already
# tested it above.
for i in range(len(keys)):
for combo in itertools.combinations(possible_kvs, i):
dict_combo: Dict[str, Any] = dict(combo) # type: ignore [arg-type]
if len(combo) == len(dict_combo.keys()):
bad_json = json.dumps(dict_combo)
sanitized_json = bad_json.replace(uncommon_text, sanitized_text)
if bad_json == valid_json:
continue
container.parse_progress(d, bad_json)
assert_invalid_json(sanitized_json)