mirror of
https://github.com/freedomofpress/dangerzone.git
synced 2025-04-28 18:02:38 +02:00
348 lines
12 KiB
Python
348 lines
12 KiB
Python
import contextlib
|
|
import logging
|
|
import os
|
|
import platform
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
from abc import ABC, abstractmethod
|
|
from pathlib import Path
|
|
from typing import IO, Callable, Iterator, Optional
|
|
|
|
import fitz
|
|
from colorama import Fore, Style
|
|
|
|
from ..conversion import errors
|
|
from ..conversion.common import DEFAULT_DPI, INT_BYTES
|
|
from ..document import Document
|
|
from ..util import get_tessdata_dir, replace_control_chars
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
MAX_CONVERSION_LOG_CHARS = 150 * 50 # up to ~150 lines of 50 characters
|
|
DOC_TO_PIXELS_LOG_START = "----- DOC TO PIXELS LOG START -----"
|
|
DOC_TO_PIXELS_LOG_END = "----- DOC TO PIXELS LOG END -----"
|
|
|
|
TIMEOUT_EXCEPTION = 15
|
|
TIMEOUT_GRACE = 15
|
|
TIMEOUT_FORCE = 5
|
|
|
|
|
|
def _signal_process_group(p: subprocess.Popen, signo: int) -> None:
|
|
"""Send a signal to a process group."""
|
|
try:
|
|
os.killpg(os.getpgid(p.pid), signo)
|
|
except (ProcessLookupError, PermissionError):
|
|
# If the process no longer exists, we may encounter the above errors, either
|
|
# when looking for the process group (ProcessLookupError), or when trying to
|
|
# kill a process group that no longer exists (PermissionError)
|
|
return
|
|
except Exception:
|
|
log.exception(
|
|
f"Unexpected error while sending signal {signo} to the"
|
|
f"document-to-pixels process group (PID: {p.pid})"
|
|
)
|
|
|
|
|
|
def terminate_process_group(p: subprocess.Popen) -> None:
|
|
"""Terminate a process group."""
|
|
if platform.system() == "Windows":
|
|
p.terminate()
|
|
else:
|
|
_signal_process_group(p, signal.SIGTERM)
|
|
|
|
|
|
def kill_process_group(p: subprocess.Popen) -> None:
|
|
"""Forcefully kill a process group."""
|
|
if platform.system() == "Windows":
|
|
p.kill()
|
|
else:
|
|
_signal_process_group(p, signal.SIGKILL)
|
|
|
|
|
|
def read_bytes(f: IO[bytes], size: int, exact: bool = True) -> bytes:
|
|
"""Read bytes from a file-like object."""
|
|
buf = f.read(size)
|
|
if exact and len(buf) != size:
|
|
raise errors.ConverterProcException()
|
|
return buf
|
|
|
|
|
|
def read_int(f: IO[bytes]) -> int:
|
|
"""Read 2 bytes from a file-like object, and decode them as int."""
|
|
untrusted_int = f.read(INT_BYTES)
|
|
if len(untrusted_int) != INT_BYTES:
|
|
raise errors.ConverterProcException()
|
|
return int.from_bytes(untrusted_int, "big", signed=False)
|
|
|
|
|
|
def read_debug_text(f: IO[bytes], size: int) -> str:
|
|
"""Read arbitrarily long text (for debug purposes), and sanitize it."""
|
|
untrusted_text = f.read(size).decode("ascii", errors="replace")
|
|
return replace_control_chars(untrusted_text, keep_newlines=True)
|
|
|
|
|
|
class IsolationProvider(ABC):
|
|
"""
|
|
Abstracts an isolation provider
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
if getattr(sys, "dangerzone_dev", False) is True:
|
|
self.proc_stderr = subprocess.PIPE
|
|
else:
|
|
self.proc_stderr = subprocess.DEVNULL
|
|
|
|
@abstractmethod
|
|
def install(self) -> bool:
|
|
pass
|
|
|
|
def convert(
|
|
self,
|
|
document: Document,
|
|
ocr_lang: Optional[str],
|
|
progress_callback: Optional[Callable] = None,
|
|
) -> None:
|
|
self.progress_callback = progress_callback
|
|
document.mark_as_converting()
|
|
try:
|
|
with self.doc_to_pixels_proc(document) as conversion_proc:
|
|
self.convert_with_proc(document, ocr_lang, conversion_proc)
|
|
document.mark_as_safe()
|
|
if document.archive_after_conversion:
|
|
document.archive()
|
|
except errors.ConversionException as e:
|
|
self.print_progress(document, True, str(e), 0)
|
|
document.mark_as_failed()
|
|
except Exception as e:
|
|
log.exception(
|
|
f"An exception occurred while converting document '{document.id}'"
|
|
)
|
|
self.print_progress(document, True, str(e), 0)
|
|
document.mark_as_failed()
|
|
|
|
def ocr_page(self, pixmap: fitz.Pixmap, ocr_lang: str) -> bytes:
|
|
"""Get a single page as pixels, OCR it, and return a PDF as bytes."""
|
|
return pixmap.pdfocr_tobytes(
|
|
compress=True,
|
|
language=ocr_lang,
|
|
tessdata=str(get_tessdata_dir()),
|
|
)
|
|
|
|
def pixels_to_pdf_page(
|
|
self,
|
|
untrusted_data: bytes,
|
|
untrusted_width: int,
|
|
untrusted_height: int,
|
|
ocr_lang: Optional[str],
|
|
) -> fitz.Document:
|
|
"""Convert a byte array of RGB pixels into a PDF page, optionally with OCR."""
|
|
pixmap = fitz.Pixmap(
|
|
fitz.Colorspace(fitz.CS_RGB),
|
|
untrusted_width,
|
|
untrusted_height,
|
|
untrusted_data,
|
|
False,
|
|
)
|
|
pixmap.set_dpi(DEFAULT_DPI, DEFAULT_DPI)
|
|
|
|
if ocr_lang: # OCR the document
|
|
page_pdf_bytes = self.ocr_page(pixmap, ocr_lang)
|
|
else: # Don't OCR
|
|
page_doc = fitz.Document()
|
|
page_doc.insert_file(pixmap)
|
|
page_pdf_bytes = page_doc.tobytes(deflate_images=True)
|
|
|
|
return fitz.open("pdf", page_pdf_bytes)
|
|
|
|
def convert_with_proc(
|
|
self,
|
|
document: Document,
|
|
ocr_lang: Optional[str],
|
|
p: subprocess.Popen,
|
|
) -> None:
|
|
percentage = 0.0
|
|
with open(document.input_filename, "rb") as f:
|
|
try:
|
|
assert p.stdin is not None
|
|
p.stdin.write(f.read())
|
|
p.stdin.close()
|
|
except BrokenPipeError:
|
|
raise errors.ConverterProcException()
|
|
|
|
assert p.stdout
|
|
n_pages = read_int(p.stdout)
|
|
if n_pages == 0 or n_pages > errors.MAX_PAGES:
|
|
raise errors.MaxPagesException()
|
|
step = 100 / n_pages
|
|
|
|
safe_doc = fitz.Document()
|
|
|
|
for page in range(1, n_pages + 1):
|
|
searchable = "searchable " if ocr_lang else ""
|
|
text = (
|
|
f"Converting page {page}/{n_pages} from pixels to {searchable}PDF"
|
|
)
|
|
self.print_progress(document, False, text, percentage)
|
|
|
|
width = read_int(p.stdout)
|
|
height = read_int(p.stdout)
|
|
if not (1 <= width <= errors.MAX_PAGE_WIDTH):
|
|
raise errors.MaxPageWidthException()
|
|
if not (1 <= height <= errors.MAX_PAGE_HEIGHT):
|
|
raise errors.MaxPageHeightException()
|
|
|
|
num_pixels = width * height * 3 # three color channels
|
|
untrusted_pixels = read_bytes(
|
|
p.stdout,
|
|
num_pixels,
|
|
)
|
|
|
|
page_pdf = self.pixels_to_pdf_page(
|
|
untrusted_pixels,
|
|
width,
|
|
height,
|
|
ocr_lang,
|
|
)
|
|
safe_doc.insert_pdf(page_pdf)
|
|
|
|
percentage += step
|
|
|
|
# Ensure nothing else is read after all bitmaps are obtained
|
|
p.stdout.close()
|
|
|
|
# Saving it with a different name first, because PyMuPDF cannot handle
|
|
# non-Unicode chars.
|
|
safe_doc.save(document.sanitized_output_filename)
|
|
os.replace(document.sanitized_output_filename, document.output_filename)
|
|
|
|
# TODO handle leftover code input
|
|
text = "Successfully converted document"
|
|
self.print_progress(document, False, text, 100)
|
|
|
|
def print_progress(
|
|
self, document: Document, error: bool, text: str, percentage: float
|
|
) -> None:
|
|
s = Style.BRIGHT + Fore.YELLOW + f"[doc {document.id}] "
|
|
s += Fore.CYAN + f"{int(percentage)}% " + Style.RESET_ALL
|
|
if error:
|
|
s += Fore.RED + text + Style.RESET_ALL
|
|
log.error(s)
|
|
else:
|
|
s += text
|
|
log.info(s)
|
|
|
|
if self.progress_callback:
|
|
self.progress_callback(error, text, percentage)
|
|
|
|
def get_proc_exception(
|
|
self, p: subprocess.Popen, timeout: int = TIMEOUT_EXCEPTION
|
|
) -> Exception:
|
|
"""Returns an exception associated with a process exit code"""
|
|
try:
|
|
error_code = p.wait(timeout)
|
|
except subprocess.TimeoutExpired:
|
|
return errors.UnexpectedConversionError(
|
|
"Encountered an I/O error during document to pixels conversion,"
|
|
f" but the conversion process is still running after {timeout} seconds"
|
|
f" (PID: {p.pid})"
|
|
)
|
|
except Exception:
|
|
return errors.UnexpectedConversionError(
|
|
"Encountered an I/O error during document to pixels conversion,"
|
|
f" but the status of the conversion process is unknown (PID: {p.pid})"
|
|
)
|
|
return errors.exception_from_error_code(error_code)
|
|
|
|
@abstractmethod
|
|
def get_max_parallel_conversions(self) -> int:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def start_doc_to_pixels_proc(self, document: Document) -> subprocess.Popen:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def terminate_doc_to_pixels_proc(
|
|
self, document: Document, p: subprocess.Popen
|
|
) -> None:
|
|
"""Terminate gracefully the process started for the doc-to-pixels phase."""
|
|
pass
|
|
|
|
def ensure_stop_doc_to_pixels_proc(
|
|
self,
|
|
document: Document,
|
|
p: subprocess.Popen,
|
|
timeout_grace: int = TIMEOUT_GRACE,
|
|
timeout_force: int = TIMEOUT_FORCE,
|
|
) -> None:
|
|
"""Stop the conversion process, or ensure it has exited.
|
|
|
|
This method should be called when we want to verify that the doc-to-pixels
|
|
process has exited, or terminate it ourselves. The termination should happen as
|
|
gracefully as possible, and we should not block indefinitely until the process
|
|
has exited.
|
|
"""
|
|
# Check if the process completed.
|
|
ret = p.poll()
|
|
if ret is not None:
|
|
return
|
|
|
|
# At this point, the process is still running. This may be benign, as we haven't
|
|
# waited for it yet. Terminate it gracefully.
|
|
self.terminate_doc_to_pixels_proc(document, p)
|
|
try:
|
|
p.wait(timeout_grace)
|
|
except subprocess.TimeoutExpired:
|
|
log.warning(
|
|
f"Conversion process did not terminate gracefully after {timeout_grace}"
|
|
" seconds. Killing it forcefully..."
|
|
)
|
|
|
|
# Forcefully kill the running process.
|
|
kill_process_group(p)
|
|
try:
|
|
p.wait(timeout_force)
|
|
except subprocess.TimeoutExpired:
|
|
log.warning(
|
|
"Conversion process did not terminate forcefully after"
|
|
f" {timeout_force} seconds. Resources may linger..."
|
|
)
|
|
|
|
@contextlib.contextmanager
|
|
def doc_to_pixels_proc(
|
|
self,
|
|
document: Document,
|
|
timeout_exception: int = TIMEOUT_EXCEPTION,
|
|
timeout_grace: int = TIMEOUT_GRACE,
|
|
timeout_force: int = TIMEOUT_FORCE,
|
|
) -> Iterator[subprocess.Popen]:
|
|
"""Start a conversion process, pass it to the caller, and then clean it up."""
|
|
p = self.start_doc_to_pixels_proc(document)
|
|
if platform.system() != "Windows":
|
|
assert os.getpgid(p.pid) != os.getpgid(
|
|
os.getpid()
|
|
), "Parent shares same PGID with child"
|
|
|
|
try:
|
|
yield p
|
|
except errors.ConverterProcException as e:
|
|
exception = self.get_proc_exception(p, timeout_exception)
|
|
raise exception from e
|
|
finally:
|
|
self.ensure_stop_doc_to_pixels_proc(
|
|
document, p, timeout_grace=timeout_grace, timeout_force=timeout_force
|
|
)
|
|
|
|
# Read the stderr of the process only if:
|
|
# * Dev mode is enabled.
|
|
# * The process has exited (else we risk hanging).
|
|
if getattr(sys, "dangerzone_dev", False) and p.poll() is not None:
|
|
assert p.stderr
|
|
debug_log = read_debug_text(p.stderr, MAX_CONVERSION_LOG_CHARS)
|
|
log.info(
|
|
"Conversion output (doc to pixels)\n"
|
|
f"{DOC_TO_PIXELS_LOG_START}\n"
|
|
f"{debug_log}" # no need for an extra newline here
|
|
f"{DOC_TO_PIXELS_LOG_END}"
|
|
)
|