Verify podman/docker images against locally stored signatures

This commit is contained in:
Alexis Métaireau 2025-01-28 15:51:46 +01:00
parent 83a38eab0d
commit 225839960c
No known key found for this signature in database
GPG key ID: C65C7A89A8FFC56E

View file

@ -7,11 +7,24 @@ import re
import shutil import shutil
import subprocess import subprocess
from base64 import b64decode from base64 import b64decode
from pathlib import Path
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
import click import click
import requests import requests
try:
import platformdirs
except ImportError:
import appdirs as platformdirs
def get_config_dir() -> str:
return Path(platformdirs.user_config_dir("dangerzone"))
SIGNATURES_PATH = get_config_dir() / "signatures"
DEFAULT_REPO = "freedomofpress/dangerzone" DEFAULT_REPO = "freedomofpress/dangerzone"
SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json" SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json"
DOCKER_MANIFEST_DISTRIBUTION = "application/vnd.docker.distribution.manifest.v2+json" DOCKER_MANIFEST_DISTRIBUTION = "application/vnd.docker.distribution.manifest.v2+json"
@ -190,13 +203,37 @@ def verify_attestation(
return True return True
def new_version_available(): def new_image_release():
# XXX - Implement # XXX - Implement
return True return True
def check_signature(signature_bundle, pub_key): def signature_to_bundle(sig):
"""Ensure that the signature bundle has been signed by the given public key.""" # Convert cosign-download signatures to the format expected by cosign bundle.
bundle = sig["Bundle"]
payload = bundle["Payload"]
return {
"base64Signature": sig["Base64Signature"],
"Payload": sig["Payload"],
"cert": sig["Cert"],
"chain": sig["Chain"],
"rekorBundle": {
"SignedEntryTimestamp": bundle["SignedEntryTimestamp"],
"Payload": {
"body": payload["body"],
"integratedTime": payload["integratedTime"],
"logIndex": payload["logIndex"],
"logID": payload["logID"],
},
},
"RFC3161Timestamp": sig["RFC3161Timestamp"],
}
def verify_signature(signature, pubkey):
"""Verify a signature against a given public key"""
signature_bundle = signature_to_bundle(signature)
# Put the value in files and verify with cosign # Put the value in files and verify with cosign
with ( with (
@ -213,13 +250,14 @@ def check_signature(signature_bundle, pub_key):
"cosign", "cosign",
"verify-blob", "verify-blob",
"--key", "--key",
pub_key, pubkey,
"--bundle", "--bundle",
signature_file.name, signature_file.name,
payload_file.name, payload_file.name,
] ]
result = subprocess.run(cmd, capture_output=True) result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0: if result.returncode != 0:
# XXX Raise instead?
return False return False
return result.stderr == b"Verified OK\n" return result.stderr == b"Verified OK\n"
@ -236,53 +274,116 @@ def container_pull(image):
process.communicate() process.communicate()
def upgrade_container_image(image, tag, pub_key, registry: RegistryClient): def upgrade_container_image(image, tag, pubkey, registry: RegistryClient):
if not new_version_available(): if not new_image_release():
return return
hash = registry.get_manifest_hash(tag) hash = registry.get_manifest_hash(tag)
signatures = get_signatures(image, hash) signatures = get_signatures(image, hash)
if len(signatures) < 1: if len(signatures) < 1:
raise Exception("Unable to retrieve signatures") raise Exception("Unable to retrieve signatures")
print(f"Found {len(signatures)} signature(s) for {image}") print(f"Found {len(signatures)} signature(s) for {image}")
for signature in signatures: for signature in signatures:
signature_is_valid = check_signature(signature, pub_key) signature_is_valid = verify_signature(signature, pubkey)
if not signature_is_valid: if not signature_is_valid:
raise Exception("Unable to verify signature") raise Exception("Unable to verify signature")
print("✅ Signature is valid") print("✅ Signature is valid")
# At this point, the signature is verified, let's upgrade # At this point, the signatures are verified
# We store the signatures just now to avoid storing unverified signatures
store_signatures(signatures, hash, pubkey)
# let's upgrade the image
# XXX Use the hash here to avoid race conditions # XXX Use the hash here to avoid race conditions
container_pull(image) container_pull(image)
def get_file_hash(file):
with open(file, "rb") as f:
content = f.read()
return hashlib.sha256(content).hexdigest()
def load_signatures(image_hash, pubkey):
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
if not pubkey_signatures.exists():
msg = (
f"Cannot find a '{pubkey_signatures}' folder."
"You might need to download the image signatures first."
)
raise Exception(msg)
with open(pubkey_signatures / f"{image_hash}.json") as f:
return json.load(f)
def store_signatures(signatures, image_hash, pubkey):
"""
Store signatures locally in the SIGNATURE_PATH folder, like this:
~/.config/dangerzone/signatures/
<pubkey-hash>
<image-hash>.json
<image-hash>.json
The format used in the `.json` file is the one of `cosign download
signature`, which differs from the "bundle" one used afterwards.
It can be converted to the one expected by cosign verify --bundle with
the `signature_to_bundle()` function.
"""
def _get_digest(sig):
payload = json.loads(b64decode(sig["Payload"]))
return payload["critical"]["image"]["docker-manifest-digest"]
# All the signatures should share the same hash.
hashes = list(map(_get_digest, signatures))
if len(set(hashes)) != 1:
raise Exception("Signatures do not share the same image hash")
if f"sha256:{image_hash}" != hashes[0]:
raise Exception("Signatures do not match the given image hash")
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
pubkey_signatures.mkdir(exist_ok=True)
with open(pubkey_signatures / f"{image_hash}.json", "w") as f:
json.dump(signatures, f)
def verify_local_image_signature(image, pubkey):
"""
Verifies that a local image has a valid signature
"""
image_hash = get_image_hash(image)
signatures = load_signatures(image_hash, pubkey)
if len(signatures) < 1:
raise Exception("No signatures found")
for signature in signatures:
if not verify_signature(signature, pubkey):
msg = f"Unable to verify signature for {image} with pubkey {pubkey}"
raise Exception(msg)
return True
def get_image_hash(image):
"""
Returns a image hash from a local image name
"""
cmd = [get_runtime_name(), "image", "inspect", image, "-f", "{{.Digest}}"]
result = subprocess.run(cmd, capture_output=True, check=True)
return result.stdout.strip().decode().strip("sha256:")
def get_signatures(image, hash): def get_signatures(image, hash):
""" """
Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format. Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format.
""" """
def _to_bundle(sig):
# Convert cosign-download signatures to the format expected by cosign bundle.
bundle = sig["Bundle"]
payload = bundle["Payload"]
return {
"base64Signature": sig["Base64Signature"],
"Payload": sig["Payload"],
"cert": sig["Cert"],
"chain": sig["Chain"],
"rekorBundle": {
"SignedEntryTimestamp": bundle["SignedEntryTimestamp"],
"Payload": {
"body": payload["body"],
"integratedTime": payload["integratedTime"],
"logIndex": payload["logIndex"],
"logID": payload["logID"],
},
},
"RFC3161Timestamp": sig["RFC3161Timestamp"],
}
process = subprocess.run( process = subprocess.run(
["cosign", "download", "signature", f"{image}@sha256:{hash}"], ["cosign", "download", "signature", f"{image}@sha256:{hash}"],
capture_output=True, capture_output=True,
@ -290,32 +391,54 @@ def get_signatures(image, hash):
) )
# XXX: Check the output first. # XXX: Check the output first.
signatures_raw = process.stdout.decode("utf-8").strip().split("\n")
# Remove the last return, split on newlines, convert from JSON # Remove the last return, split on newlines, convert from JSON
return [_to_bundle(json.loads(sig)) for sig in signatures_raw] signatures_raw = process.stdout.decode("utf-8").strip().split("\n")
return list(map(json.loads, signatures_raw))
def parse_image_location(input_string): class Image:
"""Parses container image location into (registry, namespace, repository, tag)""" def __init__(self, registry, namespace, repository, tag="latest"):
pattern = ( self.registry = registry
r"^" self.namespace = namespace
r"(?P<registry>[a-zA-Z0-9.-]+)/" self.repository = repository
r"(?P<namespace>[a-zA-Z0-9-]+)/" self.tag = tag
r"(?P<repository>[^:]+)"
r"(?::(?P<tag>[a-zA-Z0-9.-]+))?"
r"$"
)
match = re.match(pattern, input_string)
if not match:
raise ValueError("Malformed image location")
return ( def properties(self):
match.group("registry"), return (self.registry, self.namespace, self.repository, self.tag)
match.group("namespace"),
match.group("repository"), @property
match.group("tag") or "latest", def name_without_tag(self):
) return f"{self.registry}/{self.namespace}/{self.repository}"
@property
def name_with_tag(self):
return f"{self.name_without_tag}:{self.tag}"
@classmethod
def from_string(cls, input_string):
"""Parses container image location into (registry, namespace, repository, tag)"""
pattern = (
r"^"
r"(?P<registry>[a-zA-Z0-9.-]+)/"
r"(?P<namespace>[a-zA-Z0-9-]+)/"
r"(?P<repository>[^:]+)"
r"(?::(?P<tag>[a-zA-Z0-9.-]+))?"
r"$"
)
match = re.match(pattern, input_string)
if not match:
raise ValueError("Malformed image location")
return cls(
match.group("registry"),
match.group("namespace"),
match.group("repository"),
match.group("tag") or "latest",
)
def parse_image_location(string):
return Image.from_string(string).properties
@click.group() @click.group()
@ -327,12 +450,21 @@ def main():
@click.argument("image") @click.argument("image")
@click.option("--pubkey", default="pub.key") @click.option("--pubkey", default="pub.key")
def upgrade_image(image, pubkey): def upgrade_image(image, pubkey):
registry, org, package, tag = parse_image_location(image) registry, namespace, repository, tag = parse_image_location(image)
registry_client = RegistryClient(registry, org, package) registry_client = RegistryClient(registry, namespace, repository)
upgrade_container_image(image, tag, pubkey, registry_client) upgrade_container_image(image, tag, pubkey, registry_client)
@main.command()
@click.argument("image")
@click.option("--pubkey", default="pub.key")
def verify_local_image(image, pubkey):
# XXX remove a potentiel :tag
if verify_local_image_signature(image, pubkey):
click.echo(f"✅ The local image {image} has been signed with {pubkey}")
@main.command() @main.command()
@click.argument("image") @click.argument("image")
def list_tags(image): def list_tags(image):