Make prepared container images comptatible with podman load -i

This reverses how the airgapped container images were working. During
preparation, the `index.json` is now trimmed down to be loadable by
podman and docker. The original `index.json` is kept in the archive as
`dangerzone.json`. This has two benefits:

1. The image is now loadable by a container engine without changes;
2. There is no more a need to recreate the archive at runtime, leading
   to less CPU and space usage on the user device.
This commit is contained in:
Alexis Métaireau 2025-05-06 19:47:38 +02:00
parent 7045525293
commit 7d88e4b7bb
No known key found for this signature in database
GPG key ID: C65C7A89A8FFC56E
2 changed files with 146 additions and 64 deletions

View file

@ -58,6 +58,21 @@ class InvalidLogIndex(SignatureError):
pass
class InvalidImageArchive(UpdaterError):
pass
class InvalidDangerzoneManifest(InvalidImageArchive):
"""Raised when the dangerzone.json manifest dodesn't match the index.json
manifest in a container.tar image.
This could mean that the container image has been tempered and is not safe
to load, so we bail out.
"""
pass
class NeedUserInput(UpdaterError):
"""The user has not yet been prompted to know if they want to check for updates."""

View file

@ -4,10 +4,11 @@ import re
import subprocess
import tarfile
from base64 import b64decode, b64encode
from dataclasses import dataclass
from functools import reduce
from hashlib import sha256
from io import BytesIO
from pathlib import Path
from pathlib import Path, PurePath
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import Callable, Dict, List, Optional, Tuple
@ -27,13 +28,13 @@ def appdata_dir() -> Path:
# RELEASE: Bump this value to the log index of the latest signature
# to ensures the software can't upgrade to container images that predates it.
# to ensure the software can't upgrade to container images that predates it.
DEFAULT_LOG_INDEX = 0
# FIXME Store this somewhere else.
DEFAULT_PUBKEY_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key")
SIGNATURES_PATH = appdata_dir() / "signatures"
LAST_LOG_INDEX = SIGNATURES_PATH / "last_log_index"
DANGERZONE_MANIFEST = "dangerzone.json"
__all__ = [
"verify_signature",
@ -72,7 +73,8 @@ def verify_signature(signature: dict, image_digest: str, pubkey: Path) -> None:
- the signature has been signed by the given public key
- the signature matches the given image digest
"""
# XXX - Also verify the identity/docker-reference field against the expected value
# FIXME Also verify the identity/docker-reference field against
# `container_utils.expected_image_name()`
# e.g. ghcr.io/freedomofpress/dangerzone/dangerzone
cosign.ensure_installed()
@ -119,9 +121,11 @@ def verify_signature(signature: dict, image_digest: str, pubkey: Path) -> None:
log.debug("Signature verified")
@dataclass
class Signature:
def __init__(self, signature: Dict):
self.signature = signature
"""Utility class to interact with signatures"""
signature: Dict
@property
def payload(self) -> Dict:
@ -179,6 +183,7 @@ def verify_signatures(
raise errors.SignatureVerificationError("No signatures found")
for signature in signatures:
# Will raise on errors
verify_signature(signature, image_digest, pubkey)
return True
@ -211,8 +216,27 @@ def write_log_index(log_index: int) -> None:
f.write(str(log_index))
def _get_blob(tmpdir: str, digest: str) -> Path:
return Path(tmpdir) / "blobs" / "sha256" / digest.replace("sha256:", "")
def _get_images_only_manifest(input: dict) -> dict:
"""Filter out all the non-images from a loaded manifest"""
output = input.copy()
output["manifests"] = [
manifest
for manifest in input["manifests"]
if manifest["annotations"].get("kind")
in ("dev.cosignproject.cosign/imageIndex", "dev.cosignproject.cosign/image")
]
return output
def _get_blob(digest: str) -> PurePath:
return PurePath() / "blobs" / "sha256" / digest.replace("sha256:", "")
def _get_signature_filename(input: Dict) -> PurePath:
for manifest in input["manifests"]:
if manifest["annotations"].get("kind") == "dev.cosignproject.cosign/sigs":
return _get_blob(manifest["digest"])
raise errors.SignatureExtractionError()
def upgrade_container_image_airgapped(
@ -222,45 +246,54 @@ def upgrade_container_image_airgapped(
Verify the given archive against its self-contained signatures, then
upgrade the image and retag it to the expected tag.
Right now, the archive is extracted and reconstructed, requiring some space
on the filesystem.
The logic supports both "dangerzone archives" and "cosign archives".
The presence of a `dangerzone.json` file at the root of the tarball
meaning it's a "dangerzone archive".
See `prepare_airgapped_archive` for more details.
:return: The loaded image name
"""
# XXX Use a memory buffer instead of the filesystem
with TemporaryDirectory() as tmpdir:
with TemporaryDirectory() as _tempdir, tarfile.open(container_tar, "r") as archive:
# First, check that we have a "signatures.json" file
files = archive.getnames()
tmppath = Path(_tempdir)
def _get_signature_filename(manifests: List[Dict]) -> Path:
for manifest in manifests:
if (
manifest["annotations"].get("kind")
== "dev.cosignproject.cosign/sigs"
has_dangerzone_manifest = f"./{DANGERZONE_MANIFEST}" in files
if not has_dangerzone_manifest:
raise errors.InvalidImageArchive()
# Ensure that the signatures.json is the same as the index.json
# with only the images remaining, to avoid situations where we
# check the signatures but the index.json differs, making us
# think that we're with valid signatures where we indeed aren't.
archive.extract(f"./{DANGERZONE_MANIFEST}", tmppath)
archive.extract("./index.json", tmppath)
with (
(tmppath / DANGERZONE_MANIFEST).open() as dzf,
(tmppath / "index.json").open() as indexf,
):
return _get_blob(tmpdir, manifest["digest"])
raise errors.SignatureExtractionError()
dz_manifest = json.load(dzf)
index_manifest = json.load(indexf)
with tarfile.open(container_tar, "r") as archive:
archive.extractall(tmpdir)
expected_manifest = _get_images_only_manifest(dz_manifest)
if expected_manifest != index_manifest:
raise errors.InvalidDangerzoneManifest()
if not cosign.verify_local_image(tmpdir, pubkey):
raise errors.SignatureVerificationError()
# FIXME: remove this once we check the signatures validity
# if not cosign.verify_local_image(tmpdir, pubkey):
# raise errors.SignatureVerificationError()
# Remove the signatures from the archive, otherwise podman is not able to load it
with open(Path(tmpdir) / "index.json") as f:
index_json = json.load(f)
signature_filename = _get_signature_filename(dz_manifest)
archive.extract(f"./{str(signature_filename)}", tmppath)
signature_filename = _get_signature_filename(index_json["manifests"])
index_json["manifests"] = [
manifest
for manifest in index_json["manifests"]
if manifest["annotations"].get("kind")
in ("dev.cosignproject.cosign/imageIndex", "dev.cosignproject.cosign/image")
]
with open(signature_filename, "r") as f:
image_name, signatures = convert_oci_images_signatures(json.load(f), tmpdir)
with (tmppath / signature_filename).open() as f:
image_name, signatures = convert_oci_images_signatures(
json.load(f), archive, tmppath
)
log.info(f"Found image name: {image_name}")
if not bypass_logindex:
@ -273,36 +306,36 @@ def upgrade_container_image_airgapped(
"The log index is not higher than the last known one"
)
image_digest = index_json["manifests"][0].get("digest").replace("sha256:", "")
image_digest = dz_manifest["manifests"][0].get("digest").replace("sha256:", "")
# Write the new index.json to the temp folder
with open(Path(tmpdir) / "index.json", "w") as f:
json.dump(index_json, f)
with NamedTemporaryFile(suffix=".tar") as temporary_tar:
with tarfile.open(temporary_tar.name, "w") as archive:
# The root is the tmpdir
archive.add(Path(tmpdir) / "index.json", arcname="index.json")
archive.add(Path(tmpdir) / "oci-layout", arcname="oci-layout")
archive.add(Path(tmpdir) / "blobs", arcname="blobs")
runtime.load_image_tarball(Path(temporary_tar.name))
runtime.load_image_tarball(container_tar)
runtime.tag_image_by_digest(image_digest, image_name)
store_signatures(signatures, image_digest, pubkey)
return image_name
def get_blob_from_archive(digest: str, tmppath: Path, archive: tarfile.TarFile) -> Path:
"""
Extracts the blob from the given archive, place it in the given path and
return its Path.
"""
relpath = _get_blob(digest)
archive.extract(f"./{str(relpath)}", tmppath)
return tmppath / relpath
def convert_oci_images_signatures(
signatures_manifest: Dict, tmpdir: str
signatures_manifest: Dict, archive: tarfile.TarFile, tmppath: Path
) -> Tuple[str, List[Dict]]:
def _to_cosign_signature(layer: Dict) -> Dict:
signature = layer["annotations"]["dev.cosignproject.cosign/signature"]
bundle = json.loads(layer["annotations"]["dev.sigstore.cosign/bundle"])
payload_body = json.loads(b64decode(bundle["Payload"]["body"]))
payload_location = _get_blob(tmpdir, layer["digest"])
with open(payload_location, "rb") as f:
payload_path = get_blob_from_archive(layer["digest"], tmppath, archive)
with (payload_path).open("rb") as f:
payload_b64 = b64encode(f.read()).decode()
return {
@ -320,7 +353,7 @@ def convert_oci_images_signatures(
if not signatures:
raise errors.SignatureExtractionError()
payload_location = _get_blob(tmpdir, layers[0]["digest"])
payload_location = get_blob_from_archive(layers[0]["digest"], tmppath, archive)
with open(payload_location, "r") as f:
payload = json.load(f)
image_name = payload["critical"]["identity"]["docker-reference"]
@ -402,7 +435,7 @@ def store_signatures(
processed by the updater.
The format used in the `.json` file is the one of `cosign download
signature`, which differs from the "bundle" one used afterwards.
signature`, which differs from the "bundle" one used in the code.
It can be converted to the one expected by cosign verify --bundle with
the `signature_to_bundle()` function.
@ -478,15 +511,29 @@ def get_remote_signatures(image: str, digest: str) -> List[Dict]:
def prepare_airgapped_archive(image_name: str, destination: str) -> None:
"""
Prepare a container image tarball to be used in environments that do not
want to make a {podman,docker} pull.
Podman and Docker are not able to load archives for which the index.json file
contains signatures and attestations, so we need to remove them from the
index.json present in the archive.
Because we still want to retain the signatures somehow, we copy original
index.json to signatures.json, and refer to it when we need to verify the
signatures.
"""
if "@sha256:" not in image_name:
raise errors.AirgappedImageDownloadError(
"The image name must include a digest, e.g. ghcr.io/freedomofpress/dangerzone/dangerzone@sha256:123456"
"The image name must include a digest, "
"e.g. ghcr.io/freedomofpress/dangerzone/dangerzone@sha256:123456"
)
cosign.ensure_installed()
# Get the image from the registry
with TemporaryDirectory() as tmpdir:
tmppath = Path(tmpdir)
msg = f"Downloading image {image_name}. \nIt might take a while."
log.info(msg)
@ -498,8 +545,28 @@ def prepare_airgapped_archive(image_name: str, destination: str) -> None:
if process.returncode != 0:
raise errors.AirgappedImageDownloadError()
# Read from index.json, save it as DANGERZONE_MANIFEST
# and then change the index.json contents to only contain
# images (noting this as the naming might sound awkward)
with (
(tmppath / "index.json").open() as indexf,
(tmppath / DANGERZONE_MANIFEST).open("w+") as dzf,
):
original_index_json = json.load(indexf)
json.dump(original_index_json, dzf)
new_index_json = _get_images_only_manifest(original_index_json)
# Write the new index.json to the temp folder
with open(tmppath / "index.json", "w") as f:
json.dump(new_index_json, f)
with tarfile.open(destination, "w") as archive:
archive.add(tmpdir, arcname=".")
# The root is the tmpdir
# archive.add(tmppath / "index.json", arcname="index.json")
# archive.add(tmppath / "oci-layout", arcname="oci-layout")
# archive.add(tmppath / "blobs", arcname="blobs")
archive.add(str(tmppath), arcname=".")
def upgrade_container_image(