Compare commits

..

41 commits

Author SHA1 Message Date
21e726e901
Merge 760948b5b5 into a6aa66f925 2025-02-25 17:20:31 +01:00
Alexis Métaireau
760948b5b5
Add tests for registry 2025-02-25 17:20:25 +01:00
Alexis Métaireau
3ea491761c fixup! Add a dangerzone-image CLI script 2025-02-25 17:20:12 +01:00
Alexis Métaireau
f175739b20 fixup! Add a dangerzone-image CLI script 2025-02-25 17:20:12 +01:00
Alexis Métaireau
3d579c8097 fixup! Add a dangerzone-image CLI script 2025-02-25 17:20:12 +01:00
Alexis Métaireau
356d848e47 fixup! Add a dangerzone-image CLI script 2025-02-25 17:20:12 +01:00
Alexis Métaireau
49c4cee898
make the signature tests pass 2025-02-25 15:44:46 +01:00
Alexis Métaireau
22d01a4045 fixup! c9c301d833 2025-02-25 15:44:24 +01:00
Alexis Métaireau
7e4cd66d2b fixup! b4818ce854 2025-02-25 15:44:24 +01:00
Alexis Métaireau
d93c99f8e2 fixup! b4818ce854 2025-02-25 15:44:24 +01:00
Alexis Métaireau
43f6d89bbb fixup! b37815a96c 2025-02-25 15:44:24 +01:00
Alexis Métaireau
bba427d619 fixup! 83418f09f2 2025-02-25 15:44:24 +01:00
Alexis Métaireau
4a4bf7c571 fixup! 3e861cc0cd 2025-02-25 15:44:24 +01:00
Alexis Métaireau
2476ed6daa fixup! Download and verify cosign signatures 2025-02-25 15:44:24 +01:00
Alexis Métaireau
30ec1f10e9 fixup! Download and verify cosign signatures 2025-02-25 15:44:24 +01:00
Alexis Métaireau
4073a62fd4 fixup! Download and verify cosign signatures 2025-02-25 15:44:24 +01:00
Alexis Métaireau
7f83505ae9 fixup! Download and verify cosign signatures 2025-02-25 15:44:24 +01:00
Alexis Métaireau
33ee158cf2 fixup! Download and verify cosign signatures 2025-02-25 15:44:24 +01:00
Alexis Métaireau
d5d3038bfa fixup! Download and verify cosign signatures 2025-02-25 15:44:24 +01:00
Alexis Métaireau
7e283196d8 fixup! 35704b8a18 2025-02-25 15:44:24 +01:00
Alexis Métaireau
8381b2fb7b fixup! (WIP) Add tests 2025-02-25 15:44:08 +01:00
Alexis Métaireau
7baddd0064 fixup! (WIP) Add tests 2025-02-25 15:44:08 +01:00
Alexis Métaireau
0c063b5b27 fixup! (WIP) Add tests 2025-02-25 15:44:08 +01:00
Alexis Métaireau
01f7b37151 fixup! (WIP) Add tests 2025-02-25 15:44:08 +01:00
Alexis Métaireau
9bf663fdb9 fixup! (WIP) Add tests 2025-02-25 15:44:08 +01:00
Alexis Métaireau
cf7a3dbb56 fixup! (WIP) Add tests 2025-02-25 15:44:08 +01:00
Alexis Métaireau
4621902a2b fixup! (WIP) Add tests 2025-02-25 15:44:08 +01:00
Alexis Métaireau
ec4028b486 fixup! (WIP) Add tests 2025-02-25 15:44:08 +01:00
Alexis Métaireau
43cb02bcca fixup! (WIP) Add tests 2025-02-25 15:44:08 +01:00
Alexis Métaireau
ab51a71bdf fixup! (WIP) Add tests 2025-02-25 15:44:08 +01:00
Alexis Métaireau
b5bfbb5d6e fixup! (WIP) Add tests 2025-02-25 15:44:08 +01:00
Alexis Métaireau
3e861cc0cd fixup! (WIP) Add tests 2025-02-25 15:44:08 +01:00
Alexis Métaireau
83418f09f2 fixup! (WIP) Add tests 2025-02-25 15:44:08 +01:00
Alexis Métaireau
fb89f00c73 fixup! (WIP) Add tests 2025-02-25 15:44:08 +01:00
Alexis Métaireau
ecb3d87b1f fixup! (WIP) Add tests 2025-02-25 15:44:08 +01:00
Alexis Métaireau
a4fa6aaed8 fixup! (WIP) Add tests 2025-02-25 15:44:08 +01:00
Alexis Métaireau
df3efa8157 fixup! 6aff845493 2025-02-25 15:44:08 +01:00
Alexis Métaireau
c9c301d833 fixup! (WIP) some more tests 2025-02-25 15:44:08 +01:00
Alexis Métaireau
b37815a96c fixup! (WIP) some more tests 2025-02-25 15:44:08 +01:00
Alexis Métaireau
35704b8a18 fixup! (WIP) some more tests 2025-02-25 15:44:08 +01:00
Alexis Métaireau
b4818ce854 fixup! (WIP) some more tests 2025-02-25 15:44:08 +01:00
4 changed files with 376 additions and 29 deletions

