import json import unittest from pathlib import Path import pytest from pytest_subprocess import FakeProcess from dangerzone import errors as dzerrors from dangerzone.updater import errors from dangerzone.updater.signatures import ( Signature, get_last_log_index, get_log_index_from_signatures, get_remote_signatures, is_update_available, load_and_verify_signatures, prepare_airgapped_archive, store_signatures, upgrade_container_image, verify_local_image, verify_signature, verify_signatures, ) ASSETS_PATH = Path(__file__).parent / "assets" TEST_PUBKEY_PATH = ASSETS_PATH / "test.pub.key" INVALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "invalid" VALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "valid" TEMPERED_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "tempered" RANDOM_DIGEST = "aacc9b586648bbe3040f2822153b1d5ead2779af45ff750fd6f04daf4a9f64b4" @pytest.fixture def valid_signature(): signature_file = next(VALID_SIGNATURES_PATH.glob("**/*.json")) with open(signature_file, "r") as signature_file: signatures = json.load(signature_file) return signatures.pop() @pytest.fixture def tempered_signature(): signature_file = next(TEMPERED_SIGNATURES_PATH.glob("**/*.json")) with open(signature_file, "r") as signature_file: signatures = json.load(signature_file) return signatures.pop() @pytest.fixture def signature_other_digest(valid_signature): signature = valid_signature.copy() signature["Bundle"]["Payload"]["digest"] = "sha256:123456" return signature def test_load_valid_signatures(mocker): mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", VALID_SIGNATURES_PATH) valid_signatures = list(VALID_SIGNATURES_PATH.glob("**/*.json")) assert len(valid_signatures) > 0 for file in valid_signatures: signatures = load_and_verify_signatures(file.stem, TEST_PUBKEY_PATH) assert isinstance(signatures, list) assert len(signatures) > 0 def test_load_invalid_signatures(mocker): mocker.patch( "dangerzone.updater.signatures.SIGNATURES_PATH", INVALID_SIGNATURES_PATH ) invalid_signatures = list(INVALID_SIGNATURES_PATH.glob("**/*.json")) assert len(invalid_signatures) > 0 for file in invalid_signatures: with pytest.raises(errors.SignatureError): load_and_verify_signatures(file.stem, TEST_PUBKEY_PATH) def test_load_tempered_signatures(mocker): mocker.patch( "dangerzone.updater.signatures.SIGNATURES_PATH", TEMPERED_SIGNATURES_PATH ) tempered_signatures = list(TEMPERED_SIGNATURES_PATH.glob("**/*.json")) assert len(tempered_signatures) > 0 for file in tempered_signatures: with pytest.raises(errors.SignatureError): load_and_verify_signatures(file.stem, TEST_PUBKEY_PATH) def test_get_log_index_from_signatures(): signatures = [{"Bundle": {"Payload": {"logIndex": 1}}}] assert get_log_index_from_signatures(signatures) == 1 def test_get_log_index_from_signatures_empty(): signatures = [] assert get_log_index_from_signatures(signatures) == 0 def test_get_log_index_from_malformed_signatures(): signatures = [{"Bundle": {"Payload": {"logIndex": "foo"}}}] assert get_log_index_from_signatures(signatures) == 0 def test_get_log_index_from_missing_log_index(): signatures = [{"Bundle": {"Payload": {}}}] assert get_log_index_from_signatures(signatures) == 0 def test_upgrade_container_image_if_already_up_to_date(mocker): mocker.patch( "dangerzone.updater.registry.is_new_remote_image_available", return_value=(False, None), ) with pytest.raises(errors.ImageAlreadyUpToDate): upgrade_container_image( "ghcr.io/freedomofpress/dangerzone/dangerzone", "sha256:123456", "test.pub" ) def test_upgrade_container_without_signatures(mocker): mocker.patch( "dangerzone.updater.registry.is_new_remote_image_available", return_value=(True, "sha256:123456"), ) mocker.patch("dangerzone.updater.signatures.get_remote_signatures", return_value=[]) with pytest.raises(errors.SignatureVerificationError): upgrade_container_image( "ghcr.io/freedomofpress/dangerzone/dangerzone", "sha256:123456", "test.pub", ) def test_upgrade_container_lower_log_index(mocker): image_digest = "4da441235e84e93518778827a5c5745d532d7a4079886e1647924bee7ef1c14d" signatures = load_and_verify_signatures( image_digest, TEST_PUBKEY_PATH, bypass_verification=True, signatures_path=VALID_SIGNATURES_PATH, ) mocker.patch( "dangerzone.updater.registry.is_new_remote_image_available", return_value=( True, image_digest, ), ) mocker.patch( "dangerzone.updater.signatures.get_remote_signatures", return_value=signatures, ) # Mock to avoid loosing time on test failures mocker.patch("dangerzone.container_utils.container_pull") # The log index of the incoming signatures is 168652066 mocker.patch( "dangerzone.updater.signatures.get_last_log_index", return_value=168652067, ) with pytest.raises(errors.InvalidLogIndex): upgrade_container_image( "ghcr.io/freedomofpress/dangerzone/dangerzone", image_digest, TEST_PUBKEY_PATH, ) def test_prepare_airgapped_archive_requires_digest(): with pytest.raises(errors.AirgappedImageDownloadError): prepare_airgapped_archive( "ghcr.io/freedomofpress/dangerzone/dangerzone", "test.tar" ) def test_get_remote_signatures_error(fp: FakeProcess, mocker): image = "ghcr.io/freedomofpress/dangerzone/dangerzone" digest = "123456" mocker.patch("dangerzone.updater.cosign.ensure_installed", return_value=True) fp.register_subprocess( ["cosign", "download", "signature", f"{image}@sha256:{digest}"], returncode=1 ) with pytest.raises(errors.NoRemoteSignatures): get_remote_signatures(image, digest) def test_get_remote_signatures_empty(fp: FakeProcess, mocker): image = "ghcr.io/freedomofpress/dangerzone/dangerzone" digest = "123456" mocker.patch("dangerzone.updater.cosign.ensure_installed", return_value=True) fp.register_subprocess( ["cosign", "download", "signature", f"{image}@sha256:{digest}"], stdout=json.dumps({}), ) with pytest.raises(errors.NoRemoteSignatures): get_remote_signatures(image, digest) def test_get_remote_signatures_cosign_error(mocker, fp: FakeProcess): image = "ghcr.io/freedomofpress/dangerzone/dangerzone" digest = "123456" mocker.patch("dangerzone.updater.cosign.ensure_installed", return_value=True) fp.register_subprocess( ["cosign", "download", "signature", f"{image}@sha256:{digest}"], returncode=1, stderr="Error: no signatures associated", ) with pytest.raises(errors.NoRemoteSignatures): get_remote_signatures(image, digest) def test_get_remote_signatures_cosign_error(mocker, fp: FakeProcess): image = "ghcr.io/freedomofpress/dangerzone/dangerzone" digest = "123456" mocker.patch("dangerzone.updater.cosign.ensure_installed", return_value=True) fp.register_subprocess( ["cosign", "download", "signature", f"{image}@sha256:{digest}"], returncode=1, stderr="Error: no signatures associated", ) with pytest.raises(errors.NoRemoteSignatures): get_remote_signatures(image, digest) def test_store_signatures_with_different_digests( valid_signature, signature_other_digest, mocker, tmp_path ): """Test that store_signatures raises an error when a signature's digest doesn't match.""" signatures = [valid_signature, signature_other_digest] image_digest = "sha256:123456" # Mock the signatures path signatures_path = tmp_path / "signatures" signatures_path.mkdir() mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path) # Mock get_log_index_from_signatures mocker.patch( "dangerzone.updater.signatures.get_log_index_from_signatures", return_value=100, ) # Mock get_last_log_index mocker.patch( "dangerzone.updater.signatures.get_last_log_index", return_value=50, ) # Call store_signatures with pytest.raises(errors.SignatureMismatch): store_signatures(signatures, image_digest, TEST_PUBKEY_PATH) # Verify that the signatures file was not created assert not (signatures_path / f"{image_digest}.json").exists() # Verify that the log index file was not created assert not (signatures_path / "last_log_index").exists() def test_stores_signatures_updates_last_log_index(valid_signature, mocker, tmp_path): """Test that store_signatures updates the last log index file.""" signatures = [valid_signature] # Extract the digest from the signature image_digest = Signature(valid_signature).manifest_digest # Mock the signatures path signatures_path = tmp_path / "signatures" signatures_path.mkdir() mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path) # Create an existing last_log_index file with a lower value with open(signatures_path / "last_log_index", "w") as f: f.write("50") # Mock get_log_index_from_signatures to return a higher value mocker.patch( "dangerzone.updater.signatures.get_log_index_from_signatures", return_value=100, ) # Call store_signatures with pytest.raises(errors.SignatureMismatch): store_signatures(signatures, image_digest, TEST_PUBKEY_PATH) ("dangerzone.updater.signatures.get_last_log_index",) # Verify that the signatures file was not created assert not (signatures_path / f"{image_digest}.json").exists() # Verify that the log index file was not updated assert not (signatures_path / "last_log_index").exists() def test_stores_signatures_updates_last_log_index(valid_signature, mocker, tmp_path): """Test that store_signatures updates the last log index file.""" signatures = [valid_signature] # Extract the digest from the signature image_digest = Signature(valid_signature).manifest_digest signatures = [valid_signature, signature_other_digest] breakpoint() valid_signature, signature_other_digest, mocker, tmp_path """Test that store_signatures raises an error when a signature's digest doesn't match.""" image_digest = "sha256:123456" # Mock the signatures path signatures_path = tmp_path / "signatures" signatures_path.mkdir() mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path) # Mock get_log_index_from_signatures mocker.patch( "dangerzone.updater.signatures.get_log_index_from_signatures", return_value=100, ) # Mock get_last_log_index mocker.patch( "dangerzone.updater.signatures.get_last_log_index", return_value=50, ) def test_stores_signatures_updates_last_log_index(): pass # Mock the signatures path signatures_path = tmp_path / "signatures" signatures_path.mkdir() mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path) # Mock get_log_index_from_signatures mocker.patch( "dangerzone.updater.signatures.get_log_index_from_signatures", return_value=100, ) # Mock get_last_log_index mocker.patch( "dangerzone.updater.signatures.get_last_log_index", return_value=50, ) def test_get_file_digest(): # Mock the signatures path signatures_path = tmp_path / "signatures" signatures_path.mkdir() mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path) # Create an existing last_log_index file with a lower value with open(signatures_path / "last_log_index", "w") as f: f.write("50") # Mock get_log_index_from_signatures to return a higher value mocker.patch( "dangerzone.updater.signatures.get_log_index_from_signatures", return_value=100, ) # Call store_signatures store_signatures(signatures, image_digest, TEST_PUBKEY_PATH) # Verify that the log index file was updated assert (signatures_path / "last_log_index").exists() with open(signatures_path / "last_log_index", "r") as f: assert f.read() == "100" def test_is_update_available_when_remote_image_available(mocker): """ Test that is_update_available returns True when a new image is available and all checks pass """ # Mock is_new_remote_image_available to return True and digest mocker.patch( "dangerzone.updater.registry.is_new_remote_image_available", return_value=(True, RANDOM_DIGEST), ) # Mock check_signatures_and_logindex to not raise any exceptions mocker.patch( "dangerzone.updater.signatures.check_signatures_and_logindex", return_value=[{"some": "signature"}], ) # Call is_update_available update_available, digest = is_update_available( "ghcr.io/freedomofpress/dangerzone", "test.pub" ) # Verify the result assert update_available is True assert digest == RANDOM_DIGEST def test_is_update_available_when_no_remote_image(mocker): """ Test that is_update_available returns False when no remote image is available """ # Mock is_new_remote_image_available to return False mocker.patch( "dangerzone.updater.registry.is_new_remote_image_available", return_value=(False, None), ) # Call is_update_available update_available, digest = is_update_available( "ghcr.io/freedomofpress/dangerzone", "test.pub" ) # Verify the result assert update_available is False assert digest is None def test_is_update_available_with_invalid_log_index(mocker): """ Test that is_update_available returns False when the log index is invalid """ # Mock is_new_remote_image_available to return True mocker.patch( "dangerzone.updater.registry.is_new_remote_image_available", return_value=(True, RANDOM_DIGEST), ) # Mock check_signatures_and_logindex to raise InvalidLogIndex mocker.patch( "dangerzone.updater.signatures.check_signatures_and_logindex", side_effect=errors.InvalidLogIndex("Invalid log index"), ) # Call is_update_available update_available, digest = is_update_available( "ghcr.io/freedomofpress/dangerzone", "test.pub" ) # Verify the result assert update_available is False assert digest is None def test_verify_signature(valid_signature): """Test that verify_signature raises an error when the payload digest doesn't match.""" verify_signature( valid_signature, Signature(valid_signature).manifest_digest, TEST_PUBKEY_PATH, ) def test_verify_signature_tempered(tempered_signature): """Test that verify_signature raises an error when the payload digest doesn't match.""" # Call verify_signature and expect an error with pytest.raises(errors.SignatureError): verify_signature( tempered_signature, Signature(tempered_signature).manifest_digest, TEST_PUBKEY_PATH, ) def test_verify_signatures_empty_list(): with pytest.raises(errors.SignatureVerificationError): verify_signatures([], "1234", TEST_PUBKEY_PATH) def test_verify_signatures_not_0(): pass