Sequential bulk document support in cli

Basic implementation of bulk document support in dangerzone-cli.

Usage: dangerzone-cli [OPTIONS] doc1.pdf doc2.pdf
This commit is contained in:
deeplow 2022-09-20 15:20:47 +01:00
parent 1147698287
commit 981716ccff
No known key found for this signature in database
GPG key ID: 577982871529A52A
6 changed files with 137 additions and 32 deletions

View file

@ -1,4 +1,4 @@
from typing import Optional from typing import List, Optional, Tuple
import click import click
@ -17,6 +17,18 @@ def _validate_input_filename(
return filename return filename
@errors.handle_document_errors
def _validate_input_filenames(
ctx: click.Context, param: List[str], value: Tuple[str]
) -> List[str]:
normalized_filenames = []
for filename in value:
filename = Document.normalize_filename(filename)
Document.validate_input_filename(filename)
normalized_filenames.append(filename)
return normalized_filenames
@errors.handle_document_errors @errors.handle_document_errors
def _validate_output_filename( def _validate_output_filename(
ctx: click.Context, param: str, value: Optional[str] ctx: click.Context, param: str, value: Optional[str]
@ -42,6 +54,12 @@ def validate_input_filename(
return _validate_input_filename(ctx, param, value) return _validate_input_filename(ctx, param, value)
def validate_input_filenames(
ctx: click.Context, param: List[str], value: Tuple[str]
) -> List[str]:
return _validate_input_filenames(ctx, param, value)
def validate_output_filename( def validate_output_filename(
ctx: click.Context, param: str, value: Optional[str] ctx: click.Context, param: str, value: Optional[str]
) -> Optional[str]: ) -> Optional[str]:

View file

@ -2,7 +2,7 @@ import json
import logging import logging
import os import os
import sys import sys
from typing import Optional from typing import List, Optional
import click import click
from colorama import Back, Fore, Style from colorama import Back, Fore, Style
@ -26,23 +26,29 @@ def print_header(s: str) -> None:
help=f"Default is filename ending with {SAFE_EXTENSION}", help=f"Default is filename ending with {SAFE_EXTENSION}",
) )
@click.option("--ocr-lang", help="Language to OCR, defaults to none") @click.option("--ocr-lang", help="Language to OCR, defaults to none")
@click.argument("filename", required=True, callback=args.validate_input_filename) @click.argument(
"filenames",
required=True,
nargs=-1,
type=click.UNPROCESSED,
callback=args.validate_input_filenames,
)
@errors.handle_document_errors @errors.handle_document_errors
def cli_main( def cli_main(
output_filename: Optional[str], ocr_lang: Optional[str], filename: str output_filename: Optional[str], ocr_lang: Optional[str], filenames: List[str]
) -> None: ) -> None:
setup_logging() setup_logging()
dangerzone = DangerzoneCore() dangerzone = DangerzoneCore()
display_banner() display_banner()
if len(filenames) == 1 and output_filename:
document = Document(filename) dangerzone.add_document(filenames[0], output_filename)
elif len(filenames) > 1 and output_filename:
# Set PDF output filename click.echo("--output-filename can only be used with one input file.")
if output_filename: exit(1)
document.output_filename = output_filename
else: else:
document.set_default_output_filename() for filename in filenames:
dangerzone.add_document(filename)
# Validate OCR language # Validate OCR language
if ocr_lang: if ocr_lang:
@ -75,18 +81,21 @@ def cli_main(
except: except:
click.echo(f"Invalid JSON returned from container: {line}") click.echo(f"Invalid JSON returned from container: {line}")
if convert( dangerzone.convert_documents(ocr_lang, stdout_callback)
document.input_filename, documents_safe = dangerzone.get_safe_documents()
document.output_filename, documents_failed = dangerzone.get_failed_documents()
ocr_lang,
stdout_callback, if documents_safe != []:
): print_header("Safe PDF(s) created successfully")
print_header("Safe PDF created successfully") for document in documents_safe:
click.echo(document.output_filename) click.echo(document.output_filename)
exit(0) if documents_failed != []:
else: print_header("Failed to convert document(s)")
print_header("Failed to convert document") for document in documents_failed:
click.echo(document.input_filename)
exit(1) exit(1)
else:
exit(0)
def setup_logging() -> None: def setup_logging() -> None:

View file

@ -1,3 +1,4 @@
import enum
import os import os
import platform import platform
import stat import stat
@ -18,13 +19,23 @@ class Document:
document, and validating its info. document, and validating its info.
""" """
def __init__(self, input_filename: str = None) -> None: # document conversion state
STATE_UNCONVERTED = enum.auto()
STATE_SAFE = enum.auto()
STATE_FAILED = enum.auto()
def __init__(self, input_filename: str = None, output_filename: str = None) -> None:
self._input_filename: Optional[str] = None self._input_filename: Optional[str] = None
self._output_filename: Optional[str] = None self._output_filename: Optional[str] = None
if input_filename: if input_filename:
self.input_filename = input_filename self.input_filename = input_filename
if output_filename:
self.output_filename = output_filename
self.state = Document.STATE_UNCONVERTED
@staticmethod @staticmethod
def normalize_filename(filename: str) -> str: def normalize_filename(filename: str) -> str:
return os.path.abspath(filename) return os.path.abspath(filename)
@ -68,7 +79,10 @@ class Document:
@property @property
def output_filename(self) -> str: def output_filename(self) -> str:
if self._output_filename is None: if self._output_filename is None:
raise DocumentFilenameException("Output filename has not been set yet.") if self._input_filename is not None:
return self.default_output_filename
else:
raise DocumentFilenameException("Output filename has not been set yet.")
else: else:
return self._output_filename return self._output_filename
@ -78,7 +92,21 @@ class Document:
self.validate_output_filename(filename) self.validate_output_filename(filename)
self._output_filename = filename self._output_filename = filename
def set_default_output_filename(self) -> None: @property
self.output_filename = ( def default_output_filename(self) -> str:
f"{os.path.splitext(self.input_filename)[0]}{SAFE_EXTENSION}" return f"{os.path.splitext(self.input_filename)[0]}{SAFE_EXTENSION}"
)
def is_unconverted(self) -> bool:
return self.state is Document.STATE_UNCONVERTED
def is_failed(self) -> bool:
return self.state is Document.STATE_FAILED
def is_safe(self) -> bool:
return self.state is Document.STATE_SAFE
def mark_as_failed(self) -> None:
self.state = Document.STATE_FAILED
def mark_as_safe(self) -> None:
self.state = Document.STATE_SAFE

View file

@ -433,9 +433,6 @@ class SettingsWidget(QtWidgets.QWidget):
self.dangerous_doc_label.setText( self.dangerous_doc_label.setText(
f"Suspicious: {os.path.basename(self.document.input_filename)}" f"Suspicious: {os.path.basename(self.document.input_filename)}"
) )
# Set the default save location
self.document.set_default_output_filename()
self.save_lineedit.setText(os.path.basename(self.document.output_filename)) self.save_lineedit.setText(os.path.basename(self.document.output_filename))
def save_browse_button_clicked(self) -> None: def save_browse_button_clicked(self) -> None:

View file

@ -6,12 +6,13 @@ import platform
import shutil import shutil
import subprocess import subprocess
import sys import sys
from typing import Optional from typing import Callable, List, Optional
import appdirs import appdirs
import colorama import colorama
from .container import convert from .container import convert
from .document import Document
from .settings import Settings from .settings import Settings
from .util import get_resource_path from .util import get_resource_path
@ -36,3 +37,34 @@ class DangerzoneCore(object):
# Load settings # Load settings
self.settings = Settings(self) self.settings = Settings(self)
self.documents: List[Document] = []
def add_document(
self, input_filename: str, output_filename: Optional[str] = None
) -> None:
doc = Document(input_filename, output_filename)
self.documents.append(doc)
def convert_documents(
self, ocr_lang: Optional[str], stdout_callback: Callable[[str], None]
) -> None:
all_successful = True
for document in self.documents:
success = convert(
document.input_filename,
document.output_filename,
ocr_lang,
stdout_callback,
)
if success:
document.mark_as_safe()
else:
document.mark_as_failed()
def get_safe_documents(self) -> List[Document]:
return [doc for doc in self.documents if doc.is_safe()]
def get_failed_documents(self) -> List[Document]:
return [doc for doc in self.documents if doc.is_failed()]

View file

@ -78,3 +78,24 @@ def test_output_file_not_pdf(tmp_path: Path) -> None:
assert "Safe PDF filename must end in '.pdf'" in str(e.value) assert "Safe PDF filename must end in '.pdf'" in str(e.value)
assert not os.path.exists(docx_file) assert not os.path.exists(docx_file)
def test_is_unconverted_by_default(sample_doc: None) -> None:
d = Document(sample_doc)
assert d.is_unconverted()
def test_mark_as_safe(sample_doc: str) -> None:
d = Document(sample_doc)
d.mark_as_safe()
assert d.is_safe()
assert not d.is_failed()
assert not d.is_unconverted()
def test_mark_as_failed(sample_doc: str) -> None:
d = Document(sample_doc)
d.mark_as_failed()
assert d.is_failed()
assert not d.is_safe()
assert not d.is_unconverted()