Get image name from signatures for air-gapped archives

This allows to be sure that the image name is verified by a known public
key, rather than relying on an input by the user, which can lead to issues.
This commit is contained in:
Alexis Métaireau 2025-02-04 15:32:08 +01:00
parent c6f5e61e0b
commit 3b858dac27
No known key found for this signature in database
GPG key ID: C65C7A89A8FFC56E
3 changed files with 26 additions and 16 deletions

View file

@ -40,12 +40,15 @@ def upgrade(image: str, pubkey: str) -> None:
@main.command()
@click.argument("image_filename")
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
@click.option("--image-name", default=DEFAULT_IMAGE_NAME)
def load_archive(image_filename: str, pubkey: str, image_name: str) -> None:
def load_archive(image_filename: str, pubkey: str) -> None:
"""Upgrade the local image to the one in the archive."""
try:
signatures.upgrade_container_image_airgapped(image_filename, pubkey, image_name)
click.echo(f"✅ Installed image {image_filename} on the system")
loaded_image = signatures.upgrade_container_image_airgapped(
image_filename, pubkey
)
click.echo(
f"✅ Installed image {image_filename} on the system as {loaded_image}"
)
except errors.ImageAlreadyUpToDate as e:
click.echo(f"{e}")
raise click.Abort()

View file

@ -26,7 +26,7 @@ def verify_local_image(oci_image_folder: str, pubkey: str) -> bool:
log.debug(" ".join(cmd))
result = subprocess.run(cmd, capture_output=True)
if result.returncode == 0:
log.debug("Signature verified")
log.info("Signature verified")
return True
log.debug("Failed to verify signature", result.stderr)
log.info("Failed to verify signature", result.stderr)
return False

View file

@ -138,15 +138,15 @@ def _get_blob(tmpdir: str, hash: str) -> Path:
return Path(tmpdir) / "blobs" / "sha256" / hash.replace("sha256:", "")
def upgrade_container_image_airgapped(
container_tar: str, pubkey: str, image_name: str
) -> bool:
def upgrade_container_image_airgapped(container_tar: str, pubkey: str) -> str:
"""
Verify the given archive against its self-contained signatures, then
upgrade the image and retag it to the expected tag.
Right now, the archive is extracted and reconstructed, requiring some space
on the filesystem.
:return: The loaded image name
"""
# XXX Use a memory buffer instead of the filesystem
@ -164,8 +164,6 @@ def upgrade_container_image_airgapped(
with tarfile.open(container_tar, "r") as archive:
archive.extractall(tmpdir)
# XXX Check if the contained signatures match the given ones?
# Or maybe store both signatures?
if not cosign.verify_local_image(tmpdir, pubkey):
raise errors.SignatureVerificationError()
@ -182,7 +180,8 @@ def upgrade_container_image_airgapped(
]
with open(signature_filename, "rb") as f:
signatures = convert_oci_images_signatures(json.load(f), tmpdir)
image_name, signatures = convert_oci_images_signatures(json.load(f), tmpdir)
log.info(f"Found image name: {image_name}")
image_digest = index_json["manifests"][0].get("digest").replace("sha256:", "")
@ -201,12 +200,12 @@ def upgrade_container_image_airgapped(
runtime.tag_image_by_digest(image_digest, image_name)
store_signatures(signatures, image_digest, pubkey)
return True
return image_name
def convert_oci_images_signatures(
signatures_manifest: List[Dict], tmpdir: str
) -> List[Dict]:
) -> (str, List[Dict]):
def _to_cosign_signature(layer: Dict) -> Dict:
signature = layer["annotations"]["dev.cosignproject.cosign/signature"]
bundle = json.loads(layer["annotations"]["dev.sigstore.cosign/bundle"])
@ -225,7 +224,15 @@ def convert_oci_images_signatures(
"RFC3161Timestamp": None,
}
return [_to_cosign_signature(layer) for layer in signatures_manifest["layers"]]
layers = signatures_manifest["layers"]
signatures = [_to_cosign_signature(layer) for layer in layers]
payload_location = _get_blob(tmpdir, layers[0]["digest"])
with open(payload_location, "r") as f:
payload = json.load(f)
image_name = payload["critical"]["identity"]["docker-reference"]
return image_name, signatures
def get_file_hash(file: Optional[str] = None, content: Optional[bytes] = None) -> str:
@ -293,7 +300,7 @@ def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str) -> No
pubkey_signatures.mkdir(exist_ok=True)
with open(pubkey_signatures / f"{image_hash}.json", "w") as f:
log.debug(
log.info(
f"Storing signatures for {image_hash} in {pubkey_signatures}/{image_hash}.json"
)
json.dump(signatures, f)