mirror of
https://github.com/freedomofpress/dangerzone.git
synced 2025-04-28 18:02:38 +02:00

This is done to avoid forgetting windows specific arguments when calling `subprocess.run`.
385 lines
13 KiB
Python
385 lines
13 KiB
Python
import json
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from pytest_subprocess import FakeProcess
|
|
|
|
from dangerzone import errors as dzerrors
|
|
from dangerzone.updater import errors
|
|
from dangerzone.updater.signatures import (
|
|
Signature,
|
|
get_last_log_index,
|
|
get_log_index_from_signatures,
|
|
get_remote_signatures,
|
|
is_update_available,
|
|
load_and_verify_signatures,
|
|
prepare_airgapped_archive,
|
|
store_signatures,
|
|
upgrade_container_image,
|
|
verify_local_image,
|
|
verify_signature,
|
|
verify_signatures,
|
|
)
|
|
|
|
ASSETS_PATH = Path(__file__).parent / "assets"
|
|
TEST_PUBKEY_PATH = ASSETS_PATH / "test.pub.key"
|
|
INVALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "invalid"
|
|
VALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "valid"
|
|
TEMPERED_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "tempered"
|
|
|
|
RANDOM_DIGEST = "aacc9b586648bbe3040f2822153b1d5ead2779af45ff750fd6f04daf4a9f64b4"
|
|
|
|
|
|
@pytest.fixture
|
|
def valid_signature():
|
|
signature_file = next(VALID_SIGNATURES_PATH.glob("**/*.json"))
|
|
with open(signature_file, "r") as signature_file:
|
|
signatures = json.load(signature_file)
|
|
return signatures.pop()
|
|
|
|
|
|
@pytest.fixture
|
|
def tempered_signature():
|
|
signature_file = next(TEMPERED_SIGNATURES_PATH.glob("**/*.json"))
|
|
with open(signature_file, "r") as signature_file:
|
|
signatures = json.load(signature_file)
|
|
return signatures.pop()
|
|
|
|
|
|
@pytest.fixture
|
|
def signature_other_digest(valid_signature):
|
|
signature = valid_signature.copy()
|
|
signature["Bundle"]["Payload"]["digest"] = "sha256:123456"
|
|
return signature
|
|
|
|
|
|
def test_load_valid_signatures(mocker):
|
|
mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", VALID_SIGNATURES_PATH)
|
|
valid_signatures = list(VALID_SIGNATURES_PATH.glob("**/*.json"))
|
|
assert len(valid_signatures) > 0
|
|
for file in valid_signatures:
|
|
signatures = load_and_verify_signatures(file.stem, TEST_PUBKEY_PATH)
|
|
assert isinstance(signatures, list)
|
|
assert len(signatures) > 0
|
|
|
|
|
|
def test_load_invalid_signatures(mocker):
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.SIGNATURES_PATH", INVALID_SIGNATURES_PATH
|
|
)
|
|
invalid_signatures = list(INVALID_SIGNATURES_PATH.glob("**/*.json"))
|
|
assert len(invalid_signatures) > 0
|
|
for file in invalid_signatures:
|
|
with pytest.raises(errors.SignatureError):
|
|
load_and_verify_signatures(file.stem, TEST_PUBKEY_PATH)
|
|
|
|
|
|
def test_load_tempered_signatures(mocker):
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.SIGNATURES_PATH", TEMPERED_SIGNATURES_PATH
|
|
)
|
|
tempered_signatures = list(TEMPERED_SIGNATURES_PATH.glob("**/*.json"))
|
|
assert len(tempered_signatures) > 0
|
|
for file in tempered_signatures:
|
|
with pytest.raises(errors.SignatureError):
|
|
load_and_verify_signatures(file.stem, TEST_PUBKEY_PATH)
|
|
|
|
|
|
def test_get_log_index_from_signatures():
|
|
signatures = [{"Bundle": {"Payload": {"logIndex": 1}}}]
|
|
assert get_log_index_from_signatures(signatures) == 1
|
|
|
|
|
|
def test_get_log_index_from_signatures_empty():
|
|
signatures = []
|
|
assert get_log_index_from_signatures(signatures) == 0
|
|
|
|
|
|
def test_get_log_index_from_malformed_signatures():
|
|
signatures = [{"Bundle": {"Payload": {"logIndex": "foo"}}}]
|
|
assert get_log_index_from_signatures(signatures) == 0
|
|
|
|
|
|
def test_get_log_index_from_missing_log_index():
|
|
signatures = [{"Bundle": {"Payload": {}}}]
|
|
assert get_log_index_from_signatures(signatures) == 0
|
|
|
|
|
|
def test_upgrade_container_image_if_already_up_to_date(mocker):
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.is_update_available", return_value=(False, None)
|
|
)
|
|
with pytest.raises(errors.ImageAlreadyUpToDate):
|
|
upgrade_container_image(
|
|
"ghcr.io/freedomofpress/dangerzone/dangerzone", "sha256:123456", "test.pub"
|
|
)
|
|
|
|
|
|
def test_upgrade_container_without_signatures(mocker):
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.is_update_available",
|
|
return_value=(True, "sha256:123456"),
|
|
)
|
|
mocker.patch("dangerzone.updater.signatures.get_remote_signatures", return_value=[])
|
|
with pytest.raises(errors.SignatureVerificationError):
|
|
upgrade_container_image(
|
|
"ghcr.io/freedomofpress/dangerzone/dangerzone",
|
|
"sha256:123456",
|
|
"test.pub",
|
|
)
|
|
|
|
|
|
def test_upgrade_container_lower_log_index(mocker):
|
|
image_digest = "4da441235e84e93518778827a5c5745d532d7a4079886e1647924bee7ef1c14d"
|
|
signatures = load_and_verify_signatures(
|
|
image_digest,
|
|
TEST_PUBKEY_PATH,
|
|
bypass_verification=True,
|
|
signatures_path=VALID_SIGNATURES_PATH,
|
|
)
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.is_update_available",
|
|
return_value=(
|
|
True,
|
|
image_digest,
|
|
),
|
|
)
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.get_remote_signatures",
|
|
return_value=signatures,
|
|
)
|
|
# Mock to avoid loosing time on test failures
|
|
mocker.patch("dangerzone.container_utils.container_pull")
|
|
# The log index of the incoming signatures is 168652066
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.get_last_log_index",
|
|
return_value=168652067,
|
|
)
|
|
|
|
with pytest.raises(errors.InvalidLogIndex):
|
|
upgrade_container_image(
|
|
"ghcr.io/freedomofpress/dangerzone/dangerzone",
|
|
image_digest,
|
|
TEST_PUBKEY_PATH,
|
|
)
|
|
|
|
|
|
def test_prepare_airgapped_archive_requires_digest():
|
|
with pytest.raises(errors.AirgappedImageDownloadError):
|
|
prepare_airgapped_archive(
|
|
"ghcr.io/freedomofpress/dangerzone/dangerzone", "test.tar"
|
|
)
|
|
|
|
|
|
def test_get_remote_signatures_error(fp: FakeProcess, mocker):
|
|
image = "ghcr.io/freedomofpress/dangerzone/dangerzone"
|
|
digest = "123456"
|
|
mocker.patch("dangerzone.updater.cosign.ensure_installed", return_value=True)
|
|
fp.register_subprocess(
|
|
["cosign", "download", "signature", f"{image}@sha256:{digest}"], returncode=1
|
|
)
|
|
with pytest.raises(errors.NoRemoteSignatures):
|
|
get_remote_signatures(image, digest)
|
|
|
|
|
|
def test_get_remote_signatures_empty(fp: FakeProcess, mocker):
|
|
image = "ghcr.io/freedomofpress/dangerzone/dangerzone"
|
|
digest = "123456"
|
|
mocker.patch("dangerzone.updater.cosign.ensure_installed", return_value=True)
|
|
fp.register_subprocess(
|
|
["cosign", "download", "signature", f"{image}@sha256:{digest}"],
|
|
stdout=json.dumps({}),
|
|
)
|
|
with pytest.raises(errors.NoRemoteSignatures):
|
|
get_remote_signatures(image, digest)
|
|
|
|
|
|
def test_get_remote_signatures_cosign_error(mocker, fp: FakeProcess):
|
|
image = "ghcr.io/freedomofpress/dangerzone/dangerzone"
|
|
digest = "123456"
|
|
mocker.patch("dangerzone.updater.cosign.ensure_installed", return_value=True)
|
|
fp.register_subprocess(
|
|
["cosign", "download", "signature", f"{image}@sha256:{digest}"],
|
|
returncode=1,
|
|
stderr="Error: no signatures associated",
|
|
)
|
|
with pytest.raises(errors.NoRemoteSignatures):
|
|
get_remote_signatures(image, digest)
|
|
|
|
|
|
def test_store_signatures_with_different_digests(
|
|
valid_signature, signature_other_digest, mocker, tmp_path
|
|
):
|
|
"""Test that store_signatures raises an error when a signature's digest doesn't match."""
|
|
signatures = [valid_signature, signature_other_digest]
|
|
image_digest = "sha256:123456"
|
|
|
|
# Mock the signatures path
|
|
signatures_path = tmp_path / "signatures"
|
|
signatures_path.mkdir()
|
|
mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path)
|
|
|
|
# Mock get_log_index_from_signatures
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.get_log_index_from_signatures",
|
|
return_value=100,
|
|
)
|
|
|
|
# Mock get_last_log_index
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.get_last_log_index",
|
|
return_value=50,
|
|
)
|
|
|
|
# Call store_signatures
|
|
with pytest.raises(errors.SignatureMismatch):
|
|
store_signatures(signatures, image_digest, TEST_PUBKEY_PATH)
|
|
|
|
# Verify that the signatures file was not created
|
|
assert not (signatures_path / f"{image_digest}.json").exists()
|
|
|
|
# Verify that the log index file was not updated
|
|
assert not (signatures_path / "last_log_index").exists()
|
|
|
|
|
|
def test_stores_signatures_updates_last_log_index(valid_signature, mocker, tmp_path):
|
|
"""Test that store_signatures updates the last log index file."""
|
|
signatures = [valid_signature]
|
|
# Extract the digest from the signature
|
|
image_digest = Signature(valid_signature).manifest_digest
|
|
|
|
# Mock the signatures path
|
|
signatures_path = tmp_path / "signatures"
|
|
signatures_path.mkdir()
|
|
mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path)
|
|
|
|
# Create an existing last_log_index file with a lower value
|
|
with open(signatures_path / "last_log_index", "w") as f:
|
|
f.write("50")
|
|
|
|
# Mock get_log_index_from_signatures to return a higher value
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.get_log_index_from_signatures",
|
|
return_value=100,
|
|
)
|
|
|
|
# Call store_signatures
|
|
with pytest.raises(errors.SignatureMismatch):
|
|
store_signatures(signatures, image_digest, TEST_PUBKEY_PATH)
|
|
("dangerzone.updater.signatures.get_last_log_index",)
|
|
# Verify that the signatures file was not created
|
|
assert not (signatures_path / f"{image_digest}.json").exists()
|
|
|
|
# Verify that the log index file was not updated
|
|
assert not (signatures_path / "last_log_index").exists()
|
|
|
|
|
|
def test_stores_signatures_updates_last_log_index(valid_signature, mocker, tmp_path):
|
|
"""Test that store_signatures updates the last log index file."""
|
|
signatures = [valid_signature]
|
|
# Extract the digest from the signature
|
|
image_digest = Signature(valid_signature).manifest_digest
|
|
signatures = [valid_signature, signature_other_digest]
|
|
breakpoint()
|
|
valid_signature, signature_other_digest, mocker, tmp_path
|
|
|
|
"""Test that store_signatures raises an error when a signature's digest doesn't match."""
|
|
|
|
image_digest = "sha256:123456"
|
|
|
|
# Mock the signatures path
|
|
signatures_path = tmp_path / "signatures"
|
|
signatures_path.mkdir()
|
|
mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path)
|
|
|
|
# Mock get_log_index_from_signatures
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.get_log_index_from_signatures",
|
|
return_value=100,
|
|
)
|
|
|
|
# Mock get_last_log_index
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.get_last_log_index",
|
|
return_value=50,
|
|
)
|
|
|
|
|
|
def test_stores_signatures_updates_last_log_index():
|
|
pass
|
|
|
|
|
|
def test_get_file_digest():
|
|
# Mock the signatures path
|
|
signatures_path = tmp_path / "signatures"
|
|
signatures_path.mkdir()
|
|
mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path)
|
|
|
|
# Create an existing last_log_index file with a lower value
|
|
with open(signatures_path / "last_log_index", "w") as f:
|
|
f.write("50")
|
|
|
|
# Mock get_log_index_from_signatures to return a higher value
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.get_log_index_from_signatures",
|
|
return_value=100,
|
|
)
|
|
|
|
# Call store_signatures
|
|
store_signatures(signatures, image_digest, TEST_PUBKEY_PATH)
|
|
|
|
# Verify that the log index file was updated
|
|
assert (signatures_path / "last_log_index").exists()
|
|
with open(signatures_path / "last_log_index", "r") as f:
|
|
assert f.read() == "100"
|
|
|
|
|
|
def test_is_update_available_when_no_local_image(mocker):
|
|
"""
|
|
Test that is_update_available returns True when no local image is
|
|
currently present.
|
|
"""
|
|
# Mock container_image_exists to return False
|
|
mocker.patch(
|
|
"dangerzone.container_utils.get_local_image_digest",
|
|
side_effect=dzerrors.ImageNotPresentException,
|
|
)
|
|
|
|
# Mock get_manifest_digest to return a digest
|
|
mocker.patch(
|
|
"dangerzone.updater.registry.get_manifest_digest",
|
|
return_value=RANDOM_DIGEST,
|
|
)
|
|
|
|
# Call is_update_available
|
|
update_available, digest = is_update_available("ghcr.io/freedomofpress/dangerzone")
|
|
|
|
# Verify the result
|
|
assert update_available is True
|
|
assert digest == RANDOM_DIGEST
|
|
|
|
|
|
def test_verify_signature(valid_signature):
|
|
"""Test that verify_signature raises an error when the payload digest doesn't match."""
|
|
verify_signature(
|
|
valid_signature,
|
|
Signature(valid_signature).manifest_digest,
|
|
TEST_PUBKEY_PATH,
|
|
)
|
|
|
|
|
|
def test_verify_signature_tempered(tempered_signature):
|
|
"""Test that verify_signature raises an error when the payload digest doesn't match."""
|
|
# Call verify_signature and expect an error
|
|
with pytest.raises(errors.SignatureError):
|
|
verify_signature(
|
|
tempered_signature,
|
|
Signature(tempered_signature).manifest_digest,
|
|
TEST_PUBKEY_PATH,
|
|
)
|
|
|
|
|
|
def test_verify_signatures_empty_list():
|
|
with pytest.raises(errors.SignatureVerificationError):
|
|
verify_signatures([], "1234", TEST_PUBKEY_PATH)
|