Fixup: use digest instead of hash

This commit is contained in:
Alexis Métaireau 2025-02-05 15:40:21 +01:00
parent 9a44110313
commit 02e62c93f6
No known key found for this signature in database
GPG key ID: C65C7A89A8FFC56E
3 changed files with 56 additions and 57 deletions

View file

@ -29,9 +29,9 @@ def main(debug: bool) -> None:
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
def upgrade(image: str, pubkey: str) -> None:
"""Upgrade the image to the latest signed version."""
manifest_hash = registry.get_manifest_hash(image)
manifest_digest = registry.get_manifest_digest(image)
try:
is_upgraded = signatures.upgrade_container_image(image, manifest_hash, pubkey)
is_upgraded = signatures.upgrade_container_image(image, manifest_digest, pubkey)
if is_upgraded:
click.echo(f"✅ The local image {image} has been upgraded")
click.echo(f"✅ The image has been signed with {pubkey}")

View file

@ -1,6 +1,6 @@
import hashlib
import re
from collections import namedtuple
from hashlib import sha256
from typing import Dict, Optional, Tuple
import requests
@ -8,7 +8,7 @@ import requests
from . import errors, log
__all__ = [
"get_manifest_hash",
"get_manifest_digest",
"list_tags",
"get_manifest",
"parse_image_location",
@ -28,9 +28,7 @@ ACCEPT_MANIFESTS_HEADER = ",".join(
)
class Image(
namedtuple("Image", ["registry", "namespace", "image_name", "tag", "digest"])
):
class Image(namedtuple("Image", ["registry", "namespace", "image_name", "tag"])):
__slots__ = ()
@property
@ -101,7 +99,8 @@ class RegistryClient:
return tags
def get_manifest(
self, tag: str,
self,
tag: str,
) -> requests.Response:
"""Get manifest information for a specific tag"""
manifest_url = f"{self._image_url}/manifests/{tag}"
@ -123,8 +122,8 @@ class RegistryClient:
.get("manifests")
)
def get_blob(self, hash: str) -> requests.Response:
url = f"{self._image_url}/blobs/{hash}"
def get_blob(self, digest: str) -> requests.Response:
url = f"{self._image_url}/blobs/{digest}"
response = requests.get(
url,
headers={
@ -134,19 +133,19 @@ class RegistryClient:
response.raise_for_status()
return response
def get_manifest_hash(
def get_manifest_digest(
self, tag: str, tag_manifest_content: Optional[bytes] = None
) -> str:
if not tag_manifest_content:
tag_manifest_content = self.get_manifest(tag).content
return hashlib.sha256(tag_manifest_content).hexdigest()
return sha256(tag_manifest_content).hexdigest()
# XXX Refactor this with regular functions rather than a class
def get_manifest_hash(image_str: str) -> str:
def get_manifest_digest(image_str: str) -> str:
image = parse_image_location(image_str)
return RegistryClient(image).get_manifest_hash(image.tag)
return RegistryClient(image).get_manifest_digest(image.tag)
def list_tags(image_str: str) -> list:

View file

@ -55,7 +55,7 @@ def signature_to_bundle(sig: Dict) -> Dict:
}
def verify_signature(signature: dict, image_hash: str, pubkey: str) -> bool:
def verify_signature(signature: dict, image_digest: str, pubkey: str) -> bool:
"""Verify a signature against a given public key"""
# XXX - Also verfy the identity/docker-reference field against the expected value
# e.g. ghcr.io/freedomofpress/dangerzone/dangerzone
@ -64,12 +64,12 @@ def verify_signature(signature: dict, image_hash: str, pubkey: str) -> bool:
signature_bundle = signature_to_bundle(signature)
payload_bytes = b64decode(signature_bundle["Payload"])
payload_hash = json.loads(payload_bytes)["critical"]["image"][
payload_digest = json.loads(payload_bytes)["critical"]["image"][
"docker-manifest-digest"
]
if payload_hash != f"sha256:{image_hash}":
if payload_digest != f"sha256:{image_digest}":
raise errors.SignatureMismatch(
f"The signature does not match the image hash ({payload_hash}, {image_hash})"
f"The signature does not match the image digest ({payload_digest}, {image_digest})"
)
with (
@ -103,44 +103,44 @@ def verify_signature(signature: dict, image_hash: str, pubkey: str) -> bool:
return False
def new_image_release(image: str) -> bool:
remote_hash = registry.get_manifest_hash(image)
local_hash = runtime.get_local_image_digest(image)
log.debug("Remote hash: %s", remote_hash)
log.debug("Local hash: %s", local_hash)
return remote_hash != local_hash
def is_update_available(image: str) -> bool:
remote_digest = registry.get_manifest_digest(image)
local_digest = runtime.get_local_image_digest(image)
log.debug("Remote digest: %s", remote_digest)
log.debug("Local digest: %s", local_digest)
return remote_digest != local_digest
def verify_signatures(
signatures: List[Dict],
image_hash: str,
image_digest: str,
pubkey: str,
) -> bool:
for signature in signatures:
if not verify_signature(signature, image_hash, pubkey):
if not verify_signature(signature, image_digest, pubkey):
raise errors.SignatureVerificationError()
return True
def upgrade_container_image(image: str, manifest_hash: str, pubkey: str) -> bool:
def upgrade_container_image(image: str, manifest_digest: str, pubkey: str) -> bool:
"""Verify and upgrade the image to the latest, if signed."""
if not new_image_release(image):
if not is_update_available(image):
raise errors.ImageAlreadyUpToDate("The image is already up to date")
signatures = get_remote_signatures(image, manifest_hash)
verify_signatures(signatures, manifest_hash, pubkey)
signatures = get_remote_signatures(image, manifest_digest)
verify_signatures(signatures, manifest_digest, pubkey)
# At this point, the signatures are verified
# We store the signatures just now to avoid storing unverified signatures
store_signatures(signatures, manifest_hash, pubkey)
store_signatures(signatures, manifest_digest, pubkey)
# let's upgrade the image
# XXX Use the image digest here to avoid race conditions
return runtime.container_pull(image)
def _get_blob(tmpdir: str, hash: str) -> Path:
return Path(tmpdir) / "blobs" / "sha256" / hash.replace("sha256:", "")
def _get_blob(tmpdir: str, digest: str) -> Path:
return Path(tmpdir) / "blobs" / "sha256" / digest.replace("sha256:", "")
def upgrade_container_image_airgapped(container_tar: str, pubkey: str) -> str:
@ -241,8 +241,8 @@ def convert_oci_images_signatures(
return image_name, signatures
def get_file_hash(file: Optional[str] = None, content: Optional[bytes] = None) -> str:
"""Get the sha256 hash of a file or content"""
def get_file_digest(file: Optional[str] = None, content: Optional[bytes] = None) -> str:
"""Get the sha256 digest of a file or content"""
if not file and not content:
raise errors.UpdaterError("No file or content provided")
if file:
@ -253,13 +253,13 @@ def get_file_hash(file: Optional[str] = None, content: Optional[bytes] = None) -
return ""
def load_signatures(image_hash: str, pubkey: str) -> List[Dict]:
def load_signatures(image_digest: str, pubkey: str) -> List[Dict]:
"""
Load signatures from the local filesystem
See store_signatures() for the expected format.
"""
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
pubkey_signatures = SIGNATURES_PATH / get_file_digest(pubkey)
if not pubkey_signatures.exists():
msg = (
f"Cannot find a '{pubkey_signatures}' folder."
@ -267,19 +267,19 @@ def load_signatures(image_hash: str, pubkey: str) -> List[Dict]:
)
raise errors.SignaturesFolderDoesNotExist(msg)
with open(pubkey_signatures / f"{image_hash}.json") as f:
with open(pubkey_signatures / f"{image_digest}.json") as f:
log.debug("Loading signatures from %s", f.name)
return json.load(f)
def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str) -> None:
def store_signatures(signatures: list[Dict], image_digest: str, pubkey: str) -> None:
"""
Store signatures locally in the SIGNATURE_PATH folder, like this:
~/.config/dangerzone/signatures/
<pubkey-hash>
<image-hash>.json
<image-hash>.json
<pubkey-digest>
<image-digest>.json
<image-digest>.json
The format used in the `.json` file is the one of `cosign download
signature`, which differs from the "bundle" one used afterwards.
@ -292,22 +292,22 @@ def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str) -> No
payload = json.loads(b64decode(sig["Payload"]))
return payload["critical"]["image"]["docker-manifest-digest"]
# All the signatures should share the same hash.
hashes = list(map(_get_digest, signatures))
if len(set(hashes)) != 1:
raise errors.InvalidSignatures("Signatures do not share the same image hash")
# All the signatures should share the same digest.
digests = list(map(_get_digest, signatures))
if len(set(digests)) != 1:
raise errors.InvalidSignatures("Signatures do not share the same image digest")
if f"sha256:{image_hash}" != hashes[0]:
if f"sha256:{image_digest}" != digests[0]:
raise errors.SignatureMismatch(
f"Signatures do not match the given image hash ({image_hash}, {hashes[0]})"
f"Signatures do not match the given image digest ({image_digest}, {digests[0]})"
)
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
pubkey_signatures = SIGNATURES_PATH / get_file_digest(pubkey)
pubkey_signatures.mkdir(parents=True, exist_ok=True)
with open(pubkey_signatures / f"{image_hash}.json", "w") as f:
with open(pubkey_signatures / f"{image_digest}.json", "w") as f:
log.info(
f"Storing signatures for {image_hash} in {pubkey_signatures}/{image_hash}.json"
f"Storing signatures for {image_digest} in {pubkey_signatures}/{image_digest}.json"
)
json.dump(signatures, f)
@ -318,28 +318,28 @@ def verify_local_image(image: str, pubkey: str) -> bool:
"""
log.info(f"Verifying local image {image} against pubkey {pubkey}")
try:
image_hash = runtime.get_local_image_digest(image)
image_digest = runtime.get_local_image_digest(image)
except subprocess.CalledProcessError:
raise errors.ImageNotFound(f"The image {image} does not exist locally")
log.debug(f"Image hash: {image_hash}")
signatures = load_signatures(image_hash, pubkey)
log.debug(f"Image digest: {image_digest}")
signatures = load_signatures(image_digest, pubkey)
if len(signatures) < 1:
raise errors.LocalSignatureNotFound("No signatures found")
for signature in signatures:
if not verify_signature(signature, image_hash, pubkey):
if not verify_signature(signature, image_digest, pubkey):
msg = f"Unable to verify signature for {image} with pubkey {pubkey}"
raise errors.SignatureVerificationError(msg)
return True
def get_remote_signatures(image: str, hash: str) -> List[Dict]:
def get_remote_signatures(image: str, digest: str) -> List[Dict]:
"""Retrieve the signatures from the registry, via `cosign download`."""
cosign.ensure_installed()
process = subprocess.run(
["cosign", "download", "signature", f"{image}@sha256:{hash}"],
["cosign", "download", "signature", f"{image}@sha256:{digest}"],
capture_output=True,
check=True,
)