mirror of
https://github.com/freedomofpress/dangerzone.git
synced 2025-05-18 11:11:49 +02:00
Compare commits
41 commits
f8b1263a58
...
21e726e901
Author | SHA1 | Date | |
---|---|---|---|
21e726e901 | |||
![]() |
760948b5b5 | ||
![]() |
3ea491761c | ||
![]() |
f175739b20 | ||
![]() |
3d579c8097 | ||
![]() |
356d848e47 | ||
![]() |
49c4cee898 | ||
![]() |
22d01a4045 | ||
![]() |
7e4cd66d2b | ||
![]() |
d93c99f8e2 | ||
![]() |
43f6d89bbb | ||
![]() |
bba427d619 | ||
![]() |
4a4bf7c571 | ||
![]() |
2476ed6daa | ||
![]() |
30ec1f10e9 | ||
![]() |
4073a62fd4 | ||
![]() |
7f83505ae9 | ||
![]() |
33ee158cf2 | ||
![]() |
d5d3038bfa | ||
![]() |
7e283196d8 | ||
![]() |
8381b2fb7b | ||
![]() |
7baddd0064 | ||
![]() |
0c063b5b27 | ||
![]() |
01f7b37151 | ||
![]() |
9bf663fdb9 | ||
![]() |
cf7a3dbb56 | ||
![]() |
4621902a2b | ||
![]() |
ec4028b486 | ||
![]() |
43cb02bcca | ||
![]() |
ab51a71bdf | ||
![]() |
b5bfbb5d6e | ||
![]() |
3e861cc0cd | ||
![]() |
83418f09f2 | ||
![]() |
fb89f00c73 | ||
![]() |
ecb3d87b1f | ||
![]() |
a4fa6aaed8 | ||
![]() |
df3efa8157 | ||
![]() |
c9c301d833 | ||
![]() |
b37815a96c | ||
![]() |
35704b8a18 | ||
![]() |
b4818ce854 |
4 changed files with 376 additions and 29 deletions
|
@ -28,7 +28,7 @@ ACCEPT_MANIFESTS_HEADER = ",".join(
|
|||
)
|
||||
|
||||
|
||||
Image = namedtuple("Image", ["registry", "namespace", "image_name", "tag"])
|
||||
Image = namedtuple("Image", ["registry", "namespace", "image_name", "tag", "digest"])
|
||||
|
||||
|
||||
def parse_image_location(input_string: str) -> Image:
|
||||
|
@ -37,8 +37,9 @@ def parse_image_location(input_string: str) -> Image:
|
|||
r"^"
|
||||
r"(?P<registry>[a-zA-Z0-9.-]+)/"
|
||||
r"(?P<namespace>[a-zA-Z0-9-]+)/"
|
||||
r"(?P<image_name>[^:]+)"
|
||||
r"(?P<image_name>[^:@]+)"
|
||||
r"(?::(?P<tag>[a-zA-Z0-9.-]+))?"
|
||||
r"(?:@(?P<digest>sha256:[a-zA-Z0-9]+))?"
|
||||
r"$"
|
||||
)
|
||||
match = re.match(pattern, input_string)
|
||||
|
@ -49,6 +50,7 @@ def parse_image_location(input_string: str) -> Image:
|
|||
namespace=match.group("namespace"),
|
||||
image_name=match.group("image_name"),
|
||||
tag=match.group("tag") or "latest",
|
||||
digest=match.group("digest"),
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -12,6 +12,7 @@ from tempfile import NamedTemporaryFile, TemporaryDirectory
|
|||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
from .. import container_utils as runtime
|
||||
from .. import errors as dzerrors
|
||||
from ..util import get_resource_path
|
||||
from . import cosign, errors, log, registry
|
||||
|
||||
|
@ -108,16 +109,34 @@ def verify_signature(signature: dict, image_digest: str, pubkey: str | Path) ->
|
|||
if result.returncode != 0:
|
||||
# XXX Raise instead?
|
||||
log.debug("Failed to verify signature", result.stderr)
|
||||
return False
|
||||
raise errors.SignatureVerificationError("Failed to verify signature")
|
||||
if result.stderr == b"Verified OK\n":
|
||||
log.debug("Signature verified")
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class Signature:
|
||||
def __init__(self, signature: Dict):
|
||||
self.signature = signature
|
||||
|
||||
@property
|
||||
def payload(self) -> Dict:
|
||||
return json.loads(b64decode(self.signature["Payload"]))
|
||||
|
||||
@property
|
||||
def manifest_digest(self) -> str:
|
||||
full_digest = self.payload["critical"]["image"]["docker-manifest-digest"]
|
||||
return full_digest.replace("sha256:", "")
|
||||
|
||||
|
||||
def is_update_available(image: str) -> Tuple[bool, Optional[str]]:
|
||||
remote_digest = registry.get_manifest_digest(image)
|
||||
try:
|
||||
local_digest = runtime.get_local_image_digest(image)
|
||||
except dzerrors.ImageNotPresentException:
|
||||
log.debug("No local image found")
|
||||
return True, remote_digest
|
||||
log.debug("Remote digest: %s", remote_digest)
|
||||
log.debug("Local digest: %s", local_digest)
|
||||
has_update = remote_digest != local_digest
|
||||
|
@ -165,7 +184,7 @@ def get_log_index_from_signatures(signatures: List[Dict]) -> int:
|
|||
def write_log_index(log_index: int) -> None:
|
||||
last_log_index_path = SIGNATURES_PATH / "last_log_index"
|
||||
|
||||
with open(log_index, "w") as f:
|
||||
with open(last_log_index_path, "w") as f:
|
||||
f.write(str(log_index))
|
||||
|
||||
|
||||
|
@ -358,7 +377,7 @@ def store_signatures(signatures: list[Dict], image_digest: str, pubkey: str) ->
|
|||
|
||||
if f"sha256:{image_digest}" != digests[0]:
|
||||
raise errors.SignatureMismatch(
|
||||
f"Signatures do not match the given image digest ({image_digest}, {digests[0]})"
|
||||
f"Signatures do not match the given image digest (sha256:{image_digest}, {digests[0]})"
|
||||
)
|
||||
|
||||
pubkey_signatures = SIGNATURES_PATH / get_file_digest(pubkey)
|
||||
|
@ -370,6 +389,8 @@ def store_signatures(signatures: list[Dict], image_digest: str, pubkey: str) ->
|
|||
)
|
||||
json.dump(signatures, f)
|
||||
|
||||
write_log_index(get_log_index_from_signatures(signatures))
|
||||
|
||||
|
||||
def verify_local_image(image: str, pubkey: str) -> bool:
|
||||
"""
|
||||
|
|
238
tests/test_registry.py
Normal file
238
tests/test_registry.py
Normal file
|
@ -0,0 +1,238 @@
|
|||
import hashlib
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
from dangerzone.updater.registry import (
|
||||
Image,
|
||||
_get_auth_header,
|
||||
_url,
|
||||
get_manifest,
|
||||
get_manifest_digest,
|
||||
list_tags,
|
||||
parse_image_location,
|
||||
)
|
||||
|
||||
|
||||
def test_parse_image_location_no_tag():
|
||||
"""Test that parse_image_location correctly handles an image location without a tag."""
|
||||
image_str = "ghcr.io/freedomofpress/dangerzone"
|
||||
image = parse_image_location(image_str)
|
||||
|
||||
assert isinstance(image, Image)
|
||||
assert image.registry == "ghcr.io"
|
||||
assert image.namespace == "freedomofpress"
|
||||
assert image.image_name == "dangerzone"
|
||||
assert image.tag == "latest" # Default tag should be "latest"
|
||||
assert image.digest is None
|
||||
|
||||
|
||||
def test_parse_image_location_with_tag():
|
||||
"""Test that parse_image_location correctly handles an image location with a tag."""
|
||||
image_str = "ghcr.io/freedomofpress/dangerzone:v0.4.2"
|
||||
image = parse_image_location(image_str)
|
||||
|
||||
assert isinstance(image, Image)
|
||||
assert image.registry == "ghcr.io"
|
||||
assert image.namespace == "freedomofpress"
|
||||
assert image.image_name == "dangerzone"
|
||||
assert image.tag == "v0.4.2"
|
||||
|
||||
|
||||
def test_parse_image_location_tag_plus_digest():
|
||||
"""Test that parse_image_location handles an image location with a tag that includes a digest."""
|
||||
image_str = (
|
||||
"ghcr.io/freedomofpress/dangerzone"
|
||||
":20250205-0.8.0-148-ge67fbc1"
|
||||
"@sha256:19e8eacd75879d05f6621c2ea8dd955e68ee3e07b41b9d53f4c8cc9929a68a67"
|
||||
)
|
||||
|
||||
image = parse_image_location(image_str)
|
||||
|
||||
assert isinstance(image, Image)
|
||||
assert image.registry == "ghcr.io"
|
||||
assert image.namespace == "freedomofpress"
|
||||
assert image.image_name == "dangerzone"
|
||||
assert image.tag == "20250205-0.8.0-148-ge67fbc1"
|
||||
assert (
|
||||
image.digest
|
||||
== "sha256:19e8eacd75879d05f6621c2ea8dd955e68ee3e07b41b9d53f4c8cc9929a68a67"
|
||||
)
|
||||
|
||||
|
||||
def test_parse_invalid_image_location():
|
||||
"""Test that parse_image_location raises an error for invalid image locations."""
|
||||
invalid_image_locations = [
|
||||
"ghcr.io/dangerzone", # Missing namespace
|
||||
"ghcr.io/freedomofpress/dangerzone:", # Empty tag
|
||||
"freedomofpress/dangerzone", # Missing registry
|
||||
"ghcr.io:freedomofpress/dangerzone", # Invalid format
|
||||
"", # Empty string
|
||||
]
|
||||
|
||||
for invalid_image in invalid_image_locations:
|
||||
with pytest.raises(ValueError, match="Malformed image location"):
|
||||
parse_image_location(invalid_image)
|
||||
|
||||
|
||||
def test_list_tags(mocker: MockerFixture):
|
||||
"""Test that list_tags correctly retrieves tags from the registry."""
|
||||
# Mock the authentication response
|
||||
image_str = "ghcr.io/freedomofpress/dangerzone"
|
||||
|
||||
# Mock requests.get to return appropriate values for both calls
|
||||
mock_response_auth = mocker.Mock()
|
||||
mock_response_auth.json.return_value = {"token": "dummy_token"}
|
||||
mock_response_auth.raise_for_status.return_value = None
|
||||
|
||||
mock_response_tags = mocker.Mock()
|
||||
mock_response_tags.json.return_value = {
|
||||
"tags": ["v0.4.0", "v0.4.1", "v0.4.2", "latest"]
|
||||
}
|
||||
mock_response_tags.raise_for_status.return_value = None
|
||||
|
||||
# Setup the mock to return different responses for each URL
|
||||
def mock_get(url, **kwargs):
|
||||
if "token" in url:
|
||||
return mock_response_auth
|
||||
else:
|
||||
return mock_response_tags
|
||||
|
||||
mocker.patch("requests.get", side_effect=mock_get)
|
||||
|
||||
# Call the function
|
||||
tags = list_tags(image_str)
|
||||
|
||||
# Verify the result
|
||||
assert tags == ["v0.4.0", "v0.4.1", "v0.4.2", "latest"]
|
||||
|
||||
|
||||
def test_list_tags_auth_error(mocker: MockerFixture):
|
||||
"""Test that list_tags handles authentication errors correctly."""
|
||||
image_str = "ghcr.io/freedomofpress/dangerzone"
|
||||
|
||||
# Mock requests.get to raise an HTTPError
|
||||
mock_response = mocker.Mock()
|
||||
mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError(
|
||||
"401 Client Error: Unauthorized"
|
||||
)
|
||||
|
||||
mocker.patch("requests.get", return_value=mock_response)
|
||||
|
||||
# Call the function and expect an error
|
||||
with pytest.raises(requests.exceptions.HTTPError):
|
||||
list_tags(image_str)
|
||||
|
||||
|
||||
def test_list_tags_registry_error(mocker: MockerFixture):
|
||||
"""Test that list_tags handles registry errors correctly."""
|
||||
image_str = "ghcr.io/freedomofpress/dangerzone"
|
||||
|
||||
# Mock requests.get to return success for auth but error for tags
|
||||
mock_response_auth = mocker.Mock()
|
||||
mock_response_auth.json.return_value = {"token": "dummy_token"}
|
||||
mock_response_auth.raise_for_status.return_value = None
|
||||
|
||||
mock_response_tags = mocker.Mock()
|
||||
mock_response_tags.raise_for_status.side_effect = requests.exceptions.HTTPError(
|
||||
"404 Client Error: Not Found"
|
||||
)
|
||||
|
||||
# Setup the mock to return different responses for each URL
|
||||
def mock_get(url, **kwargs):
|
||||
if "token" in url:
|
||||
return mock_response_auth
|
||||
else:
|
||||
return mock_response_tags
|
||||
|
||||
mocker.patch("requests.get", side_effect=mock_get)
|
||||
|
||||
# Call the function and expect an error
|
||||
with pytest.raises(requests.exceptions.HTTPError):
|
||||
list_tags(image_str)
|
||||
|
||||
|
||||
def test_get_manifest(mocker: MockerFixture):
|
||||
"""Test that get_manifest correctly retrieves manifests from the registry."""
|
||||
image_str = "ghcr.io/freedomofpress/dangerzone:v0.4.2"
|
||||
|
||||
# Mock the responses
|
||||
manifest_content = {
|
||||
"schemaVersion": 2,
|
||||
"mediaType": "application/vnd.docker.distribution.manifest.v2+json",
|
||||
"config": {
|
||||
"mediaType": "application/vnd.docker.container.image.v1+json",
|
||||
"size": 1234,
|
||||
"digest": "sha256:abc123def456",
|
||||
},
|
||||
"layers": [
|
||||
{
|
||||
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
|
||||
"size": 12345,
|
||||
"digest": "sha256:layer1",
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
mock_response_auth = mocker.Mock()
|
||||
mock_response_auth.json.return_value = {"token": "dummy_token"}
|
||||
mock_response_auth.raise_for_status.return_value = None
|
||||
|
||||
mock_response_manifest = mocker.Mock()
|
||||
mock_response_manifest.json.return_value = manifest_content
|
||||
mock_response_manifest.status_code = 200
|
||||
mock_response_manifest.raise_for_status.return_value = None
|
||||
|
||||
# Setup the mock to return different responses for each URL
|
||||
def mock_get(url, **kwargs):
|
||||
if "token" in url:
|
||||
return mock_response_auth
|
||||
else:
|
||||
return mock_response_manifest
|
||||
|
||||
mocker.patch("requests.get", side_effect=mock_get)
|
||||
|
||||
# Call the function
|
||||
response = get_manifest(image_str)
|
||||
|
||||
# Verify the result
|
||||
assert response.status_code == 200
|
||||
assert response.json() == manifest_content
|
||||
|
||||
|
||||
def test_get_manifest_digest():
|
||||
"""Test that get_manifest_digest correctly calculates the manifest digest."""
|
||||
# Create a sample manifest content
|
||||
manifest_content = b'{"schemaVersion":2,"mediaType":"application/vnd.docker.distribution.manifest.v2+json"}'
|
||||
|
||||
# Calculate the expected digest manually
|
||||
import hashlib
|
||||
|
||||
expected_digest = hashlib.sha256(manifest_content).hexdigest()
|
||||
|
||||
# Call the function with the content directly
|
||||
digest = get_manifest_digest("unused_image_str", manifest_content)
|
||||
|
||||
# Verify the result
|
||||
assert digest == expected_digest
|
||||
|
||||
|
||||
def test_get_manifest_digest_from_registry(mocker: MockerFixture):
|
||||
"""Test that get_manifest_digest correctly retrieves and calculates digests from the registry."""
|
||||
image_str = "ghcr.io/freedomofpress/dangerzone:v0.4.2"
|
||||
|
||||
# Sample manifest content
|
||||
manifest_content = b'{"schemaVersion":2,"mediaType":"application/vnd.docker.distribution.manifest.v2+json"}'
|
||||
expected_digest = hashlib.sha256(manifest_content).hexdigest()
|
||||
|
||||
# Mock get_manifest
|
||||
mock_response = mocker.Mock()
|
||||
mock_response.content = manifest_content
|
||||
mocker.patch("dangerzone.updater.registry.get_manifest", return_value=mock_response)
|
||||
|
||||
# Call the function
|
||||
digest = get_manifest_digest(image_str)
|
||||
|
||||
# Verify the result
|
||||
assert digest == expected_digest
|
|
@ -5,8 +5,10 @@ from pathlib import Path
|
|||
import pytest
|
||||
from pytest_subprocess import FakeProcess
|
||||
|
||||
from dangerzone import errors as dzerrors
|
||||
from dangerzone.updater import errors
|
||||
from dangerzone.updater.signatures import (
|
||||
Signature,
|
||||
get_config_dir,
|
||||
get_last_log_index,
|
||||
get_log_index_from_signatures,
|
||||
|
@ -27,6 +29,8 @@ INVALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "invalid"
|
|||
VALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "valid"
|
||||
TEMPERED_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "tempered"
|
||||
|
||||
RANDOM_DIGEST = "aacc9b586648bbe3040f2822153b1d5ead2779af45ff750fd6f04daf4a9f64b4"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def valid_signature():
|
||||
|
@ -36,6 +40,14 @@ def valid_signature():
|
|||
return signatures.pop()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tempered_signature():
|
||||
signature_file = next(TEMPERED_SIGNATURES_PATH.glob("**/*.json"))
|
||||
with open(signature_file, "r") as signature_file:
|
||||
signatures = json.load(signature_file)
|
||||
return signatures.pop()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def signature_other_digest(valid_signature):
|
||||
signature = valid_signature.copy()
|
||||
|
@ -198,39 +210,113 @@ def test_get_remote_signatures_cosign_error(mocker, fp: FakeProcess):
|
|||
|
||||
|
||||
def test_store_signatures_with_different_digests(
|
||||
valid_signature, signature_other_digest
|
||||
valid_signature, signature_other_digest, mocker, tmp_path
|
||||
):
|
||||
"""Test that store_signatures raises an error when a signature's digest doesn't match."""
|
||||
signatures = [valid_signature, signature_other_digest]
|
||||
breakpoint()
|
||||
pass
|
||||
image_digest = "sha256:123456"
|
||||
|
||||
# Mock the signatures path
|
||||
signatures_path = tmp_path / "signatures"
|
||||
signatures_path.mkdir()
|
||||
mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path)
|
||||
|
||||
# Mock get_log_index_from_signatures
|
||||
mocker.patch(
|
||||
"dangerzone.updater.signatures.get_log_index_from_signatures",
|
||||
return_value=100,
|
||||
)
|
||||
|
||||
# Mock get_last_log_index
|
||||
mocker.patch(
|
||||
"dangerzone.updater.signatures.get_last_log_index",
|
||||
return_value=50,
|
||||
)
|
||||
|
||||
# Call store_signatures
|
||||
with pytest.raises(errors.SignatureMismatch):
|
||||
store_signatures(signatures, image_digest, TEST_PUBKEY_PATH)
|
||||
|
||||
# Verify that the signatures file was not created
|
||||
assert not (signatures_path / f"{image_digest}.json").exists()
|
||||
|
||||
# Verify that the log index file was not updated
|
||||
assert not (signatures_path / "last_log_index").exists()
|
||||
|
||||
|
||||
def test_store_signatures_digest_mismatch():
|
||||
pass
|
||||
def test_stores_signatures_updates_last_log_index(valid_signature, mocker, tmp_path):
|
||||
"""Test that store_signatures updates the last log index file."""
|
||||
signatures = [valid_signature]
|
||||
# Extract the digest from the signature
|
||||
image_digest = Signature(valid_signature).manifest_digest
|
||||
|
||||
# Mock the signatures path
|
||||
signatures_path = tmp_path / "signatures"
|
||||
signatures_path.mkdir()
|
||||
mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path)
|
||||
|
||||
# Create an existing last_log_index file with a lower value
|
||||
with open(signatures_path / "last_log_index", "w") as f:
|
||||
f.write("50")
|
||||
|
||||
# Mock get_log_index_from_signatures to return a higher value
|
||||
mocker.patch(
|
||||
"dangerzone.updater.signatures.get_log_index_from_signatures",
|
||||
return_value=100,
|
||||
)
|
||||
|
||||
# Call store_signatures
|
||||
store_signatures(signatures, image_digest, TEST_PUBKEY_PATH)
|
||||
|
||||
# Verify that the log index file was updated
|
||||
assert (signatures_path / "last_log_index").exists()
|
||||
with open(signatures_path / "last_log_index", "r") as f:
|
||||
assert f.read() == "100"
|
||||
|
||||
|
||||
def test_stores_signatures_updates_last_log_index():
|
||||
pass
|
||||
def test_is_update_available_when_no_local_image(mocker):
|
||||
"""
|
||||
Test that is_update_available returns True when no local image is
|
||||
currently present.
|
||||
"""
|
||||
# Mock container_image_exists to return False
|
||||
mocker.patch(
|
||||
"dangerzone.container_utils.get_local_image_digest",
|
||||
side_effect=dzerrors.ImageNotPresentException,
|
||||
)
|
||||
|
||||
# Mock get_manifest_digest to return a digest
|
||||
mocker.patch(
|
||||
"dangerzone.updater.registry.get_manifest_digest",
|
||||
return_value=RANDOM_DIGEST,
|
||||
)
|
||||
|
||||
# Call is_update_available
|
||||
update_available, digest = is_update_available("ghcr.io/freedomofpress/dangerzone")
|
||||
|
||||
# Verify the result
|
||||
assert update_available is True
|
||||
assert digest == RANDOM_DIGEST
|
||||
|
||||
|
||||
def test_get_file_digest():
|
||||
pass
|
||||
def test_verify_signature(valid_signature):
|
||||
"""Test that verify_signature raises an error when the payload digest doesn't match."""
|
||||
verify_signature(
|
||||
valid_signature,
|
||||
Signature(valid_signature).manifest_digest,
|
||||
TEST_PUBKEY_PATH,
|
||||
)
|
||||
|
||||
|
||||
def test_convert_oci_images_signatures():
|
||||
pass
|
||||
|
||||
|
||||
def test_is_update_available_nothing_local():
|
||||
pass
|
||||
|
||||
|
||||
def test_is_update_available_trims():
|
||||
pass
|
||||
|
||||
|
||||
def test_verify_signature_wrong_payload_digest():
|
||||
pass
|
||||
def test_verify_signature_tempered(tempered_signature):
|
||||
"""Test that verify_signature raises an error when the payload digest doesn't match."""
|
||||
# Call verify_signature and expect an error
|
||||
with pytest.raises(errors.SignatureError):
|
||||
verify_signature(
|
||||
tempered_signature,
|
||||
Signature(tempered_signature).manifest_digest,
|
||||
TEST_PUBKEY_PATH,
|
||||
)
|
||||
|
||||
|
||||
def test_verify_signatures_empty_list():
|
||||
|
|
Loading…
Reference in a new issue