Check if the logIndex is greater than the last known one before upgrading

Each signature is logged to Rekor, and the log index is then part of
the signature itself. Ensuring that the logIndex is greater in the given
container image signature makes it possible to ensure that we're only
going forward in time, and avoid installing older container images
thinking that they are new than the current one.
This commit is contained in:
Alexis Métaireau 2025-02-11 16:09:14 +01:00
parent 8159d6ccb7
commit 46f510ab79
No known key found for this signature in database
GPG key ID: C65C7A89A8FFC56E
2 changed files with 61 additions and 8 deletions

View file

@ -52,3 +52,7 @@ class LocalSignatureNotFound(SignatureError):
class CosignNotInstalledError(SignatureError): class CosignNotInstalledError(SignatureError):
pass pass
class InvalidLogIndex(SignatureError):
pass

View file

@ -4,6 +4,7 @@ import re
import subprocess import subprocess
import tarfile import tarfile
from base64 import b64decode, b64encode from base64 import b64decode, b64encode
from functools import reduce
from hashlib import sha256 from hashlib import sha256
from io import BytesIO from io import BytesIO
from pathlib import Path from pathlib import Path
@ -27,6 +28,8 @@ def get_config_dir() -> Path:
# XXX Store this somewhere else. # XXX Store this somewhere else.
DEFAULT_PUBKEY_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key") DEFAULT_PUBKEY_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key")
SIGNATURES_PATH = get_config_dir() / "signatures" SIGNATURES_PATH = get_config_dir() / "signatures"
LAST_LOG_INDEX = SIGNATURES_PATH / "last_log_index"
__all__ = [ __all__ = [
"verify_signature", "verify_signature",
"load_signatures", "load_signatures",
@ -127,6 +130,28 @@ def verify_signatures(
return True return True
def get_last_log_index() -> int:
SIGNATURES_PATH.mkdir(parents=True, exist_ok=True)
if not LAST_LOG_INDEX.exists():
return 0
with open(LAST_LOG_INDEX) as f:
return int(f.read())
def get_log_index_from_signatures(signatures: List[Dict]) -> int:
return reduce(
lambda acc, sig: max(acc, sig["Bundle"]["Payload"]["logIndex"]), signatures, 0
)
def write_log_index(log_index: int) -> None:
last_log_index_path = SIGNATURES_PATH / "last_log_index"
with open(log_index, "w") as f:
f.write(str(log_index))
def upgrade_container_image(image: str, manifest_digest: str, pubkey: str) -> bool: def upgrade_container_image(image: str, manifest_digest: str, pubkey: str) -> bool:
"""Verify and upgrade the image to the latest, if signed.""" """Verify and upgrade the image to the latest, if signed."""
update_available, _ = is_update_available(image) update_available, _ = is_update_available(image)
@ -136,13 +161,23 @@ def upgrade_container_image(image: str, manifest_digest: str, pubkey: str) -> bo
signatures = get_remote_signatures(image, manifest_digest) signatures = get_remote_signatures(image, manifest_digest)
verify_signatures(signatures, manifest_digest, pubkey) verify_signatures(signatures, manifest_digest, pubkey)
# At this point, the signatures are verified # Ensure that we only upgrade if the log index is higher than the last known one
# We store the signatures just now to avoid storing unverified signatures incoming_log_index = get_log_index_from_signatures(signatures)
store_signatures(signatures, manifest_digest, pubkey) last_log_index = get_last_log_index()
if incoming_log_index < last_log_index:
raise errors.InvalidLogIndex(
"The log index is not higher than the last known one"
)
# let's upgrade the image # let's upgrade the image
# XXX Use the image digest here to avoid race conditions # XXX Use the image digest here to avoid race conditions
return runtime.container_pull(image) upgraded = runtime.container_pull(image)
# At this point, the signatures are verified
# We store the signatures just now to avoid storing unverified signatures
store_signatures(signatures, manifest_digest, pubkey)
return upgraded
def _get_blob(tmpdir: str, digest: str) -> Path: def _get_blob(tmpdir: str, digest: str) -> Path:
@ -178,7 +213,7 @@ def upgrade_container_image_airgapped(container_tar: str, pubkey: str) -> str:
if not cosign.verify_local_image(tmpdir, pubkey): if not cosign.verify_local_image(tmpdir, pubkey):
raise errors.SignatureVerificationError() raise errors.SignatureVerificationError()
# Remove the signatures from the archive. # Remove the signatures from the archive, otherwise podman is not able to load it
with open(Path(tmpdir) / "index.json") as f: with open(Path(tmpdir) / "index.json") as f:
index_json = json.load(f) index_json = json.load(f)
@ -195,6 +230,15 @@ def upgrade_container_image_airgapped(container_tar: str, pubkey: str) -> str:
image_name, signatures = convert_oci_images_signatures(json.load(f), tmpdir) image_name, signatures = convert_oci_images_signatures(json.load(f), tmpdir)
log.info(f"Found image name: {image_name}") log.info(f"Found image name: {image_name}")
# Ensure that we only upgrade if the log index is higher than the last known one
incoming_log_index = get_log_index_from_signatures(signatures)
last_log_index = get_last_log_index()
if incoming_log_index < last_log_index:
raise errors.InvalidLogIndex(
"The log index is not higher than the last known one"
)
image_digest = index_json["manifests"][0].get("digest").replace("sha256:", "") image_digest = index_json["manifests"][0].get("digest").replace("sha256:", "")
# Write the new index.json to the temp folder # Write the new index.json to the temp folder
@ -283,9 +327,13 @@ def store_signatures(signatures: list[Dict], image_digest: str, pubkey: str) ->
Store signatures locally in the SIGNATURE_PATH folder, like this: Store signatures locally in the SIGNATURE_PATH folder, like this:
~/.config/dangerzone/signatures/ ~/.config/dangerzone/signatures/
<pubkey-digest> <pubkey-digest>
<image-digest>.json <image-digest>.json
<image-digest>.json <image-digest>.json
last_log_index
The last_log_index file is used to keep track of the last log index
processed by the updater.
The format used in the `.json` file is the one of `cosign download The format used in the `.json` file is the one of `cosign download
signature`, which differs from the "bundle" one used afterwards. signature`, which differs from the "bundle" one used afterwards.
@ -344,6 +392,7 @@ def get_remote_signatures(image: str, digest: str) -> List[Dict]:
"""Retrieve the signatures from the registry, via `cosign download`.""" """Retrieve the signatures from the registry, via `cosign download`."""
cosign.ensure_installed() cosign.ensure_installed()
# XXX: try/catch here
process = subprocess.run( process = subprocess.run(
["cosign", "download", "signature", f"{image}@sha256:{digest}"], ["cosign", "download", "signature", f"{image}@sha256:{digest}"],
capture_output=True, capture_output=True,