Provide an is_update_available function

This function does all the needed checks before returning `True`, making it a good external API.

Under the hood, the registry now has an `is_new_remote_image_available`
which is just for checking the presence of a new image, but doesn't do
any verirications on it, and there is also a new `check_signatures_and_logindex` that ensures that these two are valid.
This commit is contained in:
Alexis Métaireau 2025-02-26 19:22:33 +01:00
parent 238ea527e6
commit 8d6e5cb8b8
No known key found for this signature in database
GPG key ID: C65C7A89A8FFC56E
2 changed files with 56 additions and 24 deletions

View file

@ -5,6 +5,8 @@ from typing import Dict, Optional, Tuple
import requests import requests
from .. import container_utils as runtime
from .. import errors as dzerrors
from . import errors, log from . import errors, log
__all__ = [ __all__ = [
@ -114,3 +116,24 @@ def get_manifest_digest(
tag_manifest_content = get_manifest(image_str).content tag_manifest_content = get_manifest(image_str).content
return sha256(tag_manifest_content).hexdigest() return sha256(tag_manifest_content).hexdigest()
def is_new_remote_image_available(image_str: str) -> Tuple[bool, str]:
"""
Check if a new remote image is available on the registry.
"""
remote_digest = get_manifest_digest(image_str)
image = parse_image_location(image_str)
if image.digest:
local_digest = image.digest
else:
try:
local_digest = runtime.get_local_image_digest(image_str)
except dzerrors.ImageNotPresentException:
log.debug("No local image found")
return True, remote_digest
log.debug("Remote digest: %s", remote_digest)
log.debug("Local digest: %s", local_digest)
return (remote_digest != local_digest, remote_digest)

View file

@ -136,20 +136,40 @@ class Signature:
return full_digest.replace("sha256:", "") return full_digest.replace("sha256:", "")
def is_update_available(image: str) -> Tuple[bool, Optional[str]]: def is_update_available(image_str: str, pubkey: str) -> Tuple[bool, Optional[str]]:
remote_digest = registry.get_manifest_digest(image) """
try: Check if a new image is available, doing all the necessary checks ensuring it
local_digest = runtime.get_local_image_digest(image) would be safe to upgrade.
except dzerrors.ImageNotPresentException: """
log.debug("No local image found") new_image_available, remote_digest = registry.is_new_remote_image_available(
return True, remote_digest image_str
log.debug("Remote digest: %s", remote_digest) )
log.debug("Local digest: %s", local_digest) if not new_image_available:
has_update = remote_digest != local_digest
if has_update:
return True, remote_digest
return False, None return False, None
try:
check_signatures_and_logindex(image_str, remote_digest, pubkey)
return True, remote_digest
except errors.InvalidLogIndex:
return False, None
def check_signatures_and_logindex(
image_str: str, remote_digest: str, pubkey: str
) -> list[Dict]:
signatures = get_remote_signatures(image_str, remote_digest)
verify_signatures(signatures, remote_digest, pubkey)
incoming_log_index = get_log_index_from_signatures(signatures)
last_log_index = get_last_log_index()
if incoming_log_index < last_log_index:
raise errors.InvalidLogIndex(
f"The incoming log index ({incoming_log_index}) is "
f"lower than the last known log index ({last_log_index})"
)
return signatures
def verify_signatures( def verify_signatures(
signatures: List[Dict], signatures: List[Dict],
@ -461,22 +481,11 @@ def prepare_airgapped_archive(image_name: str, destination: str) -> None:
def upgrade_container_image(image: str, manifest_digest: str, pubkey: str) -> str: def upgrade_container_image(image: str, manifest_digest: str, pubkey: str) -> str:
"""Verify and upgrade the image to the latest, if signed.""" """Verify and upgrade the image to the latest, if signed."""
update_available, _ = is_update_available(image) update_available, remote_digest = registry.is_new_remote_image_available(image)
if not update_available: if not update_available:
raise errors.ImageAlreadyUpToDate("The image is already up to date") raise errors.ImageAlreadyUpToDate("The image is already up to date")
signatures = get_remote_signatures(image, manifest_digest) signatures = check_signatures_and_logindex(image, remote_digest, pubkey)
verify_signatures(signatures, manifest_digest, pubkey)
# Only upgrade if the log index is higher than the last known one
incoming_log_index = get_log_index_from_signatures(signatures)
last_log_index = get_last_log_index()
if incoming_log_index < last_log_index:
raise errors.InvalidLogIndex(
"Trying to upgrade to an image with a lower log index"
)
runtime.container_pull(image, manifest_digest) runtime.container_pull(image, manifest_digest)
# Store the signatures just now to avoid storing them unverified # Store the signatures just now to avoid storing them unverified