dangerzone/dangerzone/conversion/common.py
deeplow f7190e3876
PDFunite: fix too many open files
In large (1200+) PDFs the PDFunite command would fail on some systems
(e.g. Qubes), because it would be called with 1024+ files, leading up
to too many files open (`ulimit -n`).

This solution splits the merging into batches, accumulating the results
in a single PDF and then merging it with the next batch.
2023-11-02 15:11:50 +00:00

174 lines
5.9 KiB
Python

#!/usr/bin/env python3
import asyncio
import glob
import json
import os
import re
import shutil
import subprocess
import sys
import time
from abc import abstractmethod
from typing import Callable, Dict, Generator, List, Optional, Tuple, Union
TIMEOUT_PER_PAGE: float = 30 # (seconds)
TIMEOUT_PER_MB: float = 30 # (seconds)
TIMEOUT_MIN: float = 60 # (seconds)
PAGE_BATCH_SIZE = 50 # number of pages to be processed simulatenously
def running_on_qubes() -> bool:
# https://www.qubes-os.org/faq/#what-is-the-canonical-way-to-detect-qubes-vm
return os.path.exists("/usr/share/qubes/marker-vm")
def calculate_timeout(size: float, pages: Optional[float] = None) -> float:
"""Calculate the timeout for a command.
The timeout calculation takes two factors in mind:
1. The size (in MiBs) of the dataset (document, multiple pages).
2. The number of pages in the dataset.
It then calculates proportional timeout values based on the above, and keeps the
large one. This way, we can handle several corner cases:
* Documents with lots of pages, but small file size.
* Single images with large file size.
"""
# Do not have timeouts lower than 10 seconds, if the file size is small, since
# we need to take into account the program's startup time as well.
timeout = max(TIMEOUT_PER_MB * size, TIMEOUT_MIN)
if pages:
timeout = max(timeout, TIMEOUT_PER_PAGE * pages)
return timeout
def batch_iterator(num_pages: int) -> Generator[Tuple[int, int], None, None]:
"""Iterates over batches of PAGE_BATCH_SIZE pages"""
for first_page in range(1, num_pages + 1, PAGE_BATCH_SIZE):
if first_page + PAGE_BATCH_SIZE >= num_pages: # Last batch
last_page = num_pages
else:
last_page = first_page + PAGE_BATCH_SIZE - 1
yield (first_page, last_page)
def get_batch_timeout(timeout: Optional[float], num_pages: int) -> Optional[float]:
if timeout is None:
return None
else:
num_batches = int(num_pages / PAGE_BATCH_SIZE)
return timeout / num_batches
class DangerzoneConverter:
def __init__(self, progress_callback: Optional[Callable] = None) -> None:
self.percentage: float = 0.0
self.progress_callback = progress_callback
self.captured_output: bytes = b""
async def read_stream(
self, sr: asyncio.StreamReader, callback: Optional[Callable] = None
) -> bytes:
"""Consume a byte stream line-by-line.
Read all lines in a stream until EOF. If a user has passed a callback, call it for
each line.
Note that the lines are in bytes, since we can't assume that all command output will
be UTF-8 encoded. Higher level commands are advised to decode the output to Unicode,
if they know its encoding.
"""
buf = b""
while not sr.at_eof():
line = await sr.readline()
self.captured_output += line
if callback is not None:
await callback(line)
buf += line
return buf
async def run_command(
self,
args: List[str],
*,
error_message: str,
timeout_message: str,
timeout: Optional[float],
stdout_callback: Optional[Callable] = None,
stderr_callback: Optional[Callable] = None,
) -> Tuple[bytes, bytes]:
"""Run a command and get its output.
Run a command using asyncio.subprocess, consume its standard streams, and return its
output in bytes.
:raises RuntimeError: if the process returns a non-zero exit status
:raises TimeoutError: if the process times out
"""
# Start the provided command, and return a handle. The command will run in the
# background.
proc = await asyncio.subprocess.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Log command to debug log so we can trace back which errors
# are from each command
self.captured_output += f"[COMMAND] {' '.join(args)}\n".encode()
assert proc.stdout is not None
assert proc.stderr is not None
# Create asynchronous tasks that will consume the standard streams of the command,
# and call callbacks if necessary.
stdout_task = asyncio.create_task(
self.read_stream(proc.stdout, stdout_callback)
)
stderr_task = asyncio.create_task(
self.read_stream(proc.stderr, stderr_callback)
)
# Wait until the command has finished, for a specific timeout. Then, verify that the
# command has completed successfully. In any other case, raise an exception.
try:
ret = await asyncio.wait_for(proc.wait(), timeout=timeout)
except asyncio.exceptions.TimeoutError:
raise TimeoutError(timeout_message)
if ret != 0:
raise RuntimeError(error_message)
# Wait until the tasks that consume the command's standard streams have exited as
# well, and return their output.
stdout = await stdout_task
stderr = await stderr_task
return (stdout, stderr)
def calculate_timeout(
self, size: float, pages: Optional[float] = None
) -> Optional[float]:
"""Calculate the timeout for a command."""
if not int(os.environ.get("ENABLE_TIMEOUTS", 1)):
return None
return calculate_timeout(size, pages)
@abstractmethod
async def convert(self) -> None:
pass
def update_progress(self, text: str, *, error: bool = False) -> None:
if running_on_qubes():
if self.progress_callback:
self.progress_callback(error, text, int(self.percentage))
else:
print(
json.dumps(
{"error": error, "text": text, "percentage": int(self.percentage)}
)
)
sys.stdout.flush()