Parallel cli bulk conversions via threading

Initial parallel document conversion: creates a pool of N threads
defined by the setting 'parallel_conversions'. Each thread calls
convert() on a document.
This commit is contained in:
deeplow 2022-09-22 15:10:51 +01:00
parent e17912888a
commit 2d587f4082
No known key found for this signature in database
GPG key ID: 577982871529A52A
2 changed files with 29 additions and 5 deletions

View file

@ -254,6 +254,27 @@ def convert(
return success return success
def get_max_parallel_conversions() -> int:
n_cpu = 1
if platform.system() == "Linux":
# if on linux containers run natively
cpu_count = os.cpu_count()
if cpu_count is not None:
n_cpu = cpu_count
elif get_runtime_name() == "docker":
# For Windows and MacOS containers run in VM
# So we obtain the CPU count for the VM
n_cpu_str = subprocess.check_output(
[get_runtime(), "info", "--format", "{{.NCPU}}"],
text=True,
startupinfo=get_subprocess_startupinfo(),
)
n_cpu = int(n_cpu_str.strip())
return 2 * n_cpu + 1
# From global_common: # From global_common:
# def validate_convert_to_pixel_output(self, common, output): # def validate_convert_to_pixel_output(self, common, output):

View file

@ -1,3 +1,4 @@
import concurrent.futures
import gzip import gzip
import json import json
import logging import logging
@ -11,7 +12,7 @@ from typing import Callable, List, Optional
import appdirs import appdirs
import colorama import colorama
from .container import convert from . import container
from .document import Document from .document import Document
from .settings import Settings from .settings import Settings
from .util import get_resource_path from .util import get_resource_path
@ -49,10 +50,8 @@ class DangerzoneCore(object):
def convert_documents( def convert_documents(
self, ocr_lang: Optional[str], stdout_callback: Callable[[str], None] self, ocr_lang: Optional[str], stdout_callback: Callable[[str], None]
) -> None: ) -> None:
all_successful = True def convert_doc(document: Document) -> None:
success = container.convert(
for document in self.documents:
success = convert(
document.input_filename, document.input_filename,
document.output_filename, document.output_filename,
ocr_lang, ocr_lang,
@ -63,6 +62,10 @@ class DangerzoneCore(object):
else: else:
document.mark_as_failed() document.mark_as_failed()
max_jobs = container.get_max_parallel_conversions()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_jobs) as executor:
executor.map(convert_doc, self.documents)
def get_safe_documents(self) -> List[Document]: def get_safe_documents(self) -> List[Document]:
return [doc for doc in self.documents if doc.is_safe()] return [doc for doc in self.documents if doc.is_safe()]