View file

@ -28,7 +28,7 @@ ACCEPT_MANIFESTS_HEADER = ",".join(
) )
Image = namedtuple("Image", ["registry", "namespace", "image_name", "tag"]) Image = namedtuple("Image", ["registry", "namespace", "image_name", "tag", "digest"])
def parse_image_location(input_string: str) -> Image: def parse_image_location(input_string: str) -> Image:
@ -37,8 +37,9 @@ def parse_image_location(input_string: str) -> Image:
r"^" r"^"
r"(?P<registry>[a-zA-Z0-9.-]+)/" r"(?P<registry>[a-zA-Z0-9.-]+)/"
r"(?P<namespace>[a-zA-Z0-9-]+)/" r"(?P<namespace>[a-zA-Z0-9-]+)/"
r"(?P<image_name>[^:]+)" r"(?P<image_name>[^:@]+)"
r"(?::(?P<tag>[a-zA-Z0-9.-]+))?" r"(?::(?P<tag>[a-zA-Z0-9.-]+))?"
r"(?:@(?P<digest>sha256:[a-zA-Z0-9]+))?"
r"$" r"$"
) )
match = re.match(pattern, input_string) match = re.match(pattern, input_string)
@ -49,6 +50,7 @@ def parse_image_location(input_string: str) -> Image:
namespace=match.group("namespace"), namespace=match.group("namespace"),
image_name=match.group("image_name"), image_name=match.group("image_name"),
tag=match.group("tag") or "latest", tag=match.group("tag") or "latest",
digest=match.group("digest"),
) )

View file

@ -12,6 +12,7 @@ from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
from .. import container_utils as runtime from .. import container_utils as runtime
from .. import errors as dzerrors
from ..util import get_resource_path from ..util import get_resource_path
from . import cosign, errors, log, registry from . import cosign, errors, log, registry
@ -108,16 +109,34 @@ def verify_signature(signature: dict, image_digest: str, pubkey: str | Path) ->
if result.returncode != 0: if result.returncode != 0:
# XXX Raise instead? # XXX Raise instead?
log.debug("Failed to verify signature", result.stderr) log.debug("Failed to verify signature", result.stderr)
return False raise errors.SignatureVerificationError("Failed to verify signature")
if result.stderr == b"Verified OK\n": if result.stderr == b"Verified OK\n":
log.debug("Signature verified") log.debug("Signature verified")
return True return True
return False return False
class Signature:
def __init__(self, signature: Dict):
self.signature = signature
@property
def payload(self) -> Dict:
return json.loads(b64decode(self.signature["Payload"]))
@property
def manifest_digest(self) -> str:
full_digest = self.payload["critical"]["image"]["docker-manifest-digest"]
return full_digest.replace("sha256:", "")
def is_update_available(image: str) -> Tuple[bool, Optional[str]]: def is_update_available(image: str) -> Tuple[bool, Optional[str]]:
remote_digest = registry.get_manifest_digest(image) remote_digest = registry.get_manifest_digest(image)
local_digest = runtime.get_local_image_digest(image) try:
local_digest = runtime.get_local_image_digest(image)
except dzerrors.ImageNotPresentException:
log.debug("No local image found")
return True, remote_digest
log.debug("Remote digest: %s", remote_digest) log.debug("Remote digest: %s", remote_digest)
log.debug("Local digest: %s", local_digest) log.debug("Local digest: %s", local_digest)
has_update = remote_digest != local_digest has_update = remote_digest != local_digest
@ -165,7 +184,7 @@ def get_log_index_from_signatures(signatures: List[Dict]) -> int:
def write_log_index(log_index: int) -> None: def write_log_index(log_index: int) -> None:
last_log_index_path = SIGNATURES_PATH / "last_log_index" last_log_index_path = SIGNATURES_PATH / "last_log_index"
with open(log_index, "w") as f: with open(last_log_index_path, "w") as f:
f.write(str(log_index)) f.write(str(log_index))
@ -358,7 +377,7 @@ def store_signatures(signatures: list[Dict], image_digest: str, pubkey: str) ->
if f"sha256:{image_digest}" != digests[0]: if f"sha256:{image_digest}" != digests[0]:
raise errors.SignatureMismatch( raise errors.SignatureMismatch(
f"Signatures do not match the given image digest ({image_digest}, {digests[0]})" f"Signatures do not match the given image digest (sha256:{image_digest}, {digests[0]})"
) )
pubkey_signatures = SIGNATURES_PATH / get_file_digest(pubkey) pubkey_signatures = SIGNATURES_PATH / get_file_digest(pubkey)
@ -370,6 +389,8 @@ def store_signatures(signatures: list[Dict], image_digest: str, pubkey: str) ->
) )
json.dump(signatures, f) json.dump(signatures, f)
write_log_index(get_log_index_from_signatures(signatures))
def verify_local_image(image: str, pubkey: str) -> bool: def verify_local_image(image: str, pubkey: str) -> bool:
""" """

