diff --git a/dangerzone/updater/registry.py b/dangerzone/updater/registry.py index fe57364..0caee26 100644 --- a/dangerzone/updater/registry.py +++ b/dangerzone/updater/registry.py @@ -5,6 +5,8 @@ from typing import Dict, Optional, Tuple import requests +from .. import container_utils as runtime +from .. import errors as dzerrors from . import errors, log __all__ = [ @@ -114,3 +116,24 @@ def get_manifest_digest( tag_manifest_content = get_manifest(image_str).content return sha256(tag_manifest_content).hexdigest() + + +def is_new_remote_image_available(image_str: str) -> Tuple[bool, str]: + """ + Check if a new remote image is available on the registry. + """ + remote_digest = get_manifest_digest(image_str) + image = parse_image_location(image_str) + if image.digest: + local_digest = image.digest + else: + try: + local_digest = runtime.get_local_image_digest(image_str) + except dzerrors.ImageNotPresentException: + log.debug("No local image found") + return True, remote_digest + + log.debug("Remote digest: %s", remote_digest) + log.debug("Local digest: %s", local_digest) + + return (remote_digest != local_digest, remote_digest) diff --git a/dangerzone/updater/signatures.py b/dangerzone/updater/signatures.py index d452967..0c58b8c 100644 --- a/dangerzone/updater/signatures.py +++ b/dangerzone/updater/signatures.py @@ -136,19 +136,39 @@ class Signature: return full_digest.replace("sha256:", "") -def is_update_available(image: str) -> Tuple[bool, Optional[str]]: - remote_digest = registry.get_manifest_digest(image) +def is_update_available(image_str: str, pubkey: str) -> Tuple[bool, Optional[str]]: + """ + Check if a new image is available, doing all the necessary checks ensuring it + would be safe to upgrade. + """ + new_image_available, remote_digest = registry.is_new_remote_image_available( + image_str + ) + if not new_image_available: + return False, None + try: - local_digest = runtime.get_local_image_digest(image) - except dzerrors.ImageNotPresentException: - log.debug("No local image found") + check_signatures_and_logindex(image_str, remote_digest, pubkey) return True, remote_digest - log.debug("Remote digest: %s", remote_digest) - log.debug("Local digest: %s", local_digest) - has_update = remote_digest != local_digest - if has_update: - return True, remote_digest - return False, None + except errors.InvalidLogIndex: + return False, None + + +def check_signatures_and_logindex( + image_str: str, remote_digest: str, pubkey: str +) -> list[Dict]: + signatures = get_remote_signatures(image_str, remote_digest) + verify_signatures(signatures, remote_digest, pubkey) + + incoming_log_index = get_log_index_from_signatures(signatures) + last_log_index = get_last_log_index() + + if incoming_log_index < last_log_index: + raise errors.InvalidLogIndex( + f"The incoming log index ({incoming_log_index}) is " + f"lower than the last known log index ({last_log_index})" + ) + return signatures def verify_signatures( @@ -461,22 +481,11 @@ def prepare_airgapped_archive(image_name: str, destination: str) -> None: def upgrade_container_image(image: str, manifest_digest: str, pubkey: str) -> str: """Verify and upgrade the image to the latest, if signed.""" - update_available, _ = is_update_available(image) + update_available, remote_digest = registry.is_new_remote_image_available(image) if not update_available: raise errors.ImageAlreadyUpToDate("The image is already up to date") - signatures = get_remote_signatures(image, manifest_digest) - verify_signatures(signatures, manifest_digest, pubkey) - - # Only upgrade if the log index is higher than the last known one - incoming_log_index = get_log_index_from_signatures(signatures) - last_log_index = get_last_log_index() - - if incoming_log_index < last_log_index: - raise errors.InvalidLogIndex( - "Trying to upgrade to an image with a lower log index" - ) - + signatures = check_signatures_and_logindex(image, remote_digest, pubkey) runtime.container_pull(image, manifest_digest) # Store the signatures just now to avoid storing them unverified