mirror of
https://github.com/freedomofpress/dangerzone.git
synced 2025-05-01 11:12:24 +02:00
Add signatures tests
This commit is contained in:
parent
b6fa4fa35b
commit
affb954103
4 changed files with 329 additions and 8 deletions
|
@ -424,8 +424,7 @@ def get_remote_signatures(image: str, digest: str) -> List[Dict]:
|
||||||
|
|
||||||
# Remove the last return, split on newlines, convert from JSON
|
# Remove the last return, split on newlines, convert from JSON
|
||||||
signatures_raw = process.stdout.decode("utf-8").strip().split("\n")
|
signatures_raw = process.stdout.decode("utf-8").strip().split("\n")
|
||||||
signatures = list(map(json.loads, signatures_raw))
|
signatures = list(filter(bool, map(json.loads, signatures_raw)))
|
||||||
breakpoint()
|
|
||||||
if len(signatures) < 1:
|
if len(signatures) < 1:
|
||||||
raise errors.NoRemoteSignatures("No signatures found for the image")
|
raise errors.NoRemoteSignatures("No signatures found for the image")
|
||||||
return signatures
|
return signatures
|
||||||
|
|
|
@ -13,6 +13,13 @@ from dangerzone.gui import Application
|
||||||
sys.dangerzone_dev = True # type: ignore[attr-defined]
|
sys.dangerzone_dev = True # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
|
||||||
|
ASSETS_PATH = Path(__file__).parent / "assets"
|
||||||
|
TEST_PUBKEY_PATH = ASSETS_PATH / "test.pub.key"
|
||||||
|
INVALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "invalid"
|
||||||
|
VALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "valid"
|
||||||
|
TEMPERED_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "tempered"
|
||||||
|
|
||||||
|
|
||||||
# Use this fixture to make `pytest-qt` invoke our custom QApplication.
|
# Use this fixture to make `pytest-qt` invoke our custom QApplication.
|
||||||
# See https://pytest-qt.readthedocs.io/en/latest/qapplication.html#testing-custom-qapplications
|
# See https://pytest-qt.readthedocs.io/en/latest/qapplication.html#testing-custom-qapplications
|
||||||
@pytest.fixture(scope="session")
|
@pytest.fixture(scope="session")
|
||||||
|
@ -133,6 +140,11 @@ for_each_doc = pytest.mark.parametrize(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def signature():
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
# External Docs - base64 docs encoded for externally sourced documents
|
# External Docs - base64 docs encoded for externally sourced documents
|
||||||
# XXX to reduce the chance of accidentally opening them
|
# XXX to reduce the chance of accidentally opening them
|
||||||
test_docs_external_dir = Path(__file__).parent.joinpath(SAMPLE_EXTERNAL_DIRECTORY)
|
test_docs_external_dir = Path(__file__).parent.joinpath(SAMPLE_EXTERNAL_DIRECTORY)
|
||||||
|
|
238
tests/test_registry.py
Normal file
238
tests/test_registry.py
Normal file
|
@ -0,0 +1,238 @@
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import requests
|
||||||
|
from pytest_mock import MockerFixture
|
||||||
|
|
||||||
|
from dangerzone.updater.registry import (
|
||||||
|
Image,
|
||||||
|
_get_auth_header,
|
||||||
|
_url,
|
||||||
|
get_manifest,
|
||||||
|
get_manifest_digest,
|
||||||
|
list_tags,
|
||||||
|
parse_image_location,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_image_location_no_tag():
|
||||||
|
"""Test that parse_image_location correctly handles an image location without a tag."""
|
||||||
|
image_str = "ghcr.io/freedomofpress/dangerzone"
|
||||||
|
image = parse_image_location(image_str)
|
||||||
|
|
||||||
|
assert isinstance(image, Image)
|
||||||
|
assert image.registry == "ghcr.io"
|
||||||
|
assert image.namespace == "freedomofpress"
|
||||||
|
assert image.image_name == "dangerzone"
|
||||||
|
assert image.tag == "latest" # Default tag should be "latest"
|
||||||
|
assert image.digest is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_image_location_with_tag():
|
||||||
|
"""Test that parse_image_location correctly handles an image location with a tag."""
|
||||||
|
image_str = "ghcr.io/freedomofpress/dangerzone:v0.4.2"
|
||||||
|
image = parse_image_location(image_str)
|
||||||
|
|
||||||
|
assert isinstance(image, Image)
|
||||||
|
assert image.registry == "ghcr.io"
|
||||||
|
assert image.namespace == "freedomofpress"
|
||||||
|
assert image.image_name == "dangerzone"
|
||||||
|
assert image.tag == "v0.4.2"
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_image_location_tag_plus_digest():
|
||||||
|
"""Test that parse_image_location handles an image location with a tag that includes a digest."""
|
||||||
|
image_str = (
|
||||||
|
"ghcr.io/freedomofpress/dangerzone"
|
||||||
|
":20250205-0.8.0-148-ge67fbc1"
|
||||||
|
"@sha256:19e8eacd75879d05f6621c2ea8dd955e68ee3e07b41b9d53f4c8cc9929a68a67"
|
||||||
|
)
|
||||||
|
|
||||||
|
image = parse_image_location(image_str)
|
||||||
|
|
||||||
|
assert isinstance(image, Image)
|
||||||
|
assert image.registry == "ghcr.io"
|
||||||
|
assert image.namespace == "freedomofpress"
|
||||||
|
assert image.image_name == "dangerzone"
|
||||||
|
assert image.tag == "20250205-0.8.0-148-ge67fbc1"
|
||||||
|
assert (
|
||||||
|
image.digest
|
||||||
|
== "sha256:19e8eacd75879d05f6621c2ea8dd955e68ee3e07b41b9d53f4c8cc9929a68a67"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_invalid_image_location():
|
||||||
|
"""Test that parse_image_location raises an error for invalid image locations."""
|
||||||
|
invalid_image_locations = [
|
||||||
|
"ghcr.io/dangerzone", # Missing namespace
|
||||||
|
"ghcr.io/freedomofpress/dangerzone:", # Empty tag
|
||||||
|
"freedomofpress/dangerzone", # Missing registry
|
||||||
|
"ghcr.io:freedomofpress/dangerzone", # Invalid format
|
||||||
|
"", # Empty string
|
||||||
|
]
|
||||||
|
|
||||||
|
for invalid_image in invalid_image_locations:
|
||||||
|
with pytest.raises(ValueError, match="Malformed image location"):
|
||||||
|
parse_image_location(invalid_image)
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_tags(mocker: MockerFixture):
|
||||||
|
"""Test that list_tags correctly retrieves tags from the registry."""
|
||||||
|
# Mock the authentication response
|
||||||
|
image_str = "ghcr.io/freedomofpress/dangerzone"
|
||||||
|
|
||||||
|
# Mock requests.get to return appropriate values for both calls
|
||||||
|
mock_response_auth = mocker.Mock()
|
||||||
|
mock_response_auth.json.return_value = {"token": "dummy_token"}
|
||||||
|
mock_response_auth.raise_for_status.return_value = None
|
||||||
|
|
||||||
|
mock_response_tags = mocker.Mock()
|
||||||
|
mock_response_tags.json.return_value = {
|
||||||
|
"tags": ["v0.4.0", "v0.4.1", "v0.4.2", "latest"]
|
||||||
|
}
|
||||||
|
mock_response_tags.raise_for_status.return_value = None
|
||||||
|
|
||||||
|
# Setup the mock to return different responses for each URL
|
||||||
|
def mock_get(url, **kwargs):
|
||||||
|
if "token" in url:
|
||||||
|
return mock_response_auth
|
||||||
|
else:
|
||||||
|
return mock_response_tags
|
||||||
|
|
||||||
|
mocker.patch("requests.get", side_effect=mock_get)
|
||||||
|
|
||||||
|
# Call the function
|
||||||
|
tags = list_tags(image_str)
|
||||||
|
|
||||||
|
# Verify the result
|
||||||
|
assert tags == ["v0.4.0", "v0.4.1", "v0.4.2", "latest"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_tags_auth_error(mocker: MockerFixture):
|
||||||
|
"""Test that list_tags handles authentication errors correctly."""
|
||||||
|
image_str = "ghcr.io/freedomofpress/dangerzone"
|
||||||
|
|
||||||
|
# Mock requests.get to raise an HTTPError
|
||||||
|
mock_response = mocker.Mock()
|
||||||
|
mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError(
|
||||||
|
"401 Client Error: Unauthorized"
|
||||||
|
)
|
||||||
|
|
||||||
|
mocker.patch("requests.get", return_value=mock_response)
|
||||||
|
|
||||||
|
# Call the function and expect an error
|
||||||
|
with pytest.raises(requests.exceptions.HTTPError):
|
||||||
|
list_tags(image_str)
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_tags_registry_error(mocker: MockerFixture):
|
||||||
|
"""Test that list_tags handles registry errors correctly."""
|
||||||
|
image_str = "ghcr.io/freedomofpress/dangerzone"
|
||||||
|
|
||||||
|
# Mock requests.get to return success for auth but error for tags
|
||||||
|
mock_response_auth = mocker.Mock()
|
||||||
|
mock_response_auth.json.return_value = {"token": "dummy_token"}
|
||||||
|
mock_response_auth.raise_for_status.return_value = None
|
||||||
|
|
||||||
|
mock_response_tags = mocker.Mock()
|
||||||
|
mock_response_tags.raise_for_status.side_effect = requests.exceptions.HTTPError(
|
||||||
|
"404 Client Error: Not Found"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Setup the mock to return different responses for each URL
|
||||||
|
def mock_get(url, **kwargs):
|
||||||
|
if "token" in url:
|
||||||
|
return mock_response_auth
|
||||||
|
else:
|
||||||
|
return mock_response_tags
|
||||||
|
|
||||||
|
mocker.patch("requests.get", side_effect=mock_get)
|
||||||
|
|
||||||
|
# Call the function and expect an error
|
||||||
|
with pytest.raises(requests.exceptions.HTTPError):
|
||||||
|
list_tags(image_str)
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_manifest(mocker: MockerFixture):
|
||||||
|
"""Test that get_manifest correctly retrieves manifests from the registry."""
|
||||||
|
image_str = "ghcr.io/freedomofpress/dangerzone:v0.4.2"
|
||||||
|
|
||||||
|
# Mock the responses
|
||||||
|
manifest_content = {
|
||||||
|
"schemaVersion": 2,
|
||||||
|
"mediaType": "application/vnd.docker.distribution.manifest.v2+json",
|
||||||
|
"config": {
|
||||||
|
"mediaType": "application/vnd.docker.container.image.v1+json",
|
||||||
|
"size": 1234,
|
||||||
|
"digest": "sha256:abc123def456",
|
||||||
|
},
|
||||||
|
"layers": [
|
||||||
|
{
|
||||||
|
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
|
||||||
|
"size": 12345,
|
||||||
|
"digest": "sha256:layer1",
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
mock_response_auth = mocker.Mock()
|
||||||
|
mock_response_auth.json.return_value = {"token": "dummy_token"}
|
||||||
|
mock_response_auth.raise_for_status.return_value = None
|
||||||
|
|
||||||
|
mock_response_manifest = mocker.Mock()
|
||||||
|
mock_response_manifest.json.return_value = manifest_content
|
||||||
|
mock_response_manifest.status_code = 200
|
||||||
|
mock_response_manifest.raise_for_status.return_value = None
|
||||||
|
|
||||||
|
# Setup the mock to return different responses for each URL
|
||||||
|
def mock_get(url, **kwargs):
|
||||||
|
if "token" in url:
|
||||||
|
return mock_response_auth
|
||||||
|
else:
|
||||||
|
return mock_response_manifest
|
||||||
|
|
||||||
|
mocker.patch("requests.get", side_effect=mock_get)
|
||||||
|
|
||||||
|
# Call the function
|
||||||
|
response = get_manifest(image_str)
|
||||||
|
|
||||||
|
# Verify the result
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json() == manifest_content
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_manifest_digest():
|
||||||
|
"""Test that get_manifest_digest correctly calculates the manifest digest."""
|
||||||
|
# Create a sample manifest content
|
||||||
|
manifest_content = b'{"schemaVersion":2,"mediaType":"application/vnd.docker.distribution.manifest.v2+json"}'
|
||||||
|
|
||||||
|
# Calculate the expected digest manually
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
expected_digest = hashlib.sha256(manifest_content).hexdigest()
|
||||||
|
|
||||||
|
# Call the function with the content directly
|
||||||
|
digest = get_manifest_digest("unused_image_str", manifest_content)
|
||||||
|
|
||||||
|
# Verify the result
|
||||||
|
assert digest == expected_digest
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_manifest_digest_from_registry(mocker: MockerFixture):
|
||||||
|
"""Test that get_manifest_digest correctly retrieves and calculates digests from the registry."""
|
||||||
|
image_str = "ghcr.io/freedomofpress/dangerzone:v0.4.2"
|
||||||
|
|
||||||
|
# Sample manifest content
|
||||||
|
manifest_content = b'{"schemaVersion":2,"mediaType":"application/vnd.docker.distribution.manifest.v2+json"}'
|
||||||
|
expected_digest = hashlib.sha256(manifest_content).hexdigest()
|
||||||
|
|
||||||
|
# Mock get_manifest
|
||||||
|
mock_response = mocker.Mock()
|
||||||
|
mock_response.content = manifest_content
|
||||||
|
mocker.patch("dangerzone.updater.registry.get_manifest", return_value=mock_response)
|
||||||
|
|
||||||
|
# Call the function
|
||||||
|
digest = get_manifest_digest(image_str)
|
||||||
|
|
||||||
|
# Verify the result
|
||||||
|
assert digest == expected_digest
|
|
@ -32,6 +32,29 @@ TEMPERED_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "tempered"
|
||||||
RANDOM_DIGEST = "aacc9b586648bbe3040f2822153b1d5ead2779af45ff750fd6f04daf4a9f64b4"
|
RANDOM_DIGEST = "aacc9b586648bbe3040f2822153b1d5ead2779af45ff750fd6f04daf4a9f64b4"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def valid_signature():
|
||||||
|
signature_file = next(VALID_SIGNATURES_PATH.glob("**/*.json"))
|
||||||
|
with open(signature_file, "r") as signature_file:
|
||||||
|
signatures = json.load(signature_file)
|
||||||
|
return signatures.pop()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def tempered_signature():
|
||||||
|
signature_file = next(TEMPERED_SIGNATURES_PATH.glob("**/*.json"))
|
||||||
|
with open(signature_file, "r") as signature_file:
|
||||||
|
signatures = json.load(signature_file)
|
||||||
|
return signatures.pop()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def signature_other_digest(valid_signature):
|
||||||
|
signature = valid_signature.copy()
|
||||||
|
signature["Bundle"]["Payload"]["digest"] = "sha256:123456"
|
||||||
|
return signature
|
||||||
|
|
||||||
|
|
||||||
def test_load_valid_signatures(mocker):
|
def test_load_valid_signatures(mocker):
|
||||||
mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", VALID_SIGNATURES_PATH)
|
mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", VALID_SIGNATURES_PATH)
|
||||||
valid_signatures = list(VALID_SIGNATURES_PATH.glob("**/*.json"))
|
valid_signatures = list(VALID_SIGNATURES_PATH.glob("**/*.json"))
|
||||||
|
@ -167,21 +190,30 @@ def test_get_remote_signatures_empty(fp: FakeProcess, mocker):
|
||||||
mocker.patch("dangerzone.updater.cosign.ensure_installed", return_value=True)
|
mocker.patch("dangerzone.updater.cosign.ensure_installed", return_value=True)
|
||||||
fp.register_subprocess(
|
fp.register_subprocess(
|
||||||
["cosign", "download", "signature", f"{image}@sha256:{digest}"],
|
["cosign", "download", "signature", f"{image}@sha256:{digest}"],
|
||||||
stdout=json.dumps([]),
|
stdout=json.dumps({}),
|
||||||
)
|
)
|
||||||
with pytest.raises(errors.NoRemoteSignatures):
|
with pytest.raises(errors.NoRemoteSignatures):
|
||||||
get_remote_signatures(image, digest)
|
get_remote_signatures(image, digest)
|
||||||
|
|
||||||
|
|
||||||
def test_get_remote_signatures_cosign_error():
|
def test_get_remote_signatures_cosign_error(mocker, fp: FakeProcess):
|
||||||
pass
|
image = "ghcr.io/freedomofpress/dangerzone/dangerzone"
|
||||||
|
digest = "123456"
|
||||||
|
mocker.patch("dangerzone.updater.cosign.ensure_installed", return_value=True)
|
||||||
|
fp.register_subprocess(
|
||||||
|
["cosign", "download", "signature", f"{image}@sha256:{digest}"],
|
||||||
|
returncode=1,
|
||||||
|
stderr="Error: no signatures associated",
|
||||||
|
)
|
||||||
|
with pytest.raises(errors.NoRemoteSignatures):
|
||||||
|
get_remote_signatures(image, digest)
|
||||||
|
|
||||||
|
|
||||||
def test_store_signatures_with_different_digests(
|
def test_store_signatures_with_different_digests(
|
||||||
valid_signature, signature_other_digest, mocker, tmp_path
|
valid_signature, signature_other_digest, mocker, tmp_path
|
||||||
):
|
):
|
||||||
"""Test that store_signatures raises an error when a signature's digest doesn't match."""
|
"""Test that store_signatures raises an error when a signature's digest doesn't match."""
|
||||||
|
signatures = [valid_signature, signature_other_digest]
|
||||||
image_digest = "sha256:123456"
|
image_digest = "sha256:123456"
|
||||||
|
|
||||||
# Mock the signatures path
|
# Mock the signatures path
|
||||||
|
@ -196,8 +228,19 @@ def test_store_signatures_with_different_digests(
|
||||||
)
|
)
|
||||||
|
|
||||||
# Mock get_last_log_index
|
# Mock get_last_log_index
|
||||||
|
mocker.patch(
|
||||||
|
"dangerzone.updater.signatures.get_last_log_index",
|
||||||
|
return_value=50,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Call store_signatures
|
||||||
|
with pytest.raises(errors.SignatureMismatch):
|
||||||
|
store_signatures(signatures, image_digest, TEST_PUBKEY_PATH)
|
||||||
|
|
||||||
# Verify that the signatures file was not created
|
# Verify that the signatures file was not created
|
||||||
assert not (signatures_path / f"{image_digest}.json").exists()
|
assert not (signatures_path / f"{image_digest}.json").exists()
|
||||||
|
|
||||||
|
# Verify that the log index file was not updated
|
||||||
assert not (signatures_path / "last_log_index").exists()
|
assert not (signatures_path / "last_log_index").exists()
|
||||||
|
|
||||||
|
|
||||||
|
@ -238,6 +281,34 @@ def test_stores_signatures_updates_last_log_index(valid_signature, mocker, tmp_p
|
||||||
signatures = [valid_signature]
|
signatures = [valid_signature]
|
||||||
# Extract the digest from the signature
|
# Extract the digest from the signature
|
||||||
image_digest = Signature(valid_signature).manifest_digest
|
image_digest = Signature(valid_signature).manifest_digest
|
||||||
|
signatures = [valid_signature, signature_other_digest]
|
||||||
|
breakpoint()
|
||||||
|
valid_signature, signature_other_digest, mocker, tmp_path
|
||||||
|
|
||||||
|
"""Test that store_signatures raises an error when a signature's digest doesn't match."""
|
||||||
|
|
||||||
|
image_digest = "sha256:123456"
|
||||||
|
|
||||||
|
# Mock the signatures path
|
||||||
|
signatures_path = tmp_path / "signatures"
|
||||||
|
signatures_path.mkdir()
|
||||||
|
mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path)
|
||||||
|
|
||||||
|
# Mock get_log_index_from_signatures
|
||||||
|
mocker.patch(
|
||||||
|
"dangerzone.updater.signatures.get_log_index_from_signatures",
|
||||||
|
return_value=100,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Mock get_last_log_index
|
||||||
|
mocker.patch(
|
||||||
|
"dangerzone.updater.signatures.get_last_log_index",
|
||||||
|
return_value=50,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_stores_signatures_updates_last_log_index():
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def test_get_file_digest():
|
def test_get_file_digest():
|
||||||
|
@ -310,5 +381,6 @@ def test_verify_signature_tempered(tempered_signature):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_verify_signatures_not_0():
|
def test_verify_signatures_empty_list():
|
||||||
pass
|
with pytest.raises(errors.SignatureVerificationError):
|
||||||
|
verify_signatures([], "1234", TEST_PUBKEY_PATH)
|
||||||
|
|
Loading…
Reference in a new issue