Compare commits

..

1 commit

Author SHA1 Message Date
f8b1263a58
Merge 0f2d81dbd6 into a6aa66f925 2025-02-24 12:02:47 +01:00
4 changed files with 29 additions and 376 deletions

View file

@ -28,7 +28,7 @@ ACCEPT_MANIFESTS_HEADER = ",".join(
)
Image = namedtuple("Image", ["registry", "namespace", "image_name", "tag", "digest"])
Image = namedtuple("Image", ["registry", "namespace", "image_name", "tag"])
def parse_image_location(input_string: str) -> Image:
@ -37,9 +37,8 @@ def parse_image_location(input_string: str) -> Image:
r"^"
r"(?P<registry>[a-zA-Z0-9.-]+)/"
r"(?P<namespace>[a-zA-Z0-9-]+)/"
r"(?P<image_name>[^:@]+)"
r"(?P<image_name>[^:]+)"
r"(?::(?P<tag>[a-zA-Z0-9.-]+))?"
r"(?:@(?P<digest>sha256:[a-zA-Z0-9]+))?"
r"$"
)
match = re.match(pattern, input_string)
@ -50,7 +49,6 @@ def parse_image_location(input_string: str) -> Image:
namespace=match.group("namespace"),
image_name=match.group("image_name"),
tag=match.group("tag") or "latest",
digest=match.group("digest"),
)

View file

