Wrap dangerzone.py back into a class to keep track of percentage

This commit is contained in:
Guthrie McAfee Armstrong 2022-06-05 18:23:10 -04:00 committed by deeplow
parent eaa08c9c3d
commit 17939cb70c
No known key found for this signature in database
GPG key ID: 577982871529A52A

View file

@ -18,10 +18,7 @@ import os
import shutil import shutil
import subprocess import subprocess
import sys import sys
<<<<<<< HEAD
from typing import Dict, Optional from typing import Dict, Optional
=======
>>>>>>> d990cfb (refactor dangerzone.py, raise exceptions instead of returning int)
import magic import magic
from PIL import Image from PIL import Image
@ -55,334 +52,318 @@ def run_command(
raise TimeoutError(timeout_message) from e raise TimeoutError(timeout_message) from e
def output(self, error: bool, text: str, percentage: float) -> None: class ConversionJob(object):
print(json.dumps({"error": error, "text": text, "percentage": int(percentage)})) def __init__(self) -> None:
sys.stdout.flush() self.percentage: float = 0.0 # TODO Optional[float], but this default value will be overwritten immediately
def document_to_pixels(self) -> None:
self.percentage: float = 0.0
def document_to_pixels() -> None: conversions: Dict[str, Dict[str, Optional[str]]] = {
percentage: float = 0.0 # .pdf
"application/pdf": {"type": None},
# .docx
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": {
"type": "libreoffice",
"libreoffice_output_filter": "writer_pdf_Export",
},
# .doc
"application/msword": {
"type": "libreoffice",
"libreoffice_output_filter": "writer_pdf_Export",
},
# .docm
"application/vnd.ms-word.document.macroEnabled.12": {
"type": "libreoffice",
"libreoffice_output_filter": "writer_pdf_Export",
},
# .xlsx
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": {
"type": "libreoffice",
"libreoffice_output_filter": "calc_pdf_Export",
},
# .xls
"application/vnd.ms-excel": {
"type": "libreoffice",
"libreoffice_output_filter": "calc_pdf_Export",
},
# .pptx
"application/vnd.openxmlformats-officedocument.presentationml.presentation": {
"type": "libreoffice",
"libreoffice_output_filter": "impress_pdf_Export",
},
# .ppt
"application/vnd.ms-powerpoint": {
"type": "libreoffice",
"libreoffice_output_filter": "impress_pdf_Export",
},
# .odt
"application/vnd.oasis.opendocument.text": {
"type": "libreoffice",
"libreoffice_output_filter": "writer_pdf_Export",
},
# .odg
"application/vnd.oasis.opendocument.graphics": {
"type": "libreoffice",
"libreoffice_output_filter": "impress_pdf_Export",
},
# .odp
"application/vnd.oasis.opendocument.presentation": {
"type": "libreoffice",
"libreoffice_output_filter": "impress_pdf_Export",
},
# .ops
"application/vnd.oasis.opendocument.spreadsheet": {
"type": "libreoffice",
"libreoffice_output_filter": "calc_pdf_Export",
},
# .jpg
"image/jpeg": {"type": "convert"},
# .gif
"image/gif": {"type": "convert"},
# .png
"image/png": {"type": "convert"},
# .tif
"image/tiff": {"type": "convert"},
"image/x-tiff": {"type": "convert"},
}
conversions: Dict[str, Dict[str, Optional[str]]] = { # Detect MIME type
# .pdf mime = magic.Magic(mime=True)
"application/pdf": {"type": None}, mime_type = mime.from_file("/tmp/input_file")
# .docx
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": {
"type": "libreoffice",
"libreoffice_output_filter": "writer_pdf_Export",
},
# .doc
"application/msword": {
"type": "libreoffice",
"libreoffice_output_filter": "writer_pdf_Export",
},
# .docm
"application/vnd.ms-word.document.macroEnabled.12": {
"type": "libreoffice",
"libreoffice_output_filter": "writer_pdf_Export",
},
# .xlsx
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": {
"type": "libreoffice",
"libreoffice_output_filter": "calc_pdf_Export",
},
# .xls
"application/vnd.ms-excel": {
"type": "libreoffice",
"libreoffice_output_filter": "calc_pdf_Export",
},
# .pptx
"application/vnd.openxmlformats-officedocument.presentationml.presentation": {
"type": "libreoffice",
"libreoffice_output_filter": "impress_pdf_Export",
},
# .ppt
"application/vnd.ms-powerpoint": {
"type": "libreoffice",
"libreoffice_output_filter": "impress_pdf_Export",
},
# .odt
"application/vnd.oasis.opendocument.text": {
"type": "libreoffice",
"libreoffice_output_filter": "writer_pdf_Export",
},
# .odg
"application/vnd.oasis.opendocument.graphics": {
"type": "libreoffice",
"libreoffice_output_filter": "impress_pdf_Export",
},
# .odp
"application/vnd.oasis.opendocument.presentation": {
"type": "libreoffice",
"libreoffice_output_filter": "impress_pdf_Export",
},
# .ops
"application/vnd.oasis.opendocument.spreadsheet": {
"type": "libreoffice",
"libreoffice_output_filter": "calc_pdf_Export",
},
# .jpg
"image/jpeg": {"type": "convert"},
# .gif
"image/gif": {"type": "convert"},
# .png
"image/png": {"type": "convert"},
# .tif
"image/tiff": {"type": "convert"},
"image/x-tiff": {"type": "convert"},
}
# Detect MIME type # Validate MIME type
mime = magic.Magic(mime=True) if mime_type not in conversions:
mime_type = mime.from_file("/tmp/input_file") raise ValueError(f"Document format ${mime_type} is not supported")
# Validate MIME type # Convert input document to PDF
if mime_type not in conversions: conversion = conversions[mime_type]
raise ValueError(f"Document format ${mime_type} is not supported") if conversion["type"] is None:
pdf_filename = "/tmp/input_file"
# Convert input document to PDF elif conversion["type"] == "libreoffice":
conversion = conversions[mime_type] self.update_progress("Converting to PDF using LibreOffice")
if conversion["type"] is None: args = [
pdf_filename = "/tmp/input_file" "libreoffice",
elif conversion["type"] == "libreoffice": "--headless",
output(False, "Converting to PDF using LibreOffice", percentage) "--convert-to",
args = [ f"pdf:{conversion['libreoffice_output_filter']}",
"libreoffice", "--outdir",
"--headless", "/tmp",
"--convert-to", "/tmp/input_file",
f"pdf:{conversion['libreoffice_output_filter']}", ]
"--outdir", run_command(
"/tmp", args,
"/tmp/input_file", error_message="Conversion to PDF with LibreOffice failed",
] timeout_message=f"Error converting document to PDF, LibreOffice timed out after {DEFAULT_TIMEOUT} seconds",
run_command( )
args, pdf_filename = "/tmp/input_file.pdf"
error_message="Conversion to PDF with LibreOffice failed", elif conversion["type"] == "convert":
timeout_message=f"Error converting document to PDF, LibreOffice timed out after {DEFAULT_TIMEOUT} seconds", self.update_progress("Converting to PDF using GraphicsMagick")
) args = [
pdf_filename = "/tmp/input_file.pdf"
elif conversion["type"] == "convert":
output(False, "Converting to PDF using GraphicsMagick", percentage)
args = [
"gm",
"convert",
"/tmp/input_file",
"/tmp/input_file.pdf",
]
run_command(
args,
error_message="Conversion to PDF with GraphicsMagick failed",
timeout_message=f"Error converting document to PDF, GraphicsMagick timed out after {DEFAULT_TIMEOUT} seconds",
)
pdf_filename = "/tmp/input_file.pdf"
else:
raise ValueError(
f"Invalid conversion type {conversion['type']} for MIME type {mime_type}"
)
percentage += 3
# Separate PDF into pages
output(
False,
"Separating document into pages",
percentage,
)
args = ["pdftk", pdf_filename, "burst", "output", "/tmp/page-%d.pdf"]
run_command(
args,
error_message="Separating document into pages failed",
timeout_message=f"Error separating document into pages, pdfseparate timed out after {DEFAULT_TIMEOUT} seconds",
)
page_filenames = glob.glob("/tmp/page-*.pdf")
percentage += 2
# Convert to RGB pixel data
percentage_per_page = 45.0 / len(page_filenames)
for page in range(1, len(page_filenames) + 1):
pdf_filename = f"/tmp/page-{page}.pdf"
png_filename = f"/tmp/page-{page}.png"
rgb_filename = f"/tmp/page-{page}.rgb"
width_filename = f"/tmp/page-{page}.width"
height_filename = f"/tmp/page-{page}.height"
filename_base = f"/tmp/page-{page}"
output(
False,
f"Converting page {page}/{len(page_filenames)} to pixels",
percentage,
)
# Convert to png
run_command(
["pdftocairo", pdf_filename, "-png", "-singlefile", filename_base],
error_message="Conversion from PDF to PNG failed",
timeout_message=f"Error converting from PDF to PNG, pdftocairo timed out after {DEFAULT_TIMEOUT} seconds",
)
# Save the width and height
with Image.open(png_filename, "r") as im:
width, height = im.size
with open(width_filename, "w") as f:
f.write(str(width))
with open(height_filename, "w") as f:
f.write(str(height))
# Convert to RGB pixels
run_command(
[
"gm", "gm",
"convert", "convert",
png_filename, "/tmp/input_file",
"-depth", "/tmp/input_file.pdf",
"8", ]
f"rgb:{rgb_filename}", run_command(
], args,
error_message="Conversion from PNG to RGB failed", error_message="Conversion to PDF with GraphicsMagick failed",
timeout_message=f"Error converting from PNG to pixels, convert timed out after {DEFAULT_TIMEOUT} seconds", timeout_message=f"Error converting document to PDF, GraphicsMagick timed out after {DEFAULT_TIMEOUT} seconds",
)
pdf_filename = "/tmp/input_file.pdf"
else:
raise ValueError(
f"Invalid conversion type {conversion['type']} for MIME type {mime_type}"
)
self.percentage += 3
# Separate PDF into pages
self.update_progress("Separating document into pages"),
args = ["pdftk", pdf_filename, "burst", "output", "/tmp/page-%d.pdf"]
run_command(
args,
error_message="Separating document into pages failed",
timeout_message=f"Error separating document into pages, pdfseparate timed out after {DEFAULT_TIMEOUT} seconds",
) )
# Delete the png page_filenames = glob.glob("/tmp/page-*.pdf")
os.remove(png_filename)
percentage += percentage_per_page
# END OF FOR LOOP self.percentage += 2
output( # Convert to RGB pixel data
False, percentage_per_page = 45.0 / len(page_filenames)
"Converted document to pixels", for page in range(1, len(page_filenames) + 1):
percentage, pdf_filename = f"/tmp/page-{page}.pdf"
) png_filename = f"/tmp/page-{page}.png"
rgb_filename = f"/tmp/page-{page}.rgb"
width_filename = f"/tmp/page-{page}.width"
height_filename = f"/tmp/page-{page}.height"
filename_base = f"/tmp/page-{page}"
# Move converted files into /dangerzone self.update_progress(
for filename in ( f"Converting page {page}/{len(page_filenames)} to pixels"
glob.glob("/tmp/page-*.rgb")
+ glob.glob("/tmp/page-*.width")
+ glob.glob("/tmp/page-*.height")
):
shutil.move(filename, "/dangerzone")
def pixels_to_pdf() -> None:
percentage = 50.0
num_pages = len(glob.glob("/dangerzone/page-*.rgb"))
# Convert RGB files to PDF files
percentage_per_page = 45.0 / num_pages
for page in range(1, num_pages + 1):
filename_base = f"/dangerzone/page-{page}"
rgb_filename = f"{filename_base}.rgb"
width_filename = f"{filename_base}.width"
height_filename = f"{filename_base}.height"
png_filename = f"/tmp/page-{page}.png"
ocr_filename = f"/tmp/page-{page}"
pdf_filename = f"/tmp/page-{page}.pdf"
with open(width_filename) as f:
width = f.read().strip()
with open(height_filename) as f:
height = f.read().strip()
if os.environ.get("OCR") == "1": # OCR the document
output(
False,
f"Converting page {page}/{num_pages} from pixels to searchable PDF",
percentage,
) )
# Convert to png
run_command(
["pdftocairo", pdf_filename, "-png", "-singlefile", filename_base],
error_message="Conversion from PDF to PNG failed",
timeout_message=f"Error converting from PDF to PNG, pdftocairo timed out after {DEFAULT_TIMEOUT} seconds",
)
# Save the width and height
with Image.open(png_filename, "r") as im:
width, height = im.size
with open(width_filename, "w") as f:
f.write(str(width))
with open(height_filename, "w") as f:
f.write(str(height))
# Convert to RGB pixels
run_command( run_command(
[ [
"gm", "gm",
"convert", "convert",
"-size",
f"{width}x{height}",
"-depth",
"8",
f"rgb:{rgb_filename}",
f"png:{png_filename}",
],
error_message=f"Page {page}/{num_pages} conversion to PNG failed",
timeout_message=f"Error converting pixels to PNG, convert timed out after {DEFAULT_TIMEOUT} seconds",
)
run_command(
[
"tesseract",
png_filename, png_filename,
ocr_filename,
"-l",
os.environ.get("OCR_LANGUAGE"), # type: ignore
"--dpi",
"70",
"pdf",
],
error_message=f"Page {page}/{num_pages} OCR failed",
timeout_message=f"Error converting PNG to searchable PDF, tesseract timed out after {DEFAULT_TIMEOUT} seconds",
)
else: # Don't OCR
output(
False,
f"Converting page {page}/{num_pages} from pixels to PDF",
percentage,
)
run_command(
[
"gm",
"convert",
"-size",
f"{width}x{height}",
"-depth", "-depth",
"8", "8",
f"rgb:{rgb_filename}", f"rgb:{rgb_filename}",
f"pdf:{pdf_filename}",
], ],
error_message=f"Page {page}/{num_pages} conversion to PDF failed", error_message="Conversion from PNG to RGB failed",
timeout_message=f"Error converting RGB to PDF, convert timed out after {DEFAULT_TIMEOUT} seconds", timeout_message=f"Error converting from PNG to pixels, convert timed out after {DEFAULT_TIMEOUT} seconds",
) )
percentage += percentage_per_page # Delete the png
os.remove(png_filename)
self.percentage += percentage_per_page
# END OF FOR LOOP # END OF FOR LOOP
# Merge pages into a single PDF self.update_progress("Converted document to pixels")
output(
False,
f"Merging {num_pages} pages into a single PDF",
percentage,
)
args = ["pdfunite"]
for page in range(1, num_pages + 1):
args.append(f"/tmp/page-{page}.pdf")
args.append(f"/tmp/safe-output.pdf")
run_command(
args,
error_message="Merging pages into a single PDF failed",
timeout_message=f"Error merging pages into a single PDF, pdfunite timed out after {DEFAULT_TIMEOUT} seconds",
)
percentage += 2 # Move converted files into /dangerzone
for filename in (
glob.glob("/tmp/page-*.rgb")
+ glob.glob("/tmp/page-*.width")
+ glob.glob("/tmp/page-*.height")
):
shutil.move(filename, "/dangerzone")
# Compress def pixels_to_pdf(self) -> None:
output( self.percentage = 50.0
False,
f"Compressing PDF",
percentage,
)
compress_timeout = num_pages * 3
run_command(
["ps2pdf", "/tmp/safe-output.pdf", "/tmp/safe-output-compressed.pdf"],
timeout_message=f"Error compressing PDF, ps2pdf timed out after {compress_timeout} seconds",
error_message="Compressing PDF failed",
timeout=compress_timeout,
)
percentage = 100.0 num_pages = len(glob.glob("/dangerzone/page-*.rgb"))
output(False, "Safe PDF created", percentage)
# Move converted files into /safezone # Convert RGB files to PDF files
shutil.move("/tmp/safe-output.pdf", "/safezone") percentage_per_page = 45.0 / num_pages
shutil.move("/tmp/safe-output-compressed.pdf", "/safezone") for page in range(1, num_pages + 1):
filename_base = f"/dangerzone/page-{page}"
rgb_filename = f"{filename_base}.rgb"
width_filename = f"{filename_base}.width"
height_filename = f"{filename_base}.height"
png_filename = f"/tmp/page-{page}.png"
ocr_filename = f"/tmp/page-{page}"
pdf_filename = f"/tmp/page-{page}.pdf"
with open(width_filename) as f:
width = f.read().strip()
with open(height_filename) as f:
height = f.read().strip()
if os.environ.get("OCR") == "1": # OCR the document
self.update_progress(
f"Converting page {page}/{num_pages} from pixels to searchable PDF"
)
run_command(
[
"gm",
"convert",
"-size",
f"{width}x{height}",
"-depth",
"8",
f"rgb:{rgb_filename}",
f"png:{png_filename}",
],
error_message=f"Page {page}/{num_pages} conversion to PNG failed",
timeout_message=f"Error converting pixels to PNG, convert timed out after {DEFAULT_TIMEOUT} seconds",
)
run_command(
[
"tesseract",
png_filename,
ocr_filename,
"-l",
os.environ.get("OCR_LANGUAGE"), # type: ignore
"--dpi",
"70",
"pdf",
],
error_message=f"Page {page}/{num_pages} OCR failed",
timeout_message=f"Error converting PNG to searchable PDF, tesseract timed out after {DEFAULT_TIMEOUT} seconds",
)
else: # Don't OCR
self.update_progress(
f"Converting page {page}/{num_pages} from pixels to PDF"
)
run_command(
[
"gm",
"convert",
"-size",
f"{width}x{height}",
"-depth",
"8",
f"rgb:{rgb_filename}",
f"pdf:{pdf_filename}",
],
error_message=f"Page {page}/{num_pages} conversion to PDF failed",
timeout_message=f"Error converting RGB to PDF, convert timed out after {DEFAULT_TIMEOUT} seconds",
)
self.percentage += percentage_per_page
# END OF FOR LOOP
# Merge pages into a single PDF
self.update_progress(f"Merging {num_pages} pages into a single PDF")
args = ["pdfunite"]
for page in range(1, num_pages + 1):
args.append(f"/tmp/page-{page}.pdf")
args.append(f"/tmp/safe-output.pdf")
run_command(
args,
error_message="Merging pages into a single PDF failed",
timeout_message=f"Error merging pages into a single PDF, pdfunite timed out after {DEFAULT_TIMEOUT} seconds",
)
self.percentage += 2
# Compress
self.update_progress(f"Compressing PDF")
compress_timeout = num_pages * 3
run_command(
["ps2pdf", "/tmp/safe-output.pdf", "/tmp/safe-output-compressed.pdf"],
timeout_message=f"Error compressing PDF, ps2pdf timed out after {compress_timeout} seconds",
error_message="Compressing PDF failed",
timeout=compress_timeout,
)
self.percentage = 100.0
self.update_progress("Safe PDF created")
# Move converted files into /safezone
shutil.move("/tmp/safe-output.pdf", "/safezone")
shutil.move("/tmp/safe-output-compressed.pdf", "/safezone")
def update_progress(self, text, *, error: bool = False):
print(
json.dumps(
{"error": error, "text": text, "percentage": int(self.percentage)}
)
)
sys.stdout.flush()
def main() -> int: def main() -> int:
@ -390,21 +371,25 @@ def main() -> int:
print(f"Usage: {sys.argv[0]} [document-to-pixels]|[pixels-to-pdf]") print(f"Usage: {sys.argv[0]} [document-to-pixels]|[pixels-to-pdf]")
return -1 return -1
job = ConversionJob()
if sys.argv[1] == "document-to-pixels": if sys.argv[1] == "document-to-pixels":
try: try:
document_to_pixels() job.document_to_pixels()
except: except (RuntimeError, TimeoutError) as e:
job.update_progress(str(e), error=True)
return 1 return 1
else: else:
return 0 return 0 # Success!
if sys.argv[1] == "pixels-to-pdf": elif sys.argv[1] == "pixels-to-pdf":
try: try:
pixels_to_pdf() job.pixels_to_pdf()
except: except (RuntimeError, TimeoutError) as e:
job.update_progress(str(e), error=True)
return 1 return 1
else: else:
return 0 return 0 # Success!
return -1 return -1