Merge size checks onto common isolation provider

An overdue dead code removal was finally done, combined with the merging
of part of the containers and Qubes isolation provider regarding size
(and width/height) checks.
This commit is contained in:
deeplow 2023-10-31 15:13:15 +00:00
parent c15cbd65c6
commit ebfed4ecda
No known key found for this signature in database
GPG key ID: 577982871529A52A
7 changed files with 64 additions and 93 deletions

View file

@ -118,6 +118,11 @@ class InterruptedConversion(ConversionException):
) )
class PixelFilesMismatch(ConversionException):
error_code = ERROR_SHIFT + 70
error_message = "The number of pages received is different than expected"
class UnexpectedConversionError(PDFtoPPMException): class UnexpectedConversionError(PDFtoPPMException):
error_code = ERROR_SHIFT + 100 error_code = ERROR_SHIFT + 100
error_message = "Some unexpected error occurred while converting the document" error_message = "Some unexpected error occurred while converting the document"

View file

@ -110,83 +110,20 @@ class IsolationProvider(ABC):
""" """
Reconstruct PPM files and save as PNG to save space Reconstruct PPM files and save as PNG to save space
""" """
if not (1 <= width <= errors.MAX_PAGE_WIDTH):
raise errors.MaxPageWidthException()
if not (1 <= height <= errors.MAX_PAGE_HEIGHT):
raise errors.MaxPageHeightException()
ppm_header = f"P6\n{width} {height}\n255\n".encode() ppm_header = f"P6\n{width} {height}\n255\n".encode()
ppm_data = io.BytesIO(ppm_header + rgb_data) ppm_data = io.BytesIO(ppm_header + rgb_data)
png_path = Path(tempdir) / f"page-{page}.png" png_path = Path(tempdir) / f"page-{page}.png"
# Verify the exact data was received
if len(rgb_data) != width * height * 3:
raise errors.InterruptedConversion()
try: try:
Image.open(ppm_data).save(png_path, "PNG") Image.open(ppm_data).save(png_path, "PNG")
except UnidentifiedImageError as e: except UnidentifiedImageError as e:
raise errors.PPMtoPNGError() from e raise errors.PPMtoPNGError() from e
# From global_common:
# def validate_convert_to_pixel_output(self, common, output):
# """
# Take the output from the convert to pixels tasks and validate it. Returns
# a tuple like: (success (boolean), error_message (str))
# """
# max_image_width = 10000
# max_image_height = 10000
# # Did we hit an error?
# for line in output.split("\n"):
# if (
# "failed:" in line
# or "The document format is not supported" in line
# or "Error" in line
# ):
# return False, output
# # How many pages was that?
# num_pages = None
# for line in output.split("\n"):
# if line.startswith("Document has "):
# num_pages = line.split(" ")[2]
# break
# if not num_pages or not num_pages.isdigit() or int(num_pages) <= 0:
# return False, "Invalid number of pages returned"
# num_pages = int(num_pages)
# # Make sure we have the files we expect
# expected_filenames = []
# for i in range(1, num_pages + 1):
# expected_filenames += [
# f"page-{i}.rgb",
# f"page-{i}.width",
# f"page-{i}.height",
# ]
# expected_filenames.sort()
# actual_filenames = os.listdir(common.pixel_dir.name)
# actual_filenames.sort()
# if expected_filenames != actual_filenames:
# return (
# False,
# f"We expected these files:\n{expected_filenames}\n\nBut we got these files:\n{actual_filenames}",
# )
# # Make sure the files are the correct sizes
# for i in range(1, num_pages + 1):
# with open(f"{common.pixel_dir.name}/page-{i}.width") as f:
# w_str = f.read().strip()
# with open(f"{common.pixel_dir.name}/page-{i}.height") as f:
# h_str = f.read().strip()
# w = int(w_str)
# h = int(h_str)
# if (
# not w_str.isdigit()
# or not h_str.isdigit()
# or w <= 0
# or w > max_image_width
# or h <= 0
# or h > max_image_height
# ):
# return False, f"Page {i} has invalid geometry"
# # Make sure the RGB file is the correct size
# if os.path.getsize(f"{common.pixel_dir.name}/page-{i}.rgb") != w * h * 3:
# return False, f"Page {i} has an invalid RGB file size"
# return True, True

View file

