From a9fec44837dacabe9ff9d5877f8962fef2f7511a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexis=20M=C3=A9taireau?= Date: Wed, 26 Feb 2025 16:09:58 +0100 Subject: [PATCH] Introduce a `subprocess_run` utility function This is done to avoid forgetting windows specific arguments when calling `subprocess.run`. --- dangerzone/container_utils.py | 32 +-- dangerzone/updater/cli.py | 9 +- dangerzone/updater/signatures.py | 37 +-- tests/test_signatures.py | 385 +++++++++++++++++++++++++++++++ 4 files changed, 429 insertions(+), 34 deletions(-) create mode 100644 tests/test_signatures.py diff --git a/dangerzone/container_utils.py b/dangerzone/container_utils.py index f2d1e19..162901e 100644 --- a/dangerzone/container_utils.py +++ b/dangerzone/container_utils.py @@ -56,6 +56,11 @@ class Runtime(object): return "podman" if platform.system() == "Linux" else "docker" +def subprocess_run(*args, **kwargs) -> subprocess.CompletedProcess: + """subprocess.run with the correct startupinfo for Windows.""" + return subprocess.run(*args, startupinfo=get_subprocess_startupinfo(), **kwargs) + + def get_runtime_version(runtime: Optional[Runtime] = None) -> Tuple[int, int]: """Get the major/minor parts of the Docker/Podman version. @@ -75,9 +80,8 @@ def get_runtime_version(runtime: Optional[Runtime] = None) -> Tuple[int, int]: cmd = [str(runtime.path), "version", "-f", query] try: - version = subprocess.run( + version = subprocess_run( cmd, - startupinfo=get_subprocess_startupinfo(), capture_output=True, check=True, ).stdout.decode() @@ -193,8 +197,6 @@ def load_image_tarball() -> None: add_image_tag(bad_tag, good_tag) delete_image_tag(bad_tag) - log.info("Successfully installed container image") - def tag_image_by_digest(digest: str, tag: str) -> None: """Tag a container image by digest. @@ -204,7 +206,7 @@ def tag_image_by_digest(digest: str, tag: str) -> None: image_id = get_image_id_by_digest(digest) cmd = [str(runtime.path), "tag", image_id, tag] log.debug(" ".join(cmd)) - subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True) + subprocess_run(cmd, check=True) def get_image_id_by_digest(digest: str) -> str: @@ -221,9 +223,7 @@ def get_image_id_by_digest(digest: str) -> str: "{{.Id}}", ] log.debug(" ".join(cmd)) - process = subprocess.run( - cmd, startupinfo=get_subprocess_startupinfo(), check=True, capture_output=True - ) + process = subprocess_run(cmd, check=True, capture_output=True) # In case we have multiple lines, we only want the first one. return process.stdout.decode().strip().split("\n")[0] @@ -232,10 +232,12 @@ def container_pull(image: str, manifest_digest: str): """Pull a container image from a registry.""" runtime = Runtime() cmd = [str(runtime.path), "pull", f"{image}@sha256:{manifest_digest}"] - process = subprocess.Popen(cmd, stdout=subprocess.PIPE) - process.communicate() - if process.returncode != 0: - raise errors.ContainerPullException(f"Could not pull the container image: {e}") + try: + subprocess_run(cmd, check=True) + except subprocess.CalledProcessError as e: + raise errors.ContainerPullException( + f"Could not pull the container image: {e}" + ) from e def get_local_image_digest(image: str) -> str: @@ -249,7 +251,11 @@ def get_local_image_digest(image: str) -> str: cmd = [str(runtime.path), "images", image, "--format", "{{.Digest}}"] log.debug(" ".join(cmd)) try: - result = subprocess.run(cmd, capture_output=True, check=True) + result = subprocess_run( + cmd, + capture_output=True, + check=True, + ) lines = result.stdout.decode().strip().split("\n") if len(lines) != 1: raise errors.MultipleImagesFoundException( diff --git a/dangerzone/updater/cli.py b/dangerzone/updater/cli.py index e496aaf..9363d51 100644 --- a/dangerzone/updater/cli.py +++ b/dangerzone/updater/cli.py @@ -29,11 +29,10 @@ def upgrade(image: str, pubkey: str) -> None: """Upgrade the image to the latest signed version.""" manifest_digest = registry.get_manifest_digest(image) try: - is_upgraded = signatures.upgrade_container_image(image, manifest_digest, pubkey) - if is_upgraded: - click.echo(f"✅ The local image {image} has been upgraded") - click.echo(f"✅ The image has been signed with {pubkey}") - click.echo(f"✅ Signatures has been verified and stored locally") + signatures.upgrade_container_image(image, manifest_digest, pubkey) + click.echo(f"✅ The local image {image} has been upgraded") + click.echo(f"✅ The image has been signed with {pubkey}") + click.echo(f"✅ Signatures has been verified and stored locally") except errors.ImageAlreadyUpToDate as e: click.echo(f"✅ {e}") diff --git a/dangerzone/updater/signatures.py b/dangerzone/updater/signatures.py index 27cd55d..46f382a 100644 --- a/dangerzone/updater/signatures.py +++ b/dangerzone/updater/signatures.py @@ -22,13 +22,17 @@ except ImportError: import appdirs as platformdirs # type: ignore[no-redef] -def get_config_dir() -> Path: - return Path(platformdirs.user_config_dir("dangerzone")) +def appdata_dir() -> Path: + return Path(platformdirs.user_data_dir("dangerzone")) +# RELEASE: Bump this value to the log index of the latest signature +# to ensures the software can't upgrade to container images that predates it. +DEFAULT_LOG_INDEX = 0 + # XXX Store this somewhere else. DEFAULT_PUBKEY_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key") -SIGNATURES_PATH = get_config_dir() / "signatures" +SIGNATURES_PATH = appdata_dir() / "signatures" LAST_LOG_INDEX = SIGNATURES_PATH / "last_log_index" __all__ = [ @@ -61,9 +65,14 @@ def signature_to_bundle(sig: Dict) -> Dict: } -def verify_signature(signature: dict, image_digest: str, pubkey: str) -> bool: - """Verify a signature against a given public key""" - # XXX - Also verfy the identity/docker-reference field against the expected value +def verify_signature(signature: dict, image_digest: str, pubkey: str | Path) -> None: + """ + Verifies that: + + - the signature has been signed by the given public key + - the signature matches the given image digest + """ + # XXX - Also verify the identity/docker-reference field against the expected value # e.g. ghcr.io/freedomofpress/dangerzone/dangerzone cosign.ensure_installed() @@ -75,7 +84,8 @@ def verify_signature(signature: dict, image_digest: str, pubkey: str) -> bool: ] if payload_digest != f"sha256:{image_digest}": raise errors.SignatureMismatch( - f"The signature does not match the image digest ({payload_digest}, {image_digest})" + "The given signature does not match the expected image digest " + f"({payload_digest}, {image_digest})" ) with ( @@ -99,14 +109,10 @@ def verify_signature(signature: dict, image_digest: str, pubkey: str) -> bool: ] log.debug(" ".join(cmd)) result = subprocess.run(cmd, capture_output=True) - if result.returncode != 0: - # XXX Raise instead? + if result.returncode != 0 or result.stderr != b"Verified OK\n": log.debug("Failed to verify signature", result.stderr) raise errors.SignatureVerificationError("Failed to verify signature") - if result.stderr == b"Verified OK\n": - log.debug("Signature verified") - return True - return False + log.debug("Signature verified") class Signature: @@ -144,15 +150,14 @@ def verify_signatures( pubkey: str, ) -> bool: for signature in signatures: - if not verify_signature(signature, image_digest, pubkey): - raise errors.SignatureVerificationError() + verify_signature(signature, image_digest, pubkey) return True def get_last_log_index() -> int: SIGNATURES_PATH.mkdir(parents=True, exist_ok=True) if not LAST_LOG_INDEX.exists(): - return 0 + return DEFAULT_LOG_INDEX with open(LAST_LOG_INDEX) as f: return int(f.read()) diff --git a/tests/test_signatures.py b/tests/test_signatures.py new file mode 100644 index 0000000..5f7a846 --- /dev/null +++ b/tests/test_signatures.py @@ -0,0 +1,385 @@ +import json +import unittest +from pathlib import Path + +import pytest +from pytest_subprocess import FakeProcess + +from dangerzone import errors as dzerrors +from dangerzone.updater import errors +from dangerzone.updater.signatures import ( + Signature, + get_last_log_index, + get_log_index_from_signatures, + get_remote_signatures, + is_update_available, + load_and_verify_signatures, + prepare_airgapped_archive, + store_signatures, + upgrade_container_image, + verify_local_image, + verify_signature, + verify_signatures, +) + +ASSETS_PATH = Path(__file__).parent / "assets" +TEST_PUBKEY_PATH = ASSETS_PATH / "test.pub.key" +INVALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "invalid" +VALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "valid" +TEMPERED_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "tempered" + +RANDOM_DIGEST = "aacc9b586648bbe3040f2822153b1d5ead2779af45ff750fd6f04daf4a9f64b4" + + +@pytest.fixture +def valid_signature(): + signature_file = next(VALID_SIGNATURES_PATH.glob("**/*.json")) + with open(signature_file, "r") as signature_file: + signatures = json.load(signature_file) + return signatures.pop() + + +@pytest.fixture +def tempered_signature(): + signature_file = next(TEMPERED_SIGNATURES_PATH.glob("**/*.json")) + with open(signature_file, "r") as signature_file: + signatures = json.load(signature_file) + return signatures.pop() + + +@pytest.fixture +def signature_other_digest(valid_signature): + signature = valid_signature.copy() + signature["Bundle"]["Payload"]["digest"] = "sha256:123456" + return signature + + +def test_load_valid_signatures(mocker): + mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", VALID_SIGNATURES_PATH) + valid_signatures = list(VALID_SIGNATURES_PATH.glob("**/*.json")) + assert len(valid_signatures) > 0 + for file in valid_signatures: + signatures = load_and_verify_signatures(file.stem, TEST_PUBKEY_PATH) + assert isinstance(signatures, list) + assert len(signatures) > 0 + + +def test_load_invalid_signatures(mocker): + mocker.patch( + "dangerzone.updater.signatures.SIGNATURES_PATH", INVALID_SIGNATURES_PATH + ) + invalid_signatures = list(INVALID_SIGNATURES_PATH.glob("**/*.json")) + assert len(invalid_signatures) > 0 + for file in invalid_signatures: + with pytest.raises(errors.SignatureError): + load_and_verify_signatures(file.stem, TEST_PUBKEY_PATH) + + +def test_load_tempered_signatures(mocker): + mocker.patch( + "dangerzone.updater.signatures.SIGNATURES_PATH", TEMPERED_SIGNATURES_PATH + ) + tempered_signatures = list(TEMPERED_SIGNATURES_PATH.glob("**/*.json")) + assert len(tempered_signatures) > 0 + for file in tempered_signatures: + with pytest.raises(errors.SignatureError): + load_and_verify_signatures(file.stem, TEST_PUBKEY_PATH) + + +def test_get_log_index_from_signatures(): + signatures = [{"Bundle": {"Payload": {"logIndex": 1}}}] + assert get_log_index_from_signatures(signatures) == 1 + + +def test_get_log_index_from_signatures_empty(): + signatures = [] + assert get_log_index_from_signatures(signatures) == 0 + + +def test_get_log_index_from_malformed_signatures(): + signatures = [{"Bundle": {"Payload": {"logIndex": "foo"}}}] + assert get_log_index_from_signatures(signatures) == 0 + + +def test_get_log_index_from_missing_log_index(): + signatures = [{"Bundle": {"Payload": {}}}] + assert get_log_index_from_signatures(signatures) == 0 + + +def test_upgrade_container_image_if_already_up_to_date(mocker): + mocker.patch( + "dangerzone.updater.signatures.is_update_available", return_value=(False, None) + ) + with pytest.raises(errors.ImageAlreadyUpToDate): + upgrade_container_image( + "ghcr.io/freedomofpress/dangerzone/dangerzone", "sha256:123456", "test.pub" + ) + + +def test_upgrade_container_without_signatures(mocker): + mocker.patch( + "dangerzone.updater.signatures.is_update_available", + return_value=(True, "sha256:123456"), + ) + mocker.patch("dangerzone.updater.signatures.get_remote_signatures", return_value=[]) + with pytest.raises(errors.SignatureVerificationError): + upgrade_container_image( + "ghcr.io/freedomofpress/dangerzone/dangerzone", + "sha256:123456", + "test.pub", + ) + + +def test_upgrade_container_lower_log_index(mocker): + image_digest = "4da441235e84e93518778827a5c5745d532d7a4079886e1647924bee7ef1c14d" + signatures = load_and_verify_signatures( + image_digest, + TEST_PUBKEY_PATH, + bypass_verification=True, + signatures_path=VALID_SIGNATURES_PATH, + ) + mocker.patch( + "dangerzone.updater.signatures.is_update_available", + return_value=( + True, + image_digest, + ), + ) + mocker.patch( + "dangerzone.updater.signatures.get_remote_signatures", + return_value=signatures, + ) + # Mock to avoid loosing time on test failures + mocker.patch("dangerzone.container_utils.container_pull") + # The log index of the incoming signatures is 168652066 + mocker.patch( + "dangerzone.updater.signatures.get_last_log_index", + return_value=168652067, + ) + + with pytest.raises(errors.InvalidLogIndex): + upgrade_container_image( + "ghcr.io/freedomofpress/dangerzone/dangerzone", + image_digest, + TEST_PUBKEY_PATH, + ) + + +def test_prepare_airgapped_archive_requires_digest(): + with pytest.raises(errors.AirgappedImageDownloadError): + prepare_airgapped_archive( + "ghcr.io/freedomofpress/dangerzone/dangerzone", "test.tar" + ) + + +def test_get_remote_signatures_error(fp: FakeProcess, mocker): + image = "ghcr.io/freedomofpress/dangerzone/dangerzone" + digest = "123456" + mocker.patch("dangerzone.updater.cosign.ensure_installed", return_value=True) + fp.register_subprocess( + ["cosign", "download", "signature", f"{image}@sha256:{digest}"], returncode=1 + ) + with pytest.raises(errors.NoRemoteSignatures): + get_remote_signatures(image, digest) + + +def test_get_remote_signatures_empty(fp: FakeProcess, mocker): + image = "ghcr.io/freedomofpress/dangerzone/dangerzone" + digest = "123456" + mocker.patch("dangerzone.updater.cosign.ensure_installed", return_value=True) + fp.register_subprocess( + ["cosign", "download", "signature", f"{image}@sha256:{digest}"], + stdout=json.dumps({}), + ) + with pytest.raises(errors.NoRemoteSignatures): + get_remote_signatures(image, digest) + + +def test_get_remote_signatures_cosign_error(mocker, fp: FakeProcess): + image = "ghcr.io/freedomofpress/dangerzone/dangerzone" + digest = "123456" + mocker.patch("dangerzone.updater.cosign.ensure_installed", return_value=True) + fp.register_subprocess( + ["cosign", "download", "signature", f"{image}@sha256:{digest}"], + returncode=1, + stderr="Error: no signatures associated", + ) + with pytest.raises(errors.NoRemoteSignatures): + get_remote_signatures(image, digest) + + +def test_store_signatures_with_different_digests( + valid_signature, signature_other_digest, mocker, tmp_path +): + """Test that store_signatures raises an error when a signature's digest doesn't match.""" + signatures = [valid_signature, signature_other_digest] + image_digest = "sha256:123456" + + # Mock the signatures path + signatures_path = tmp_path / "signatures" + signatures_path.mkdir() + mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path) + + # Mock get_log_index_from_signatures + mocker.patch( + "dangerzone.updater.signatures.get_log_index_from_signatures", + return_value=100, + ) + + # Mock get_last_log_index + mocker.patch( + "dangerzone.updater.signatures.get_last_log_index", + return_value=50, + ) + + # Call store_signatures + with pytest.raises(errors.SignatureMismatch): + store_signatures(signatures, image_digest, TEST_PUBKEY_PATH) + + # Verify that the signatures file was not created + assert not (signatures_path / f"{image_digest}.json").exists() + + # Verify that the log index file was not updated + assert not (signatures_path / "last_log_index").exists() + + +def test_stores_signatures_updates_last_log_index(valid_signature, mocker, tmp_path): + """Test that store_signatures updates the last log index file.""" + signatures = [valid_signature] + # Extract the digest from the signature + image_digest = Signature(valid_signature).manifest_digest + + # Mock the signatures path + signatures_path = tmp_path / "signatures" + signatures_path.mkdir() + mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path) + + # Create an existing last_log_index file with a lower value + with open(signatures_path / "last_log_index", "w") as f: + f.write("50") + + # Mock get_log_index_from_signatures to return a higher value + mocker.patch( + "dangerzone.updater.signatures.get_log_index_from_signatures", + return_value=100, + ) + + # Call store_signatures + with pytest.raises(errors.SignatureMismatch): + store_signatures(signatures, image_digest, TEST_PUBKEY_PATH) + ("dangerzone.updater.signatures.get_last_log_index",) + # Verify that the signatures file was not created + assert not (signatures_path / f"{image_digest}.json").exists() + + # Verify that the log index file was not updated + assert not (signatures_path / "last_log_index").exists() + + +def test_stores_signatures_updates_last_log_index(valid_signature, mocker, tmp_path): + """Test that store_signatures updates the last log index file.""" + signatures = [valid_signature] + # Extract the digest from the signature + image_digest = Signature(valid_signature).manifest_digest + signatures = [valid_signature, signature_other_digest] + breakpoint() + valid_signature, signature_other_digest, mocker, tmp_path + + """Test that store_signatures raises an error when a signature's digest doesn't match.""" + + image_digest = "sha256:123456" + + # Mock the signatures path + signatures_path = tmp_path / "signatures" + signatures_path.mkdir() + mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path) + + # Mock get_log_index_from_signatures + mocker.patch( + "dangerzone.updater.signatures.get_log_index_from_signatures", + return_value=100, + ) + + # Mock get_last_log_index + mocker.patch( + "dangerzone.updater.signatures.get_last_log_index", + return_value=50, + ) + + +def test_stores_signatures_updates_last_log_index(): + pass + + +def test_get_file_digest(): + # Mock the signatures path + signatures_path = tmp_path / "signatures" + signatures_path.mkdir() + mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path) + + # Create an existing last_log_index file with a lower value + with open(signatures_path / "last_log_index", "w") as f: + f.write("50") + + # Mock get_log_index_from_signatures to return a higher value + mocker.patch( + "dangerzone.updater.signatures.get_log_index_from_signatures", + return_value=100, + ) + + # Call store_signatures + store_signatures(signatures, image_digest, TEST_PUBKEY_PATH) + + # Verify that the log index file was updated + assert (signatures_path / "last_log_index").exists() + with open(signatures_path / "last_log_index", "r") as f: + assert f.read() == "100" + + +def test_is_update_available_when_no_local_image(mocker): + """ + Test that is_update_available returns True when no local image is + currently present. + """ + # Mock container_image_exists to return False + mocker.patch( + "dangerzone.container_utils.get_local_image_digest", + side_effect=dzerrors.ImageNotPresentException, + ) + + # Mock get_manifest_digest to return a digest + mocker.patch( + "dangerzone.updater.registry.get_manifest_digest", + return_value=RANDOM_DIGEST, + ) + + # Call is_update_available + update_available, digest = is_update_available("ghcr.io/freedomofpress/dangerzone") + + # Verify the result + assert update_available is True + assert digest == RANDOM_DIGEST + + +def test_verify_signature(valid_signature): + """Test that verify_signature raises an error when the payload digest doesn't match.""" + verify_signature( + valid_signature, + Signature(valid_signature).manifest_digest, + TEST_PUBKEY_PATH, + ) + + +def test_verify_signature_tempered(tempered_signature): + """Test that verify_signature raises an error when the payload digest doesn't match.""" + # Call verify_signature and expect an error + with pytest.raises(errors.SignatureError): + verify_signature( + tempered_signature, + Signature(tempered_signature).manifest_digest, + TEST_PUBKEY_PATH, + ) + + +def test_verify_signatures_empty_list(): + with pytest.raises(errors.SignatureVerificationError): + verify_signatures([], "1234", TEST_PUBKEY_PATH)