mirror of
https://github.com/freedomofpress/dangerzone.git
synced 2025-04-28 18:02:38 +02:00
Fixup: use digest instead of hash
This commit is contained in:
parent
4542f0b4c4
commit
af6b4e0d73
3 changed files with 56 additions and 57 deletions
|
@ -29,9 +29,9 @@ def main(debug: bool) -> None:
|
|||
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
|
||||
def upgrade(image: str, pubkey: str) -> None:
|
||||
"""Upgrade the image to the latest signed version."""
|
||||
manifest_hash = registry.get_manifest_hash(image)
|
||||
manifest_digest = registry.get_manifest_digest(image)
|
||||
try:
|
||||
is_upgraded = signatures.upgrade_container_image(image, manifest_hash, pubkey)
|
||||
is_upgraded = signatures.upgrade_container_image(image, manifest_digest, pubkey)
|
||||
if is_upgraded:
|
||||
click.echo(f"✅ The local image {image} has been upgraded")
|
||||
click.echo(f"✅ The image has been signed with {pubkey}")
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import hashlib
|
||||
import re
|
||||
from collections import namedtuple
|
||||
from hashlib import sha256
|
||||
from typing import Dict, Optional, Tuple
|
||||
|
||||
import requests
|
||||
|
@ -8,7 +8,7 @@ import requests
|
|||
from . import errors, log
|
||||
|
||||
__all__ = [
|
||||
"get_manifest_hash",
|
||||
"get_manifest_digest",
|
||||
"list_tags",
|
||||
"get_manifest",
|
||||
"parse_image_location",
|
||||
|
@ -28,9 +28,7 @@ ACCEPT_MANIFESTS_HEADER = ",".join(
|
|||
)
|
||||
|
||||
|
||||
class Image(
|
||||
namedtuple("Image", ["registry", "namespace", "image_name", "tag", "digest"])
|
||||
):
|
||||
class Image(namedtuple("Image", ["registry", "namespace", "image_name", "tag"])):
|
||||
__slots__ = ()
|
||||
|
||||
@property
|
||||
|
@ -101,7 +99,8 @@ class RegistryClient:
|
|||
return tags
|
||||
|
||||
def get_manifest(
|
||||
self, tag: str,
|
||||
self,
|
||||
tag: str,
|
||||
) -> requests.Response:
|
||||
"""Get manifest information for a specific tag"""
|
||||
manifest_url = f"{self._image_url}/manifests/{tag}"
|
||||
|
@ -123,8 +122,8 @@ class RegistryClient:
|
|||
.get("manifests")
|
||||
)
|
||||
|
||||
def get_blob(self, hash: str) -> requests.Response:
|
||||
url = f"{self._image_url}/blobs/{hash}"
|
||||
def get_blob(self, digest: str) -> requests.Response:
|
||||
url = f"{self._image_url}/blobs/{digest}"
|
||||
response = requests.get(
|
||||
url,
|
||||
headers={
|
||||
|
@ -134,19 +133,19 @@ class RegistryClient:
|
|||
response.raise_for_status()
|
||||
return response
|
||||
|
||||
def get_manifest_hash(
|
||||
def get_manifest_digest(
|
||||
self, tag: str, tag_manifest_content: Optional[bytes] = None
|
||||
) -> str:
|
||||
if not tag_manifest_content:
|
||||
tag_manifest_content = self.get_manifest(tag).content
|
||||
|
||||
return hashlib.sha256(tag_manifest_content).hexdigest()
|
||||
return sha256(tag_manifest_content).hexdigest()
|
||||
|
||||
|
||||
# XXX Refactor this with regular functions rather than a class
|
||||
def get_manifest_hash(image_str: str) -> str:
|
||||
def get_manifest_digest(image_str: str) -> str:
|
||||
image = parse_image_location(image_str)
|
||||
return RegistryClient(image).get_manifest_hash(image.tag)
|
||||
return RegistryClient(image).get_manifest_digest(image.tag)
|
||||
|
||||
|
||||
def list_tags(image_str: str) -> list:
|
||||
|
|
|
@ -55,7 +55,7 @@ def signature_to_bundle(sig: Dict) -> Dict:
|
|||
}
|
||||
|
||||
|
||||
def verify_signature(signature: dict, image_hash: str, pubkey: str) -> bool:
|
||||
def verify_signature(signature: dict, image_digest: str, pubkey: str) -> bool:
|
||||
"""Verify a signature against a given public key"""
|
||||
# XXX - Also verfy the identity/docker-reference field against the expected value
|
||||
# e.g. ghcr.io/freedomofpress/dangerzone/dangerzone
|
||||
|
@ -64,12 +64,12 @@ def verify_signature(signature: dict, image_hash: str, pubkey: str) -> bool:
|
|||
signature_bundle = signature_to_bundle(signature)
|
||||
|
||||
payload_bytes = b64decode(signature_bundle["Payload"])
|
||||
payload_hash = json.loads(payload_bytes)["critical"]["image"][
|
||||
payload_digest = json.loads(payload_bytes)["critical"]["image"][
|
||||
"docker-manifest-digest"
|
||||
]
|
||||
if payload_hash != f"sha256:{image_hash}":
|
||||
if payload_digest != f"sha256:{image_digest}":
|
||||
raise errors.SignatureMismatch(
|
||||
f"The signature does not match the image hash ({payload_hash}, {image_hash})"
|
||||
f"The signature does not match the image digest ({payload_digest}, {image_digest})"
|
||||
)
|
||||
|
||||
with (
|
||||
|
@ -103,44 +103,44 @@ def verify_signature(signature: dict, image_hash: str, pubkey: str) -> bool:
|
|||
return False
|
||||
|
||||
|
||||
def new_image_release(image: str) -> bool:
|
||||
remote_hash = registry.get_manifest_hash(image)
|
||||
local_hash = runtime.get_local_image_digest(image)
|
||||
log.debug("Remote hash: %s", remote_hash)
|
||||
log.debug("Local hash: %s", local_hash)
|
||||
return remote_hash != local_hash
|
||||
def is_update_available(image: str) -> bool:
|
||||
remote_digest = registry.get_manifest_digest(image)
|
||||
local_digest = runtime.get_local_image_digest(image)
|
||||
log.debug("Remote digest: %s", remote_digest)
|
||||
log.debug("Local digest: %s", local_digest)
|
||||
return remote_digest != local_digest
|
||||
|
||||
|
||||
def verify_signatures(
|
||||
signatures: List[Dict],
|
||||
image_hash: str,
|
||||
image_digest: str,
|
||||
pubkey: str,
|
||||
) -> bool:
|
||||
for signature in signatures:
|
||||
if not verify_signature(signature, image_hash, pubkey):
|
||||
if not verify_signature(signature, image_digest, pubkey):
|
||||
raise errors.SignatureVerificationError()
|
||||
return True
|
||||
|
||||
|
||||
def upgrade_container_image(image: str, manifest_hash: str, pubkey: str) -> bool:
|
||||
def upgrade_container_image(image: str, manifest_digest: str, pubkey: str) -> bool:
|
||||
"""Verify and upgrade the image to the latest, if signed."""
|
||||
if not new_image_release(image):
|
||||
if not is_update_available(image):
|
||||
raise errors.ImageAlreadyUpToDate("The image is already up to date")
|
||||
|
||||
signatures = get_remote_signatures(image, manifest_hash)
|
||||
verify_signatures(signatures, manifest_hash, pubkey)
|
||||
signatures = get_remote_signatures(image, manifest_digest)
|
||||
verify_signatures(signatures, manifest_digest, pubkey)
|
||||
|
||||
# At this point, the signatures are verified
|
||||
# We store the signatures just now to avoid storing unverified signatures
|
||||
store_signatures(signatures, manifest_hash, pubkey)
|
||||
store_signatures(signatures, manifest_digest, pubkey)
|
||||
|
||||
# let's upgrade the image
|
||||
# XXX Use the image digest here to avoid race conditions
|
||||
return runtime.container_pull(image)
|
||||
|
||||
|
||||
def _get_blob(tmpdir: str, hash: str) -> Path:
|
||||
return Path(tmpdir) / "blobs" / "sha256" / hash.replace("sha256:", "")
|
||||
def _get_blob(tmpdir: str, digest: str) -> Path:
|
||||
return Path(tmpdir) / "blobs" / "sha256" / digest.replace("sha256:", "")
|
||||
|
||||
|
||||
def upgrade_container_image_airgapped(container_tar: str, pubkey: str) -> str:
|
||||
|
@ -241,8 +241,8 @@ def convert_oci_images_signatures(
|
|||
return image_name, signatures
|
||||
|
||||
|
||||
def get_file_hash(file: Optional[str] = None, content: Optional[bytes] = None) -> str:
|
||||
"""Get the sha256 hash of a file or content"""
|
||||
def get_file_digest(file: Optional[str] = None, content: Optional[bytes] = None) -> str:
|
||||
"""Get the sha256 digest of a file or content"""
|
||||
if not file and not content:
|
||||
raise errors.UpdaterError("No file or content provided")
|
||||
if file:
|
||||
|
@ -253,13 +253,13 @@ def get_file_hash(file: Optional[str] = None, content: Optional[bytes] = None) -
|
|||
return ""
|
||||
|
||||
|
||||
def load_signatures(image_hash: str, pubkey: str) -> List[Dict]:
|
||||
def load_signatures(image_digest: str, pubkey: str) -> List[Dict]:
|
||||
"""
|
||||
Load signatures from the local filesystem
|
||||
|
||||
See store_signatures() for the expected format.
|
||||
"""
|
||||
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
|
||||
pubkey_signatures = SIGNATURES_PATH / get_file_digest(pubkey)
|
||||
if not pubkey_signatures.exists():
|
||||
msg = (
|
||||
f"Cannot find a '{pubkey_signatures}' folder."
|
||||
|
@ -267,19 +267,19 @@ def load_signatures(image_hash: str, pubkey: str) -> List[Dict]:
|
|||
)
|
||||
raise errors.SignaturesFolderDoesNotExist(msg)
|
||||
|
||||
with open(pubkey_signatures / f"{image_hash}.json") as f:
|
||||
with open(pubkey_signatures / f"{image_digest}.json") as f:
|
||||
log.debug("Loading signatures from %s", f.name)
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str) -> None:
|
||||
def store_signatures(signatures: list[Dict], image_digest: str, pubkey: str) -> None:
|
||||
"""
|
||||
Store signatures locally in the SIGNATURE_PATH folder, like this:
|
||||
|
||||
~/.config/dangerzone/signatures/
|
||||
└── <pubkey-hash>
|
||||
└── <image-hash>.json
|
||||
└── <image-hash>.json
|
||||
└── <pubkey-digest>
|
||||
└── <image-digest>.json
|
||||
└── <image-digest>.json
|
||||
|
||||
The format used in the `.json` file is the one of `cosign download
|
||||
signature`, which differs from the "bundle" one used afterwards.
|
||||
|
@ -292,22 +292,22 @@ def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str) -> No
|
|||
payload = json.loads(b64decode(sig["Payload"]))
|
||||
return payload["critical"]["image"]["docker-manifest-digest"]
|
||||
|
||||
# All the signatures should share the same hash.
|
||||
hashes = list(map(_get_digest, signatures))
|
||||
if len(set(hashes)) != 1:
|
||||
raise errors.InvalidSignatures("Signatures do not share the same image hash")
|
||||
# All the signatures should share the same digest.
|
||||
digests = list(map(_get_digest, signatures))
|
||||
if len(set(digests)) != 1:
|
||||
raise errors.InvalidSignatures("Signatures do not share the same image digest")
|
||||
|
||||
if f"sha256:{image_hash}" != hashes[0]:
|
||||
if f"sha256:{image_digest}" != digests[0]:
|
||||
raise errors.SignatureMismatch(
|
||||
f"Signatures do not match the given image hash ({image_hash}, {hashes[0]})"
|
||||
f"Signatures do not match the given image digest ({image_digest}, {digests[0]})"
|
||||
)
|
||||
|
||||
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
|
||||
pubkey_signatures = SIGNATURES_PATH / get_file_digest(pubkey)
|
||||
pubkey_signatures.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
with open(pubkey_signatures / f"{image_hash}.json", "w") as f:
|
||||
with open(pubkey_signatures / f"{image_digest}.json", "w") as f:
|
||||
log.info(
|
||||
f"Storing signatures for {image_hash} in {pubkey_signatures}/{image_hash}.json"
|
||||
f"Storing signatures for {image_digest} in {pubkey_signatures}/{image_digest}.json"
|
||||
)
|
||||
json.dump(signatures, f)
|
||||
|
||||
|
@ -318,28 +318,28 @@ def verify_local_image(image: str, pubkey: str) -> bool:
|
|||
"""
|
||||
log.info(f"Verifying local image {image} against pubkey {pubkey}")
|
||||
try:
|
||||
image_hash = runtime.get_local_image_digest(image)
|
||||
image_digest = runtime.get_local_image_digest(image)
|
||||
except subprocess.CalledProcessError:
|
||||
raise errors.ImageNotFound(f"The image {image} does not exist locally")
|
||||
|
||||
log.debug(f"Image hash: {image_hash}")
|
||||
signatures = load_signatures(image_hash, pubkey)
|
||||
log.debug(f"Image digest: {image_digest}")
|
||||
signatures = load_signatures(image_digest, pubkey)
|
||||
if len(signatures) < 1:
|
||||
raise errors.LocalSignatureNotFound("No signatures found")
|
||||
|
||||
for signature in signatures:
|
||||
if not verify_signature(signature, image_hash, pubkey):
|
||||
if not verify_signature(signature, image_digest, pubkey):
|
||||
msg = f"Unable to verify signature for {image} with pubkey {pubkey}"
|
||||
raise errors.SignatureVerificationError(msg)
|
||||
return True
|
||||
|
||||
|
||||
def get_remote_signatures(image: str, hash: str) -> List[Dict]:
|
||||
def get_remote_signatures(image: str, digest: str) -> List[Dict]:
|
||||
"""Retrieve the signatures from the registry, via `cosign download`."""
|
||||
cosign.ensure_installed()
|
||||
|
||||
process = subprocess.run(
|
||||
["cosign", "download", "signature", f"{image}@sha256:{hash}"],
|
||||
["cosign", "download", "signature", f"{image}@sha256:{digest}"],
|
||||
capture_output=True,
|
||||
check=True,
|
||||
)
|
||||
|
|
Loading…
Reference in a new issue