diff --git a/dangerzone/container_utils.py b/dangerzone/container_utils.py index 8882ac5..10ee68d 100644 --- a/dangerzone/container_utils.py +++ b/dangerzone/container_utils.py @@ -1,3 +1,4 @@ +import functools import logging import os import platform @@ -55,9 +56,11 @@ class Runtime(object): return "podman" if platform.system() == "Linux" else "docker" -def subprocess_run(*args, **kwargs) -> subprocess.CompletedProcess: - """subprocess.run with the correct startupinfo for Windows.""" - return subprocess.run(*args, startupinfo=get_subprocess_startupinfo(), **kwargs) +# subprocess.run with the correct startupinfo for Windows. +# We use a partial here to better profit from type checking +subprocess_run = functools.partial( + subprocess.run, startupinfo=get_subprocess_startupinfo() +) def get_runtime_version(runtime: Optional[Runtime] = None) -> Tuple[int, int]: @@ -230,14 +233,14 @@ def get_image_id_by_digest(digest: str) -> str: return process.stdout.decode().strip().split("\n")[0] -def expected_image_name(): +def expected_image_name() -> str: image_name_path = get_resource_path("image-name.txt") return image_name_path.read_text().strip("\n") def container_pull( image: str, manifest_digest: str, callback: Optional[Callable] = None -): +) -> None: """Pull a container image from a registry.""" runtime = Runtime() cmd = [str(runtime.path), "pull", f"{image}@sha256:{manifest_digest}"] @@ -264,12 +267,12 @@ def get_local_image_digest(image: Optional[str] = None) -> str: """ Returns a image hash from a local image name """ - image = image or expected_image_name() + expected_image = image or expected_image_name() # Get the image hash from the "podman images" command. # It's not possible to use "podman inspect" here as it # returns the digest of the architecture-bound image runtime = Runtime() - cmd = [str(runtime.path), "images", image, "--format", "{{.Digest}}"] + cmd = [str(runtime.path), "images", expected_image, "--format", "{{.Digest}}"] log.debug(" ".join(cmd)) try: result = subprocess_run( @@ -285,10 +288,10 @@ def get_local_image_digest(image: Optional[str] = None) -> str: image_digest = lines[0].replace("sha256:", "") if not image_digest: raise errors.ImageNotPresentException( - f"The image {image} does not exist locally" + f"The image {expected_image} does not exist locally" ) return image_digest except subprocess.CalledProcessError as e: raise errors.ImageNotPresentException( - f"The image {image} does not exist locally" + f"The image {expected_image} does not exist locally" ) diff --git a/dangerzone/gui/main_window.py b/dangerzone/gui/main_window.py index f78b6ad..1470efe 100644 --- a/dangerzone/gui/main_window.py +++ b/dangerzone/gui/main_window.py @@ -495,7 +495,7 @@ class TracebackWidget(QTextEdit): self.setPlainText(error) self.setVisible(True) - def process_output(self, line): + def process_output(self, line: str) -> None: self.current_output += line self.setText(self.current_output) cursor = self.textCursor() diff --git a/dangerzone/isolation_provider/base.py b/dangerzone/isolation_provider/base.py index 27d1c09..2d75ebf 100644 --- a/dangerzone/isolation_provider/base.py +++ b/dangerzone/isolation_provider/base.py @@ -95,7 +95,9 @@ class IsolationProvider(ABC): return self.debug or getattr(sys, "dangerzone_dev", False) @abstractmethod - def install(self, should_upgrade: bool, callback: Callable) -> bool: + def install( + self, should_upgrade: bool, callback: Optional[Callable] = None + ) -> bool: pass def convert( diff --git a/dangerzone/isolation_provider/dummy.py b/dangerzone/isolation_provider/dummy.py index a70a4ef..ccae099 100644 --- a/dangerzone/isolation_provider/dummy.py +++ b/dangerzone/isolation_provider/dummy.py @@ -1,6 +1,7 @@ import logging import subprocess import sys +from typing import Callable, Optional from ..conversion.common import DangerzoneConverter from ..document import Document @@ -36,7 +37,9 @@ class Dummy(IsolationProvider): ) super().__init__() - def install(self, *args, **kwargs) -> bool: + def install( + self, should_upgrade: bool, callback: Optional[Callable] = None + ) -> bool: return True @staticmethod diff --git a/dangerzone/isolation_provider/qubes.py b/dangerzone/isolation_provider/qubes.py index defc4e8..7bb191e 100644 --- a/dangerzone/isolation_provider/qubes.py +++ b/dangerzone/isolation_provider/qubes.py @@ -5,7 +5,7 @@ import subprocess import sys import zipfile from pathlib import Path -from typing import IO +from typing import IO, Callable, Optional from ..conversion.common import running_on_qubes from ..document import Document @@ -18,7 +18,9 @@ log = logging.getLogger(__name__) class Qubes(IsolationProvider): """Uses a disposable qube for performing the conversion""" - def install(self, *args, **kwargs) -> bool: + def install( + self, should_upgrade: bool, callback: Optional[Callable] = None + ) -> bool: return True @staticmethod diff --git a/dangerzone/updater/cli.py b/dangerzone/updater/cli.py index 463722e..6818b47 100644 --- a/dangerzone/updater/cli.py +++ b/dangerzone/updater/cli.py @@ -2,11 +2,12 @@ import functools import logging +from pathlib import Path import click from .. import container_utils -from ..container_utils import get_runtime_name +from ..container_utils import Runtime from . import attestations, errors, log, registry, signatures DEFAULT_REPOSITORY = "freedomofpress/dangerzone" @@ -16,7 +17,6 @@ DEFAULT_IMAGE_NAME = "ghcr.io/freedomofpress/dangerzone/dangerzone" @click.group() @click.option("--debug", is_flag=True) -@click.option("--runtime", default=get_runtime_name()) def main(debug: bool, runtime: str) -> None: if debug: click.echo("Debug mode enabled") @@ -25,15 +25,13 @@ def main(debug: bool, runtime: str) -> None: level = logging.INFO logging.basicConfig(level=level) - if runtime != get_runtime_name(): - click.echo(f"Using container runtime: {runtime}") - container_utils.RUNTIME_NAME = runtime - @main.command() @click.argument("image", default=DEFAULT_IMAGE_NAME) -@click.option("--pubkey", default=signatures.DEFAULT_PUBKEY_LOCATION) -def upgrade(image: str, pubkey: str) -> None: +@click.option( + "--pubkey", default=signatures.DEFAULT_PUBKEY_LOCATION, type=click.Path(exists=True) +) +def upgrade(image: str, pubkey: Path) -> None: """Upgrade the image to the latest signed version.""" manifest_digest = registry.get_manifest_digest(image) @@ -54,8 +52,10 @@ def upgrade(image: str, pubkey: str) -> None: @main.command() @click.argument("image", default=DEFAULT_IMAGE_NAME) -@click.option("--pubkey", default=signatures.DEFAULT_PUBKEY_LOCATION) -def store_signatures(image: str, pubkey: str) -> None: +@click.option( + "--pubkey", default=signatures.DEFAULT_PUBKEY_LOCATION, type=click.Path(exists=True) +) +def store_signatures(image: str, pubkey: Path) -> None: manifest_digest = registry.get_manifest_digest(image) sigs = signatures.get_remote_signatures(image, manifest_digest) signatures.verify_signatures(sigs, manifest_digest, pubkey) @@ -64,17 +64,19 @@ def store_signatures(image: str, pubkey: str) -> None: @main.command() -@click.argument("image_filename") -@click.option("--pubkey", default=signatures.DEFAULT_PUBKEY_LOCATION) +@click.argument("image_filename", type=click.Path(exists=True)) +@click.option( + "--pubkey", default=signatures.DEFAULT_PUBKEY_LOCATION, type=click.Path(exists=True) +) @click.option("--force", is_flag=True) -def load_archive(image_filename: str, pubkey: str, force: bool) -> None: +def load_archive(image_filename: Path, pubkey: Path, force: bool) -> None: """Upgrade the local image to the one in the archive.""" try: loaded_image = signatures.upgrade_container_image_airgapped( image_filename, pubkey, bypass_logindex=force ) click.echo( - f"✅ Installed image {image_filename} on the system as {loaded_image}" + f"✅ Installed image {str(image_filename)} on the system as {loaded_image}" ) except errors.ImageAlreadyUpToDate as e: click.echo(f"✅ {e}") @@ -97,8 +99,10 @@ def prepare_archive(image: str, output: str) -> None: @main.command() @click.argument("image", default=DEFAULT_IMAGE_NAME) -@click.option("--pubkey", default=signatures.DEFAULT_PUBKEY_LOCATION) -def verify_local(image: str, pubkey: str) -> None: +@click.option( + "--pubkey", default=signatures.DEFAULT_PUBKEY_LOCATION, type=click.Path(exists=True) +) +def verify_local(image: str, pubkey: Path) -> None: """ Verify the local image signature against a public key and the stored signatures. """ diff --git a/dangerzone/updater/cosign.py b/dangerzone/updater/cosign.py index 9abcc84..2f7c4f3 100644 --- a/dangerzone/updater/cosign.py +++ b/dangerzone/updater/cosign.py @@ -1,4 +1,5 @@ import subprocess +from pathlib import Path from . import errors, log @@ -10,7 +11,7 @@ def ensure_installed() -> None: raise errors.CosignNotInstalledError() -def verify_local_image(oci_image_folder: str, pubkey: str) -> bool: +def verify_local_image(oci_image_folder: str, pubkey: Path) -> bool: """Verify the given path against the given public key""" ensure_installed() @@ -18,7 +19,7 @@ def verify_local_image(oci_image_folder: str, pubkey: str) -> bool: "cosign", "verify", "--key", - pubkey, + str(pubkey), "--offline", "--local-image", oci_image_folder, diff --git a/dangerzone/updater/releases.py b/dangerzone/updater/releases.py index 7fc842b..2c9c773 100644 --- a/dangerzone/updater/releases.py +++ b/dangerzone/updater/releases.py @@ -42,7 +42,7 @@ def _get_now_timestamp() -> int: return int(time.time()) -def _should_postpone_update_check(settings) -> bool: +def _should_postpone_update_check(settings: Settings) -> bool: """Consult and update cooldown timer. If the previous check happened before the cooldown period expires, do not check @@ -141,7 +141,7 @@ def should_check_for_releases(settings: Settings) -> bool: return True -def check_for_updates(settings) -> UpdateReport: +def check_for_updates(settings: Settings) -> UpdateReport: """Check for updates locally and remotely. Check for updates (locally and remotely) and return a report with the findings: diff --git a/dangerzone/updater/signatures.py b/dangerzone/updater/signatures.py index ab0d900..3fad8a4 100644 --- a/dangerzone/updater/signatures.py +++ b/dangerzone/updater/signatures.py @@ -30,8 +30,8 @@ def appdata_dir() -> Path: # to ensures the software can't upgrade to container images that predates it. DEFAULT_LOG_INDEX = 0 -# XXX Store this somewhere else. -DEFAULT_PUBKEY_LOCATION = str(get_resource_path("freedomofpress-dangerzone-pub.key")) +# FIXME Store this somewhere else. +DEFAULT_PUBKEY_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key") SIGNATURES_PATH = appdata_dir() / "signatures" LAST_LOG_INDEX = SIGNATURES_PATH / "last_log_index" @@ -65,7 +65,7 @@ def signature_to_bundle(sig: Dict) -> Dict: } -def verify_signature(signature: dict, image_digest: str, pubkey: str | Path) -> None: +def verify_signature(signature: dict, image_digest: str, pubkey: Path) -> None: """ Verifies that: @@ -102,9 +102,6 @@ def verify_signature(signature: dict, image_digest: str, pubkey: str | Path) -> payload_file.write(payload_bytes) payload_file.flush() - if isinstance(pubkey, str): - pubkey = Path(pubkey) - cmd = [ "cosign", "verify-blob", @@ -136,7 +133,7 @@ class Signature: return full_digest.replace("sha256:", "") -def is_update_available(image_str: str, pubkey: str) -> Tuple[bool, Optional[str]]: +def is_update_available(image_str: str, pubkey: Path) -> Tuple[bool, Optional[str]]: """ Check if a new image is available, doing all the necessary checks ensuring it would be safe to upgrade. @@ -155,7 +152,7 @@ def is_update_available(image_str: str, pubkey: str) -> Tuple[bool, Optional[str def check_signatures_and_logindex( - image_str: str, remote_digest: str, pubkey: str + image_str: str, remote_digest: str, pubkey: Path ) -> list[Dict]: signatures = get_remote_signatures(image_str, remote_digest) verify_signatures(signatures, remote_digest, pubkey) @@ -174,7 +171,7 @@ def check_signatures_and_logindex( def verify_signatures( signatures: List[Dict], image_digest: str, - pubkey: str, + pubkey: Path, ) -> bool: if len(signatures) < 1: raise errors.SignatureVerificationError("No signatures found") @@ -217,7 +214,7 @@ def _get_blob(tmpdir: str, digest: str) -> Path: def upgrade_container_image_airgapped( - container_tar: str, pubkey: str, bypass_logindex: bool = False + container_tar: Path, pubkey: Path, bypass_logindex: bool = False ) -> str: """ Verify the given archive against its self-contained signatures, then @@ -287,7 +284,7 @@ def upgrade_container_image_airgapped( archive.add(Path(tmpdir) / "oci-layout", arcname="oci-layout") archive.add(Path(tmpdir) / "blobs", arcname="blobs") - runtime.load_image_tarball(temporary_tar.name) + runtime.load_image_tarball(Path(temporary_tar.name)) runtime.tag_image_by_digest(image_digest, image_name) store_signatures(signatures, image_digest, pubkey) @@ -329,12 +326,14 @@ def convert_oci_images_signatures( return image_name, signatures -def get_file_digest(file: Optional[str] = None, content: Optional[bytes] = None) -> str: +def get_file_digest( + path: Optional[Path] = None, content: Optional[bytes] = None +) -> str: """Get the sha256 digest of a file or content""" - if not file and not content: + if not path and not content: raise errors.UpdaterError("No file or content provided") - if file: - with open(file, "rb") as f: + if path: + with path.open("rb") as f: content = f.read() if content: return sha256(content).hexdigest() @@ -343,7 +342,7 @@ def get_file_digest(file: Optional[str] = None, content: Optional[bytes] = None) def load_and_verify_signatures( image_digest: str, - pubkey: str, + pubkey: Path, bypass_verification: bool = False, signatures_path: Optional[Path] = None, ) -> List[Dict]: @@ -383,7 +382,10 @@ def load_and_verify_signatures( def store_signatures( - signatures: list[Dict], image_digest: str, pubkey: str, update_logindex: bool = True + signatures: list[Dict], + image_digest: str, + pubkey: Path, + update_logindex: bool = True, ) -> None: """ Store signatures locally in the SIGNATURE_PATH folder, like this: @@ -434,12 +436,13 @@ def store_signatures( def verify_local_image( - image: Optional[str] = None, pubkey: str = DEFAULT_PUBKEY_LOCATION + image: Optional[str] = None, pubkey: Path = DEFAULT_PUBKEY_LOCATION ) -> bool: """ Verifies that a local image has a valid signature """ - image = image or runtime.expected_image_name() + if image is None: + image = runtime.expected_image_name() log.info(f"Verifying local image {image} against pubkey {pubkey}") try: image_digest = runtime.get_local_image_digest(image) @@ -500,7 +503,7 @@ def prepare_airgapped_archive(image_name: str, destination: str) -> None: def upgrade_container_image( manifest_digest: str, image: Optional[str] = None, - pubkey: Optional[str] = DEFAULT_PUBKEY_LOCATION, + pubkey: Path = DEFAULT_PUBKEY_LOCATION, callback: Optional[Callable] = None, ) -> str: """Verify and upgrade the image to the latest, if signed.""" @@ -518,7 +521,7 @@ def upgrade_container_image( def install_local_container_tar( - pubkey: Optional[str] = DEFAULT_PUBKEY_LOCATION, + pubkey: Path = DEFAULT_PUBKEY_LOCATION, ) -> None: tarball_path = get_resource_path("container.tar") log.debug("Installing container image %s", tarball_path)