@ -306,6 +306,8 @@ class Container(IsolationProvider):
) )
num_pages = len(glob.glob(f"{pixel_dir}/page-*.rgb")) num_pages = len(glob.glob(f"{pixel_dir}/page-*.rgb"))
self.verify_received_pixel_files(pixel_dir, num_pages)
for page in range(1, num_pages + 1): for page in range(1, num_pages + 1):
filename_base = f"{pixel_dir}/page-{page}" filename_base = f"{pixel_dir}/page-{page}"
rgb_filename = f"{filename_base}.rgb" rgb_filename = f"{filename_base}.rgb"
@ -379,6 +381,25 @@ class Container(IsolationProvider):
return success return success
def verify_received_pixel_files(
self, pixel_dir: pathlib.Path, num_pages: int
) -> None:
"""Make sure we have the files we expect"""
expected_filenames = ["captured_output.txt"]
for i in range(1, num_pages + 1):
expected_filenames += [
f"page-{i}.rgb",
f"page-{i}.width",
f"page-{i}.height",
]
expected_filenames.sort()
actual_filenames = os.listdir(pixel_dir)
actual_filenames.sort()
if expected_filenames != actual_filenames:
raise errors.PixelFilesMismatch()
def get_max_parallel_conversions(self) -> int: def get_max_parallel_conversions(self) -> int:
# FIXME hardcoded 1 until timeouts are more limited and better handled # FIXME hardcoded 1 until timeouts are more limited and better handled
# https://github.com/freedomofpress/dangerzone/issues/257 # https://github.com/freedomofpress/dangerzone/issues/257

View file

@ -109,10 +109,6 @@ class Qubes(IsolationProvider):
width = read_int(self.proc.stdout, timeout=sw.remaining) width = read_int(self.proc.stdout, timeout=sw.remaining)
height = read_int(self.proc.stdout, timeout=sw.remaining) height = read_int(self.proc.stdout, timeout=sw.remaining)
if not (1 <= width <= errors.MAX_PAGE_WIDTH):
raise errors.MaxPageWidthException()
if not (1 <= height <= errors.MAX_PAGE_HEIGHT):
raise errors.MaxPageHeightException()
num_pixels = width * height * 3 # three color channels num_pixels = width * height * 3 # three color channels
untrusted_pixels = read_bytes( untrusted_pixels = read_bytes(

View file

@ -9,7 +9,13 @@ from dangerzone.document import Document
from dangerzone.isolation_provider import base from dangerzone.isolation_provider import base
from dangerzone.isolation_provider.qubes import running_on_qubes from dangerzone.isolation_provider.qubes import running_on_qubes
from .. import pdf_11k_pages, sanitized_text, uncommon_text from .. import (
pdf_11k_pages,
sample_bad_height,
sample_bad_width,
sanitized_text,
uncommon_text,
)
@pytest.mark.skipif( @pytest.mark.skipif(
@ -68,3 +74,18 @@ class IsolationProviderTest:
with pytest.raises(errors.MaxPagesException): with pytest.raises(errors.MaxPagesException):
success = provider._convert(doc, ocr_lang=None) success = provider._convert(doc, ocr_lang=None)
assert not success assert not success
def test_max_dimensions(
self,
sample_bad_width: str,
sample_bad_height: str,
provider: base.IsolationProvider,
mocker: MockerFixture,
) -> None:
provider.progress_callback = mocker.MagicMock()
with pytest.raises(errors.MaxPageWidthException):
success = provider._convert(Document(sample_bad_width), ocr_lang=None)
assert not success
with pytest.raises(errors.MaxPageHeightException):
success = provider._convert(Document(sample_bad_height), ocr_lang=None)
assert not success

View file

@ -9,7 +9,13 @@ from dangerzone.document import Document
from dangerzone.isolation_provider.container import Container from dangerzone.isolation_provider.container import Container
# XXX Fixtures used in abstract Test class need to be imported regardless # XXX Fixtures used in abstract Test class need to be imported regardless
from .. import pdf_11k_pages, sanitized_text, uncommon_text from .. import (
pdf_11k_pages,
sample_bad_height,
sample_bad_width,
sanitized_text,
uncommon_text,
)
from .base import IsolationProviderTest from .base import IsolationProviderTest

View file

@ -45,21 +45,6 @@ class TestQubes(IsolationProviderTest):
success = provider._convert(doc, ocr_lang=None) success = provider._convert(doc, ocr_lang=None)
assert not success assert not success
def test_max_dimensions(
self,
sample_bad_width: str,
sample_bad_height: str,
provider: Qubes,
mocker: MockerFixture,
) -> None:
provider.progress_callback = mocker.MagicMock()
with pytest.raises(errors.MaxPageWidthException):
success = provider._convert(Document(sample_bad_width), ocr_lang=None)
assert not success
with pytest.raises(errors.MaxPageHeightException):
success = provider._convert(Document(sample_bad_height), ocr_lang=None)
assert not success
def test_out_of_ram( def test_out_of_ram(
self, self,
provider: Qubes, provider: Qubes,