Locally store the signatures for oci-images archives

On air-gapped environements, it's now possible to load signatures
generated by `cosign save` commands. The signatures embedded in this
format will be converted to the one used by `cosign download signature`.
This commit is contained in:
Alexis Métaireau 2025-02-03 20:18:10 +01:00
parent f30ced7834
commit 4d27449351
No known key found for this signature in database
GPG key ID: C65C7A89A8FFC56E
3 changed files with 70 additions and 17 deletions

View file

@ -155,6 +155,9 @@ def load_image_tarball_file(tarball_path: str) -> None:
def tag_image_by_digest(digest: str, tag: str) -> None:
"""Tag a container image by digest.
The sha256: prefix should be omitted from the digest.
"""
image_id = get_image_id_by_digest(digest)
cmd = [get_runtime(), "tag", image_id, tag]
log.debug(" ".join(cmd))
@ -162,11 +165,14 @@ def tag_image_by_digest(digest: str, tag: str) -> None:
def get_image_id_by_digest(digest: str) -> str:
"""Get an image ID from a digest.
The sha256: prefix should be omitted from the digest.
"""
cmd = [
get_runtime(),
"images",
"-f",
f"digest={digest}",
f"digest=sha256:{digest}",
"--format",
"{{.Id}}",
]
@ -174,7 +180,9 @@ def get_image_id_by_digest(digest: str) -> str:
process = subprocess.run(
cmd, startupinfo=get_subprocess_startupinfo(), check=True, capture_output=True
)
return process.stdout.decode().strip()
breakpoint()
# In case we have multiple lines, we only want the first one.
return process.stdout.decode().strip().split("\n")[0]
def container_pull(image: str) -> bool:

View file

@ -22,6 +22,10 @@ class SignatureVerificationError(SignatureError):
pass
class SignatureExtractionError(SignatureError):
pass
class SignaturesFolderDoesNotExist(SignatureError):
pass

View file

@ -3,7 +3,7 @@ import platform
import re
import subprocess
import tarfile
from base64 import b64decode
from base64 import b64decode, b64encode
from hashlib import sha256
from io import BytesIO
from pathlib import Path
@ -156,6 +156,10 @@ def upgrade_container_image(image: str, manifest_hash: str, pubkey: str) -> bool
return runtime.container_pull(image)
def _get_blob(tmpdir: str, hash: str) -> Path:
return Path(tmpdir) / "blobs" / "sha256" / hash.replace("sha256:", "")
def upgrade_container_image_airgapped(
container_tar: str, pubkey: str, image_name: str
) -> bool:
@ -166,8 +170,19 @@ def upgrade_container_image_airgapped(
Right now, the archive is extracted and reconstructed, requiring some space
on the filesystem.
"""
# XXX Use a memory buffer instead of the filesystem
with TemporaryDirectory() as tmpdir:
def _get_signature_filename(manifests: List[Dict]) -> Path:
for manifest in manifests:
if (
manifest["annotations"].get("kind")
== "dev.cosignproject.cosign/sigs"
):
return _get_blob(tmpdir, manifest["digest"])
raise errors.SignatureExtractionError()
with tarfile.open(container_tar, "r") as archive:
archive.extractall(tmpdir)
@ -179,14 +194,19 @@ def upgrade_container_image_airgapped(
# Remove the signatures from the archive.
with open(Path(tmpdir) / "index.json") as f:
index_json = json.load(f)
index_json["manifests"] = [
manifest
for manifest in index_json["manifests"]
if manifest["annotations"].get("kind")
!= "dev.cosignproject.cosign/sigs"
]
image_digest = index_json["manifests"][0].get("digest")
signature_filename = _get_signature_filename(index_json["manifests"])
index_json["manifests"] = [
manifest
for manifest in index_json["manifests"]
if manifest["annotations"].get("kind") != "dev.cosignproject.cosign/sigs"
]
with open(signature_filename, "rb") as f:
signatures = convert_oci_images_signatures(json.load(f), tmpdir)
image_digest = index_json["manifests"][0].get("digest").replace("sha256:", "")
# Write the new index.json to the temp folder
with open(Path(tmpdir) / "index.json", "w") as f:
@ -202,15 +222,34 @@ def upgrade_container_image_airgapped(
runtime.load_image_tarball_file(temporary_tar.name)
runtime.tag_image_by_digest(image_digest, image_name)
# XXX Convert the signatures to the expected format
# At this point, the signatures are verified
# We store the signatures just now to avoid storing unverified signatures
# store_signatures(signatures, image_hash, pubkey)
store_signatures(signatures, image_digest, pubkey)
return True
def convert_oci_images_signatures(
signatures_manifest: List[Dict], tmpdir: str
) -> List[Dict]:
def _to_cosign_signature(layer: Dict) -> Dict:
signature = layer["annotations"]["dev.cosignproject.cosign/signature"]
bundle = json.loads(layer["annotations"]["dev.sigstore.cosign/bundle"])
payload_body = json.loads(b64decode(bundle["Payload"]["body"]))
payload_location = _get_blob(tmpdir, layer["digest"])
with open(payload_location, "rb") as f:
payload_b64 = b64encode(f.read()).decode()
return {
"Base64Signature": payload_body["spec"]["signature"]["content"],
"Payload": payload_b64,
"Cert": None,
"Chain": None,
"rekorBundle": bundle,
"RFC3161Timestamp": None,
}
return [_to_cosign_signature(layer) for layer in signatures_manifest["layers"]]
def get_file_hash(file: Optional[str] = None, content: Optional[bytes] = None) -> str:
"""Get the sha256 hash of a file or content"""
if not file and not content:
@ -268,7 +307,9 @@ def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str) -> No
raise errors.InvalidSignatures("Signatures do not share the same image hash")
if f"sha256:{image_hash}" != hashes[0]:
raise errors.SignatureMismatch("Signatures do not match the given image hash")
raise errors.SignatureMismatch(
f"Signatures do not match the given image hash ({image_hash}, {hashes[0]})"
)
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
pubkey_signatures.mkdir(exist_ok=True)