Split isolation providers into their own .py files

Provides more clear code organization having each provider in their own
python file rather than a single one.
This commit is contained in:
deeplow 2023-01-18 13:59:58 +00:00
parent 7ed1fd6b59
commit 538df18709
No known key found for this signature in database
GPG key ID: 577982871529A52A
9 changed files with 228 additions and 119 deletions

View file

@ -5,8 +5,10 @@ from typing import Any, Callable, List, Optional, TypeVar
import click
from colorama import Back, Fore, Style
from . import args, errors, isolation_provider
from . import args, errors
from .document import ARCHIVE_SUBDIR, SAFE_EXTENSION
from .isolation_provider.container import Container
from .isolation_provider.dummy import Dummy
from .logic import DangerzoneCore
from .util import get_version

View file

@ -13,6 +13,8 @@ from PySide2 import QtCore, QtGui, QtWidgets
from .. import args, errors
from ..document import Document
from ..isolation_provider.container import Container
from ..isolation_provider.dummy import Dummy
from ..util import get_resource_path, get_version
from .logic import DangerzoneGui
from .main_window import MainWindow

View file

@ -13,6 +13,7 @@ from PySide2 import QtCore, QtGui, QtWidgets
if platform.system() == "Linux":
from xdg.DesktopEntry import DesktopEntry
from ..isolation_provider.base import IsolationProvider
from ..logic import DangerzoneCore
from ..settings import Settings
from ..util import get_resource_path

View file

@ -11,8 +11,10 @@ from typing import List, Optional
from colorama import Fore, Style
from PySide2 import QtCore, QtGui, QtWidgets
from .. import errors, isolation_provider
from .. import errors
from ..document import SAFE_EXTENSION, Document
from ..isolation_provider.container import Container, NoContainerTechException
from ..isolation_provider.dummy import Dummy
from ..util import get_resource_path, get_subprocess_startupinfo, get_version
from .logic import Alert, DangerzoneGui
@ -167,8 +169,11 @@ class WaitingWidget(QtWidgets.QWidget):
state: Optional[str] = None
try:
if isinstance( # Sanity check
self.dangerzone.isolation_provider, Container
):
container_runtime = self.dangerzone.isolation_provider.get_runtime()
except isolation_provider.NoContainerTechException as e:
except NoContainerTechException as e:
log.error(str(e))
state = "not_installed"

View file

@ -0,0 +1,118 @@
import logging
import subprocess
from abc import ABC, abstractmethod
from typing import Callable, Optional
from ..document import Document
log = logging.getLogger(__name__)
class IsolationProvider(ABC):
"""
Abstracts an isolation provider
"""
@abstractmethod
def install(self) -> bool:
pass
def convert(
self,
document: Document,
ocr_lang: Optional[str],
stdout_callback: Optional[Callable] = None,
) -> None:
document.mark_as_converting()
success = self._convert(document, ocr_lang, stdout_callback)
if success:
document.mark_as_safe()
if document.archive_after_conversion:
document.archive()
else:
document.mark_as_failed()
@abstractmethod
def _convert(
self,
document: Document,
ocr_lang: Optional[str],
stdout_callback: Optional[Callable] = None,
) -> bool:
pass
@abstractmethod
def get_max_parallel_conversions(self) -> int:
pass
# From global_common:
# def validate_convert_to_pixel_output(self, common, output):
# """
# Take the output from the convert to pixels tasks and validate it. Returns
# a tuple like: (success (boolean), error_message (str))
# """
# max_image_width = 10000
# max_image_height = 10000
# # Did we hit an error?
# for line in output.split("\n"):
# if (
# "failed:" in line
# or "The document format is not supported" in line
# or "Error" in line
# ):
# return False, output
# # How many pages was that?
# num_pages = None
# for line in output.split("\n"):
# if line.startswith("Document has "):
# num_pages = line.split(" ")[2]
# break
# if not num_pages or not num_pages.isdigit() or int(num_pages) <= 0:
# return False, "Invalid number of pages returned"
# num_pages = int(num_pages)
# # Make sure we have the files we expect
# expected_filenames = []
# for i in range(1, num_pages + 1):
# expected_filenames += [
# f"page-{i}.rgb",
# f"page-{i}.width",
# f"page-{i}.height",
# ]
# expected_filenames.sort()
# actual_filenames = os.listdir(common.pixel_dir.name)
# actual_filenames.sort()
# if expected_filenames != actual_filenames:
# return (
# False,
# f"We expected these files:\n{expected_filenames}\n\nBut we got these files:\n{actual_filenames}",
# )
# # Make sure the files are the correct sizes
# for i in range(1, num_pages + 1):
# with open(f"{common.pixel_dir.name}/page-{i}.width") as f:
# w_str = f.read().strip()
# with open(f"{common.pixel_dir.name}/page-{i}.height") as f:
# h_str = f.read().strip()
# w = int(w_str)
# h = int(h_str)
# if (
# not w_str.isdigit()
# or not h_str.isdigit()
# or w <= 0
# or w > max_image_width
# or h <= 0
# or h > max_image_height
# ):
# return False, f"Page {i} has invalid geometry"
# # Make sure the RGB file is the correct size
# if os.path.getsize(f"{common.pixel_dir.name}/page-{i}.rgb") != w * h * 3:
# return False, f"Page {i} has an invalid RGB file size"
# return True, True

