mirror of
https://github.com/freedomofpress/dangerzone.git
synced 2025-04-28 18:02:38 +02:00
372 lines
12 KiB
Python
372 lines
12 KiB
Python
import json
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from pytest_subprocess import FakeProcess
|
|
|
|
from dangerzone import errors as dzerrors
|
|
from dangerzone.updater import errors
|
|
from dangerzone.updater.signatures import (
|
|
Signature,
|
|
get_last_log_index,
|
|
get_log_index_from_signatures,
|
|
get_remote_signatures,
|
|
is_update_available,
|
|
load_and_verify_signatures,
|
|
prepare_airgapped_archive,
|
|
store_signatures,
|
|
upgrade_container_image,
|
|
verify_local_image,
|
|
verify_signature,
|
|
verify_signatures,
|
|
)
|
|
|
|
ASSETS_PATH = Path(__file__).parent / "assets"
|
|
TEST_PUBKEY_PATH = ASSETS_PATH / "test.pub.key"
|
|
INVALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "invalid"
|
|
VALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "valid"
|
|
TEMPERED_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "tempered"
|
|
|
|
RANDOM_DIGEST = "aacc9b586648bbe3040f2822153b1d5ead2779af45ff750fd6f04daf4a9f64b4"
|
|
|
|
|
|
@pytest.fixture
|
|
def valid_signature():
|
|
signature_file = next(VALID_SIGNATURES_PATH.glob("**/*.json"))
|
|
with open(signature_file, "r") as signature_file:
|
|
signatures = json.load(signature_file)
|
|
return signatures.pop()
|
|
|
|
|
|
@pytest.fixture
|
|
def tempered_signature():
|
|
signature_file = next(TEMPERED_SIGNATURES_PATH.glob("**/*.json"))
|
|
with open(signature_file, "r") as signature_file:
|
|
signatures = json.load(signature_file)
|
|
return signatures.pop()
|
|
|
|
|
|
@pytest.fixture
|
|
def signature_other_digest(valid_signature):
|
|
signature = valid_signature.copy()
|
|
signature["Bundle"]["Payload"]["digest"] = "sha256:123456"
|
|
return signature
|
|
|
|
|
|
def test_load_valid_signatures(mocker):
|
|
mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", VALID_SIGNATURES_PATH)
|
|
valid_signatures = list(VALID_SIGNATURES_PATH.glob("**/*.json"))
|
|
assert len(valid_signatures) > 0
|
|
for file in valid_signatures:
|
|
signatures = load_and_verify_signatures(file.stem, TEST_PUBKEY_PATH)
|
|
assert isinstance(signatures, list)
|
|
assert len(signatures) > 0
|
|
|
|
|
|
def test_load_invalid_signatures(mocker):
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.SIGNATURES_PATH", INVALID_SIGNATURES_PATH
|
|
)
|
|
invalid_signatures = list(INVALID_SIGNATURES_PATH.glob("**/*.json"))
|
|
assert len(invalid_signatures) > 0
|
|
for file in invalid_signatures:
|
|
with pytest.raises(errors.SignatureError):
|
|
load_and_verify_signatures(file.stem, TEST_PUBKEY_PATH)
|
|
|
|
|
|
def test_load_tempered_signatures(mocker):
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.SIGNATURES_PATH", TEMPERED_SIGNATURES_PATH
|
|
)
|
|
tempered_signatures = list(TEMPERED_SIGNATURES_PATH.glob("**/*.json"))
|
|
assert len(tempered_signatures) > 0
|
|
for file in tempered_signatures:
|
|
with pytest.raises(errors.SignatureError):
|
|
load_and_verify_signatures(file.stem, TEST_PUBKEY_PATH)
|
|
|
|
|
|
def test_get_log_index_from_signatures():
|
|
signatures = [{"Bundle": {"Payload": {"logIndex": 1}}}]
|
|
assert get_log_index_from_signatures(signatures) == 1
|
|
|
|
|
|
def test_get_log_index_from_signatures_empty():
|
|
signatures = []
|
|
assert get_log_index_from_signatures(signatures) == 0
|
|
|
|
|
|
def test_get_log_index_from_malformed_signatures():
|
|
signatures = [{"Bundle": {"Payload": {"logIndex": "foo"}}}]
|
|
assert get_log_index_from_signatures(signatures) == 0
|
|
|
|
|
|
def test_get_log_index_from_missing_log_index():
|
|
signatures = [{"Bundle": {"Payload": {}}}]
|
|
assert get_log_index_from_signatures(signatures) == 0
|
|
|
|
|
|
def test_upgrade_container_image_if_already_up_to_date(mocker):
|
|
mocker.patch(
|
|
"dangerzone.updater.registry.is_new_remote_image_available",
|
|
return_value=(False, None),
|
|
)
|
|
with pytest.raises(errors.ImageAlreadyUpToDate):
|
|
upgrade_container_image(
|
|
"ghcr.io/freedomofpress/dangerzone/dangerzone", "sha256:123456", "test.pub"
|
|
)
|
|
|
|
|
|
def test_upgrade_container_without_signatures(mocker):
|
|
mocker.patch(
|
|
"dangerzone.updater.registry.is_new_remote_image_available",
|
|
return_value=(True, "sha256:123456"),
|
|
)
|
|
mocker.patch("dangerzone.updater.signatures.get_remote_signatures", return_value=[])
|
|
with pytest.raises(errors.SignatureVerificationError):
|
|
upgrade_container_image(
|
|
"ghcr.io/freedomofpress/dangerzone/dangerzone",
|
|
"sha256:123456",
|
|
"test.pub",
|
|
)
|
|
|
|
|
|
def test_upgrade_container_lower_log_index(mocker):
|
|
image_digest = "4da441235e84e93518778827a5c5745d532d7a4079886e1647924bee7ef1c14d"
|
|
signatures = load_and_verify_signatures(
|
|
image_digest,
|
|
TEST_PUBKEY_PATH,
|
|
bypass_verification=True,
|
|
signatures_path=VALID_SIGNATURES_PATH,
|
|
)
|
|
mocker.patch(
|
|
"dangerzone.updater.registry.is_new_remote_image_available",
|
|
return_value=(
|
|
True,
|
|
image_digest,
|
|
),
|
|
)
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.get_remote_signatures",
|
|
return_value=signatures,
|
|
)
|
|
# Mock to avoid loosing time on test failures
|
|
mocker.patch("dangerzone.container_utils.container_pull")
|
|
# The log index of the incoming signatures is 168652066
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.get_last_log_index",
|
|
return_value=168652067,
|
|
)
|
|
|
|
with pytest.raises(errors.InvalidLogIndex):
|
|
upgrade_container_image(
|
|
"ghcr.io/freedomofpress/dangerzone/dangerzone",
|
|
image_digest,
|
|
TEST_PUBKEY_PATH,
|
|
)
|
|
|
|
|
|
def test_prepare_airgapped_archive_requires_digest():
|
|
with pytest.raises(errors.AirgappedImageDownloadError):
|
|
prepare_airgapped_archive(
|
|
"ghcr.io/freedomofpress/dangerzone/dangerzone", "test.tar"
|
|
)
|
|
|
|
|
|
def test_get_remote_signatures_error(fp: FakeProcess, mocker):
|
|
image = "ghcr.io/freedomofpress/dangerzone/dangerzone"
|
|
digest = "123456"
|
|
mocker.patch("dangerzone.updater.cosign.ensure_installed", return_value=True)
|
|
fp.register_subprocess(
|
|
["cosign", "download", "signature", f"{image}@sha256:{digest}"], returncode=1
|
|
)
|
|
with pytest.raises(errors.NoRemoteSignatures):
|
|
get_remote_signatures(image, digest)
|
|
|
|
|
|
def test_get_remote_signatures_empty(fp: FakeProcess, mocker):
|
|
image = "ghcr.io/freedomofpress/dangerzone/dangerzone"
|
|
digest = "123456"
|
|
mocker.patch("dangerzone.updater.cosign.ensure_installed", return_value=True)
|
|
fp.register_subprocess(
|
|
["cosign", "download", "signature", f"{image}@sha256:{digest}"],
|
|
stdout=json.dumps({}),
|
|
)
|
|
with pytest.raises(errors.NoRemoteSignatures):
|
|
get_remote_signatures(image, digest)
|
|
|
|
|
|
def test_get_remote_signatures_cosign_error(mocker, fp: FakeProcess):
|
|
image = "ghcr.io/freedomofpress/dangerzone/dangerzone"
|
|
digest = "123456"
|
|
mocker.patch("dangerzone.updater.cosign.ensure_installed", return_value=True)
|
|
fp.register_subprocess(
|
|
["cosign", "download", "signature", f"{image}@sha256:{digest}"],
|
|
returncode=1,
|
|
stderr="Error: no signatures associated",
|
|
)
|
|
with pytest.raises(errors.NoRemoteSignatures):
|
|
get_remote_signatures(image, digest)
|
|
|
|
|
|
def test_store_signatures_with_different_digests(
|
|
valid_signature, signature_other_digest, mocker, tmp_path
|
|
):
|
|
"""Test that store_signatures raises an error when a signature's digest doesn't match."""
|
|
signatures = [valid_signature, signature_other_digest]
|
|
image_digest = "sha256:123456"
|
|
|
|
# Mock the signatures path
|
|
signatures_path = tmp_path / "signatures"
|
|
signatures_path.mkdir()
|
|
mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path)
|
|
|
|
# Mock get_log_index_from_signatures
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.get_log_index_from_signatures",
|
|
return_value=100,
|
|
)
|
|
|
|
# Mock get_last_log_index
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.get_last_log_index",
|
|
return_value=50,
|
|
)
|
|
|
|
# Call store_signatures
|
|
with pytest.raises(errors.SignatureMismatch):
|
|
store_signatures(signatures, image_digest, TEST_PUBKEY_PATH)
|
|
|
|
# Verify that the signatures file was not created
|
|
assert not (signatures_path / f"{image_digest}.json").exists()
|
|
|
|
# Verify that the log index file was not updated
|
|
assert not (signatures_path / "last_log_index").exists()
|
|
|
|
|
|
def test_stores_signatures_updates_last_log_index(valid_signature, mocker, tmp_path):
|
|
"""Test that store_signatures updates the last log index file."""
|
|
signatures = [valid_signature]
|
|
# Extract the digest from the signature
|
|
image_digest = Signature(valid_signature).manifest_digest
|
|
|
|
# Mock the signatures path
|
|
signatures_path = tmp_path / "signatures"
|
|
signatures_path.mkdir()
|
|
mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path)
|
|
|
|
# Create an existing last_log_index file with a lower value
|
|
with open(signatures_path / "last_log_index", "w") as f:
|
|
f.write("50")
|
|
|
|
# Mock get_log_index_from_signatures to return a higher value
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.get_log_index_from_signatures",
|
|
return_value=100,
|
|
)
|
|
|
|
# Call store_signatures
|
|
store_signatures(signatures, image_digest, TEST_PUBKEY_PATH)
|
|
|
|
# Verify that the log index file was updated
|
|
assert (signatures_path / "last_log_index").exists()
|
|
with open(signatures_path / "last_log_index", "r") as f:
|
|
assert f.read() == "100"
|
|
|
|
|
|
def test_is_update_available_when_remote_image_available(mocker):
|
|
"""
|
|
Test that is_update_available returns True when a new image is available
|
|
and all checks pass
|
|
"""
|
|
# Mock is_new_remote_image_available to return True and digest
|
|
mocker.patch(
|
|
"dangerzone.updater.registry.is_new_remote_image_available",
|
|
return_value=(True, RANDOM_DIGEST),
|
|
)
|
|
|
|
# Mock check_signatures_and_logindex to not raise any exceptions
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.check_signatures_and_logindex",
|
|
return_value=[{"some": "signature"}],
|
|
)
|
|
|
|
# Call is_update_available
|
|
update_available, digest = is_update_available(
|
|
"ghcr.io/freedomofpress/dangerzone", "test.pub"
|
|
)
|
|
|
|
# Verify the result
|
|
assert update_available is True
|
|
assert digest == RANDOM_DIGEST
|
|
|
|
|
|
def test_is_update_available_when_no_remote_image(mocker):
|
|
"""
|
|
Test that is_update_available returns False when no remote image is available
|
|
"""
|
|
# Mock is_new_remote_image_available to return False
|
|
mocker.patch(
|
|
"dangerzone.updater.registry.is_new_remote_image_available",
|
|
return_value=(False, None),
|
|
)
|
|
|
|
# Call is_update_available
|
|
update_available, digest = is_update_available(
|
|
"ghcr.io/freedomofpress/dangerzone", "test.pub"
|
|
)
|
|
|
|
# Verify the result
|
|
assert update_available is False
|
|
assert digest is None
|
|
|
|
|
|
def test_is_update_available_with_invalid_log_index(mocker):
|
|
"""
|
|
Test that is_update_available returns False when the log index is invalid
|
|
"""
|
|
# Mock is_new_remote_image_available to return True
|
|
mocker.patch(
|
|
"dangerzone.updater.registry.is_new_remote_image_available",
|
|
return_value=(True, RANDOM_DIGEST),
|
|
)
|
|
|
|
# Mock check_signatures_and_logindex to raise InvalidLogIndex
|
|
mocker.patch(
|
|
"dangerzone.updater.signatures.check_signatures_and_logindex",
|
|
side_effect=errors.InvalidLogIndex("Invalid log index"),
|
|
)
|
|
|
|
# Call is_update_available
|
|
update_available, digest = is_update_available(
|
|
"ghcr.io/freedomofpress/dangerzone", "test.pub"
|
|
)
|
|
|
|
# Verify the result
|
|
assert update_available is False
|
|
assert digest is None
|
|
|
|
|
|
def test_verify_signature(valid_signature):
|
|
"""Test that verify_signature raises an error when the payload digest doesn't match."""
|
|
verify_signature(
|
|
valid_signature,
|
|
Signature(valid_signature).manifest_digest,
|
|
TEST_PUBKEY_PATH,
|
|
)
|
|
|
|
|
|
def test_verify_signature_tempered(tempered_signature):
|
|
"""Test that verify_signature raises an error when the payload digest doesn't match."""
|
|
# Call verify_signature and expect an error
|
|
with pytest.raises(errors.SignatureError):
|
|
verify_signature(
|
|
tempered_signature,
|
|
Signature(tempered_signature).manifest_digest,
|
|
TEST_PUBKEY_PATH,
|
|
)
|
|
|
|
|
|
def test_verify_signatures_empty_list():
|
|
with pytest.raises(errors.SignatureVerificationError):
|
|
verify_signatures([], "1234", TEST_PUBKEY_PATH)
|