@ -12,7 +12,6 @@ from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import Dict, List, Optional, Tuple
from .. import container_utils as runtime
from .. import errors as dzerrors
from ..util import get_resource_path
from . import cosign, errors, log, registry
@ -109,34 +108,16 @@ def verify_signature(signature: dict, image_digest: str, pubkey: str | Path) ->
if result.returncode != 0:
# XXX Raise instead?
log.debug("Failed to verify signature", result.stderr)
raise errors.SignatureVerificationError("Failed to verify signature")
return False
if result.stderr == b"Verified OK\n":
log.debug("Signature verified")
return True
return False
class Signature:
def __init__(self, signature: Dict):
self.signature = signature
@property
def payload(self) -> Dict:
return json.loads(b64decode(self.signature["Payload"]))
@property
def manifest_digest(self) -> str:
full_digest = self.payload["critical"]["image"]["docker-manifest-digest"]
return full_digest.replace("sha256:", "")
def is_update_available(image: str) -> Tuple[bool, Optional[str]]:
remote_digest = registry.get_manifest_digest(image)
try:
local_digest = runtime.get_local_image_digest(image)
except dzerrors.ImageNotPresentException:
log.debug("No local image found")
return True, remote_digest
local_digest = runtime.get_local_image_digest(image)
log.debug("Remote digest: %s", remote_digest)
log.debug("Local digest: %s", local_digest)
has_update = remote_digest != local_digest
@ -184,7 +165,7 @@ def get_log_index_from_signatures(signatures: List[Dict]) -> int:
def write_log_index(log_index: int) -> None:
last_log_index_path = SIGNATURES_PATH / "last_log_index"
with open(last_log_index_path, "w") as f:
with open(log_index, "w") as f:
f.write(str(log_index))
@ -377,7 +358,7 @@ def store_signatures(signatures: list[Dict], image_digest: str, pubkey: str) ->
if f"sha256:{image_digest}" != digests[0]:
raise errors.SignatureMismatch(
f"Signatures do not match the given image digest (sha256:{image_digest}, {digests[0]})"
f"Signatures do not match the given image digest ({image_digest}, {digests[0]})"
)
pubkey_signatures = SIGNATURES_PATH / get_file_digest(pubkey)
@ -389,8 +370,6 @@ def store_signatures(signatures: list[Dict], image_digest: str, pubkey: str) ->
)
json.dump(signatures, f)
write_log_index(get_log_index_from_signatures(signatures))
def verify_local_image(image: str, pubkey: str) -> bool:
"""

View file

@ -1,238 +0,0 @@
import hashlib
import pytest
import requests
from pytest_mock import MockerFixture
from dangerzone.updater.registry import (
Image,
_get_auth_header,
_url,
get_manifest,
get_manifest_digest,
list_tags,
parse_image_location,
)
def test_parse_image_location_no_tag():
"""Test that parse_image_location correctly handles an image location without a tag."""
image_str = "ghcr.io/freedomofpress/dangerzone"
image = parse_image_location(image_str)
assert isinstance(image, Image)
assert image.registry == "ghcr.io"
assert image.namespace == "freedomofpress"
assert image.image_name == "dangerzone"
assert image.tag == "latest" # Default tag should be "latest"
assert image.digest is None
def test_parse_image_location_with_tag():
"""Test that parse_image_location correctly handles an image location with a tag."""
image_str = "ghcr.io/freedomofpress/dangerzone:v0.4.2"
image = parse_image_location(image_str)
assert isinstance(image, Image)
assert image.registry == "ghcr.io"
assert image.namespace == "freedomofpress"
assert image.image_name == "dangerzone"
assert image.tag == "v0.4.2"
def test_parse_image_location_tag_plus_digest():
"""Test that parse_image_location handles an image location with a tag that includes a digest."""
image_str = (
"ghcr.io/freedomofpress/dangerzone"
":20250205-0.8.0-148-ge67fbc1"
"@sha256:19e8eacd75879d05f6621c2ea8dd955e68ee3e07b41b9d53f4c8cc9929a68a67"
)
image = parse_image_location(image_str)
assert isinstance(image, Image)
assert image.registry == "ghcr.io"
assert image.namespace == "freedomofpress"
assert image.image_name == "dangerzone"
assert image.tag == "20250205-0.8.0-148-ge67fbc1"
assert (
image.digest
== "sha256:19e8eacd75879d05f6621c2ea8dd955e68ee3e07b41b9d53f4c8cc9929a68a67"
)
def test_parse_invalid_image_location():
"""Test that parse_image_location raises an error for invalid image locations."""
invalid_image_locations = [
"ghcr.io/dangerzone", # Missing namespace
"ghcr.io/freedomofpress/dangerzone:", # Empty tag
"freedomofpress/dangerzone", # Missing registry
"ghcr.io:freedomofpress/dangerzone", # Invalid format
"", # Empty string
]
for invalid_image in invalid_image_locations:
with pytest.raises(ValueError, match="Malformed image location"):
parse_image_location(invalid_image)
def test_list_tags(mocker: MockerFixture):
"""Test that list_tags correctly retrieves tags from the registry."""
# Mock the authentication response
image_str = "ghcr.io/freedomofpress/dangerzone"
# Mock requests.get to return appropriate values for both calls
mock_response_auth = mocker.Mock()
mock_response_auth.json.return_value = {"token": "dummy_token"}
mock_response_auth.raise_for_status.return_value = None
mock_response_tags = mocker.Mock()
mock_response_tags.json.return_value = {
"tags": ["v0.4.0", "v0.4.1", "v0.4.2", "latest"]
}
mock_response_tags.raise_for_status.return_value = None
# Setup the mock to return different responses for each URL
def mock_get(url, **kwargs):
if "token" in url:
return mock_response_auth
else:
return mock_response_tags
mocker.patch("requests.get", side_effect=mock_get)
# Call the function
tags = list_tags(image_str)
# Verify the result
assert tags == ["v0.4.0", "v0.4.1", "v0.4.2", "latest"]
def test_list_tags_auth_error(mocker: MockerFixture):
"""Test that list_tags handles authentication errors correctly."""
image_str = "ghcr.io/freedomofpress/dangerzone"
# Mock requests.get to raise an HTTPError
mock_response = mocker.Mock()
mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError(
"401 Client Error: Unauthorized"
)
mocker.patch("requests.get", return_value=mock_response)
# Call the function and expect an error
with pytest.raises(requests.exceptions.HTTPError):
list_tags(image_str)
def test_list_tags_registry_error(mocker: MockerFixture):
"""Test that list_tags handles registry errors correctly."""
image_str = "ghcr.io/freedomofpress/dangerzone"
# Mock requests.get to return success for auth but error for tags
mock_response_auth = mocker.Mock()
mock_response_auth.json.return_value = {"token": "dummy_token"}
mock_response_auth.raise_for_status.return_value = None
mock_response_tags = mocker.Mock()
mock_response_tags.raise_for_status.side_effect = requests.exceptions.HTTPError(
"404 Client Error: Not Found"
)
# Setup the mock to return different responses for each URL
def mock_get(url, **kwargs):
if "token" in url:
return mock_response_auth
else:
return mock_response_tags
mocker.patch("requests.get", side_effect=mock_get)
# Call the function and expect an error
with pytest.raises(requests.exceptions.HTTPError):
list_tags(image_str)
def test_get_manifest(mocker: MockerFixture):
"""Test that get_manifest correctly retrieves manifests from the registry."""
image_str = "ghcr.io/freedomofpress/dangerzone:v0.4.2"
# Mock the responses
manifest_content = {
"schemaVersion": 2,
"mediaType": "application/vnd.docker.distribution.manifest.v2+json",
"config": {
"mediaType": "application/vnd.docker.container.image.v1+json",
"size": 1234,
"digest": "sha256:abc123def456",
},
"layers": [
{
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
"size": 12345,
"digest": "sha256:layer1",
}
],
}
mock_response_auth = mocker.Mock()
mock_response_auth.json.return_value = {"token": "dummy_token"}
mock_response_auth.raise_for_status.return_value = None
mock_response_manifest = mocker.Mock()
mock_response_manifest.json.return_value = manifest_content
mock_response_manifest.status_code = 200
mock_response_manifest.raise_for_status.return_value = None
# Setup the mock to return different responses for each URL
def mock_get(url, **kwargs):
if "token" in url:
return mock_response_auth
else:
return mock_response_manifest
mocker.patch("requests.get", side_effect=mock_get)
# Call the function
response = get_manifest(image_str)
# Verify the result
assert response.status_code == 200
assert response.json() == manifest_content
def test_get_manifest_digest():
"""Test that get_manifest_digest correctly calculates the manifest digest."""
# Create a sample manifest content
manifest_content = b'{"schemaVersion":2,"mediaType":"application/vnd.docker.distribution.manifest.v2+json"}'
# Calculate the expected digest manually
import hashlib
expected_digest = hashlib.sha256(manifest_content).hexdigest()
# Call the function with the content directly
digest = get_manifest_digest("unused_image_str", manifest_content)
# Verify the result
assert digest == expected_digest
def test_get_manifest_digest_from_registry(mocker: MockerFixture):
"""Test that get_manifest_digest correctly retrieves and calculates digests from the registry."""
image_str = "ghcr.io/freedomofpress/dangerzone:v0.4.2"
# Sample manifest content
manifest_content = b'{"schemaVersion":2,"mediaType":"application/vnd.docker.distribution.manifest.v2+json"}'
expected_digest = hashlib.sha256(manifest_content).hexdigest()
# Mock get_manifest
mock_response = mocker.Mock()
mock_response.content = manifest_content
mocker.patch("dangerzone.updater.registry.get_manifest", return_value=mock_response)
# Call the function
digest = get_manifest_digest(image_str)
# Verify the result
assert digest == expected_digest

View file

@ -5,10 +5,8 @@ from pathlib import Path
import pytest
from pytest_subprocess import FakeProcess
from dangerzone import errors as dzerrors
from dangerzone.updater import errors
from dangerzone.updater.signatures import (
Signature,
get_config_dir,
get_last_log_index,
get_log_index_from_signatures,
@ -29,8 +27,6 @@ INVALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "invalid"
VALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "valid"
TEMPERED_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "tempered"
RANDOM_DIGEST = "aacc9b586648bbe3040f2822153b1d5ead2779af45ff750fd6f04daf4a9f64b4"
@pytest.fixture
def valid_signature():
@ -40,14 +36,6 @@ def valid_signature():
return signatures.pop()
@pytest.fixture
def tempered_signature():
signature_file = next(TEMPERED_SIGNATURES_PATH.glob("**/*.json"))
with open(signature_file, "r") as signature_file:
signatures = json.load(signature_file)
return signatures.pop()
@pytest.fixture
def signature_other_digest(valid_signature):
signature = valid_signature.copy()
@ -210,113 +198,39 @@ def test_get_remote_signatures_cosign_error(mocker, fp: FakeProcess):
def test_store_signatures_with_different_digests(
valid_signature, signature_other_digest, mocker, tmp_path
valid_signature, signature_other_digest
):
"""Test that store_signatures raises an error when a signature's digest doesn't match."""
signatures = [valid_signature, signature_other_digest]
image_digest = "sha256:123456"
# Mock the signatures path
signatures_path = tmp_path / "signatures"
signatures_path.mkdir()
mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path)
# Mock get_log_index_from_signatures
mocker.patch(
"dangerzone.updater.signatures.get_log_index_from_signatures",
return_value=100,
)
# Mock get_last_log_index
mocker.patch(
"dangerzone.updater.signatures.get_last_log_index",
return_value=50,
)
# Call store_signatures
with pytest.raises(errors.SignatureMismatch):
store_signatures(signatures, image_digest, TEST_PUBKEY_PATH)
# Verify that the signatures file was not created
assert not (signatures_path / f"{image_digest}.json").exists()
# Verify that the log index file was not updated
assert not (signatures_path / "last_log_index").exists()
breakpoint()
pass
def test_stores_signatures_updates_last_log_index(valid_signature, mocker, tmp_path):
"""Test that store_signatures updates the last log index file."""
signatures = [valid_signature]
# Extract the digest from the signature
image_digest = Signature(valid_signature).manifest_digest
# Mock the signatures path
signatures_path = tmp_path / "signatures"
signatures_path.mkdir()
mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path)
# Create an existing last_log_index file with a lower value
with open(signatures_path / "last_log_index", "w") as f:
f.write("50")
# Mock get_log_index_from_signatures to return a higher value
mocker.patch(
"dangerzone.updater.signatures.get_log_index_from_signatures",
return_value=100,
)
# Call store_signatures
store_signatures(signatures, image_digest, TEST_PUBKEY_PATH)
# Verify that the log index file was updated
assert (signatures_path / "last_log_index").exists()
with open(signatures_path / "last_log_index", "r") as f:
assert f.read() == "100"
def test_store_signatures_digest_mismatch():
pass
def test_is_update_available_when_no_local_image(mocker):
"""
Test that is_update_available returns True when no local image is
currently present.
"""
# Mock container_image_exists to return False
mocker.patch(
"dangerzone.container_utils.get_local_image_digest",
side_effect=dzerrors.ImageNotPresentException,
)
# Mock get_manifest_digest to return a digest
mocker.patch(
"dangerzone.updater.registry.get_manifest_digest",
return_value=RANDOM_DIGEST,
)
# Call is_update_available
update_available, digest = is_update_available("ghcr.io/freedomofpress/dangerzone")
# Verify the result
assert update_available is True
assert digest == RANDOM_DIGEST
def test_stores_signatures_updates_last_log_index():
pass
def test_verify_signature(valid_signature):
"""Test that verify_signature raises an error when the payload digest doesn't match."""
verify_signature(
valid_signature,
Signature(valid_signature).manifest_digest,
TEST_PUBKEY_PATH,
)
def test_get_file_digest():
pass
def test_verify_signature_tempered(tempered_signature):
"""Test that verify_signature raises an error when the payload digest doesn't match."""
# Call verify_signature and expect an error
with pytest.raises(errors.SignatureError):
verify_signature(
tempered_signature,
Signature(tempered_signature).manifest_digest,
TEST_PUBKEY_PATH,
)
def test_convert_oci_images_signatures():
pass
def test_is_update_available_nothing_local():
pass
def test_is_update_available_trims():
pass
def test_verify_signature_wrong_payload_digest():
pass
def test_verify_signatures_empty_list():