diff --git a/dangerzone/updater/signatures.py b/dangerzone/updater/signatures.py index cc006ad..7113fc5 100644 --- a/dangerzone/updater/signatures.py +++ b/dangerzone/updater/signatures.py @@ -424,8 +424,7 @@ def get_remote_signatures(image: str, digest: str) -> List[Dict]: # Remove the last return, split on newlines, convert from JSON signatures_raw = process.stdout.decode("utf-8").strip().split("\n") - signatures = list(map(json.loads, signatures_raw)) - breakpoint() + signatures = list(filter(bool, map(json.loads, signatures_raw))) if len(signatures) < 1: raise errors.NoRemoteSignatures("No signatures found for the image") return signatures diff --git a/tests/conftest.py b/tests/conftest.py index 4a80f17..5afbb58 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,6 +13,13 @@ from dangerzone.gui import Application sys.dangerzone_dev = True # type: ignore[attr-defined] +ASSETS_PATH = Path(__file__).parent / "assets" +TEST_PUBKEY_PATH = ASSETS_PATH / "test.pub.key" +INVALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "invalid" +VALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "valid" +TEMPERED_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "tempered" + + # Use this fixture to make `pytest-qt` invoke our custom QApplication. # See https://pytest-qt.readthedocs.io/en/latest/qapplication.html#testing-custom-qapplications @pytest.fixture(scope="session") @@ -133,6 +140,11 @@ for_each_doc = pytest.mark.parametrize( ) +@pytest.fixture +def signature(): + return {} + + # External Docs - base64 docs encoded for externally sourced documents # XXX to reduce the chance of accidentally opening them test_docs_external_dir = Path(__file__).parent.joinpath(SAMPLE_EXTERNAL_DIRECTORY) diff --git a/tests/test_registry.py b/tests/test_registry.py new file mode 100644 index 0000000..efbf576 --- /dev/null +++ b/tests/test_registry.py @@ -0,0 +1,238 @@ +import hashlib + +import pytest +import requests +from pytest_mock import MockerFixture + +from dangerzone.updater.registry import ( + Image, + _get_auth_header, + _url, + get_manifest, + get_manifest_digest, + list_tags, + parse_image_location, +) + + +def test_parse_image_location_no_tag(): + """Test that parse_image_location correctly handles an image location without a tag.""" + image_str = "ghcr.io/freedomofpress/dangerzone" + image = parse_image_location(image_str) + + assert isinstance(image, Image) + assert image.registry == "ghcr.io" + assert image.namespace == "freedomofpress" + assert image.image_name == "dangerzone" + assert image.tag == "latest" # Default tag should be "latest" + assert image.digest is None + + +def test_parse_image_location_with_tag(): + """Test that parse_image_location correctly handles an image location with a tag.""" + image_str = "ghcr.io/freedomofpress/dangerzone:v0.4.2" + image = parse_image_location(image_str) + + assert isinstance(image, Image) + assert image.registry == "ghcr.io" + assert image.namespace == "freedomofpress" + assert image.image_name == "dangerzone" + assert image.tag == "v0.4.2" + + +def test_parse_image_location_tag_plus_digest(): + """Test that parse_image_location handles an image location with a tag that includes a digest.""" + image_str = ( + "ghcr.io/freedomofpress/dangerzone" + ":20250205-0.8.0-148-ge67fbc1" + "@sha256:19e8eacd75879d05f6621c2ea8dd955e68ee3e07b41b9d53f4c8cc9929a68a67" + ) + + image = parse_image_location(image_str) + + assert isinstance(image, Image) + assert image.registry == "ghcr.io" + assert image.namespace == "freedomofpress" + assert image.image_name == "dangerzone" + assert image.tag == "20250205-0.8.0-148-ge67fbc1" + assert ( + image.digest + == "sha256:19e8eacd75879d05f6621c2ea8dd955e68ee3e07b41b9d53f4c8cc9929a68a67" + ) + + +def test_parse_invalid_image_location(): + """Test that parse_image_location raises an error for invalid image locations.""" + invalid_image_locations = [ + "ghcr.io/dangerzone", # Missing namespace + "ghcr.io/freedomofpress/dangerzone:", # Empty tag + "freedomofpress/dangerzone", # Missing registry + "ghcr.io:freedomofpress/dangerzone", # Invalid format + "", # Empty string + ] + + for invalid_image in invalid_image_locations: + with pytest.raises(ValueError, match="Malformed image location"): + parse_image_location(invalid_image) + + +def test_list_tags(mocker: MockerFixture): + """Test that list_tags correctly retrieves tags from the registry.""" + # Mock the authentication response + image_str = "ghcr.io/freedomofpress/dangerzone" + + # Mock requests.get to return appropriate values for both calls + mock_response_auth = mocker.Mock() + mock_response_auth.json.return_value = {"token": "dummy_token"} + mock_response_auth.raise_for_status.return_value = None + + mock_response_tags = mocker.Mock() + mock_response_tags.json.return_value = { + "tags": ["v0.4.0", "v0.4.1", "v0.4.2", "latest"] + } + mock_response_tags.raise_for_status.return_value = None + + # Setup the mock to return different responses for each URL + def mock_get(url, **kwargs): + if "token" in url: + return mock_response_auth + else: + return mock_response_tags + + mocker.patch("requests.get", side_effect=mock_get) + + # Call the function + tags = list_tags(image_str) + + # Verify the result + assert tags == ["v0.4.0", "v0.4.1", "v0.4.2", "latest"] + + +def test_list_tags_auth_error(mocker: MockerFixture): + """Test that list_tags handles authentication errors correctly.""" + image_str = "ghcr.io/freedomofpress/dangerzone" + + # Mock requests.get to raise an HTTPError + mock_response = mocker.Mock() + mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError( + "401 Client Error: Unauthorized" + ) + + mocker.patch("requests.get", return_value=mock_response) + + # Call the function and expect an error + with pytest.raises(requests.exceptions.HTTPError): + list_tags(image_str) + + +def test_list_tags_registry_error(mocker: MockerFixture): + """Test that list_tags handles registry errors correctly.""" + image_str = "ghcr.io/freedomofpress/dangerzone" + + # Mock requests.get to return success for auth but error for tags + mock_response_auth = mocker.Mock() + mock_response_auth.json.return_value = {"token": "dummy_token"} + mock_response_auth.raise_for_status.return_value = None + + mock_response_tags = mocker.Mock() + mock_response_tags.raise_for_status.side_effect = requests.exceptions.HTTPError( + "404 Client Error: Not Found" + ) + + # Setup the mock to return different responses for each URL + def mock_get(url, **kwargs): + if "token" in url: + return mock_response_auth + else: + return mock_response_tags + + mocker.patch("requests.get", side_effect=mock_get) + + # Call the function and expect an error + with pytest.raises(requests.exceptions.HTTPError): + list_tags(image_str) + + +def test_get_manifest(mocker: MockerFixture): + """Test that get_manifest correctly retrieves manifests from the registry.""" + image_str = "ghcr.io/freedomofpress/dangerzone:v0.4.2" + + # Mock the responses + manifest_content = { + "schemaVersion": 2, + "mediaType": "application/vnd.docker.distribution.manifest.v2+json", + "config": { + "mediaType": "application/vnd.docker.container.image.v1+json", + "size": 1234, + "digest": "sha256:abc123def456", + }, + "layers": [ + { + "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip", + "size": 12345, + "digest": "sha256:layer1", + } + ], + } + + mock_response_auth = mocker.Mock() + mock_response_auth.json.return_value = {"token": "dummy_token"} + mock_response_auth.raise_for_status.return_value = None + + mock_response_manifest = mocker.Mock() + mock_response_manifest.json.return_value = manifest_content + mock_response_manifest.status_code = 200 + mock_response_manifest.raise_for_status.return_value = None + + # Setup the mock to return different responses for each URL + def mock_get(url, **kwargs): + if "token" in url: + return mock_response_auth + else: + return mock_response_manifest + + mocker.patch("requests.get", side_effect=mock_get) + + # Call the function + response = get_manifest(image_str) + + # Verify the result + assert response.status_code == 200 + assert response.json() == manifest_content + + +def test_get_manifest_digest(): + """Test that get_manifest_digest correctly calculates the manifest digest.""" + # Create a sample manifest content + manifest_content = b'{"schemaVersion":2,"mediaType":"application/vnd.docker.distribution.manifest.v2+json"}' + + # Calculate the expected digest manually + import hashlib + + expected_digest = hashlib.sha256(manifest_content).hexdigest() + + # Call the function with the content directly + digest = get_manifest_digest("unused_image_str", manifest_content) + + # Verify the result + assert digest == expected_digest + + +def test_get_manifest_digest_from_registry(mocker: MockerFixture): + """Test that get_manifest_digest correctly retrieves and calculates digests from the registry.""" + image_str = "ghcr.io/freedomofpress/dangerzone:v0.4.2" + + # Sample manifest content + manifest_content = b'{"schemaVersion":2,"mediaType":"application/vnd.docker.distribution.manifest.v2+json"}' + expected_digest = hashlib.sha256(manifest_content).hexdigest() + + # Mock get_manifest + mock_response = mocker.Mock() + mock_response.content = manifest_content + mocker.patch("dangerzone.updater.registry.get_manifest", return_value=mock_response) + + # Call the function + digest = get_manifest_digest(image_str) + + # Verify the result + assert digest == expected_digest diff --git a/tests/test_signatures.py b/tests/test_signatures.py index 2681896..da9fb99 100644 --- a/tests/test_signatures.py +++ b/tests/test_signatures.py @@ -32,6 +32,29 @@ TEMPERED_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "tempered" RANDOM_DIGEST = "aacc9b586648bbe3040f2822153b1d5ead2779af45ff750fd6f04daf4a9f64b4" +@pytest.fixture +def valid_signature(): + signature_file = next(VALID_SIGNATURES_PATH.glob("**/*.json")) + with open(signature_file, "r") as signature_file: + signatures = json.load(signature_file) + return signatures.pop() + + +@pytest.fixture +def tempered_signature(): + signature_file = next(TEMPERED_SIGNATURES_PATH.glob("**/*.json")) + with open(signature_file, "r") as signature_file: + signatures = json.load(signature_file) + return signatures.pop() + + +@pytest.fixture +def signature_other_digest(valid_signature): + signature = valid_signature.copy() + signature["Bundle"]["Payload"]["digest"] = "sha256:123456" + return signature + + def test_load_valid_signatures(mocker): mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", VALID_SIGNATURES_PATH) valid_signatures = list(VALID_SIGNATURES_PATH.glob("**/*.json")) @@ -167,21 +190,30 @@ def test_get_remote_signatures_empty(fp: FakeProcess, mocker): mocker.patch("dangerzone.updater.cosign.ensure_installed", return_value=True) fp.register_subprocess( ["cosign", "download", "signature", f"{image}@sha256:{digest}"], - stdout=json.dumps([]), + stdout=json.dumps({}), ) with pytest.raises(errors.NoRemoteSignatures): get_remote_signatures(image, digest) -def test_get_remote_signatures_cosign_error(): - pass +def test_get_remote_signatures_cosign_error(mocker, fp: FakeProcess): + image = "ghcr.io/freedomofpress/dangerzone/dangerzone" + digest = "123456" + mocker.patch("dangerzone.updater.cosign.ensure_installed", return_value=True) + fp.register_subprocess( + ["cosign", "download", "signature", f"{image}@sha256:{digest}"], + returncode=1, + stderr="Error: no signatures associated", + ) + with pytest.raises(errors.NoRemoteSignatures): + get_remote_signatures(image, digest) def test_store_signatures_with_different_digests( valid_signature, signature_other_digest, mocker, tmp_path ): """Test that store_signatures raises an error when a signature's digest doesn't match.""" - + signatures = [valid_signature, signature_other_digest] image_digest = "sha256:123456" # Mock the signatures path @@ -196,8 +228,19 @@ def test_store_signatures_with_different_digests( ) # Mock get_last_log_index + mocker.patch( + "dangerzone.updater.signatures.get_last_log_index", + return_value=50, + ) + + # Call store_signatures + with pytest.raises(errors.SignatureMismatch): + store_signatures(signatures, image_digest, TEST_PUBKEY_PATH) + # Verify that the signatures file was not created assert not (signatures_path / f"{image_digest}.json").exists() + + # Verify that the log index file was not updated assert not (signatures_path / "last_log_index").exists() @@ -238,6 +281,34 @@ def test_stores_signatures_updates_last_log_index(valid_signature, mocker, tmp_p signatures = [valid_signature] # Extract the digest from the signature image_digest = Signature(valid_signature).manifest_digest + signatures = [valid_signature, signature_other_digest] + breakpoint() + valid_signature, signature_other_digest, mocker, tmp_path + + """Test that store_signatures raises an error when a signature's digest doesn't match.""" + + image_digest = "sha256:123456" + + # Mock the signatures path + signatures_path = tmp_path / "signatures" + signatures_path.mkdir() + mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path) + + # Mock get_log_index_from_signatures + mocker.patch( + "dangerzone.updater.signatures.get_log_index_from_signatures", + return_value=100, + ) + + # Mock get_last_log_index + mocker.patch( + "dangerzone.updater.signatures.get_last_log_index", + return_value=50, + ) + + +def test_stores_signatures_updates_last_log_index(): + pass def test_get_file_digest(): @@ -310,5 +381,6 @@ def test_verify_signature_tempered(tempered_signature): ) -def test_verify_signatures_not_0(): - pass +def test_verify_signatures_empty_list(): + with pytest.raises(errors.SignatureVerificationError): + verify_signatures([], "1234", TEST_PUBKEY_PATH)