View file

@ -7,14 +7,14 @@ import platform
import shutil
import subprocess
import tempfile
from abc import ABC, abstractmethod
from typing import Callable, List, Optional, Tuple
import appdirs
from colorama import Fore, Style
from .document import Document
from .util import get_resource_path, get_subprocess_startupinfo
from ..document import Document
from ..util import get_resource_path, get_subprocess_startupinfo
from .base import IsolationProvider
# Define startupinfo for subprocesses
if platform.system() == "Windows":
@ -23,6 +23,7 @@ if platform.system() == "Windows":
else:
startupinfo = None
log = logging.getLogger(__name__)
@ -31,45 +32,7 @@ class NoContainerTechException(Exception):
super().__init__(f"{container_tech} is not installed")
class AbstractIsolationProvider(ABC):
"""
Abstracts an isolation provider
"""
@abstractmethod
def install(self) -> bool:
pass
def convert(
self,
document: Document,
ocr_lang: Optional[str],
stdout_callback: Optional[Callable] = None,
) -> None:
document.mark_as_converting()
success = self._convert(document, ocr_lang, stdout_callback)
if success:
document.mark_as_safe()
if document.archive_after_conversion:
document.archive()
else:
document.mark_as_failed()
@abstractmethod
def _convert(
self,
document: Document,
ocr_lang: Optional[str],
stdout_callback: Optional[Callable] = None,
) -> bool:
pass
@abstractmethod
def get_max_parallel_conversions(self) -> int:
pass
class Container(AbstractIsolationProvider):
class Container(IsolationProvider):
# Name of the dangerzone container
CONTAINER_NAME = "dangerzone.rocks/dangerzone"
@ -355,75 +318,3 @@ class Container(AbstractIsolationProvider):
n_cpu = int(n_cpu_str.strip())
return 2 * n_cpu + 1
# From global_common:
# def validate_convert_to_pixel_output(self, common, output):
# """
# Take the output from the convert to pixels tasks and validate it. Returns
# a tuple like: (success (boolean), error_message (str))
# """
# max_image_width = 10000
# max_image_height = 10000
# # Did we hit an error?
# for line in output.split("\n"):
# if (
# "failed:" in line
# or "The document format is not supported" in line
# or "Error" in line
# ):
# return False, output
# # How many pages was that?
# num_pages = None
# for line in output.split("\n"):
# if line.startswith("Document has "):
# num_pages = line.split(" ")[2]
# break
# if not num_pages or not num_pages.isdigit() or int(num_pages) <= 0:
# return False, "Invalid number of pages returned"
# num_pages = int(num_pages)
# # Make sure we have the files we expect
# expected_filenames = []
# for i in range(1, num_pages + 1):
# expected_filenames += [
# f"page-{i}.rgb",
# f"page-{i}.width",
# f"page-{i}.height",
# ]
# expected_filenames.sort()
# actual_filenames = os.listdir(common.pixel_dir.name)
# actual_filenames.sort()
# if expected_filenames != actual_filenames:
# return (
# False,
# f"We expected these files:\n{expected_filenames}\n\nBut we got these files:\n{actual_filenames}",
# )
# # Make sure the files are the correct sizes
# for i in range(1, num_pages + 1):
# with open(f"{common.pixel_dir.name}/page-{i}.width") as f:
# w_str = f.read().strip()
# with open(f"{common.pixel_dir.name}/page-{i}.height") as f:
# h_str = f.read().strip()
# w = int(w_str)
# h = int(h_str)
# if (
# not w_str.isdigit()
# or not h_str.isdigit()
# or w <= 0
# or w > max_image_width
# or h <= 0
# or h > max_image_height
# ):
# return False, f"Page {i} has invalid geometry"
# # Make sure the RGB file is the correct size
# if os.path.getsize(f"{common.pixel_dir.name}/page-{i}.rgb") != w * h * 3:
# return False, f"Page {i} has an invalid RGB file size"
# return True, True

