Fix types and use pathlib.Path for files

This commit is contained in:
Alexis Métaireau 2025-04-29 12:46:30 +02:00
parent bcbcf150fc
commit e5d091d268
No known key found for this signature in database
GPG key ID: C65C7A89A8FFC56E
9 changed files with 73 additions and 55 deletions

View file

@ -1,3 +1,4 @@
import functools
import logging
import os
import platform
@ -55,9 +56,11 @@ class Runtime(object):
return "podman" if platform.system() == "Linux" else "docker"
def subprocess_run(*args, **kwargs) -> subprocess.CompletedProcess:
"""subprocess.run with the correct startupinfo for Windows."""
return subprocess.run(*args, startupinfo=get_subprocess_startupinfo(), **kwargs)
# subprocess.run with the correct startupinfo for Windows.
# We use a partial here to better profit from type checking
subprocess_run = functools.partial(
subprocess.run, startupinfo=get_subprocess_startupinfo()
)
def get_runtime_version(runtime: Optional[Runtime] = None) -> Tuple[int, int]:
@ -230,14 +233,14 @@ def get_image_id_by_digest(digest: str) -> str:
return process.stdout.decode().strip().split("\n")[0]
def expected_image_name():
def expected_image_name() -> str:
image_name_path = get_resource_path("image-name.txt")
return image_name_path.read_text().strip("\n")
def container_pull(
image: str, manifest_digest: str, callback: Optional[Callable] = None
):
) -> None:
"""Pull a container image from a registry."""
runtime = Runtime()
cmd = [str(runtime.path), "pull", f"{image}@sha256:{manifest_digest}"]
@ -264,12 +267,12 @@ def get_local_image_digest(image: Optional[str] = None) -> str:
"""
Returns a image hash from a local image name
"""
image = image or expected_image_name()
expected_image = image or expected_image_name()
# Get the image hash from the "podman images" command.
# It's not possible to use "podman inspect" here as it
# returns the digest of the architecture-bound image
runtime = Runtime()
cmd = [str(runtime.path), "images", image, "--format", "{{.Digest}}"]
cmd = [str(runtime.path), "images", expected_image, "--format", "{{.Digest}}"]
log.debug(" ".join(cmd))
try:
result = subprocess_run(
@ -285,10 +288,10 @@ def get_local_image_digest(image: Optional[str] = None) -> str:
image_digest = lines[0].replace("sha256:", "")
if not image_digest:
raise errors.ImageNotPresentException(
f"The image {image} does not exist locally"
f"The image {expected_image} does not exist locally"
)
return image_digest
except subprocess.CalledProcessError as e:
raise errors.ImageNotPresentException(
f"The image {image} does not exist locally"
f"The image {expected_image} does not exist locally"
)

View file

@ -495,7 +495,7 @@ class TracebackWidget(QTextEdit):
self.setPlainText(error)
self.setVisible(True)
def process_output(self, line):
def process_output(self, line: str) -> None:
self.current_output += line
self.setText(self.current_output)
cursor = self.textCursor()

View file

@ -95,7 +95,9 @@ class IsolationProvider(ABC):
return self.debug or getattr(sys, "dangerzone_dev", False)
@abstractmethod
def install(self, should_upgrade: bool, callback: Callable) -> bool:
def install(
self, should_upgrade: bool, callback: Optional[Callable] = None
) -> bool:
pass
def convert(

View file

@ -1,6 +1,7 @@
import logging
import subprocess
import sys
from typing import Callable, Optional
from ..conversion.common import DangerzoneConverter
from ..document import Document
@ -36,7 +37,9 @@ class Dummy(IsolationProvider):
)
super().__init__()
def install(self, *args, **kwargs) -> bool:
def install(
self, should_upgrade: bool, callback: Optional[Callable] = None
) -> bool:
return True
@staticmethod

View file

@ -5,7 +5,7 @@ import subprocess
import sys
import zipfile
from pathlib import Path
from typing import IO
from typing import IO, Callable, Optional
from ..conversion.common import running_on_qubes
from ..document import Document
@ -18,7 +18,9 @@ log = logging.getLogger(__name__)
class Qubes(IsolationProvider):
"""Uses a disposable qube for performing the conversion"""
def install(self, *args, **kwargs) -> bool:
def install(
self, should_upgrade: bool, callback: Optional[Callable] = None
) -> bool:
return True
@staticmethod

View file

@ -2,11 +2,12 @@
import functools
import logging
from pathlib import Path
import click
from .. import container_utils
from ..container_utils import get_runtime_name
from ..container_utils import Runtime
from . import attestations, errors, log, registry, signatures
DEFAULT_REPOSITORY = "freedomofpress/dangerzone"
@ -16,7 +17,6 @@ DEFAULT_IMAGE_NAME = "ghcr.io/freedomofpress/dangerzone/dangerzone"
@click.group()
@click.option("--debug", is_flag=True)
@click.option("--runtime", default=get_runtime_name())
def main(debug: bool, runtime: str) -> None:
if debug:
click.echo("Debug mode enabled")
@ -25,15 +25,13 @@ def main(debug: bool, runtime: str) -> None:
level = logging.INFO
logging.basicConfig(level=level)
if runtime != get_runtime_name():
click.echo(f"Using container runtime: {runtime}")
container_utils.RUNTIME_NAME = runtime
@main.command()
@click.argument("image", default=DEFAULT_IMAGE_NAME)
@click.option("--pubkey", default=signatures.DEFAULT_PUBKEY_LOCATION)
def upgrade(image: str, pubkey: str) -> None:
@click.option(
"--pubkey", default=signatures.DEFAULT_PUBKEY_LOCATION, type=click.Path(exists=True)
)
def upgrade(image: str, pubkey: Path) -> None:
"""Upgrade the image to the latest signed version."""
manifest_digest = registry.get_manifest_digest(image)
@ -54,8 +52,10 @@ def upgrade(image: str, pubkey: str) -> None:
@main.command()
@click.argument("image", default=DEFAULT_IMAGE_NAME)
@click.option("--pubkey", default=signatures.DEFAULT_PUBKEY_LOCATION)
def store_signatures(image: str, pubkey: str) -> None:
@click.option(
"--pubkey", default=signatures.DEFAULT_PUBKEY_LOCATION, type=click.Path(exists=True)
)
def store_signatures(image: str, pubkey: Path) -> None:
manifest_digest = registry.get_manifest_digest(image)
sigs = signatures.get_remote_signatures(image, manifest_digest)
signatures.verify_signatures(sigs, manifest_digest, pubkey)
@ -64,17 +64,19 @@ def store_signatures(image: str, pubkey: str) -> None:
@main.command()
@click.argument("image_filename")
@click.option("--pubkey", default=signatures.DEFAULT_PUBKEY_LOCATION)
@click.argument("image_filename", type=click.Path(exists=True))
@click.option(
"--pubkey", default=signatures.DEFAULT_PUBKEY_LOCATION, type=click.Path(exists=True)
)
@click.option("--force", is_flag=True)
def load_archive(image_filename: str, pubkey: str, force: bool) -> None:
def load_archive(image_filename: Path, pubkey: Path, force: bool) -> None:
"""Upgrade the local image to the one in the archive."""
try:
loaded_image = signatures.upgrade_container_image_airgapped(
image_filename, pubkey, bypass_logindex=force
)
click.echo(
f"✅ Installed image {image_filename} on the system as {loaded_image}"
f"✅ Installed image {str(image_filename)} on the system as {loaded_image}"
)
except errors.ImageAlreadyUpToDate as e:
click.echo(f"{e}")
@ -97,8 +99,10 @@ def prepare_archive(image: str, output: str) -> None:
@main.command()
@click.argument("image", default=DEFAULT_IMAGE_NAME)
@click.option("--pubkey", default=signatures.DEFAULT_PUBKEY_LOCATION)
def verify_local(image: str, pubkey: str) -> None:
@click.option(
"--pubkey", default=signatures.DEFAULT_PUBKEY_LOCATION, type=click.Path(exists=True)
)
def verify_local(image: str, pubkey: Path) -> None:
"""
Verify the local image signature against a public key and the stored signatures.
"""

View file

@ -1,4 +1,5 @@
import subprocess
from pathlib import Path
from . import errors, log
@ -10,7 +11,7 @@ def ensure_installed() -> None:
raise errors.CosignNotInstalledError()
def verify_local_image(oci_image_folder: str, pubkey: str) -> bool:
def verify_local_image(oci_image_folder: str, pubkey: Path) -> bool:
"""Verify the given path against the given public key"""
ensure_installed()
@ -18,7 +19,7 @@ def verify_local_image(oci_image_folder: str, pubkey: str) -> bool:
"cosign",
"verify",
"--key",
pubkey,
str(pubkey),
"--offline",
"--local-image",
oci_image_folder,

View file

@ -42,7 +42,7 @@ def _get_now_timestamp() -> int:
return int(time.time())
def _should_postpone_update_check(settings) -> bool:
def _should_postpone_update_check(settings: Settings) -> bool:
"""Consult and update cooldown timer.
If the previous check happened before the cooldown period expires, do not check
@ -141,7 +141,7 @@ def should_check_for_releases(settings: Settings) -> bool:
return True
def check_for_updates(settings) -> UpdateReport:
def check_for_updates(settings: Settings) -> UpdateReport:
"""Check for updates locally and remotely.
Check for updates (locally and remotely) and return a report with the findings:

View file

@ -30,8 +30,8 @@ def appdata_dir() -> Path:
# to ensures the software can't upgrade to container images that predates it.
DEFAULT_LOG_INDEX = 0
# XXX Store this somewhere else.
DEFAULT_PUBKEY_LOCATION = str(get_resource_path("freedomofpress-dangerzone-pub.key"))
# FIXME Store this somewhere else.
DEFAULT_PUBKEY_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key")
SIGNATURES_PATH = appdata_dir() / "signatures"
LAST_LOG_INDEX = SIGNATURES_PATH / "last_log_index"
@ -65,7 +65,7 @@ def signature_to_bundle(sig: Dict) -> Dict:
}
def verify_signature(signature: dict, image_digest: str, pubkey: str | Path) -> None:
def verify_signature(signature: dict, image_digest: str, pubkey: Path) -> None:
"""
Verifies that:
@ -102,9 +102,6 @@ def verify_signature(signature: dict, image_digest: str, pubkey: str | Path) ->
payload_file.write(payload_bytes)
payload_file.flush()
if isinstance(pubkey, str):
pubkey = Path(pubkey)
cmd = [
"cosign",
"verify-blob",
@ -136,7 +133,7 @@ class Signature:
return full_digest.replace("sha256:", "")
def is_update_available(image_str: str, pubkey: str) -> Tuple[bool, Optional[str]]:
def is_update_available(image_str: str, pubkey: Path) -> Tuple[bool, Optional[str]]:
"""
Check if a new image is available, doing all the necessary checks ensuring it
would be safe to upgrade.
@ -155,7 +152,7 @@ def is_update_available(image_str: str, pubkey: str) -> Tuple[bool, Optional[str
def check_signatures_and_logindex(
image_str: str, remote_digest: str, pubkey: str
image_str: str, remote_digest: str, pubkey: Path
) -> list[Dict]:
signatures = get_remote_signatures(image_str, remote_digest)
verify_signatures(signatures, remote_digest, pubkey)
@ -174,7 +171,7 @@ def check_signatures_and_logindex(
def verify_signatures(
signatures: List[Dict],
image_digest: str,
pubkey: str,
pubkey: Path,
) -> bool:
if len(signatures) < 1:
raise errors.SignatureVerificationError("No signatures found")
@ -217,7 +214,7 @@ def _get_blob(tmpdir: str, digest: str) -> Path:
def upgrade_container_image_airgapped(
container_tar: str, pubkey: str, bypass_logindex: bool = False
container_tar: Path, pubkey: Path, bypass_logindex: bool = False
) -> str:
"""
Verify the given archive against its self-contained signatures, then
@ -287,7 +284,7 @@ def upgrade_container_image_airgapped(
archive.add(Path(tmpdir) / "oci-layout", arcname="oci-layout")
archive.add(Path(tmpdir) / "blobs", arcname="blobs")
runtime.load_image_tarball(temporary_tar.name)
runtime.load_image_tarball(Path(temporary_tar.name))
runtime.tag_image_by_digest(image_digest, image_name)
store_signatures(signatures, image_digest, pubkey)
@ -329,12 +326,14 @@ def convert_oci_images_signatures(
return image_name, signatures
def get_file_digest(file: Optional[str] = None, content: Optional[bytes] = None) -> str:
def get_file_digest(
path: Optional[Path] = None, content: Optional[bytes] = None
) -> str:
"""Get the sha256 digest of a file or content"""
if not file and not content:
if not path and not content:
raise errors.UpdaterError("No file or content provided")
if file:
with open(file, "rb") as f:
if path:
with path.open("rb") as f:
content = f.read()
if content:
return sha256(content).hexdigest()
@ -343,7 +342,7 @@ def get_file_digest(file: Optional[str] = None, content: Optional[bytes] = None)
def load_and_verify_signatures(
image_digest: str,
pubkey: str,
pubkey: Path,
bypass_verification: bool = False,
signatures_path: Optional[Path] = None,
) -> List[Dict]:
@ -383,7 +382,10 @@ def load_and_verify_signatures(
def store_signatures(
signatures: list[Dict], image_digest: str, pubkey: str, update_logindex: bool = True
signatures: list[Dict],
image_digest: str,
pubkey: Path,
update_logindex: bool = True,
) -> None:
"""
Store signatures locally in the SIGNATURE_PATH folder, like this:
@ -434,12 +436,13 @@ def store_signatures(
def verify_local_image(
image: Optional[str] = None, pubkey: str = DEFAULT_PUBKEY_LOCATION
image: Optional[str] = None, pubkey: Path = DEFAULT_PUBKEY_LOCATION
) -> bool:
"""
Verifies that a local image has a valid signature
"""
image = image or runtime.expected_image_name()
if image is None:
image = runtime.expected_image_name()
log.info(f"Verifying local image {image} against pubkey {pubkey}")
try:
image_digest = runtime.get_local_image_digest(image)
@ -500,7 +503,7 @@ def prepare_airgapped_archive(image_name: str, destination: str) -> None:
def upgrade_container_image(
manifest_digest: str,
image: Optional[str] = None,
pubkey: Optional[str] = DEFAULT_PUBKEY_LOCATION,
pubkey: Path = DEFAULT_PUBKEY_LOCATION,
callback: Optional[Callable] = None,
) -> str:
"""Verify and upgrade the image to the latest, if signed."""
@ -518,7 +521,7 @@ def upgrade_container_image(
def install_local_container_tar(
pubkey: Optional[str] = DEFAULT_PUBKEY_LOCATION,
pubkey: Path = DEFAULT_PUBKEY_LOCATION,
) -> None:
tarball_path = get_resource_path("container.tar")
log.debug("Installing container image %s", tarball_path)