diff --git a/container/dangerzone.py b/container/dangerzone.py index f888ee5..af5ce21 100644 --- a/container/dangerzone.py +++ b/container/dangerzone.py @@ -12,6 +12,7 @@ pixels_to_pdf: - 95%-100%: Compress the final PDF """ +import asyncio import glob import json import os @@ -20,7 +21,7 @@ import shutil import subprocess import sys import time -from typing import Callable, Dict, List, Optional, Union +from typing import Callable, Dict, List, Optional, Tuple, Union import magic @@ -31,7 +32,30 @@ DEFAULT_TIMEOUT: float = 120 COMPRESSION_TIMEOUT: float = 10 -def run_command( +async def read_stream(sr: asyncio.StreamReader, callback: Callable = None) -> bytes: + """Consume a byte stream line-by-line. + + Read all lines in a stream until EOF. If a user has passed a callback, call it for + each line. + + Note that the lines are in bytes, since we can't assume that all command output will + be UTF-8 encoded. Higher level commands are advised to decode the output to Unicode, + if they know its encoding. + """ + buf = b"" + while True: + line = await sr.readline() + if sr.at_eof(): + break + if callback is not None: + callback(line) + # TODO: This would be a good place to log the received line, mostly for debug + # logging. + buf += line + return buf + + +async def run_command( args: List[str], *, error_message: str, @@ -39,68 +63,52 @@ def run_command( timeout: float = DEFAULT_TIMEOUT, stdout_callback: Callable = None, stderr_callback: Callable = None, -) -> None: - """ - Runs a command and returns the result. +) -> Tuple[bytes, bytes]: + """Run a command and get its output. + + Run a command using asyncio.subprocess, consume its standard streams, and return its + output in bytes. :raises RuntimeError: if the process returns a non-zero exit status :raises TimeoutError: if the process times out """ - if stdout_callback is None and stderr_callback is None: - try: - subprocess.run(args, timeout=timeout, check=True) - except subprocess.CalledProcessError as e: - raise RuntimeError(error_message) from e - except subprocess.TimeoutExpired as e: - raise TimeoutError(timeout_message) from e + # Start the provided command, and return a handle. The command will run in the + # background. + proc = await asyncio.subprocess.create_subprocess_exec( + *args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) - else: - p = subprocess.Popen( - args, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - ) + assert proc.stdout is not None + assert proc.stderr is not None - # Progress callback requires a manually implemented timeout - start_time = time.time() + # Create asynchronous tasks that will consume the standard streams of the command, + # and call callbacks if necessary. + stdout_task = asyncio.create_task(read_stream(proc.stdout, stdout_callback)) + stderr_task = asyncio.create_task(read_stream(proc.stderr, stderr_callback)) - # Make reading from stdout or stderr non-blocking - if p.stdout: - os.set_blocking(p.stdout.fileno(), False) - if p.stderr: - os.set_blocking(p.stderr.fileno(), False) + # Wait until the command has finished, for a specific timeout. Then, verify that the + # command has completed successfully. In any other case, raise an exception. + try: + ret = await asyncio.wait_for(proc.wait(), timeout=timeout) + except asyncio.exceptions.TimeoutError: + raise TimeoutError(timeout_message) + if ret != 0: + raise RuntimeError(error_message) - while True: - # Processes hasn't finished - if p.poll() is not None: - if p.returncode != 0: - raise RuntimeError(error_message) - break - - # Check if timeout hasn't expired - if time.time() - start_time > timeout: - p.kill() - raise TimeoutError(timeout_message) - - if p.stdout and stdout_callback is not None: - line = p.stdout.readline() - if len(line) > 0: - line = line.rstrip() # strip trailing "\n" - stdout_callback(line) - - if p.stderr and stderr_callback is not None: - line = p.stderr.readline() - if len(line) > 0: - line = line.rstrip() # strip trailing "\n" - stderr_callback(line) + # Wait until the tasks that consume the command's standard streams have exited as + # well, and return their output. + stdout = await stdout_task + stderr = await stderr_task + return (stdout, stderr) class DangerzoneConverter: def __init__(self) -> None: self.percentage: float = 0.0 - def document_to_pixels(self) -> None: + async def document_to_pixels(self) -> None: conversions: Dict[str, Dict[str, Optional[str]]] = { # .pdf @@ -194,7 +202,7 @@ class DangerzoneConverter: "/tmp", "/tmp/input_file", ] - run_command( + await run_command( args, error_message="Conversion to PDF with LibreOffice failed", timeout_message=f"Error converting document to PDF, LibreOffice timed out after {DEFAULT_TIMEOUT} seconds", @@ -208,7 +216,7 @@ class DangerzoneConverter: "/tmp/input_file", "/tmp/input_file.pdf", ] - run_command( + await run_command( args, error_message="Conversion to PDF with GraphicsMagick failed", timeout_message=f"Error converting document to PDF, GraphicsMagick timed out after {DEFAULT_TIMEOUT} seconds", @@ -222,25 +230,21 @@ class DangerzoneConverter: # Obtain number of pages self.update_progress("Calculating number of pages") - self.num_pages: Union[None, int] = None - - def get_num_pages(line: str) -> None: - search = re.search(r"^Pages: (\d+)", line) - if search is not None: - self.num_pages = int(search.group(1)) - - run_command( + stdout, _ = await run_command( ["pdfinfo", pdf_filename], error_message="PDF file is corrupted", timeout_message=f"Extracting metadata from PDF timed out after 1 second", timeout=1, - stdout_callback=get_num_pages, ) - if self.num_pages == None: - raise ValueError("Number of pages could not be extraced from PDF") - def pdftoppm_progress_callback(line: str) -> None: - """Function called for every line the 'pdftoppm'command outputs + search = re.search(r"Pages:\s*(\d+)\s*\n", stdout.decode()) + if search is not None: + self.num_pages: int = int(search.group(1)) + else: + raise ValueError("Number of pages could not be extracted from PDF") + + def pdftoppm_progress_callback(line: bytes) -> None: + """Function called for every line the 'pdftoppm' command outputs Sample pdftoppm output: @@ -253,7 +257,7 @@ class DangerzoneConverter: Each successful line is in the format "{page} {page_num} {ppm_filename}" """ try: - (page_str, num_pages_str, _) = line.split() + (page_str, num_pages_str, _) = line.decode().split() num_pages = int(num_pages_str) page = int(page_str) except ValueError as e: @@ -305,8 +309,8 @@ class DangerzoneConverter: page_base = "/tmp/page" # Convert to PPM, which is essentially an RGB format - pdftoppm_timeout = 1.0 * self.num_pages # type: ignore [operator] - run_command( + pdftoppm_timeout = 1.0 * self.num_pages + await run_command( [ "pdftoppm", pdf_filename, @@ -329,7 +333,7 @@ class DangerzoneConverter: ): shutil.move(filename, "/dangerzone") - def pixels_to_pdf(self) -> None: + async def pixels_to_pdf(self) -> None: self.percentage = 50.0 num_pages = len(glob.glob("/dangerzone/page-*.rgb")) @@ -354,7 +358,7 @@ class DangerzoneConverter: self.update_progress( f"Converting page {page}/{num_pages} from pixels to searchable PDF" ) - run_command( + await run_command( [ "gm", "convert", @@ -368,7 +372,7 @@ class DangerzoneConverter: error_message=f"Page {page}/{num_pages} conversion to PNG failed", timeout_message=f"Error converting pixels to PNG, convert timed out after {DEFAULT_TIMEOUT} seconds", ) - run_command( + await run_command( [ "tesseract", png_filename, @@ -387,7 +391,7 @@ class DangerzoneConverter: self.update_progress( f"Converting page {page}/{num_pages} from pixels to PDF" ) - run_command( + await run_command( [ "gm", "convert", @@ -410,7 +414,7 @@ class DangerzoneConverter: for page in range(1, num_pages + 1): args.append(f"/tmp/page-{page}.pdf") args.append(f"/tmp/safe-output.pdf") - run_command( + await run_command( args, error_message="Merging pages into a single PDF failed", timeout_message=f"Error merging pages into a single PDF, pdfunite timed out after {DEFAULT_TIMEOUT} seconds", @@ -421,7 +425,7 @@ class DangerzoneConverter: # Compress self.update_progress("Compressing PDF") compress_timeout = num_pages * COMPRESSION_TIMEOUT - run_command( + await run_command( ["ps2pdf", "/tmp/safe-output.pdf", "/tmp/safe-output-compressed.pdf"], timeout_message=f"Error compressing PDF, ps2pdf timed out after {compress_timeout} seconds", error_message="Compressing PDF failed", @@ -444,7 +448,7 @@ class DangerzoneConverter: sys.stdout.flush() -def main() -> int: +async def main() -> int: if len(sys.argv) != 2: print(f"Usage: {sys.argv[0]} [document-to-pixels]|[pixels-to-pdf]") return -1 @@ -453,9 +457,9 @@ def main() -> int: try: if sys.argv[1] == "document-to-pixels": - converter.document_to_pixels() + await converter.document_to_pixels() elif sys.argv[1] == "pixels-to-pdf": - converter.pixels_to_pdf() + await converter.pixels_to_pdf() except (RuntimeError, TimeoutError, ValueError) as e: converter.update_progress(str(e), error=True) return 1 @@ -464,4 +468,4 @@ def main() -> int: if __name__ == "__main__": - sys.exit(main()) + sys.exit(asyncio.run(main()))