238
tests/test_registry.py Normal file
View file

@ -0,0 +1,238 @@
import hashlib
import pytest
import requests
from pytest_mock import MockerFixture
from dangerzone.updater.registry import (
Image,
_get_auth_header,
_url,
get_manifest,
get_manifest_digest,
list_tags,
parse_image_location,
)
def test_parse_image_location_no_tag():
"""Test that parse_image_location correctly handles an image location without a tag."""
image_str = "ghcr.io/freedomofpress/dangerzone"
image = parse_image_location(image_str)
assert isinstance(image, Image)
assert image.registry == "ghcr.io"
assert image.namespace == "freedomofpress"
assert image.image_name == "dangerzone"
assert image.tag == "latest" # Default tag should be "latest"
assert image.digest is None
def test_parse_image_location_with_tag():
"""Test that parse_image_location correctly handles an image location with a tag."""
image_str = "ghcr.io/freedomofpress/dangerzone:v0.4.2"
image = parse_image_location(image_str)
assert isinstance(image, Image)
assert image.registry == "ghcr.io"
assert image.namespace == "freedomofpress"
assert image.image_name == "dangerzone"
assert image.tag == "v0.4.2"
def test_parse_image_location_tag_plus_digest():
"""Test that parse_image_location handles an image location with a tag that includes a digest."""
image_str = (
"ghcr.io/freedomofpress/dangerzone"
":20250205-0.8.0-148-ge67fbc1"
"@sha256:19e8eacd75879d05f6621c2ea8dd955e68ee3e07b41b9d53f4c8cc9929a68a67"
)
image = parse_image_location(image_str)
assert isinstance(image, Image)
assert image.registry == "ghcr.io"
assert image.namespace == "freedomofpress"
assert image.image_name == "dangerzone"
assert image.tag == "20250205-0.8.0-148-ge67fbc1"
assert (
image.digest
== "sha256:19e8eacd75879d05f6621c2ea8dd955e68ee3e07b41b9d53f4c8cc9929a68a67"
)
def test_parse_invalid_image_location():
"""Test that parse_image_location raises an error for invalid image locations."""
invalid_image_locations = [
"ghcr.io/dangerzone", # Missing namespace
"ghcr.io/freedomofpress/dangerzone:", # Empty tag
"freedomofpress/dangerzone", # Missing registry
"ghcr.io:freedomofpress/dangerzone", # Invalid format
"", # Empty string
]
for invalid_image in invalid_image_locations:
with pytest.raises(ValueError, match="Malformed image location"):
parse_image_location(invalid_image)
def test_list_tags(mocker: MockerFixture):
"""Test that list_tags correctly retrieves tags from the registry."""
# Mock the authentication response
image_str = "ghcr.io/freedomofpress/dangerzone"
# Mock requests.get to return appropriate values for both calls
mock_response_auth = mocker.Mock()
mock_response_auth.json.return_value = {"token": "dummy_token"}
mock_response_auth.raise_for_status.return_value = None
mock_response_tags = mocker.Mock()
mock_response_tags.json.return_value = {
"tags": ["v0.4.0", "v0.4.1", "v0.4.2", "latest"]
}
mock_response_tags.raise_for_status.return_value = None
# Setup the mock to return different responses for each URL
def mock_get(url, **kwargs):
if "token" in url:
return mock_response_auth
else:
return mock_response_tags
mocker.patch("requests.get", side_effect=mock_get)
# Call the function
tags = list_tags(image_str)
# Verify the result
assert tags == ["v0.4.0", "v0.4.1", "v0.4.2", "latest"]
def test_list_tags_auth_error(mocker: MockerFixture):
"""Test that list_tags handles authentication errors correctly."""
image_str = "ghcr.io/freedomofpress/dangerzone"
# Mock requests.get to raise an HTTPError
mock_response = mocker.Mock()
mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError(
"401 Client Error: Unauthorized"
)
mocker.patch("requests.get", return_value=mock_response)
# Call the function and expect an error
with pytest.raises(requests.exceptions.HTTPError):
list_tags(image_str)
def test_list_tags_registry_error(mocker: MockerFixture):
"""Test that list_tags handles registry errors correctly."""
image_str = "ghcr.io/freedomofpress/dangerzone"
# Mock requests.get to return success for auth but error for tags
mock_response_auth = mocker.Mock()
mock_response_auth.json.return_value = {"token": "dummy_token"}
mock_response_auth.raise_for_status.return_value = None
mock_response_tags = mocker.Mock()
mock_response_tags.raise_for_status.side_effect = requests.exceptions.HTTPError(
"404 Client Error: Not Found"
)
# Setup the mock to return different responses for each URL
def mock_get(url, **kwargs):
if "token" in url:
return mock_response_auth
else:
return mock_response_tags
mocker.patch("requests.get", side_effect=mock_get)
# Call the function and expect an error
with pytest.raises(requests.exceptions.HTTPError):
list_tags(image_str)
def test_get_manifest(mocker: MockerFixture):
"""Test that get_manifest correctly retrieves manifests from the registry."""
image_str = "ghcr.io/freedomofpress/dangerzone:v0.4.2"
# Mock the responses
manifest_content = {
"schemaVersion": 2,
"mediaType": "application/vnd.docker.distribution.manifest.v2+json",
"config": {
"mediaType": "application/vnd.docker.container.image.v1+json",
"size": 1234,
"digest": "sha256:abc123def456",
},
"layers": [
{
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
"size": 12345,
"digest": "sha256:layer1",
}
],
}
mock_response_auth = mocker.Mock()
mock_response_auth.json.return_value = {"token": "dummy_token"}
mock_response_auth.raise_for_status.return_value = None
mock_response_manifest = mocker.Mock()
mock_response_manifest.json.return_value = manifest_content
mock_response_manifest.status_code = 200
mock_response_manifest.raise_for_status.return_value = None
# Setup the mock to return different responses for each URL
def mock_get(url, **kwargs):
if "token" in url:
return mock_response_auth
else:
return mock_response_manifest
mocker.patch("requests.get", side_effect=mock_get)
# Call the function
response = get_manifest(image_str)
# Verify the result
assert response.status_code == 200
assert response.json() == manifest_content
def test_get_manifest_digest():
"""Test that get_manifest_digest correctly calculates the manifest digest."""
# Create a sample manifest content
manifest_content = b'{"schemaVersion":2,"mediaType":"application/vnd.docker.distribution.manifest.v2+json"}'
# Calculate the expected digest manually
import hashlib
expected_digest = hashlib.sha256(manifest_content).hexdigest()
# Call the function with the content directly
digest = get_manifest_digest("unused_image_str", manifest_content)
# Verify the result
assert digest == expected_digest
def test_get_manifest_digest_from_registry(mocker: MockerFixture):
"""Test that get_manifest_digest correctly retrieves and calculates digests from the registry."""
image_str = "ghcr.io/freedomofpress/dangerzone:v0.4.2"
# Sample manifest content
manifest_content = b'{"schemaVersion":2,"mediaType":"application/vnd.docker.distribution.manifest.v2+json"}'
expected_digest = hashlib.sha256(manifest_content).hexdigest()
# Mock get_manifest
mock_response = mocker.Mock()
mock_response.content = manifest_content
mocker.patch("dangerzone.updater.registry.get_manifest", return_value=mock_response)
# Call the function
digest = get_manifest_digest(image_str)
# Verify the result
assert digest == expected_digest

