mirror of
https://github.com/freedomofpress/dangerzone.git
synced 2025-05-18 11:11:49 +02:00
Compare commits
1 commit
21e726e901
...
f8b1263a58
Author | SHA1 | Date | |
---|---|---|---|
f8b1263a58 |
4 changed files with 29 additions and 376 deletions
|
@ -28,7 +28,7 @@ ACCEPT_MANIFESTS_HEADER = ",".join(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
Image = namedtuple("Image", ["registry", "namespace", "image_name", "tag", "digest"])
|
Image = namedtuple("Image", ["registry", "namespace", "image_name", "tag"])
|
||||||
|
|
||||||
|
|
||||||
def parse_image_location(input_string: str) -> Image:
|
def parse_image_location(input_string: str) -> Image:
|
||||||
|
@ -37,9 +37,8 @@ def parse_image_location(input_string: str) -> Image:
|
||||||
r"^"
|
r"^"
|
||||||
r"(?P<registry>[a-zA-Z0-9.-]+)/"
|
r"(?P<registry>[a-zA-Z0-9.-]+)/"
|
||||||
r"(?P<namespace>[a-zA-Z0-9-]+)/"
|
r"(?P<namespace>[a-zA-Z0-9-]+)/"
|
||||||
r"(?P<image_name>[^:@]+)"
|
r"(?P<image_name>[^:]+)"
|
||||||
r"(?::(?P<tag>[a-zA-Z0-9.-]+))?"
|
r"(?::(?P<tag>[a-zA-Z0-9.-]+))?"
|
||||||
r"(?:@(?P<digest>sha256:[a-zA-Z0-9]+))?"
|
|
||||||
r"$"
|
r"$"
|
||||||
)
|
)
|
||||||
match = re.match(pattern, input_string)
|
match = re.match(pattern, input_string)
|
||||||
|
@ -50,7 +49,6 @@ def parse_image_location(input_string: str) -> Image:
|
||||||
namespace=match.group("namespace"),
|
namespace=match.group("namespace"),
|
||||||
image_name=match.group("image_name"),
|
image_name=match.group("image_name"),
|
||||||
tag=match.group("tag") or "latest",
|
tag=match.group("tag") or "latest",
|
||||||
digest=match.group("digest"),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -12,7 +12,6 @@ from tempfile import NamedTemporaryFile, TemporaryDirectory
|
||||||
from typing import Dict, List, Optional, Tuple
|
from typing import Dict, List, Optional, Tuple
|
||||||
|
|
||||||
from .. import container_utils as runtime
|
from .. import container_utils as runtime
|
||||||
from .. import errors as dzerrors
|
|
||||||
from ..util import get_resource_path
|
from ..util import get_resource_path
|
||||||
from . import cosign, errors, log, registry
|
from . import cosign, errors, log, registry
|
||||||
|
|
||||||
|
@ -109,34 +108,16 @@ def verify_signature(signature: dict, image_digest: str, pubkey: str | Path) ->
|
||||||
if result.returncode != 0:
|
if result.returncode != 0:
|
||||||
# XXX Raise instead?
|
# XXX Raise instead?
|
||||||
log.debug("Failed to verify signature", result.stderr)
|
log.debug("Failed to verify signature", result.stderr)
|
||||||
raise errors.SignatureVerificationError("Failed to verify signature")
|
return False
|
||||||
if result.stderr == b"Verified OK\n":
|
if result.stderr == b"Verified OK\n":
|
||||||
log.debug("Signature verified")
|
log.debug("Signature verified")
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
class Signature:
|
|
||||||
def __init__(self, signature: Dict):
|
|
||||||
self.signature = signature
|
|
||||||
|
|
||||||
@property
|
|
||||||
def payload(self) -> Dict:
|
|
||||||
return json.loads(b64decode(self.signature["Payload"]))
|
|
||||||
|
|
||||||
@property
|
|
||||||
def manifest_digest(self) -> str:
|
|
||||||
full_digest = self.payload["critical"]["image"]["docker-manifest-digest"]
|
|
||||||
return full_digest.replace("sha256:", "")
|
|
||||||
|
|
||||||
|
|
||||||
def is_update_available(image: str) -> Tuple[bool, Optional[str]]:
|
def is_update_available(image: str) -> Tuple[bool, Optional[str]]:
|
||||||
remote_digest = registry.get_manifest_digest(image)
|
remote_digest = registry.get_manifest_digest(image)
|
||||||
try:
|
local_digest = runtime.get_local_image_digest(image)
|
||||||
local_digest = runtime.get_local_image_digest(image)
|
|
||||||
except dzerrors.ImageNotPresentException:
|
|
||||||
log.debug("No local image found")
|
|
||||||
return True, remote_digest
|
|
||||||
log.debug("Remote digest: %s", remote_digest)
|
log.debug("Remote digest: %s", remote_digest)
|
||||||
log.debug("Local digest: %s", local_digest)
|
log.debug("Local digest: %s", local_digest)
|
||||||
has_update = remote_digest != local_digest
|
has_update = remote_digest != local_digest
|
||||||
|
@ -184,7 +165,7 @@ def get_log_index_from_signatures(signatures: List[Dict]) -> int:
|
||||||
def write_log_index(log_index: int) -> None:
|
def write_log_index(log_index: int) -> None:
|
||||||
last_log_index_path = SIGNATURES_PATH / "last_log_index"
|
last_log_index_path = SIGNATURES_PATH / "last_log_index"
|
||||||
|
|
||||||
with open(last_log_index_path, "w") as f:
|
with open(log_index, "w") as f:
|
||||||
f.write(str(log_index))
|
f.write(str(log_index))
|
||||||
|
|
||||||
|
|
||||||
|
@ -377,7 +358,7 @@ def store_signatures(signatures: list[Dict], image_digest: str, pubkey: str) ->
|
||||||
|
|
||||||
if f"sha256:{image_digest}" != digests[0]:
|
if f"sha256:{image_digest}" != digests[0]:
|
||||||
raise errors.SignatureMismatch(
|
raise errors.SignatureMismatch(
|
||||||
f"Signatures do not match the given image digest (sha256:{image_digest}, {digests[0]})"
|
f"Signatures do not match the given image digest ({image_digest}, {digests[0]})"
|
||||||
)
|
)
|
||||||
|
|
||||||
pubkey_signatures = SIGNATURES_PATH / get_file_digest(pubkey)
|
pubkey_signatures = SIGNATURES_PATH / get_file_digest(pubkey)
|
||||||
|
@ -389,8 +370,6 @@ def store_signatures(signatures: list[Dict], image_digest: str, pubkey: str) ->
|
||||||
)
|
)
|
||||||
json.dump(signatures, f)
|
json.dump(signatures, f)
|
||||||
|
|
||||||
write_log_index(get_log_index_from_signatures(signatures))
|
|
||||||
|
|
||||||
|
|
||||||
def verify_local_image(image: str, pubkey: str) -> bool:
|
def verify_local_image(image: str, pubkey: str) -> bool:
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -1,238 +0,0 @@
|
||||||
import hashlib
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
import requests
|
|
||||||
from pytest_mock import MockerFixture
|
|
||||||
|
|
||||||
from dangerzone.updater.registry import (
|
|
||||||
Image,
|
|
||||||
_get_auth_header,
|
|
||||||
_url,
|
|
||||||
get_manifest,
|
|
||||||
get_manifest_digest,
|
|
||||||
list_tags,
|
|
||||||
parse_image_location,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_parse_image_location_no_tag():
|
|
||||||
"""Test that parse_image_location correctly handles an image location without a tag."""
|
|
||||||
image_str = "ghcr.io/freedomofpress/dangerzone"
|
|
||||||
image = parse_image_location(image_str)
|
|
||||||
|
|
||||||
assert isinstance(image, Image)
|
|
||||||
assert image.registry == "ghcr.io"
|
|
||||||
assert image.namespace == "freedomofpress"
|
|
||||||
assert image.image_name == "dangerzone"
|
|
||||||
assert image.tag == "latest" # Default tag should be "latest"
|
|
||||||
assert image.digest is None
|
|
||||||
|
|
||||||
|
|
||||||
def test_parse_image_location_with_tag():
|
|
||||||
"""Test that parse_image_location correctly handles an image location with a tag."""
|
|
||||||
image_str = "ghcr.io/freedomofpress/dangerzone:v0.4.2"
|
|
||||||
image = parse_image_location(image_str)
|
|
||||||
|
|
||||||
assert isinstance(image, Image)
|
|
||||||
assert image.registry == "ghcr.io"
|
|
||||||
assert image.namespace == "freedomofpress"
|
|
||||||
assert image.image_name == "dangerzone"
|
|
||||||
assert image.tag == "v0.4.2"
|
|
||||||
|
|
||||||
|
|
||||||
def test_parse_image_location_tag_plus_digest():
|
|
||||||
"""Test that parse_image_location handles an image location with a tag that includes a digest."""
|
|
||||||
image_str = (
|
|
||||||
"ghcr.io/freedomofpress/dangerzone"
|
|
||||||
":20250205-0.8.0-148-ge67fbc1"
|
|
||||||
"@sha256:19e8eacd75879d05f6621c2ea8dd955e68ee3e07b41b9d53f4c8cc9929a68a67"
|
|
||||||
)
|
|
||||||
|
|
||||||
image = parse_image_location(image_str)
|
|
||||||
|
|
||||||
assert isinstance(image, Image)
|
|
||||||
assert image.registry == "ghcr.io"
|
|
||||||
assert image.namespace == "freedomofpress"
|
|
||||||
assert image.image_name == "dangerzone"
|
|
||||||
assert image.tag == "20250205-0.8.0-148-ge67fbc1"
|
|
||||||
assert (
|
|
||||||
image.digest
|
|
||||||
== "sha256:19e8eacd75879d05f6621c2ea8dd955e68ee3e07b41b9d53f4c8cc9929a68a67"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_parse_invalid_image_location():
|
|
||||||
"""Test that parse_image_location raises an error for invalid image locations."""
|
|
||||||
invalid_image_locations = [
|
|
||||||
"ghcr.io/dangerzone", # Missing namespace
|
|
||||||
"ghcr.io/freedomofpress/dangerzone:", # Empty tag
|
|
||||||
"freedomofpress/dangerzone", # Missing registry
|
|
||||||
"ghcr.io:freedomofpress/dangerzone", # Invalid format
|
|
||||||
"", # Empty string
|
|
||||||
]
|
|
||||||
|
|
||||||
for invalid_image in invalid_image_locations:
|
|
||||||
with pytest.raises(ValueError, match="Malformed image location"):
|
|
||||||
parse_image_location(invalid_image)
|
|
||||||
|
|
||||||
|
|
||||||
def test_list_tags(mocker: MockerFixture):
|
|
||||||
"""Test that list_tags correctly retrieves tags from the registry."""
|
|
||||||
# Mock the authentication response
|
|
||||||
image_str = "ghcr.io/freedomofpress/dangerzone"
|
|
||||||
|
|
||||||
# Mock requests.get to return appropriate values for both calls
|
|
||||||
mock_response_auth = mocker.Mock()
|
|
||||||
mock_response_auth.json.return_value = {"token": "dummy_token"}
|
|
||||||
mock_response_auth.raise_for_status.return_value = None
|
|
||||||
|
|
||||||
mock_response_tags = mocker.Mock()
|
|
||||||
mock_response_tags.json.return_value = {
|
|
||||||
"tags": ["v0.4.0", "v0.4.1", "v0.4.2", "latest"]
|
|
||||||
}
|
|
||||||
mock_response_tags.raise_for_status.return_value = None
|
|
||||||
|
|
||||||
# Setup the mock to return different responses for each URL
|
|
||||||
def mock_get(url, **kwargs):
|
|
||||||
if "token" in url:
|
|
||||||
return mock_response_auth
|
|
||||||
else:
|
|
||||||
return mock_response_tags
|
|
||||||
|
|
||||||
mocker.patch("requests.get", side_effect=mock_get)
|
|
||||||
|
|
||||||
# Call the function
|
|
||||||
tags = list_tags(image_str)
|
|
||||||
|
|
||||||
# Verify the result
|
|
||||||
assert tags == ["v0.4.0", "v0.4.1", "v0.4.2", "latest"]
|
|
||||||
|
|
||||||
|
|
||||||
def test_list_tags_auth_error(mocker: MockerFixture):
|
|
||||||
"""Test that list_tags handles authentication errors correctly."""
|
|
||||||
image_str = "ghcr.io/freedomofpress/dangerzone"
|
|
||||||
|
|
||||||
# Mock requests.get to raise an HTTPError
|
|
||||||
mock_response = mocker.Mock()
|
|
||||||
mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError(
|
|
||||||
"401 Client Error: Unauthorized"
|
|
||||||
)
|
|
||||||
|
|
||||||
mocker.patch("requests.get", return_value=mock_response)
|
|
||||||
|
|
||||||
# Call the function and expect an error
|
|
||||||
with pytest.raises(requests.exceptions.HTTPError):
|
|
||||||
list_tags(image_str)
|
|
||||||
|
|
||||||
|
|
||||||
def test_list_tags_registry_error(mocker: MockerFixture):
|
|
||||||
"""Test that list_tags handles registry errors correctly."""
|
|
||||||
image_str = "ghcr.io/freedomofpress/dangerzone"
|
|
||||||
|
|
||||||
# Mock requests.get to return success for auth but error for tags
|
|
||||||
mock_response_auth = mocker.Mock()
|
|
||||||
mock_response_auth.json.return_value = {"token": "dummy_token"}
|
|
||||||
mock_response_auth.raise_for_status.return_value = None
|
|
||||||
|
|
||||||
mock_response_tags = mocker.Mock()
|
|
||||||
mock_response_tags.raise_for_status.side_effect = requests.exceptions.HTTPError(
|
|
||||||
"404 Client Error: Not Found"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Setup the mock to return different responses for each URL
|
|
||||||
def mock_get(url, **kwargs):
|
|
||||||
if "token" in url:
|
|
||||||
return mock_response_auth
|
|
||||||
else:
|
|
||||||
return mock_response_tags
|
|
||||||
|
|
||||||
mocker.patch("requests.get", side_effect=mock_get)
|
|
||||||
|
|
||||||
# Call the function and expect an error
|
|
||||||
with pytest.raises(requests.exceptions.HTTPError):
|
|
||||||
list_tags(image_str)
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_manifest(mocker: MockerFixture):
|
|
||||||
"""Test that get_manifest correctly retrieves manifests from the registry."""
|
|
||||||
image_str = "ghcr.io/freedomofpress/dangerzone:v0.4.2"
|
|
||||||
|
|
||||||
# Mock the responses
|
|
||||||
manifest_content = {
|
|
||||||
"schemaVersion": 2,
|
|
||||||
"mediaType": "application/vnd.docker.distribution.manifest.v2+json",
|
|
||||||
"config": {
|
|
||||||
"mediaType": "application/vnd.docker.container.image.v1+json",
|
|
||||||
"size": 1234,
|
|
||||||
"digest": "sha256:abc123def456",
|
|
||||||
},
|
|
||||||
"layers": [
|
|
||||||
{
|
|
||||||
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
|
|
||||||
"size": 12345,
|
|
||||||
"digest": "sha256:layer1",
|
|
||||||
}
|
|
||||||
],
|
|
||||||
}
|
|
||||||
|
|
||||||
mock_response_auth = mocker.Mock()
|
|
||||||
mock_response_auth.json.return_value = {"token": "dummy_token"}
|
|
||||||
mock_response_auth.raise_for_status.return_value = None
|
|
||||||
|
|
||||||
mock_response_manifest = mocker.Mock()
|
|
||||||
mock_response_manifest.json.return_value = manifest_content
|
|
||||||
mock_response_manifest.status_code = 200
|
|
||||||
mock_response_manifest.raise_for_status.return_value = None
|
|
||||||
|
|
||||||
# Setup the mock to return different responses for each URL
|
|
||||||
def mock_get(url, **kwargs):
|
|
||||||
if "token" in url:
|
|
||||||
return mock_response_auth
|
|
||||||
else:
|
|
||||||
return mock_response_manifest
|
|
||||||
|
|
||||||
mocker.patch("requests.get", side_effect=mock_get)
|
|
||||||
|
|
||||||
# Call the function
|
|
||||||
response = get_manifest(image_str)
|
|
||||||
|
|
||||||
# Verify the result
|
|
||||||
assert response.status_code == 200
|
|
||||||
assert response.json() == manifest_content
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_manifest_digest():
|
|
||||||
"""Test that get_manifest_digest correctly calculates the manifest digest."""
|
|
||||||
# Create a sample manifest content
|
|
||||||
manifest_content = b'{"schemaVersion":2,"mediaType":"application/vnd.docker.distribution.manifest.v2+json"}'
|
|
||||||
|
|
||||||
# Calculate the expected digest manually
|
|
||||||
import hashlib
|
|
||||||
|
|
||||||
expected_digest = hashlib.sha256(manifest_content).hexdigest()
|
|
||||||
|
|
||||||
# Call the function with the content directly
|
|
||||||
digest = get_manifest_digest("unused_image_str", manifest_content)
|
|
||||||
|
|
||||||
# Verify the result
|
|
||||||
assert digest == expected_digest
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_manifest_digest_from_registry(mocker: MockerFixture):
|
|
||||||
"""Test that get_manifest_digest correctly retrieves and calculates digests from the registry."""
|
|
||||||
image_str = "ghcr.io/freedomofpress/dangerzone:v0.4.2"
|
|
||||||
|
|
||||||
# Sample manifest content
|
|
||||||
manifest_content = b'{"schemaVersion":2,"mediaType":"application/vnd.docker.distribution.manifest.v2+json"}'
|
|
||||||
expected_digest = hashlib.sha256(manifest_content).hexdigest()
|
|
||||||
|
|
||||||
# Mock get_manifest
|
|
||||||
mock_response = mocker.Mock()
|
|
||||||
mock_response.content = manifest_content
|
|
||||||
mocker.patch("dangerzone.updater.registry.get_manifest", return_value=mock_response)
|
|
||||||
|
|
||||||
# Call the function
|
|
||||||
digest = get_manifest_digest(image_str)
|
|
||||||
|
|
||||||
# Verify the result
|
|
||||||
assert digest == expected_digest
|
|
|
@ -5,10 +5,8 @@ from pathlib import Path
|
||||||
import pytest
|
import pytest
|
||||||
from pytest_subprocess import FakeProcess
|
from pytest_subprocess import FakeProcess
|
||||||
|
|
||||||
from dangerzone import errors as dzerrors
|
|
||||||
from dangerzone.updater import errors
|
from dangerzone.updater import errors
|
||||||
from dangerzone.updater.signatures import (
|
from dangerzone.updater.signatures import (
|
||||||
Signature,
|
|
||||||
get_config_dir,
|
get_config_dir,
|
||||||
get_last_log_index,
|
get_last_log_index,
|
||||||
get_log_index_from_signatures,
|
get_log_index_from_signatures,
|
||||||
|
@ -29,8 +27,6 @@ INVALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "invalid"
|
||||||
VALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "valid"
|
VALID_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "valid"
|
||||||
TEMPERED_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "tempered"
|
TEMPERED_SIGNATURES_PATH = ASSETS_PATH / "signatures" / "tempered"
|
||||||
|
|
||||||
RANDOM_DIGEST = "aacc9b586648bbe3040f2822153b1d5ead2779af45ff750fd6f04daf4a9f64b4"
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def valid_signature():
|
def valid_signature():
|
||||||
|
@ -40,14 +36,6 @@ def valid_signature():
|
||||||
return signatures.pop()
|
return signatures.pop()
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def tempered_signature():
|
|
||||||
signature_file = next(TEMPERED_SIGNATURES_PATH.glob("**/*.json"))
|
|
||||||
with open(signature_file, "r") as signature_file:
|
|
||||||
signatures = json.load(signature_file)
|
|
||||||
return signatures.pop()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def signature_other_digest(valid_signature):
|
def signature_other_digest(valid_signature):
|
||||||
signature = valid_signature.copy()
|
signature = valid_signature.copy()
|
||||||
|
@ -210,113 +198,39 @@ def test_get_remote_signatures_cosign_error(mocker, fp: FakeProcess):
|
||||||
|
|
||||||
|
|
||||||
def test_store_signatures_with_different_digests(
|
def test_store_signatures_with_different_digests(
|
||||||
valid_signature, signature_other_digest, mocker, tmp_path
|
valid_signature, signature_other_digest
|
||||||
):
|
):
|
||||||
"""Test that store_signatures raises an error when a signature's digest doesn't match."""
|
|
||||||
signatures = [valid_signature, signature_other_digest]
|
signatures = [valid_signature, signature_other_digest]
|
||||||
image_digest = "sha256:123456"
|
breakpoint()
|
||||||
|
pass
|
||||||
# Mock the signatures path
|
|
||||||
signatures_path = tmp_path / "signatures"
|
|
||||||
signatures_path.mkdir()
|
|
||||||
mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path)
|
|
||||||
|
|
||||||
# Mock get_log_index_from_signatures
|
|
||||||
mocker.patch(
|
|
||||||
"dangerzone.updater.signatures.get_log_index_from_signatures",
|
|
||||||
return_value=100,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Mock get_last_log_index
|
|
||||||
mocker.patch(
|
|
||||||
"dangerzone.updater.signatures.get_last_log_index",
|
|
||||||
return_value=50,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Call store_signatures
|
|
||||||
with pytest.raises(errors.SignatureMismatch):
|
|
||||||
store_signatures(signatures, image_digest, TEST_PUBKEY_PATH)
|
|
||||||
|
|
||||||
# Verify that the signatures file was not created
|
|
||||||
assert not (signatures_path / f"{image_digest}.json").exists()
|
|
||||||
|
|
||||||
# Verify that the log index file was not updated
|
|
||||||
assert not (signatures_path / "last_log_index").exists()
|
|
||||||
|
|
||||||
|
|
||||||
def test_stores_signatures_updates_last_log_index(valid_signature, mocker, tmp_path):
|
def test_store_signatures_digest_mismatch():
|
||||||
"""Test that store_signatures updates the last log index file."""
|
pass
|
||||||
signatures = [valid_signature]
|
|
||||||
# Extract the digest from the signature
|
|
||||||
image_digest = Signature(valid_signature).manifest_digest
|
|
||||||
|
|
||||||
# Mock the signatures path
|
|
||||||
signatures_path = tmp_path / "signatures"
|
|
||||||
signatures_path.mkdir()
|
|
||||||
mocker.patch("dangerzone.updater.signatures.SIGNATURES_PATH", signatures_path)
|
|
||||||
|
|
||||||
# Create an existing last_log_index file with a lower value
|
|
||||||
with open(signatures_path / "last_log_index", "w") as f:
|
|
||||||
f.write("50")
|
|
||||||
|
|
||||||
# Mock get_log_index_from_signatures to return a higher value
|
|
||||||
mocker.patch(
|
|
||||||
"dangerzone.updater.signatures.get_log_index_from_signatures",
|
|
||||||
return_value=100,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Call store_signatures
|
|
||||||
store_signatures(signatures, image_digest, TEST_PUBKEY_PATH)
|
|
||||||
|
|
||||||
# Verify that the log index file was updated
|
|
||||||
assert (signatures_path / "last_log_index").exists()
|
|
||||||
with open(signatures_path / "last_log_index", "r") as f:
|
|
||||||
assert f.read() == "100"
|
|
||||||
|
|
||||||
|
|
||||||
def test_is_update_available_when_no_local_image(mocker):
|
def test_stores_signatures_updates_last_log_index():
|
||||||
"""
|
pass
|
||||||
Test that is_update_available returns True when no local image is
|
|
||||||
currently present.
|
|
||||||
"""
|
|
||||||
# Mock container_image_exists to return False
|
|
||||||
mocker.patch(
|
|
||||||
"dangerzone.container_utils.get_local_image_digest",
|
|
||||||
side_effect=dzerrors.ImageNotPresentException,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Mock get_manifest_digest to return a digest
|
|
||||||
mocker.patch(
|
|
||||||
"dangerzone.updater.registry.get_manifest_digest",
|
|
||||||
return_value=RANDOM_DIGEST,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Call is_update_available
|
|
||||||
update_available, digest = is_update_available("ghcr.io/freedomofpress/dangerzone")
|
|
||||||
|
|
||||||
# Verify the result
|
|
||||||
assert update_available is True
|
|
||||||
assert digest == RANDOM_DIGEST
|
|
||||||
|
|
||||||
|
|
||||||
def test_verify_signature(valid_signature):
|
def test_get_file_digest():
|
||||||
"""Test that verify_signature raises an error when the payload digest doesn't match."""
|
pass
|
||||||
verify_signature(
|
|
||||||
valid_signature,
|
|
||||||
Signature(valid_signature).manifest_digest,
|
|
||||||
TEST_PUBKEY_PATH,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_verify_signature_tempered(tempered_signature):
|
def test_convert_oci_images_signatures():
|
||||||
"""Test that verify_signature raises an error when the payload digest doesn't match."""
|
pass
|
||||||
# Call verify_signature and expect an error
|
|
||||||
with pytest.raises(errors.SignatureError):
|
|
||||||
verify_signature(
|
def test_is_update_available_nothing_local():
|
||||||
tempered_signature,
|
pass
|
||||||
Signature(tempered_signature).manifest_digest,
|
|
||||||
TEST_PUBKEY_PATH,
|
|
||||||
)
|
def test_is_update_available_trims():
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def test_verify_signature_wrong_payload_digest():
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def test_verify_signatures_empty_list():
|
def test_verify_signatures_empty_list():
|
||||||
|
|
Loading…
Reference in a new issue