Compare commits

..

1 commit

Author SHA1 Message Date
21e726e901
Merge 760948b5b5 into a6aa66f925 2025-02-25 17:20:31 +01:00
7 changed files with 86 additions and 128 deletions

View file

@ -14,11 +14,6 @@ CONTAINER_NAME = "ghcr.io/freedomofpress/dangerzone/dangerzone"
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def subprocess_run(*args, **kwargs) -> subprocess.CompletedProcess:
"""subprocess.run with the correct startupinfo for Windows."""
return subprocess.run(*args, startupinfo=get_subprocess_startupinfo(), **kwargs)
def get_runtime_name() -> str: def get_runtime_name() -> str:
if platform.system() == "Linux": if platform.system() == "Linux":
return "podman" return "podman"
@ -44,8 +39,9 @@ def get_runtime_version() -> Tuple[int, int]:
cmd = [runtime, "version", "-f", query] cmd = [runtime, "version", "-f", query]
try: try:
version = subprocess_run( version = subprocess.run(
cmd, cmd,
startupinfo=get_subprocess_startupinfo(),
capture_output=True, capture_output=True,
check=True, check=True,
).stdout.decode() ).stdout.decode()
@ -148,7 +144,8 @@ def load_image_tarball_from_gzip() -> None:
def load_image_tarball_from_tar(tarball_path: str) -> None: def load_image_tarball_from_tar(tarball_path: str) -> None:
cmd = [get_runtime(), "load", "-i", tarball_path] cmd = [get_runtime(), "load", "-i", tarball_path]
subprocess_run(cmd, check=True) subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
log.info("Successfully installed container image from %s", tarball_path) log.info("Successfully installed container image from %s", tarball_path)
@ -159,7 +156,7 @@ def tag_image_by_digest(digest: str, tag: str) -> None:
image_id = get_image_id_by_digest(digest) image_id = get_image_id_by_digest(digest)
cmd = [get_runtime(), "tag", image_id, tag] cmd = [get_runtime(), "tag", image_id, tag]
log.debug(" ".join(cmd)) log.debug(" ".join(cmd))
subprocess_run(cmd, check=True) subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
def get_image_id_by_digest(digest: str) -> str: def get_image_id_by_digest(digest: str) -> str:
@ -175,37 +172,31 @@ def get_image_id_by_digest(digest: str) -> str:
"{{.Id}}", "{{.Id}}",
] ]
log.debug(" ".join(cmd)) log.debug(" ".join(cmd))
process = subprocess_run(cmd, check=True, capture_output=True) process = subprocess.run(
cmd, startupinfo=get_subprocess_startupinfo(), check=True, capture_output=True
)
# In case we have multiple lines, we only want the first one. # In case we have multiple lines, we only want the first one.
return process.stdout.decode().strip().split("\n")[0] return process.stdout.decode().strip().split("\n")[0]
def container_pull(image: str, manifest_digest: str): def container_pull(image: str) -> bool:
"""Pull a container image from a registry.""" """Pull a container image from a registry."""
cmd = [get_runtime_name(), "pull", f"{image}@sha256:{manifest_digest}"] cmd = [get_runtime_name(), "pull", f"{image}"]
try: process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
subprocess_run(cmd, check=True) process.communicate()
except subprocess.CalledProcessError as e: return process.returncode == 0
raise errors.ContainerPullException(
f"Could not pull the container image: {e}"
) from e
def get_local_image_digest(image: str) -> str: def get_local_image_digest(image: str) -> str:
""" """
Returns a image hash from a local image name Returns a image hash from a local image name
""" """
# Get the image hash from the "podman images" command. # Get the image hash from the podman images command, as
# It's not possible to use "podman inspect" here as it # podman inspect returns a the digest of the architecture-bound image
# returns the digest of the architecture-bound image
cmd = [get_runtime_name(), "images", image, "--format", "{{.Digest}}"] cmd = [get_runtime_name(), "images", image, "--format", "{{.Digest}}"]
log.debug(" ".join(cmd)) log.debug(" ".join(cmd))
try: try:
result = subprocess_run( result = subprocess.run(cmd, capture_output=True, check=True)
cmd,
capture_output=True,
check=True,
)
lines = result.stdout.decode().strip().split("\n") lines = result.stdout.decode().strip().split("\n")
if len(lines) != 1: if len(lines) != 1:
raise errors.MultipleImagesFoundException( raise errors.MultipleImagesFoundException(

View file

@ -122,33 +122,25 @@ def handle_document_errors(func: F) -> F:
#### Container-related errors #### Container-related errors
class ContainerException(Exception): class ImageNotPresentException(Exception):
pass pass
class ImageNotPresentException(ContainerException): class MultipleImagesFoundException(Exception):
pass pass
class MultipleImagesFoundException(ContainerException): class ImageInstallationException(Exception):
pass pass
class ImageInstallationException(ContainerException): class NoContainerTechException(Exception):
pass
class NoContainerTechException(ContainerException):
def __init__(self, container_tech: str) -> None: def __init__(self, container_tech: str) -> None:
super().__init__(f"{container_tech} is not installed") super().__init__(f"{container_tech} is not installed")
class NotAvailableContainerTechException(ContainerException): class NotAvailableContainerTechException(Exception):
def __init__(self, container_tech: str, error: str) -> None: def __init__(self, container_tech: str, error: str) -> None:
self.error = error self.error = error
self.container_tech = container_tech self.container_tech = container_tech
super().__init__(f"{container_tech} is not available") super().__init__(f"{container_tech} is not available")
class ContainerPullException(ContainerException):
pass

View file

@ -29,17 +29,15 @@ def upgrade(image: str, pubkey: str) -> None:
"""Upgrade the image to the latest signed version.""" """Upgrade the image to the latest signed version."""
manifest_digest = registry.get_manifest_digest(image) manifest_digest = registry.get_manifest_digest(image)
try: try:
signatures.upgrade_container_image(image, manifest_digest, pubkey) is_upgraded = signatures.upgrade_container_image(image, manifest_digest, pubkey)
click.echo(f"✅ The local image {image} has been upgraded") if is_upgraded:
click.echo(f"✅ The image has been signed with {pubkey}") click.echo(f"✅ The local image {image} has been upgraded")
click.echo(f"✅ Signatures has been verified and stored locally") click.echo(f"✅ The image has been signed with {pubkey}")
click.echo(f"✅ Signatures has been verified and stored locally")
except errors.ImageAlreadyUpToDate as e: except errors.ImageAlreadyUpToDate as e:
click.echo(f"{e}") click.echo(f"{e}")
raise click.Abort() raise click.Abort()
except Exception as e:
click.echo(f"{e}")
raise click.Abort()
@main.command() @main.command()

View file

@ -5,8 +5,6 @@ from typing import Dict, Optional, Tuple
import requests import requests
from .. import container_utils as runtime
from .. import errors as dzerrors
from . import errors, log from . import errors, log
__all__ = [ __all__ = [
@ -116,24 +114,3 @@ def get_manifest_digest(
tag_manifest_content = get_manifest(image_str).content tag_manifest_content = get_manifest(image_str).content
return sha256(tag_manifest_content).hexdigest() return sha256(tag_manifest_content).hexdigest()
def is_new_remote_image_available(image_str: str) -> Tuple[bool, str]:
"""
Check if a new remote image is available on the registry.
"""
remote_digest = get_manifest_digest(image_str)
image = parse_image_location(image_str)
if image.digest:
local_digest = image.digest
else:
try:
local_digest = runtime.get_local_image_digest(image_str)
except dzerrors.ImageNotPresentException:
log.debug("No local image found")
return True, remote_digest
log.debug("Remote digest: %s", remote_digest)
log.debug("Local digest: %s", local_digest)
return (remote_digest != local_digest, remote_digest)

View file

@ -22,17 +22,13 @@ except ImportError:
import appdirs as platformdirs # type: ignore[no-redef] import appdirs as platformdirs # type: ignore[no-redef]
def appdata_dir() -> Path: def get_config_dir() -> Path:
return Path(platformdirs.user_data_dir("dangerzone")) return Path(platformdirs.user_config_dir("dangerzone"))
# RELEASE: Bump this value to the log index of the latest signature
# to ensures the software can't upgrade to container images that predates it.
DEFAULT_LOG_INDEX = 0
# XXX Store this somewhere else. # XXX Store this somewhere else.
DEFAULT_PUBKEY_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key") DEFAULT_PUBKEY_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key")
SIGNATURES_PATH = appdata_dir() / "signatures" SIGNATURES_PATH = get_config_dir() / "signatures"
LAST_LOG_INDEX = SIGNATURES_PATH / "last_log_index" LAST_LOG_INDEX = SIGNATURES_PATH / "last_log_index"
__all__ = [ __all__ = [
@ -65,14 +61,9 @@ def signature_to_bundle(sig: Dict) -> Dict:
} }
def verify_signature(signature: dict, image_digest: str, pubkey: str | Path) -> None: def verify_signature(signature: dict, image_digest: str, pubkey: str | Path) -> bool:
""" """Verify a signature against a given public key"""
Verifies that: # XXX - Also verfy the identity/docker-reference field against the expected value
- the signature has been signed by the given public key
- the signature matches the given image digest
"""
# XXX - Also verify the identity/docker-reference field against the expected value
# e.g. ghcr.io/freedomofpress/dangerzone/dangerzone # e.g. ghcr.io/freedomofpress/dangerzone/dangerzone
cosign.ensure_installed() cosign.ensure_installed()
@ -88,8 +79,7 @@ def verify_signature(signature: dict, image_digest: str, pubkey: str | Path) ->
) )
if payload_digest != f"sha256:{image_digest}": if payload_digest != f"sha256:{image_digest}":
raise errors.SignatureMismatch( raise errors.SignatureMismatch(
"The given signature does not match the expected image digest " f"The signature does not match the image digest ({payload_digest}, {image_digest})"
f"({payload_digest}, {image_digest})"
) )
with ( with (
@ -116,10 +106,14 @@ def verify_signature(signature: dict, image_digest: str, pubkey: str | Path) ->
] ]
log.debug(" ".join(cmd)) log.debug(" ".join(cmd))
result = subprocess.run(cmd, capture_output=True) result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0 or result.stderr != b"Verified OK\n": if result.returncode != 0:
# XXX Raise instead?
log.debug("Failed to verify signature", result.stderr) log.debug("Failed to verify signature", result.stderr)
raise errors.SignatureVerificationError("Failed to verify signature") raise errors.SignatureVerificationError("Failed to verify signature")
log.debug("Signature verified") if result.stderr == b"Verified OK\n":
log.debug("Signature verified")
return True
return False
class Signature: class Signature:
@ -136,39 +130,19 @@ class Signature:
return full_digest.replace("sha256:", "") return full_digest.replace("sha256:", "")
def is_update_available(image_str: str, pubkey: str) -> Tuple[bool, Optional[str]]: def is_update_available(image: str) -> Tuple[bool, Optional[str]]:
""" remote_digest = registry.get_manifest_digest(image)
Check if a new image is available, doing all the necessary checks ensuring it
would be safe to upgrade.
"""
new_image_available, remote_digest = registry.is_new_remote_image_available(
image_str
)
if not new_image_available:
return False, None
try: try:
check_signatures_and_logindex(image_str, remote_digest, pubkey) local_digest = runtime.get_local_image_digest(image)
except dzerrors.ImageNotPresentException:
log.debug("No local image found")
return True, remote_digest return True, remote_digest
except errors.InvalidLogIndex: log.debug("Remote digest: %s", remote_digest)
return False, None log.debug("Local digest: %s", local_digest)
has_update = remote_digest != local_digest
if has_update:
def check_signatures_and_logindex( return True, remote_digest
image_str: str, remote_digest: str, pubkey: str return False, None
) -> list[Dict]:
signatures = get_remote_signatures(image_str, remote_digest)
verify_signatures(signatures, remote_digest, pubkey)
incoming_log_index = get_log_index_from_signatures(signatures)
last_log_index = get_last_log_index()
if incoming_log_index < last_log_index:
raise errors.InvalidLogIndex(
f"The incoming log index ({incoming_log_index}) is "
f"lower than the last known log index ({last_log_index})"
)
return signatures
def verify_signatures( def verify_signatures(
@ -180,14 +154,17 @@ def verify_signatures(
raise errors.SignatureVerificationError("No signatures found") raise errors.SignatureVerificationError("No signatures found")
for signature in signatures: for signature in signatures:
verify_signature(signature, image_digest, pubkey) if not verify_signature(signature, image_digest, pubkey):
msg = f"Unable to verify signature for {image_digest} with pubkey {pubkey}"
raise errors.SignatureVerificationError(msg)
return True return True
def get_last_log_index() -> int: def get_last_log_index() -> int:
SIGNATURES_PATH.mkdir(parents=True, exist_ok=True) SIGNATURES_PATH.mkdir(parents=True, exist_ok=True)
if not LAST_LOG_INDEX.exists(): if not LAST_LOG_INDEX.exists():
return DEFAULT_LOG_INDEX return 0
with open(LAST_LOG_INDEX) as f: with open(LAST_LOG_INDEX) as f:
return int(f.read()) return int(f.read())
@ -387,8 +364,6 @@ def store_signatures(signatures: list[Dict], image_digest: str, pubkey: str) ->
It can be converted to the one expected by cosign verify --bundle with It can be converted to the one expected by cosign verify --bundle with
the `signature_to_bundle()` function. the `signature_to_bundle()` function.
This function must be used only if the provided signatures have been verified.
""" """
def _get_digest(sig: Dict) -> str: def _get_digest(sig: Dict) -> str:
@ -478,15 +453,29 @@ def prepare_airgapped_archive(image_name: str, destination: str) -> None:
archive.add(tmpdir, arcname=".") archive.add(tmpdir, arcname=".")
def upgrade_container_image(image: str, manifest_digest: str, pubkey: str) -> str: def upgrade_container_image(image: str, manifest_digest: str, pubkey: str) -> bool:
"""Verify and upgrade the image to the latest, if signed.""" """Verify and upgrade the image to the latest, if signed."""
update_available, remote_digest = registry.is_new_remote_image_available(image) update_available, _ = is_update_available(image)
if not update_available: if not update_available:
raise errors.ImageAlreadyUpToDate("The image is already up to date") raise errors.ImageAlreadyUpToDate("The image is already up to date")
signatures = check_signatures_and_logindex(image, remote_digest, pubkey) signatures = get_remote_signatures(image, manifest_digest)
runtime.container_pull(image, manifest_digest) verify_signatures(signatures, manifest_digest, pubkey)
# Store the signatures just now to avoid storing them unverified # Ensure that we only upgrade if the log index is higher than the last known one
incoming_log_index = get_log_index_from_signatures(signatures)
last_log_index = get_last_log_index()
if incoming_log_index < last_log_index:
raise errors.InvalidLogIndex(
"The log index is not higher than the last known one"
)
# let's upgrade the image
# XXX Use the image digest here to avoid race conditions
upgraded = runtime.container_pull(image)
# At this point, the signatures are verified
# We store the signatures just now to avoid storing unverified signatures
store_signatures(signatures, manifest_digest, pubkey) store_signatures(signatures, manifest_digest, pubkey)
return manifest_digest return upgraded

View file

@ -13,6 +13,12 @@ from dangerzone.gui import Application
sys.dangerzone_dev = True # type: ignore[attr-defined] sys.dangerzone_dev = True # type: ignore[attr-defined]
ASSETS_PATH = Path(__file__).parent / "assets"
TEST_PUBKEY_PATH = ASSETS_PATH / "test.pub.key"
INVALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "invalid"
VALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "valid"
TEMPERED_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "tempered"
# Use this fixture to make `pytest-qt` invoke our custom QApplication. # Use this fixture to make `pytest-qt` invoke our custom QApplication.
# See https://pytest-qt.readthedocs.io/en/latest/qapplication.html#testing-custom-qapplications # See https://pytest-qt.readthedocs.io/en/latest/qapplication.html#testing-custom-qapplications
@ -134,6 +140,10 @@ for_each_doc = pytest.mark.parametrize(
) )
@pytest.fixture
def signature():
return {}
# External Docs - base64 docs encoded for externally sourced documents # External Docs - base64 docs encoded for externally sourced documents
# XXX to reduce the chance of accidentally opening them # XXX to reduce the chance of accidentally opening them

View file

@ -9,6 +9,7 @@ from dangerzone import errors as dzerrors
from dangerzone.updater import errors from dangerzone.updater import errors
from dangerzone.updater.signatures import ( from dangerzone.updater.signatures import (
Signature, Signature,
get_config_dir,
get_last_log_index, get_last_log_index,
get_log_index_from_signatures, get_log_index_from_signatures,
get_remote_signatures, get_remote_signatures,