View file

@ -5,8 +5,10 @@ from pathlib import Path
import pytest import pytest
from pytest_subprocess import FakeProcess from pytest_subprocess import FakeProcess
from dangerzone import errors as dzerrors
from dangerzone.updater import errors from dangerzone.updater import errors
from dangerzone.updater.signatures import ( from dangerzone.updater.signatures import (
Signature,
get_config_dir, get_config_dir,
get_last_log_index, get_last_log_index,
get_log_index_from_signatures, get_log_index_from_signatures,
@ -27,6 +29,8 @@ INVALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "invalid"
VALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "valid" VALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "valid"
TEMPERED_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "tempered" TEMPERED_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "tempered"
RANDOM_DIGEST = "aacc9b586648bbe3040f2822153b1d5ead2779af45ff750fd6f04daf4a9f64b4"
@pytest.fixture @pytest.fixture
def valid_signature(): def valid_signature():
@ -36,6 +40,14 @@ def valid_signature():
return signatures.pop() return signatures.pop()
@pytest.fixture
def tempered_signature():
signature_file = next(TEMPERED_SIGNATURES_PATH.glob("**/*.json"))
with open(signature_file, "r") as signature_file:
signatures = json.load(signature_file)
return signatures.pop()
@pytest.fixture @pytest.fixture
def signature_other_digest(valid_signature): def signature_other_digest(valid_signature):
signature = valid_signature.copy() signature = valid_signature.copy()
@ -198,39 +210,113 @@ def test_get_remote_signatures_cosign_error(mocker, fp: FakeProcess):
def test_store_signatures_with_different_digests( def test_store_signatures_with_different_digests(
valid_signature, signature_other_digest valid_signature, signature_other_digest, mocker, tmp_path
): ):
"""Test that store_signatures raises an error when a signature's digest doesn't match."""
signatures = [valid_signature, signature_other_digest] signatures = [valid_signature, signature_other_digest]
breakpoint() image_digest = "sha256:123456"
pass
# Mock the signatures path
signatures_path = tmp_path / "signatures"
signatures_path.mkdir()
mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path)
# Mock get_log_index_from_signatures
mocker.patch(
"dangerzone.updater.signatures.get_log_index_from_signatures",
return_value=100,
)
# Mock get_last_log_index
mocker.patch(
"dangerzone.updater.signatures.get_last_log_index",
return_value=50,
)
# Call store_signatures
with pytest.raises(errors.SignatureMismatch):
store_signatures(signatures, image_digest, TEST_PUBKEY_PATH)
# Verify that the signatures file was not created
assert not (signatures_path / f"{image_digest}.json").exists()
# Verify that the log index file was not updated
assert not (signatures_path / "last_log_index").exists()
def test_store_signatures_digest_mismatch(): def test_stores_signatures_updates_last_log_index(valid_signature, mocker, tmp_path):
pass """Test that store_signatures updates the last log index file."""
signatures = [valid_signature]
# Extract the digest from the signature
image_digest = Signature(valid_signature).manifest_digest
# Mock the signatures path
signatures_path = tmp_path / "signatures"
signatures_path.mkdir()
mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path)
# Create an existing last_log_index file with a lower value
with open(signatures_path / "last_log_index", "w") as f:
f.write("50")
# Mock get_log_index_from_signatures to return a higher value
mocker.patch(
"dangerzone.updater.signatures.get_log_index_from_signatures",
return_value=100,
)
# Call store_signatures
store_signatures(signatures, image_digest, TEST_PUBKEY_PATH)
# Verify that the log index file was updated
assert (signatures_path / "last_log_index").exists()
with open(signatures_path / "last_log_index", "r") as f:
assert f.read() == "100"
def test_stores_signatures_updates_last_log_index(): def test_is_update_available_when_no_local_image(mocker):
pass """
Test that is_update_available returns True when no local image is
currently present.
"""
# Mock container_image_exists to return False
mocker.patch(
"dangerzone.container_utils.get_local_image_digest",
side_effect=dzerrors.ImageNotPresentException,
)
# Mock get_manifest_digest to return a digest
mocker.patch(
"dangerzone.updater.registry.get_manifest_digest",
return_value=RANDOM_DIGEST,
)
# Call is_update_available
update_available, digest = is_update_available("ghcr.io/freedomofpress/dangerzone")
# Verify the result
assert update_available is True
assert digest == RANDOM_DIGEST
def test_get_file_digest(): def test_verify_signature(valid_signature):
pass """Test that verify_signature raises an error when the payload digest doesn't match."""
verify_signature(
valid_signature,
Signature(valid_signature).manifest_digest,
TEST_PUBKEY_PATH,
)
def test_convert_oci_images_signatures(): def test_verify_signature_tempered(tempered_signature):
pass """Test that verify_signature raises an error when the payload digest doesn't match."""
# Call verify_signature and expect an error
with pytest.raises(errors.SignatureError):
def test_is_update_available_nothing_local(): verify_signature(
pass tempered_signature,
Signature(tempered_signature).manifest_digest,
TEST_PUBKEY_PATH,
def test_is_update_available_trims(): )
pass
def test_verify_signature_wrong_payload_digest():
pass
def test_verify_signatures_empty_list(): def test_verify_signatures_empty_list():