mirror of
https://github.com/freedomofpress/dangerzone.git
synced 2025-04-29 02:12:36 +02:00
container: Run commands asynchronously
Convert the Dangerzone script that in the container to run commands asynchronously, via the asyncio module. The main advantage of this approach is that it's fast, easy, and safe to consume the command's streams, while the command is running in the background. Previously, we had implemented an approach that used non-blocking sockets, but those are easy to get wrong. For instance, timeouts were not exact, capturing output was brittle. Fixes #325
This commit is contained in:
parent
24975fabd5
commit
aeeed411a0
1 changed files with 82 additions and 78 deletions
|
@ -12,6 +12,7 @@ pixels_to_pdf:
|
||||||
- 95%-100%: Compress the final PDF
|
- 95%-100%: Compress the final PDF
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import glob
|
import glob
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
@ -20,7 +21,7 @@ import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
from typing import Callable, Dict, List, Optional, Union
|
from typing import Callable, Dict, List, Optional, Tuple, Union
|
||||||
|
|
||||||
import magic
|
import magic
|
||||||
|
|
||||||
|
@ -31,7 +32,30 @@ DEFAULT_TIMEOUT: float = 120
|
||||||
COMPRESSION_TIMEOUT: float = 10
|
COMPRESSION_TIMEOUT: float = 10
|
||||||
|
|
||||||
|
|
||||||
def run_command(
|
async def read_stream(sr: asyncio.StreamReader, callback: Callable = None) -> bytes:
|
||||||
|
"""Consume a byte stream line-by-line.
|
||||||
|
|
||||||
|
Read all lines in a stream until EOF. If a user has passed a callback, call it for
|
||||||
|
each line.
|
||||||
|
|
||||||
|
Note that the lines are in bytes, since we can't assume that all command output will
|
||||||
|
be UTF-8 encoded. Higher level commands are advised to decode the output to Unicode,
|
||||||
|
if they know its encoding.
|
||||||
|
"""
|
||||||
|
buf = b""
|
||||||
|
while True:
|
||||||
|
line = await sr.readline()
|
||||||
|
if sr.at_eof():
|
||||||
|
break
|
||||||
|
if callback is not None:
|
||||||
|
callback(line)
|
||||||
|
# TODO: This would be a good place to log the received line, mostly for debug
|
||||||
|
# logging.
|
||||||
|
buf += line
|
||||||
|
return buf
|
||||||
|
|
||||||
|
|
||||||
|
async def run_command(
|
||||||
args: List[str],
|
args: List[str],
|
||||||
*,
|
*,
|
||||||
error_message: str,
|
error_message: str,
|
||||||
|
@ -39,68 +63,52 @@ def run_command(
|
||||||
timeout: float = DEFAULT_TIMEOUT,
|
timeout: float = DEFAULT_TIMEOUT,
|
||||||
stdout_callback: Callable = None,
|
stdout_callback: Callable = None,
|
||||||
stderr_callback: Callable = None,
|
stderr_callback: Callable = None,
|
||||||
) -> None:
|
) -> Tuple[bytes, bytes]:
|
||||||
"""
|
"""Run a command and get its output.
|
||||||
Runs a command and returns the result.
|
|
||||||
|
Run a command using asyncio.subprocess, consume its standard streams, and return its
|
||||||
|
output in bytes.
|
||||||
|
|
||||||
:raises RuntimeError: if the process returns a non-zero exit status
|
:raises RuntimeError: if the process returns a non-zero exit status
|
||||||
:raises TimeoutError: if the process times out
|
:raises TimeoutError: if the process times out
|
||||||
"""
|
"""
|
||||||
if stdout_callback is None and stderr_callback is None:
|
# Start the provided command, and return a handle. The command will run in the
|
||||||
try:
|
# background.
|
||||||
subprocess.run(args, timeout=timeout, check=True)
|
proc = await asyncio.subprocess.create_subprocess_exec(
|
||||||
except subprocess.CalledProcessError as e:
|
*args,
|
||||||
raise RuntimeError(error_message) from e
|
stdout=asyncio.subprocess.PIPE,
|
||||||
except subprocess.TimeoutExpired as e:
|
stderr=asyncio.subprocess.PIPE,
|
||||||
raise TimeoutError(timeout_message) from e
|
|
||||||
|
|
||||||
else:
|
|
||||||
p = subprocess.Popen(
|
|
||||||
args,
|
|
||||||
stdout=subprocess.PIPE,
|
|
||||||
stderr=subprocess.PIPE,
|
|
||||||
universal_newlines=True,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Progress callback requires a manually implemented timeout
|
assert proc.stdout is not None
|
||||||
start_time = time.time()
|
assert proc.stderr is not None
|
||||||
|
|
||||||
# Make reading from stdout or stderr non-blocking
|
# Create asynchronous tasks that will consume the standard streams of the command,
|
||||||
if p.stdout:
|
# and call callbacks if necessary.
|
||||||
os.set_blocking(p.stdout.fileno(), False)
|
stdout_task = asyncio.create_task(read_stream(proc.stdout, stdout_callback))
|
||||||
if p.stderr:
|
stderr_task = asyncio.create_task(read_stream(proc.stderr, stderr_callback))
|
||||||
os.set_blocking(p.stderr.fileno(), False)
|
|
||||||
|
|
||||||
while True:
|
# Wait until the command has finished, for a specific timeout. Then, verify that the
|
||||||
# Processes hasn't finished
|
# command has completed successfully. In any other case, raise an exception.
|
||||||
if p.poll() is not None:
|
try:
|
||||||
if p.returncode != 0:
|
ret = await asyncio.wait_for(proc.wait(), timeout=timeout)
|
||||||
raise RuntimeError(error_message)
|
except asyncio.exceptions.TimeoutError:
|
||||||
break
|
|
||||||
|
|
||||||
# Check if timeout hasn't expired
|
|
||||||
if time.time() - start_time > timeout:
|
|
||||||
p.kill()
|
|
||||||
raise TimeoutError(timeout_message)
|
raise TimeoutError(timeout_message)
|
||||||
|
if ret != 0:
|
||||||
|
raise RuntimeError(error_message)
|
||||||
|
|
||||||
if p.stdout and stdout_callback is not None:
|
# Wait until the tasks that consume the command's standard streams have exited as
|
||||||
line = p.stdout.readline()
|
# well, and return their output.
|
||||||
if len(line) > 0:
|
stdout = await stdout_task
|
||||||
line = line.rstrip() # strip trailing "\n"
|
stderr = await stderr_task
|
||||||
stdout_callback(line)
|
return (stdout, stderr)
|
||||||
|
|
||||||
if p.stderr and stderr_callback is not None:
|
|
||||||
line = p.stderr.readline()
|
|
||||||
if len(line) > 0:
|
|
||||||
line = line.rstrip() # strip trailing "\n"
|
|
||||||
stderr_callback(line)
|
|
||||||
|
|
||||||
|
|
||||||
class DangerzoneConverter:
|
class DangerzoneConverter:
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self.percentage: float = 0.0
|
self.percentage: float = 0.0
|
||||||
|
|
||||||
def document_to_pixels(self) -> None:
|
async def document_to_pixels(self) -> None:
|
||||||
|
|
||||||
conversions: Dict[str, Dict[str, Optional[str]]] = {
|
conversions: Dict[str, Dict[str, Optional[str]]] = {
|
||||||
# .pdf
|
# .pdf
|
||||||
|
@ -194,7 +202,7 @@ class DangerzoneConverter:
|
||||||
"/tmp",
|
"/tmp",
|
||||||
"/tmp/input_file",
|
"/tmp/input_file",
|
||||||
]
|
]
|
||||||
run_command(
|
await run_command(
|
||||||
args,
|
args,
|
||||||
error_message="Conversion to PDF with LibreOffice failed",
|
error_message="Conversion to PDF with LibreOffice failed",
|
||||||
timeout_message=f"Error converting document to PDF, LibreOffice timed out after {DEFAULT_TIMEOUT} seconds",
|
timeout_message=f"Error converting document to PDF, LibreOffice timed out after {DEFAULT_TIMEOUT} seconds",
|
||||||
|
@ -208,7 +216,7 @@ class DangerzoneConverter:
|
||||||
"/tmp/input_file",
|
"/tmp/input_file",
|
||||||
"/tmp/input_file.pdf",
|
"/tmp/input_file.pdf",
|
||||||
]
|
]
|
||||||
run_command(
|
await run_command(
|
||||||
args,
|
args,
|
||||||
error_message="Conversion to PDF with GraphicsMagick failed",
|
error_message="Conversion to PDF with GraphicsMagick failed",
|
||||||
timeout_message=f"Error converting document to PDF, GraphicsMagick timed out after {DEFAULT_TIMEOUT} seconds",
|
timeout_message=f"Error converting document to PDF, GraphicsMagick timed out after {DEFAULT_TIMEOUT} seconds",
|
||||||
|
@ -222,24 +230,20 @@ class DangerzoneConverter:
|
||||||
|
|
||||||
# Obtain number of pages
|
# Obtain number of pages
|
||||||
self.update_progress("Calculating number of pages")
|
self.update_progress("Calculating number of pages")
|
||||||
self.num_pages: Union[None, int] = None
|
stdout, _ = await run_command(
|
||||||
|
|
||||||
def get_num_pages(line: str) -> None:
|
|
||||||
search = re.search(r"^Pages: (\d+)", line)
|
|
||||||
if search is not None:
|
|
||||||
self.num_pages = int(search.group(1))
|
|
||||||
|
|
||||||
run_command(
|
|
||||||
["pdfinfo", pdf_filename],
|
["pdfinfo", pdf_filename],
|
||||||
error_message="PDF file is corrupted",
|
error_message="PDF file is corrupted",
|
||||||
timeout_message=f"Extracting metadata from PDF timed out after 1 second",
|
timeout_message=f"Extracting metadata from PDF timed out after 1 second",
|
||||||
timeout=1,
|
timeout=1,
|
||||||
stdout_callback=get_num_pages,
|
|
||||||
)
|
)
|
||||||
if self.num_pages == None:
|
|
||||||
raise ValueError("Number of pages could not be extraced from PDF")
|
|
||||||
|
|
||||||
def pdftoppm_progress_callback(line: str) -> None:
|
search = re.search(r"Pages:\s*(\d+)\s*\n", stdout.decode())
|
||||||
|
if search is not None:
|
||||||
|
self.num_pages: int = int(search.group(1))
|
||||||
|
else:
|
||||||
|
raise ValueError("Number of pages could not be extracted from PDF")
|
||||||
|
|
||||||
|
def pdftoppm_progress_callback(line: bytes) -> None:
|
||||||
"""Function called for every line the 'pdftoppm' command outputs
|
"""Function called for every line the 'pdftoppm' command outputs
|
||||||
|
|
||||||
Sample pdftoppm output:
|
Sample pdftoppm output:
|
||||||
|
@ -253,7 +257,7 @@ class DangerzoneConverter:
|
||||||
Each successful line is in the format "{page} {page_num} {ppm_filename}"
|
Each successful line is in the format "{page} {page_num} {ppm_filename}"
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
(page_str, num_pages_str, _) = line.split()
|
(page_str, num_pages_str, _) = line.decode().split()
|
||||||
num_pages = int(num_pages_str)
|
num_pages = int(num_pages_str)
|
||||||
page = int(page_str)
|
page = int(page_str)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
|
@ -305,8 +309,8 @@ class DangerzoneConverter:
|
||||||
page_base = "/tmp/page"
|
page_base = "/tmp/page"
|
||||||
|
|
||||||
# Convert to PPM, which is essentially an RGB format
|
# Convert to PPM, which is essentially an RGB format
|
||||||
pdftoppm_timeout = 1.0 * self.num_pages # type: ignore [operator]
|
pdftoppm_timeout = 1.0 * self.num_pages
|
||||||
run_command(
|
await run_command(
|
||||||
[
|
[
|
||||||
"pdftoppm",
|
"pdftoppm",
|
||||||
pdf_filename,
|
pdf_filename,
|
||||||
|
@ -329,7 +333,7 @@ class DangerzoneConverter:
|
||||||
):
|
):
|
||||||
shutil.move(filename, "/dangerzone")
|
shutil.move(filename, "/dangerzone")
|
||||||
|
|
||||||
def pixels_to_pdf(self) -> None:
|
async def pixels_to_pdf(self) -> None:
|
||||||
self.percentage = 50.0
|
self.percentage = 50.0
|
||||||
|
|
||||||
num_pages = len(glob.glob("/dangerzone/page-*.rgb"))
|
num_pages = len(glob.glob("/dangerzone/page-*.rgb"))
|
||||||
|
@ -354,7 +358,7 @@ class DangerzoneConverter:
|
||||||
self.update_progress(
|
self.update_progress(
|
||||||
f"Converting page {page}/{num_pages} from pixels to searchable PDF"
|
f"Converting page {page}/{num_pages} from pixels to searchable PDF"
|
||||||
)
|
)
|
||||||
run_command(
|
await run_command(
|
||||||
[
|
[
|
||||||
"gm",
|
"gm",
|
||||||
"convert",
|
"convert",
|
||||||
|
@ -368,7 +372,7 @@ class DangerzoneConverter:
|
||||||
error_message=f"Page {page}/{num_pages} conversion to PNG failed",
|
error_message=f"Page {page}/{num_pages} conversion to PNG failed",
|
||||||
timeout_message=f"Error converting pixels to PNG, convert timed out after {DEFAULT_TIMEOUT} seconds",
|
timeout_message=f"Error converting pixels to PNG, convert timed out after {DEFAULT_TIMEOUT} seconds",
|
||||||
)
|
)
|
||||||
run_command(
|
await run_command(
|
||||||
[
|
[
|
||||||
"tesseract",
|
"tesseract",
|
||||||
png_filename,
|
png_filename,
|
||||||
|
@ -387,7 +391,7 @@ class DangerzoneConverter:
|
||||||
self.update_progress(
|
self.update_progress(
|
||||||
f"Converting page {page}/{num_pages} from pixels to PDF"
|
f"Converting page {page}/{num_pages} from pixels to PDF"
|
||||||
)
|
)
|
||||||
run_command(
|
await run_command(
|
||||||
[
|
[
|
||||||
"gm",
|
"gm",
|
||||||
"convert",
|
"convert",
|
||||||
|
@ -410,7 +414,7 @@ class DangerzoneConverter:
|
||||||
for page in range(1, num_pages + 1):
|
for page in range(1, num_pages + 1):
|
||||||
args.append(f"/tmp/page-{page}.pdf")
|
args.append(f"/tmp/page-{page}.pdf")
|
||||||
args.append(f"/tmp/safe-output.pdf")
|
args.append(f"/tmp/safe-output.pdf")
|
||||||
run_command(
|
await run_command(
|
||||||
args,
|
args,
|
||||||
error_message="Merging pages into a single PDF failed",
|
error_message="Merging pages into a single PDF failed",
|
||||||
timeout_message=f"Error merging pages into a single PDF, pdfunite timed out after {DEFAULT_TIMEOUT} seconds",
|
timeout_message=f"Error merging pages into a single PDF, pdfunite timed out after {DEFAULT_TIMEOUT} seconds",
|
||||||
|
@ -421,7 +425,7 @@ class DangerzoneConverter:
|
||||||
# Compress
|
# Compress
|
||||||
self.update_progress("Compressing PDF")
|
self.update_progress("Compressing PDF")
|
||||||
compress_timeout = num_pages * COMPRESSION_TIMEOUT
|
compress_timeout = num_pages * COMPRESSION_TIMEOUT
|
||||||
run_command(
|
await run_command(
|
||||||
["ps2pdf", "/tmp/safe-output.pdf", "/tmp/safe-output-compressed.pdf"],
|
["ps2pdf", "/tmp/safe-output.pdf", "/tmp/safe-output-compressed.pdf"],
|
||||||
timeout_message=f"Error compressing PDF, ps2pdf timed out after {compress_timeout} seconds",
|
timeout_message=f"Error compressing PDF, ps2pdf timed out after {compress_timeout} seconds",
|
||||||
error_message="Compressing PDF failed",
|
error_message="Compressing PDF failed",
|
||||||
|
@ -444,7 +448,7 @@ class DangerzoneConverter:
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
|
|
||||||
|
|
||||||
def main() -> int:
|
async def main() -> int:
|
||||||
if len(sys.argv) != 2:
|
if len(sys.argv) != 2:
|
||||||
print(f"Usage: {sys.argv[0]} [document-to-pixels]|[pixels-to-pdf]")
|
print(f"Usage: {sys.argv[0]} [document-to-pixels]|[pixels-to-pdf]")
|
||||||
return -1
|
return -1
|
||||||
|
@ -453,9 +457,9 @@ def main() -> int:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if sys.argv[1] == "document-to-pixels":
|
if sys.argv[1] == "document-to-pixels":
|
||||||
converter.document_to_pixels()
|
await converter.document_to_pixels()
|
||||||
elif sys.argv[1] == "pixels-to-pdf":
|
elif sys.argv[1] == "pixels-to-pdf":
|
||||||
converter.pixels_to_pdf()
|
await converter.pixels_to_pdf()
|
||||||
except (RuntimeError, TimeoutError, ValueError) as e:
|
except (RuntimeError, TimeoutError, ValueError) as e:
|
||||||
converter.update_progress(str(e), error=True)
|
converter.update_progress(str(e), error=True)
|
||||||
return 1
|
return 1
|
||||||
|
@ -464,4 +468,4 @@ def main() -> int:
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
sys.exit(main())
|
sys.exit(asyncio.run(main()))
|
||||||
|
|
Loading…
Reference in a new issue