View file

@ -0,0 +1,89 @@
import logging
import os
import shutil
import sys
import time
from typing import Callable, Optional
from colorama import Fore, Style
from ..document import Document
from ..util import get_resource_path
from .base import IsolationProvider
log = logging.getLogger(__name__)
class Dummy(IsolationProvider):
"""Dummy Isolation Provider (FOR TESTING ONLY)
"Do-nothing" converter - the sanitized files are the same as the input files.
Useful for testing without the need to use docker.
"""
def __init__(self) -> None:
# Sanity check
if not getattr(sys, "dangerzone_dev", False):
raise Exception(
"Dummy isolation provider is UNSAFE and should never be "
+ "called in a non-testing system."
)
def install(self) -> bool:
pass
def _convert(
self,
document: Document,
ocr_lang: Optional[str],
stdout_callback: Optional[Callable] = None,
) -> bool:
log.debug("Dummy converter started:")
log.debug(
f" - document: {os.path.basename(document.input_filename)} ({document.id})"
)
log.debug(f" - ocr : {ocr_lang}")
log.debug("\n(simulating conversion)")
success = True
progress = [
[False, "Converting to PDF using GraphicsMagick", 0.0],
[False, "Separating document into pages", 3.0],
[False, "Converting page 1/1 to pixels", 5.0],
[False, "Converted document to pixels", 50.0],
[False, "Converting page 1/1 from pixels to PDF", 50.0],
[False, "Merging 1 pages into a single PDF", 95.0],
[False, "Compressing PDF", 97.0],
[False, "Safe PDF created", 100.0],
]
for (error, text, percentage) in progress:
self._print_progress(document, error, text, percentage) # type: ignore [arg-type]
if stdout_callback:
stdout_callback(error, text, percentage)
if error:
success = False
time.sleep(0.2)
if success:
shutil.copy(
get_resource_path("dummy_document.pdf"), document.output_filename
)
return success
def _print_progress(
self, document: Document, error: bool, text: str, percentage: float
) -> None:
s = Style.BRIGHT + Fore.YELLOW + f"[doc {document.id}] "
s += Fore.CYAN + f"{percentage}% "
if error:
s += Style.RESET_ALL + Fore.RED + text
log.error(s)
else:
s += Style.RESET_ALL + text
log.info(s)
def get_max_parallel_conversions(self) -> int:
return 1

View file

@ -14,6 +14,7 @@ import colorama
from . import errors, isolation_provider
from .document import Document
from .isolation_provider.container import Container
from .settings import Settings
from .util import get_resource_path
@ -41,7 +42,7 @@ class DangerzoneCore(object):
self.documents: List[Document] = []
self.isolation_provider = isolation_provider.Container()
self.isolation_provider = Container()
def add_document_from_filename(
self,