Automate the verification of image signatures

This commit is contained in:
Alexis Métaireau 2025-01-28 11:12:21 +01:00
parent bcd1ec2173
commit 47252cc31d
No known key found for this signature in database
GPG key ID: C65C7A89A8FFC56E

View file

@ -1,9 +1,12 @@
#!/usr/bin/python
import hashlib
import json
import platform
import re
import shutil
import subprocess
from base64 import b64decode
from tempfile import NamedTemporaryFile
import click
@ -90,6 +93,12 @@ class RegistryClient:
response.raise_for_status()
return response
def get_manifest_hash(self, tag, tag_manifest_content=None):
if not tag_manifest_content:
tag_manifest_content = self.get_manifest(tag).content
return hashlib.sha256(tag_manifest_content).hexdigest()
def get_attestation(self, tag):
"""
Retrieve an attestation from a given tag.
@ -114,7 +123,7 @@ class RegistryClient:
# The attestation is available on the same container registry, with a
# specific tag named "sha256-{sha256(manifest)}"
tag_manifest_hash = hashlib.sha256(tag_manifest_content).hexdigest()
tag_manifest_hash = self.get_manifest_hash(tag, tag_manifest_content)
# This will get us a "list" of manifests...
manifests = self.list_manifests(f"sha256-{tag_manifest_hash}")
@ -138,17 +147,21 @@ class RegistryClient:
bundle = self.get_blob(blob_digest)
return tag_manifest_content, bundle.content
def verify_attestation(self, image_tag: str, expected_repo: str):
"""
Look up the image attestation to see if the image has been built
on Github runners, and from a given repository.
"""
manifest, bundle = self.get_attestation(image_tag)
def _write(file, content):
file.write(content)
file.flush()
def verify_attestation(
registry_client: RegistryClient, image_tag: str, expected_repo: str
):
"""
Look up the image attestation to see if the image has been built
on Github runners, and from a given repository.
"""
manifest, bundle = registry_client.get_attestation(image_tag)
# Put the value in files and verify with cosign
with (
NamedTemporaryFile(mode="wb") as manifest_json,
@ -177,6 +190,112 @@ class RegistryClient:
return True
def new_version_available():
# XXX - Implement
return True
def check_signature(signature_bundle, pub_key):
"""Ensure that the signature bundle has been signed by the given public key."""
# Put the value in files and verify with cosign
with (
NamedTemporaryFile(mode="w") as signature_file,
NamedTemporaryFile(mode="bw") as payload_file,
):
json.dump(signature_bundle, signature_file)
signature_file.flush()
payload_bytes = b64decode(signature_bundle["Payload"])
_write(payload_file, payload_bytes)
cmd = [
"cosign",
"verify-blob",
"--key",
pub_key,
"--bundle",
signature_file.name,
payload_file.name,
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
return False
return result.stderr == b"Verified OK\n"
def get_runtime_name() -> str:
if platform.system() == "Linux":
return "podman"
return "docker"
def container_pull(image):
cmd = [get_runtime_name(), "pull", f"{image}"]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
process.communicate()
def upgrade_container_image(image, tag, pub_key, registry: RegistryClient):
if not new_version_available():
return
hash = registry.get_manifest_hash(tag)
signatures = get_signatures(image, hash)
if len(signatures) < 1:
raise Exception("Unable to retrieve signatures")
print(f"Found {len(signatures)} signature(s) for {image}")
for signature in signatures:
signature_is_valid = check_signature(signature, pub_key)
if not signature_is_valid:
raise Exception("Unable to verify signature")
print("✅ Signature is valid")
# At this point, the signature is verified, let's upgrade
# XXX Use the hash here to avoid race conditions
container_pull(image)
def get_signatures(image, hash):
"""
Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format.
"""
def _to_bundle(sig):
# Convert cosign-download signatures to the format expected by cosign bundle.
bundle = sig["Bundle"]
payload = bundle["Payload"]
return {
"base64Signature": sig["Base64Signature"],
"Payload": sig["Payload"],
"cert": sig["Cert"],
"chain": sig["Chain"],
"rekorBundle": {
"SignedEntryTimestamp": bundle["SignedEntryTimestamp"],
"Payload": {
"body": payload["body"],
"integratedTime": payload["integratedTime"],
"logIndex": payload["logIndex"],
"logID": payload["logID"],
},
},
"RFC3161Timestamp": sig["RFC3161Timestamp"],
}
process = subprocess.run(
["cosign", "download", "signature", f"{image}@sha256:{hash}"],
capture_output=True,
check=True,
)
# XXX: Check the output first.
signatures_raw = process.stdout.decode("utf-8").strip().split("\n")
# Remove the last return, split on newlines, convert from JSON
return [_to_bundle(json.loads(sig)) for sig in signatures_raw]
def parse_image_location(input_string):
"""Parses container image location into (registry, namespace, repository, tag)"""
pattern = (
@ -190,7 +309,13 @@ def parse_image_location(input_string):
match = re.match(pattern, input_string)
if not match:
raise ValueError("Malformed image location")
return match.group("registry", "namespace", "repository", "tag")
return (
match.group("registry"),
match.group("namespace"),
match.group("repository"),
match.group("tag") or "latest",
)
@click.group()
@ -198,6 +323,16 @@ def main():
pass
@main.command()
@click.argument("image")
@click.option("--pubkey", default="pub.key")
def upgrade_image(image, pubkey):
registry, org, package, tag = parse_image_location(image)
registry_client = RegistryClient(registry, org, package)
upgrade_container_image(image, tag, pubkey, registry_client)
@main.command()
@click.argument("image")
def list_tags(image):
@ -239,7 +374,7 @@ def attest(image: str, repo: str):
tag = tag or "latest"
client = RegistryClient(registry, org, package)
verified = client.verify_attestation(tag, repo)
verified = verify_attestation(client, tag, repo)
if verified:
click.echo(
f"🎉 The image available at `{client.image}:{tag}` has been built by Github Runners from the `{repo}` repository"