mirror of
https://github.com/freedomofpress/dangerzone.git
synced 2025-04-28 18:02:38 +02:00
Automate the verification of image signatures
This commit is contained in:
parent
1ea76ded9b
commit
83a38eab0d
1 changed files with 171 additions and 36 deletions
|
@ -1,9 +1,12 @@
|
||||||
#!/usr/bin/python
|
#!/usr/bin/python
|
||||||
|
|
||||||
import hashlib
|
import hashlib
|
||||||
|
import json
|
||||||
|
import platform
|
||||||
import re
|
import re
|
||||||
import shutil
|
import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
|
from base64 import b64decode
|
||||||
from tempfile import NamedTemporaryFile
|
from tempfile import NamedTemporaryFile
|
||||||
|
|
||||||
import click
|
import click
|
||||||
|
@ -90,6 +93,12 @@ class RegistryClient:
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
def get_manifest_hash(self, tag, tag_manifest_content=None):
|
||||||
|
if not tag_manifest_content:
|
||||||
|
tag_manifest_content = self.get_manifest(tag).content
|
||||||
|
|
||||||
|
return hashlib.sha256(tag_manifest_content).hexdigest()
|
||||||
|
|
||||||
def get_attestation(self, tag):
|
def get_attestation(self, tag):
|
||||||
"""
|
"""
|
||||||
Retrieve an attestation from a given tag.
|
Retrieve an attestation from a given tag.
|
||||||
|
@ -114,7 +123,7 @@ class RegistryClient:
|
||||||
|
|
||||||
# The attestation is available on the same container registry, with a
|
# The attestation is available on the same container registry, with a
|
||||||
# specific tag named "sha256-{sha256(manifest)}"
|
# specific tag named "sha256-{sha256(manifest)}"
|
||||||
tag_manifest_hash = hashlib.sha256(tag_manifest_content).hexdigest()
|
tag_manifest_hash = self.get_manifest_hash(tag, tag_manifest_content)
|
||||||
|
|
||||||
# This will get us a "list" of manifests...
|
# This will get us a "list" of manifests...
|
||||||
manifests = self.list_manifests(f"sha256-{tag_manifest_hash}")
|
manifests = self.list_manifests(f"sha256-{tag_manifest_hash}")
|
||||||
|
@ -138,43 +147,153 @@ class RegistryClient:
|
||||||
bundle = self.get_blob(blob_digest)
|
bundle = self.get_blob(blob_digest)
|
||||||
return tag_manifest_content, bundle.content
|
return tag_manifest_content, bundle.content
|
||||||
|
|
||||||
def verify_attestation(self, image_tag: str, expected_repo: str):
|
|
||||||
"""
|
|
||||||
Look up the image attestation to see if the image has been built
|
|
||||||
on Github runners, and from a given repository.
|
|
||||||
"""
|
|
||||||
manifest, bundle = self.get_attestation(image_tag)
|
|
||||||
|
|
||||||
def _write(file, content):
|
def _write(file, content):
|
||||||
file.write(content)
|
file.write(content)
|
||||||
file.flush()
|
file.flush()
|
||||||
|
|
||||||
# Put the value in files and verify with cosign
|
|
||||||
with (
|
|
||||||
NamedTemporaryFile(mode="wb") as manifest_json,
|
|
||||||
NamedTemporaryFile(mode="wb") as bundle_json,
|
|
||||||
):
|
|
||||||
_write(manifest_json, manifest)
|
|
||||||
_write(bundle_json, bundle)
|
|
||||||
|
|
||||||
# Call cosign with the temporary file paths
|
def verify_attestation(
|
||||||
cmd = [
|
registry_client: RegistryClient, image_tag: str, expected_repo: str
|
||||||
"cosign",
|
):
|
||||||
"verify-blob-attestation",
|
"""
|
||||||
"--bundle",
|
Look up the image attestation to see if the image has been built
|
||||||
bundle_json.name,
|
on Github runners, and from a given repository.
|
||||||
"--new-bundle-format",
|
"""
|
||||||
"--certificate-oidc-issuer",
|
manifest, bundle = registry_client.get_attestation(image_tag)
|
||||||
"https://token.actions.githubusercontent.com",
|
|
||||||
"--certificate-identity-regexp",
|
|
||||||
f"^https://github.com/{expected_repo}/.github/workflows/release-container-image.yml@refs/heads/test/image-publication-cosign",
|
|
||||||
manifest_json.name,
|
|
||||||
]
|
|
||||||
|
|
||||||
result = subprocess.run(cmd, capture_output=True)
|
# Put the value in files and verify with cosign
|
||||||
if result.returncode != 0:
|
with (
|
||||||
raise Exception(f"Attestation cannot be verified. {result.stderr}")
|
NamedTemporaryFile(mode="wb") as manifest_json,
|
||||||
return True
|
NamedTemporaryFile(mode="wb") as bundle_json,
|
||||||
|
):
|
||||||
|
_write(manifest_json, manifest)
|
||||||
|
_write(bundle_json, bundle)
|
||||||
|
|
||||||
|
# Call cosign with the temporary file paths
|
||||||
|
cmd = [
|
||||||
|
"cosign",
|
||||||
|
"verify-blob-attestation",
|
||||||
|
"--bundle",
|
||||||
|
bundle_json.name,
|
||||||
|
"--new-bundle-format",
|
||||||
|
"--certificate-oidc-issuer",
|
||||||
|
"https://token.actions.githubusercontent.com",
|
||||||
|
"--certificate-identity-regexp",
|
||||||
|
f"^https://github.com/{expected_repo}/.github/workflows/release-container-image.yml@refs/heads/test/image-publication-cosign",
|
||||||
|
manifest_json.name,
|
||||||
|
]
|
||||||
|
|
||||||
|
result = subprocess.run(cmd, capture_output=True)
|
||||||
|
if result.returncode != 0:
|
||||||
|
raise Exception(f"Attestation cannot be verified. {result.stderr}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def new_version_available():
|
||||||
|
# XXX - Implement
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def check_signature(signature_bundle, pub_key):
|
||||||
|
"""Ensure that the signature bundle has been signed by the given public key."""
|
||||||
|
|
||||||
|
# Put the value in files and verify with cosign
|
||||||
|
with (
|
||||||
|
NamedTemporaryFile(mode="w") as signature_file,
|
||||||
|
NamedTemporaryFile(mode="bw") as payload_file,
|
||||||
|
):
|
||||||
|
json.dump(signature_bundle, signature_file)
|
||||||
|
signature_file.flush()
|
||||||
|
|
||||||
|
payload_bytes = b64decode(signature_bundle["Payload"])
|
||||||
|
_write(payload_file, payload_bytes)
|
||||||
|
|
||||||
|
cmd = [
|
||||||
|
"cosign",
|
||||||
|
"verify-blob",
|
||||||
|
"--key",
|
||||||
|
pub_key,
|
||||||
|
"--bundle",
|
||||||
|
signature_file.name,
|
||||||
|
payload_file.name,
|
||||||
|
]
|
||||||
|
result = subprocess.run(cmd, capture_output=True)
|
||||||
|
if result.returncode != 0:
|
||||||
|
return False
|
||||||
|
return result.stderr == b"Verified OK\n"
|
||||||
|
|
||||||
|
|
||||||
|
def get_runtime_name() -> str:
|
||||||
|
if platform.system() == "Linux":
|
||||||
|
return "podman"
|
||||||
|
return "docker"
|
||||||
|
|
||||||
|
|
||||||
|
def container_pull(image):
|
||||||
|
cmd = [get_runtime_name(), "pull", f"{image}"]
|
||||||
|
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
|
||||||
|
process.communicate()
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade_container_image(image, tag, pub_key, registry: RegistryClient):
|
||||||
|
if not new_version_available():
|
||||||
|
return
|
||||||
|
|
||||||
|
hash = registry.get_manifest_hash(tag)
|
||||||
|
signatures = get_signatures(image, hash)
|
||||||
|
if len(signatures) < 1:
|
||||||
|
raise Exception("Unable to retrieve signatures")
|
||||||
|
|
||||||
|
print(f"Found {len(signatures)} signature(s) for {image}")
|
||||||
|
for signature in signatures:
|
||||||
|
signature_is_valid = check_signature(signature, pub_key)
|
||||||
|
if not signature_is_valid:
|
||||||
|
raise Exception("Unable to verify signature")
|
||||||
|
print("✅ Signature is valid")
|
||||||
|
|
||||||
|
# At this point, the signature is verified, let's upgrade
|
||||||
|
# XXX Use the hash here to avoid race conditions
|
||||||
|
container_pull(image)
|
||||||
|
|
||||||
|
|
||||||
|
def get_signatures(image, hash):
|
||||||
|
"""
|
||||||
|
Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _to_bundle(sig):
|
||||||
|
# Convert cosign-download signatures to the format expected by cosign bundle.
|
||||||
|
bundle = sig["Bundle"]
|
||||||
|
payload = bundle["Payload"]
|
||||||
|
return {
|
||||||
|
"base64Signature": sig["Base64Signature"],
|
||||||
|
"Payload": sig["Payload"],
|
||||||
|
"cert": sig["Cert"],
|
||||||
|
"chain": sig["Chain"],
|
||||||
|
"rekorBundle": {
|
||||||
|
"SignedEntryTimestamp": bundle["SignedEntryTimestamp"],
|
||||||
|
"Payload": {
|
||||||
|
"body": payload["body"],
|
||||||
|
"integratedTime": payload["integratedTime"],
|
||||||
|
"logIndex": payload["logIndex"],
|
||||||
|
"logID": payload["logID"],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"RFC3161Timestamp": sig["RFC3161Timestamp"],
|
||||||
|
}
|
||||||
|
|
||||||
|
process = subprocess.run(
|
||||||
|
["cosign", "download", "signature", f"{image}@sha256:{hash}"],
|
||||||
|
capture_output=True,
|
||||||
|
check=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX: Check the output first.
|
||||||
|
signatures_raw = process.stdout.decode("utf-8").strip().split("\n")
|
||||||
|
|
||||||
|
# Remove the last return, split on newlines, convert from JSON
|
||||||
|
return [_to_bundle(json.loads(sig)) for sig in signatures_raw]
|
||||||
|
|
||||||
|
|
||||||
def parse_image_location(input_string):
|
def parse_image_location(input_string):
|
||||||
|
@ -190,7 +309,13 @@ def parse_image_location(input_string):
|
||||||
match = re.match(pattern, input_string)
|
match = re.match(pattern, input_string)
|
||||||
if not match:
|
if not match:
|
||||||
raise ValueError("Malformed image location")
|
raise ValueError("Malformed image location")
|
||||||
return match.group("registry", "namespace", "repository", "tag")
|
|
||||||
|
return (
|
||||||
|
match.group("registry"),
|
||||||
|
match.group("namespace"),
|
||||||
|
match.group("repository"),
|
||||||
|
match.group("tag") or "latest",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@click.group()
|
@click.group()
|
||||||
|
@ -198,6 +323,16 @@ def main():
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@main.command()
|
||||||
|
@click.argument("image")
|
||||||
|
@click.option("--pubkey", default="pub.key")
|
||||||
|
def upgrade_image(image, pubkey):
|
||||||
|
registry, org, package, tag = parse_image_location(image)
|
||||||
|
registry_client = RegistryClient(registry, org, package)
|
||||||
|
|
||||||
|
upgrade_container_image(image, tag, pubkey, registry_client)
|
||||||
|
|
||||||
|
|
||||||
@main.command()
|
@main.command()
|
||||||
@click.argument("image")
|
@click.argument("image")
|
||||||
def list_tags(image):
|
def list_tags(image):
|
||||||
|
@ -239,7 +374,7 @@ def attest(image: str, repo: str):
|
||||||
tag = tag or "latest"
|
tag = tag or "latest"
|
||||||
|
|
||||||
client = RegistryClient(registry, org, package)
|
client = RegistryClient(registry, org, package)
|
||||||
verified = client.verify_attestation(tag, repo)
|
verified = verify_attestation(client, tag, repo)
|
||||||
if verified:
|
if verified:
|
||||||
click.echo(
|
click.echo(
|
||||||
f"🎉 The image available at `{client.image}:{tag}` has been built by Github Runners from the `{repo}` repository"
|
f"🎉 The image available at `{client.image}:{tag}` has been built by Github Runners from the `{repo}` repository"
|
||||||
|
|
Loading…
Reference in a new issue