Qubes: strategy for capturing conversion logs

Use qrexec stdout to send conversion data (pixels) and stderr to send
conversion progress at the end of the conversion. This happens
regardless of whether or not the conversion is in developer mode or not.

It's the client that decides if it reads the debug data from stderr or
not. In this case, it only reads it if developer mode is enabled.
This commit is contained in:
deeplow 2023-06-22 12:14:04 +01:00
parent 00adf223a5
commit 874b8865e2
No known key found for this signature in database
GPG key ID: 577982871529A52A
5 changed files with 109 additions and 86 deletions

View file

@ -22,84 +22,87 @@ def running_on_qubes() -> bool:
return os.path.exists("/usr/share/qubes/marker-vm")
async def read_stream(
sr: asyncio.StreamReader, callback: Optional[Callable] = None
) -> bytes:
"""Consume a byte stream line-by-line.
Read all lines in a stream until EOF. If a user has passed a callback, call it for
each line.
Note that the lines are in bytes, since we can't assume that all command output will
be UTF-8 encoded. Higher level commands are advised to decode the output to Unicode,
if they know its encoding.
"""
buf = b""
while True:
line = await sr.readline()
if sr.at_eof():
break
if callback is not None:
callback(line)
# TODO: This would be a good place to log the received line, mostly for debug
# logging.
buf += line
return buf
async def run_command(
args: List[str],
*,
error_message: str,
timeout_message: str,
timeout: Optional[float],
stdout_callback: Optional[Callable] = None,
stderr_callback: Optional[Callable] = None,
) -> Tuple[bytes, bytes]:
"""Run a command and get its output.
Run a command using asyncio.subprocess, consume its standard streams, and return its
output in bytes.
:raises RuntimeError: if the process returns a non-zero exit status
:raises TimeoutError: if the process times out
"""
# Start the provided command, and return a handle. The command will run in the
# background.
proc = await asyncio.subprocess.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
assert proc.stdout is not None
assert proc.stderr is not None
# Create asynchronous tasks that will consume the standard streams of the command,
# and call callbacks if necessary.
stdout_task = asyncio.create_task(read_stream(proc.stdout, stdout_callback))
stderr_task = asyncio.create_task(read_stream(proc.stderr, stderr_callback))
# Wait until the command has finished, for a specific timeout. Then, verify that the
# command has completed successfully. In any other case, raise an exception.
try:
ret = await asyncio.wait_for(proc.wait(), timeout=timeout)
except asyncio.exceptions.TimeoutError:
raise TimeoutError(timeout_message)
if ret != 0:
raise RuntimeError(error_message)
# Wait until the tasks that consume the command's standard streams have exited as
# well, and return their output.
stdout = await stdout_task
stderr = await stderr_task
return (stdout, stderr)
class DangerzoneConverter:
def __init__(self, progress_callback: Optional[Callable] = None) -> None:
self.percentage: float = 0.0
self.progress_callback = progress_callback
self.captured_output: bytes = b""
async def read_stream(
self, sr: asyncio.StreamReader, callback: Optional[Callable] = None
) -> bytes:
"""Consume a byte stream line-by-line.
Read all lines in a stream until EOF. If a user has passed a callback, call it for
each line.
Note that the lines are in bytes, since we can't assume that all command output will
be UTF-8 encoded. Higher level commands are advised to decode the output to Unicode,
if they know its encoding.
"""
buf = b""
while True:
line = await sr.readline()
if sr.at_eof():
break
self.captured_output += line
if callback is not None:
callback(line)
buf += line
return buf
async def run_command(
self,
args: List[str],
*,
error_message: str,
timeout_message: str,
timeout: Optional[float],
stdout_callback: Optional[Callable] = None,
stderr_callback: Optional[Callable] = None,
) -> Tuple[bytes, bytes]:
"""Run a command and get its output.
Run a command using asyncio.subprocess, consume its standard streams, and return its
output in bytes.
:raises RuntimeError: if the process returns a non-zero exit status
:raises TimeoutError: if the process times out
"""
# Start the provided command, and return a handle. The command will run in the
# background.
proc = await asyncio.subprocess.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
assert proc.stdout is not None
assert proc.stderr is not None
# Create asynchronous tasks that will consume the standard streams of the command,
# and call callbacks if necessary.
stdout_task = asyncio.create_task(
self.read_stream(proc.stdout, stdout_callback)
)
stderr_task = asyncio.create_task(
self.read_stream(proc.stderr, stderr_callback)
)
# Wait until the command has finished, for a specific timeout. Then, verify that the
# command has completed successfully. In any other case, raise an exception.
try:
ret = await asyncio.wait_for(proc.wait(), timeout=timeout)
except asyncio.exceptions.TimeoutError:
raise TimeoutError(timeout_message)
if ret != 0:
raise RuntimeError(error_message)
# Wait until the tasks that consume the command's standard streams have exited as
# well, and return their output.
stdout = await stdout_task
stderr = await stderr_task
return (stdout, stderr)
def calculate_timeout(
self, size: float, pages: Optional[float] = None

View file

@ -18,7 +18,7 @@ from typing import Dict, Optional
import magic
from .common import DangerzoneConverter, run_command, running_on_qubes
from .common import DangerzoneConverter, running_on_qubes
class DocumentToPixels(DangerzoneConverter):
@ -189,7 +189,7 @@ class DocumentToPixels(DangerzoneConverter):
"/tmp",
"/tmp/input_file",
]
await run_command(
await self.run_command(
args,
error_message="Conversion to PDF with LibreOffice failed",
timeout_message=(
@ -213,7 +213,7 @@ class DocumentToPixels(DangerzoneConverter):
"/tmp/input_file",
"/tmp/input_file.pdf",
]
await run_command(
await self.run_command(
args,
error_message="Conversion to PDF with GraphicsMagick failed",
timeout_message=(
@ -231,7 +231,7 @@ class DocumentToPixels(DangerzoneConverter):
# Obtain number of pages
self.update_progress("Calculating number of pages")
stdout, _ = await run_command(
stdout, _ = await self.run_command(
["pdfinfo", pdf_filename],
error_message="PDF file is corrupted",
timeout_message=(
@ -317,7 +317,7 @@ class DocumentToPixels(DangerzoneConverter):
page_base = "/tmp/page"
await run_command(
await self.run_command(
[
"pdftoppm",
pdf_filename,
@ -351,7 +351,7 @@ class DocumentToPixels(DangerzoneConverter):
f"/usr/lib/libreoffice/share/extensions/{libreoffice_ext}/",
f"/libreoffice_ext/{libreoffice_ext}",
]
await run_command(
await self.run_command(
unzip_args,
error_message="LibreOffice extension installation failed (unzipping)",
timeout_message="unzipping LibreOffice extension timed out 5 seconds",

View file

@ -84,6 +84,9 @@ async def main() -> None:
rgb_data = rgb_file.read()
await write_bytes(rgb_data)
# Write debug information
await write_bytes(converter.captured_output, file=sys.stderr)
if __name__ == "__main__":
sys.exit(asyncio.run(main()))

View file

@ -12,7 +12,7 @@ import os
import shutil
import sys
from .common import DangerzoneConverter, run_command, running_on_qubes
from .common import DangerzoneConverter, running_on_qubes
class PixelsToPDF(DangerzoneConverter):
@ -47,7 +47,7 @@ class PixelsToPDF(DangerzoneConverter):
self.update_progress(
f"Converting page {page}/{num_pages} from pixels to searchable PDF"
)
await run_command(
await self.run_command(
[
"gm",
"convert",
@ -65,7 +65,7 @@ class PixelsToPDF(DangerzoneConverter):
),
timeout=timeout,
)
await run_command(
await self.run_command(
[
"tesseract",
png_filename,
@ -88,7 +88,7 @@ class PixelsToPDF(DangerzoneConverter):
self.update_progress(
f"Converting page {page}/{num_pages} from pixels to PDF"
)
await run_command(
await self.run_command(
[
"gm",
"convert",
@ -119,7 +119,7 @@ class PixelsToPDF(DangerzoneConverter):
for page in range(1, num_pages + 1):
args.append(f"/tmp/page-{page}.pdf")
args.append(f"/tmp/safe-output.pdf")
await run_command(
await self.run_command(
args,
error_message="Merging pages into a single PDF failed",
timeout_message=(
@ -133,7 +133,7 @@ class PixelsToPDF(DangerzoneConverter):
# Compress
self.update_progress("Compressing PDF")
await run_command(
await self.run_command(
["ps2pdf", "/tmp/safe-output.pdf", "/tmp/safe-output-compressed.pdf"],
error_message="Compressing PDF failed",
timeout_message=(

View file

@ -28,6 +28,8 @@ CONVERTED_FILE_PATH = (
"/tmp/safe-output-compressed.pdf"
)
MAX_CONVERSION_LOG_CHARS = 150 * 50 # up to ~150 lines of 50 characters
def read_bytes(p: subprocess.Popen, buff_size: int) -> bytes:
"""Read bytes from stdout."""
@ -40,6 +42,15 @@ def read_int(p: subprocess.Popen) -> int:
return int.from_bytes(untrusted_int, signed=False)
def read_debug_text(p: subprocess.Popen) -> str:
"""Read arbitrarily long text (for debug purposes)"""
if p.stderr:
untrusted_text = p.stderr.read(MAX_CONVERSION_LOG_CHARS)
return untrusted_text.decode("ascii", errors="replace")
else:
return ""
class Qubes(IsolationProvider):
"""Uses a disposable qube for performing the conversion"""
@ -75,6 +86,7 @@ class Qubes(IsolationProvider):
["/usr/bin/qrexec-client-vm", "@dispvm:dz-dvm", "dz.ConvertDev"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
assert p.stdin is not None
@ -89,6 +101,7 @@ class Qubes(IsolationProvider):
["/usr/bin/qrexec-client-vm", "@dispvm:dz-dvm", "dz.Convert"],
stdin=f,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
n_pages = read_int(p)
@ -126,6 +139,10 @@ class Qubes(IsolationProvider):
text = "Converted document to pixels"
self.print_progress_trusted(document, False, text, percentage)
if getattr(sys, "dangerzone_dev", False):
text = f"Conversion output (doc to pixels):\n{read_debug_text(p)}"
log.info(text)
# FIXME pass OCR stuff properly (see #455)
old_environ = dict(os.environ)
if ocr_lang: