refactor dangerzone.py, raise exceptions instead of returning int

Standardize calls to subprocess.run to shrink file by about 100 lines
This commit is contained in:
Guthrie McAfee Armstrong 2022-06-05 15:14:38 -04:00 committed by deeplow
parent 7a84b89410
commit eaa08c9c3d
No known key found for this signature in database
GPG key ID: 577982871529A52A

View file

@ -18,16 +18,41 @@ import os
import shutil import shutil
import subprocess import subprocess
import sys import sys
<<<<<<< HEAD
from typing import Dict, Optional from typing import Dict, Optional
=======
>>>>>>> d990cfb (refactor dangerzone.py, raise exceptions instead of returning int)
import magic import magic
from PIL import Image from PIL import Image
# timeout in seconds for any single subprocess # timeout in seconds for any single subprocess
# FIXME https://github.com/freedomofpress/dangerzone/issues/146 # FIXME https://github.com/freedomofpress/dangerzone/issues/146
# FIXME https://github.com/freedomofpress/dangerzone/issues/149 # FIXME https://github.com/freedomofpress/dangerzone/issues/149
TIMEOUT_SECONDS = 60 DEFAULT_TIMEOUT: float = 60
def run_command(
args, *, error_message: str, timeout_message: str, timeout: float = DEFAULT_TIMEOUT
) -> subprocess.CompletedProcess:
"""
Runs a command and returns the result.
:raises RuntimeError: if the process returns a non-zero exit status
:raises TimeoutError: if the process times out
"""
try:
return subprocess.run(
args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=timeout,
check=True,
)
except subprocess.CalledProcessError as e:
raise RuntimeError(error_message) from e
except subprocess.TimeoutExpired as e:
raise TimeoutError(timeout_message) from e
def output(self, error: bool, text: str, percentage: float) -> None: def output(self, error: bool, text: str, percentage: float) -> None:
@ -35,7 +60,7 @@ def output(self, error: bool, text: str, percentage: float) -> None:
sys.stdout.flush() sys.stdout.flush()
def document_to_pixels() -> int: def document_to_pixels() -> None:
percentage: float = 0.0 percentage: float = 0.0
conversions: Dict[str, Dict[str, Optional[str]]] = { conversions: Dict[str, Dict[str, Optional[str]]] = {
@ -130,18 +155,11 @@ def document_to_pixels() -> int:
"/tmp", "/tmp",
"/tmp/input_file", "/tmp/input_file",
] ]
try: run_command(
subprocess.run(
args, args,
stdout=subprocess.DEVNULL, error_message="Conversion to PDF with LibreOffice failed",
stderr=subprocess.DEVNULL, timeout_message=f"Error converting document to PDF, LibreOffice timed out after {DEFAULT_TIMEOUT} seconds",
timeout=TIMEOUT_SECONDS,
check=True,
) )
except subprocess.TimeoutExpired:
raise TimeoutError(f"Error converting document to PDF, LibreOffice timed out after {TIMEOUT_SECONDS} seconds")
except subprocess.CalledProcessError:
raise RuntimeError("Conversion to PDF with LibreOffice failed")
pdf_filename = "/tmp/input_file.pdf" pdf_filename = "/tmp/input_file.pdf"
elif conversion["type"] == "convert": elif conversion["type"] == "convert":
output(False, "Converting to PDF using GraphicsMagick", percentage) output(False, "Converting to PDF using GraphicsMagick", percentage)
@ -151,21 +169,16 @@ def document_to_pixels() -> int:
"/tmp/input_file", "/tmp/input_file",
"/tmp/input_file.pdf", "/tmp/input_file.pdf",
] ]
try: run_command(
subprocess.run(
args, args,
stdout=subprocess.DEVNULL, error_message="Conversion to PDF with GraphicsMagick failed",
stderr=subprocess.DEVNULL, timeout_message=f"Error converting document to PDF, GraphicsMagick timed out after {DEFAULT_TIMEOUT} seconds",
timeout=TIMEOUT_SECONDS,
check=True,
) )
except subprocess.TimeoutExpired:
raise TimeoutError(f"Error converting document to PDF, GraphicsMagick timed out after {TIMEOUT_SECONDS} seconds")
except subprocess.CalledProcessError:
raise RuntimeError("Conversion to PDF with GraphicsMagick failed")
pdf_filename = "/tmp/input_file.pdf" pdf_filename = "/tmp/input_file.pdf"
else: else:
raise ValueError(f"Invalid conversion type {conversion['type']} for MIME type {mime_type}") raise ValueError(
f"Invalid conversion type {conversion['type']} for MIME type {mime_type}"
)
percentage += 3 percentage += 3
# Separate PDF into pages # Separate PDF into pages
@ -175,18 +188,11 @@ def document_to_pixels() -> int:
percentage, percentage,
) )
args = ["pdftk", pdf_filename, "burst", "output", "/tmp/page-%d.pdf"] args = ["pdftk", pdf_filename, "burst", "output", "/tmp/page-%d.pdf"]
try: run_command(
subprocess.run(
args, args,
stdout=subprocess.DEVNULL, error_message="Separating document into pages failed",
stderr=subprocess.DEVNULL, timeout_message=f"Error separating document into pages, pdfseparate timed out after {DEFAULT_TIMEOUT} seconds",
timeout=TIMEOUT_SECONDS,
check=True,
) )
except subprocess.TimeoutExpired:
raise TimeoutError(f"Error separating document into pages, pdfseparate timed out after {TIMEOUT_SECONDS} seconds")
except subprocess.CalledProcessError:
raise RuntimeError(f"Separating document into pages failed")
page_filenames = glob.glob("/tmp/page-*.pdf") page_filenames = glob.glob("/tmp/page-*.pdf")
@ -209,18 +215,11 @@ def document_to_pixels() -> int:
) )
# Convert to png # Convert to png
try: run_command(
subprocess.run(
["pdftocairo", pdf_filename, "-png", "-singlefile", filename_base], ["pdftocairo", pdf_filename, "-png", "-singlefile", filename_base],
stdout=subprocess.DEVNULL, error_message="Conversion from PDF to PNG failed",
stderr=subprocess.DEVNULL, timeout_message=f"Error converting from PDF to PNG, pdftocairo timed out after {DEFAULT_TIMEOUT} seconds",
timeout=TIMEOUT_SECONDS,
check=True,
) )
except subprocess.TimeoutExpired:
raise TimeoutError(f"Error converting from PDF to PNG, pdftocairo timed out after {TIMEOUT_SECONDS} seconds")
except subprocess.CalledProcessError:
raise RuntimeError("Conversion from PDF to PNG failed")
# Save the width and height # Save the width and height
with Image.open(png_filename, "r") as im: with Image.open(png_filename, "r") as im:
@ -231,8 +230,7 @@ def document_to_pixels() -> int:
f.write(str(height)) f.write(str(height))
# Convert to RGB pixels # Convert to RGB pixels
try: run_command(
p = subprocess.run(
[ [
"gm", "gm",
"convert", "convert",
@ -241,24 +239,16 @@ def document_to_pixels() -> int:
"8", "8",
f"rgb:{rgb_filename}", f"rgb:{rgb_filename}",
], ],
timeout=TIMEOUT_SECONDS, error_message="Conversion from PNG to RGB failed",
check=True, timeout_message=f"Error converting from PNG to pixels, convert timed out after {DEFAULT_TIMEOUT} seconds",
) )
except subprocess.TimeoutExpired:
raise TimeoutError(f"Error converting from PNG to pixels, convert timed out after {TIMEOUT_SECONDS} seconds")
if p.returncode != 0:
output(
True,
"Conversion from PNG to RGB failed",
percentage,
)
return 1
# Delete the png # Delete the png
os.remove(png_filename) os.remove(png_filename)
percentage += percentage_per_page percentage += percentage_per_page
# END OF FOR LOOP
output( output(
False, False,
"Converted document to pixels", "Converted document to pixels",
@ -273,11 +263,9 @@ def document_to_pixels() -> int:
): ):
shutil.move(filename, "/dangerzone") shutil.move(filename, "/dangerzone")
return 0
def pixels_to_pdf() -> None:
def pixels_to_pdf() -> int: percentage = 50.0
percentage: float = 50.0
num_pages = len(glob.glob("/dangerzone/page-*.rgb")) num_pages = len(glob.glob("/dangerzone/page-*.rgb"))
@ -297,15 +285,14 @@ def pixels_to_pdf() -> int:
with open(height_filename) as f: with open(height_filename) as f:
height = f.read().strip() height = f.read().strip()
if os.environ.get("OCR") == "1": if os.environ.get("OCR") == "1": # OCR the document
# OCR the document
output( output(
False, False,
f"Converting page {page}/{num_pages} from pixels to searchable PDF", f"Converting page {page}/{num_pages} from pixels to searchable PDF",
percentage, percentage,
) )
run_command(
args = [ [
"gm", "gm",
"convert", "convert",
"-size", "-size",
@ -314,30 +301,12 @@ def pixels_to_pdf() -> int:
"8", "8",
f"rgb:{rgb_filename}", f"rgb:{rgb_filename}",
f"png:{png_filename}", f"png:{png_filename}",
] ],
try: error_message=f"Page {page}/{num_pages} conversion to PNG failed",
p = subprocess.run( timeout_message=f"Error converting pixels to PNG, convert timed out after {DEFAULT_TIMEOUT} seconds",
args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=60,
) )
except subprocess.TimeoutExpired: run_command(
output( [
True,
"Error converting pixels to PNG, convert timed out after 60 seconds",
percentage,
)
return 1
if p.returncode != 0:
output(
True,
f"Page {page}/{num_pages} conversion to PNG failed",
percentage,
)
return 1
args = [
"tesseract", "tesseract",
png_filename, png_filename,
ocr_filename, ocr_filename,
@ -346,38 +315,19 @@ def pixels_to_pdf() -> int:
"--dpi", "--dpi",
"70", "70",
"pdf", "pdf",
] ],
try: error_message=f"Page {page}/{num_pages} OCR failed",
p = subprocess.run( timeout_message=f"Error converting PNG to searchable PDF, tesseract timed out after {DEFAULT_TIMEOUT} seconds",
args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=60,
) )
except subprocess.TimeoutExpired:
output(
True,
"Error converting PNG to searchable PDF, tesseract timed out after 60 seconds",
percentage,
)
return 1
if p.returncode != 0:
output(
True,
f"Page {page}/{num_pages} OCR failed",
percentage,
)
return 1
else: else: # Don't OCR
# Don't OCR
output( output(
False, False,
f"Converting page {page}/{num_pages} from pixels to PDF", f"Converting page {page}/{num_pages} from pixels to PDF",
percentage, percentage,
) )
run_command(
args = [ [
"gm", "gm",
"convert", "convert",
"-size", "-size",
@ -386,31 +336,15 @@ def pixels_to_pdf() -> int:
"8", "8",
f"rgb:{rgb_filename}", f"rgb:{rgb_filename}",
f"pdf:{pdf_filename}", f"pdf:{pdf_filename}",
] ],
try: error_message=f"Page {page}/{num_pages} conversion to PDF failed",
p = subprocess.run( timeout_message=f"Error converting RGB to PDF, convert timed out after {DEFAULT_TIMEOUT} seconds",
args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=60,
) )
except subprocess.TimeoutExpired:
output(
True,
"Error converting RGB to PDF, convert timed out after 60 seconds",
percentage,
)
return 1
if p.returncode != 0:
output(
True,
f"Page {page}/{num_pages} conversion to PDF failed",
percentage,
)
return 1
percentage += percentage_per_page percentage += percentage_per_page
# END OF FOR LOOP
# Merge pages into a single PDF # Merge pages into a single PDF
output( output(
False, False,
@ -421,24 +355,11 @@ def pixels_to_pdf() -> int:
for page in range(1, num_pages + 1): for page in range(1, num_pages + 1):
args.append(f"/tmp/page-{page}.pdf") args.append(f"/tmp/page-{page}.pdf")
args.append(f"/tmp/safe-output.pdf") args.append(f"/tmp/safe-output.pdf")
try: run_command(
p = subprocess.run( args,
args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=60 error_message="Merging pages into a single PDF failed",
timeout_message=f"Error merging pages into a single PDF, pdfunite timed out after {DEFAULT_TIMEOUT} seconds",
) )
except subprocess.TimeoutExpired:
output(
True,
"Error merging pages into a single PDF, pdfunite timed out after 60 seconds",
percentage,
)
return 1
if p.returncode != 0:
output(
True,
"Merging pages into a single PDF failed",
percentage,
)
return 1
percentage += 2 percentage += 2
@ -449,27 +370,12 @@ def pixels_to_pdf() -> int:
percentage, percentage,
) )
compress_timeout = num_pages * 3 compress_timeout = num_pages * 3
try: run_command(
p = subprocess.run(
["ps2pdf", "/tmp/safe-output.pdf", "/tmp/safe-output-compressed.pdf"], ["ps2pdf", "/tmp/safe-output.pdf", "/tmp/safe-output-compressed.pdf"],
stdout=subprocess.DEVNULL, timeout_message=f"Error compressing PDF, ps2pdf timed out after {compress_timeout} seconds",
stderr=subprocess.DEVNULL, error_message="Compressing PDF failed",
timeout=compress_timeout, timeout=compress_timeout,
) )
except subprocess.TimeoutExpired:
output(
True,
f"Error compressing PDF, ps2pdf timed out after {compress_timeout} seconds",
percentage,
)
return 1
if p.returncode != 0:
output(
True,
f"Compressing PDF failed",
percentage,
)
return 1
percentage = 100.0 percentage = 100.0
output(False, "Safe PDF created", percentage) output(False, "Safe PDF created", percentage)
@ -478,8 +384,6 @@ def pixels_to_pdf() -> int:
shutil.move("/tmp/safe-output.pdf", "/safezone") shutil.move("/tmp/safe-output.pdf", "/safezone")
shutil.move("/tmp/safe-output-compressed.pdf", "/safezone") shutil.move("/tmp/safe-output-compressed.pdf", "/safezone")
return 0
def main() -> int: def main() -> int:
if len(sys.argv) != 2: if len(sys.argv) != 2: