Allow installation on air-gapped systems

- Verify the archive against the known public signature
- Prepare a new archive format (with signature removed)
- Load the new image and retag it with the expected tag

During this process, the signatures are lost and should instead be
converted to a known format. Additionally, the name fo the repository
should ideally come from the signatures rather than from the command
line.
This commit is contained in:
Alexis Métaireau 2025-02-03 17:39:03 +01:00
parent d4547b8964
commit f30ced7834
No known key found for this signature in database
GPG key ID: C65C7A89A8FFC56E
6 changed files with 373 additions and 48 deletions

View file

@ -116,7 +116,7 @@ def get_expected_tag() -> str:
return f.read().strip()
def load_image_tarball() -> None:
def load_image_tarball_in_memory() -> None:
log.info("Installing Dangerzone container image...")
p = subprocess.Popen(
[get_runtime(), "load"],
@ -147,6 +147,36 @@ def load_image_tarball() -> None:
log.info("Successfully installed container image from")
def load_image_tarball_file(tarball_path: str) -> None:
cmd = [get_runtime(), "load", "-i", tarball_path]
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
log.info("Successfully installed container image from %s", tarball_path)
def tag_image_by_digest(digest: str, tag: str) -> None:
image_id = get_image_id_by_digest(digest)
cmd = [get_runtime(), "tag", image_id, tag]
log.debug(" ".join(cmd))
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
def get_image_id_by_digest(digest: str) -> str:
cmd = [
get_runtime(),
"images",
"-f",
f"digest={digest}",
"--format",
"{{.Id}}",
]
log.debug(" ".join(cmd))
process = subprocess.run(
cmd, startupinfo=get_subprocess_startupinfo(), check=True, capture_output=True
)
return process.stdout.decode().strip()
def container_pull(image: str) -> bool:
"""Pull a container image from a registry."""
cmd = [get_runtime_name(), "pull", f"{image}"]
@ -155,8 +185,10 @@ def container_pull(image: str) -> bool:
return process.returncode == 0
def load_image_hash(image: str) -> str:
"""Returns a image hash from a local image name"""
def get_local_image_hash(image: str) -> str:
"""
Returns a image hash from a local image name
"""
cmd = [get_runtime_name(), "image", "inspect", image, "-f", "{{.Digest}}"]
result = subprocess.run(cmd, capture_output=True, check=True)
return result.stdout.strip().decode().strip("sha256:")

View file

@ -102,7 +102,7 @@ class Container(IsolationProvider):
return True
# Load the image tarball into the container runtime.
container_utils.load_image_tarball()
container_utils.load_image_tarball_in_memory()
# Check that the container image has the expected image tag.
# See https://github.com/freedomofpress/dangerzone/issues/988 for an example

189
dangerzone/rntime.py Normal file
View file

@ -0,0 +1,189 @@
import gzip
import logging
import platform
import shutil
import subprocess
from typing import List, Optional, Tuple
from . import errors
from .util import get_resource_path, get_subprocess_startupinfo
CONTAINER_NAME = "dangerzone.rocks/dangerzone"
log = logging.getLogger(__name__)
def get_runtime_name() -> str:
if platform.system() == "Linux":
return "podman"
# Windows, Darwin, and unknown use docker for now, dangerzone-vm eventually
return "docker"
def get_runtime_version() -> Tuple[int, int]:
"""Get the major/minor parts of the Docker/Podman version.
Some of the operations we perform in this module rely on some Podman features
that are not available across all of our platforms. In order to have a proper
fallback, we need to know the Podman version. More specifically, we're fine with
just knowing the major and minor version, since writing/installing a full-blown
semver parser is an overkill.
"""
# Get the Docker/Podman version, using a Go template.
runtime = get_runtime_name()
if runtime == "podman":
query = "{{.Client.Version}}"
else:
query = "{{.Server.Version}}"
cmd = [runtime, "version", "-f", query]
try:
version = subprocess.run(
cmd,
startupinfo=get_subprocess_startupinfo(),
capture_output=True,
check=True,
).stdout.decode()
except Exception as e:
msg = f"Could not get the version of the {runtime.capitalize()} tool: {e}"
raise RuntimeError(msg) from e
# Parse this version and return the major/minor parts, since we don't need the
# rest.
try:
major, minor, _ = version.split(".", 3)
return (int(major), int(minor))
except Exception as e:
msg = (
f"Could not parse the version of the {runtime.capitalize()} tool"
f" (found: '{version}') due to the following error: {e}"
)
raise RuntimeError(msg)
def get_runtime() -> str:
container_tech = get_runtime_name()
runtime = shutil.which(container_tech)
if runtime is None:
raise errors.NoContainerTechException(container_tech)
return runtime
def list_image_tags() -> List[str]:
"""Get the tags of all loaded Dangerzone images.
This method returns a mapping of image tags to image IDs, for all Dangerzone
images. This can be useful when we want to find which are the local image tags,
and which image ID does the "latest" tag point to.
"""
return (
subprocess.check_output(
[
get_runtime(),
"image",
"list",
"--format",
"{{ .Tag }}",
CONTAINER_NAME,
],
text=True,
startupinfo=get_subprocess_startupinfo(),
)
.strip()
.split()
)
def delete_image_tag(tag: str) -> None:
"""Delete a Dangerzone image tag."""
name = CONTAINER_NAME + ":" + tag
log.warning(f"Deleting old container image: {name}")
try:
subprocess.check_output(
[get_runtime(), "rmi", "--force", name],
startupinfo=get_subprocess_startupinfo(),
)
except Exception as e:
log.warning(
f"Couldn't delete old container image '{name}', so leaving it there."
f" Original error: {e}"
)
def get_expected_tag() -> str:
"""Get the tag of the Dangerzone image tarball from the image-id.txt file."""
with open(get_resource_path("image-id.txt")) as f:
return f.read().strip()
def tag_image_by_digest(digest: str, tag: str) -> None:
image_id = get_image_id_by_digest(digest)
cmd = [get_runtime(), "tag", image_id, tag]
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
def get_image_id_by_digest(digest: str) -> str:
cmd = [
get_runtime(),
"image",
"tag",
"-f",
f'digest="{digest}"',
"--format ",
"{{.Id}}",
]
process = subprocess.run(
cmd, startupinfo=get_subprocess_startupinfo(), check=True, capture_output=True
)
return process.stdout.decode().strip()
def load_image_tarball_in_memory(
compressed_container_path: Optional[str] = None,
) -> None:
if compressed_container_path is None:
compressed_container_path = get_resource_path("container.tar.gz")
log.info("Installing Dangerzone container image...")
p = subprocess.Popen(
[get_runtime(), "load"],
stdin=subprocess.PIPE,
startupinfo=get_subprocess_startupinfo(),
)
chunk_size = 4 << 20
with gzip.open(compressed_container_path) as f:
while True:
chunk = f.read(chunk_size)
if len(chunk) > 0:
if p.stdin:
p.stdin.write(chunk)
else:
break
_, err = p.communicate()
if p.returncode < 0:
if err:
error = err.decode()
else:
error = "No output"
raise errors.ImageInstallationException(
f"Could not install container image: {error}"
)
log.info("Successfully installed container image from")
def load_image_tarball_file(container_path: str) -> None:
cmd = [get_runtime(), "load", "-i", container_path]
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
log.info("Successfully installed container image from %s", container_path)
def container_pull(image: str) -> bool:
# XXX - Move to container_utils.py
cmd = [get_runtime_name(), "pull", f"{image}"]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
process.communicate()
return process.returncode == 0

View file

@ -7,16 +7,20 @@ import click
from ..util import get_resource_path
from . import errors, log, registry
from .attestations import verify_attestation
from .signatures import upgrade_container_image, verify_offline_image_signature
from .signatures import (
upgrade_container_image,
upgrade_container_image_airgapped,
verify_offline_image_signature,
)
DEFAULT_REPOSITORY = "freedomofpress/dangerzone"
DEFAULT_IMAGE_NAME = "ghcr.io/freedomofpress/dangerzone"
PUBKEY_DEFAULT_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key")
@click.group()
@click.option("--debug", is_flag=True)
def main(debug=False) -> None:
def main(debug: bool) -> None:
if debug:
click.echo("Debug mode enabled")
level = logging.DEBUG
@ -26,11 +30,9 @@ def main(debug=False) -> None:
@main.command()
@click.option("--image")
@click.argument("image")
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
@click.option("--airgap", is_flag=True)
# XXX Add options to do airgap upgrade
def upgrade(image: str, pubkey: str, airgap: bool) -> None:
def upgrade(image: str, pubkey: str) -> None:
"""Upgrade the image to the latest signed version."""
manifest_hash = registry.get_manifest_hash(image)
try:
@ -41,6 +43,20 @@ def upgrade(image: str, pubkey: str, airgap: bool) -> None:
raise click.Abort()
@main.command()
@click.argument("image_filename")
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
@click.option("--image-name", default=DEFAULT_IMAGE_NAME)
def upgrade_airgapped(image_filename: str, pubkey: str, image_name: str) -> None:
"""Upgrade the image to the latest signed version."""
try:
upgrade_container_image_airgapped(image_filename, pubkey, image_name)
click.echo(f"✅ Installed image {image_filename} on the system")
except errors.ImageAlreadyUpToDate as e:
click.echo(f"{e}")
raise click.Abort()
@main.command()
@click.argument("image")
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)

View file

@ -12,7 +12,6 @@ __all__ = [
"list_tags",
"get_manifest",
"get_attestation",
"Image",
"parse_image_location",
]

View file

@ -2,15 +2,16 @@ import json
import platform
import re
import subprocess
import tarfile
from base64 import b64decode
from hashlib import sha256
from io import BytesIO
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Dict, List, Tuple
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import Dict, List, Optional, Tuple
from ..container_utils import container_pull, load_image_hash
from . import errors, log, utils
from .registry import get_manifest_hash
from .. import container_utils as runtime
from . import errors, log, registry, utils
try:
import platformdirs
@ -54,12 +55,40 @@ def signature_to_bundle(sig: Dict) -> Dict:
}
def verify_signature(signature: dict, pubkey: str) -> bool:
def cosign_verify_local_image(oci_image_folder: str, pubkey: str) -> bool:
"""Verify the given path against the given public key"""
utils.ensure_cosign()
cmd = [
"cosign",
"verify",
"--key",
pubkey,
"--offline",
"--local-image",
oci_image_folder,
]
log.debug(" ".join(cmd))
result = subprocess.run(cmd, capture_output=True)
if result.returncode == 0:
log.debug("Signature verified")
return True
log.debug("Failed to verify signature", result.stderr)
return False
def verify_signature(signature: dict, image_hash: str, pubkey: str) -> bool:
"""Verify a signature against a given public key"""
# XXX - Also verfy the identity/docker-reference field against the expected value
# e.g. ghcr.io/freedomofpress/dangerzone/dangerzone
utils.ensure_cosign()
signature_bundle = signature_to_bundle(signature)
payload_bytes = b64decode(signature_bundle["Payload"])
if json.loads(payload_bytes)["critical"]["type"] != f"sha256:{image_hash}":
raise errors.SignatureMismatch("The signature does not match the image hash")
with (
NamedTemporaryFile(mode="w") as signature_file,
NamedTemporaryFile(mode="bw") as payload_file,
@ -67,7 +96,6 @@ def verify_signature(signature: dict, pubkey: str) -> bool:
json.dump(signature_bundle, signature_file)
signature_file.flush()
payload_bytes = b64decode(signature_bundle["Payload"])
payload_file.write(payload_bytes)
payload_file.flush()
@ -92,47 +120,107 @@ def verify_signature(signature: dict, pubkey: str) -> bool:
return False
def new_image_release(image) -> bool:
remote_hash = get_manifest_hash(image)
local_hash = load_image_hash(image)
def new_image_release(image: str) -> bool:
remote_hash = registry.get_manifest_hash(image)
local_hash = runtime.get_local_image_hash(image)
log.debug("Remote hash: %s", remote_hash)
log.debug("Local hash: %s", local_hash)
return remote_hash != local_hash
def upgrade_container_image(
image: str,
manifest_hash: str,
def verify_signatures(
signatures: List[Dict],
image_hash: str,
pubkey: str,
) -> bool:
for signature in signatures:
if not verify_signature(signature, image_hash, pubkey):
raise errors.SignatureVerificationError()
return True
def upgrade_container_image(image: str, manifest_hash: str, pubkey: str) -> bool:
"""Verify and upgrade the image to the latest, if signed."""
if not new_image_release(image):
raise errors.ImageAlreadyUpToDate("The image is already up to date")
return False
signatures = get_signatures(image, manifest_hash)
log.debug("Signatures: %s", signatures)
if len(signatures) < 1:
raise errors.NoRemoteSignatures("No remote signatures found")
for signature in signatures:
signature_is_valid = verify_signature(signature, pubkey)
if not signature_is_valid:
raise errors.SignatureVerificationError()
signatures = get_remote_signatures(image, manifest_hash)
verify_signatures(signatures, manifest_hash, pubkey)
# At this point, the signatures are verified
# We store the signatures just now to avoid storing unverified signatures
store_signatures(signatures, manifest_hash, pubkey)
# let's upgrade the image
# XXX Use the hash here to avoid race conditions
return container_pull(image)
# XXX Use the image digest here to avoid race conditions
return runtime.container_pull(image)
def get_file_hash(file: str) -> str:
with open(file, "rb") as f:
content = f.read()
def upgrade_container_image_airgapped(
container_tar: str, pubkey: str, image_name: str
) -> bool:
"""
Verify the given archive against its self-contained signatures, then
upgrade the image and retag it to the expected tag.
Right now, the archive is extracted and reconstructed, requiring some space
on the filesystem.
"""
# XXX Use a memory buffer instead of the filesystem
with TemporaryDirectory() as tmpdir:
with tarfile.open(container_tar, "r") as archive:
archive.extractall(tmpdir)
# XXX Check if the contained signatures match the given ones?
# Or maybe store both signatures?
if not cosign_verify_local_image(tmpdir, pubkey):
raise errors.SignatureVerificationError()
# Remove the signatures from the archive.
with open(Path(tmpdir) / "index.json") as f:
index_json = json.load(f)
index_json["manifests"] = [
manifest
for manifest in index_json["manifests"]
if manifest["annotations"].get("kind")
!= "dev.cosignproject.cosign/sigs"
]
image_digest = index_json["manifests"][0].get("digest")
# Write the new index.json to the temp folder
with open(Path(tmpdir) / "index.json", "w") as f:
json.dump(index_json, f)
with NamedTemporaryFile(suffix=".tar") as temporary_tar:
with tarfile.open(temporary_tar.name, "w") as archive:
# The root is the tmpdir
archive.add(Path(tmpdir) / "index.json", arcname="index.json")
archive.add(Path(tmpdir) / "oci-layout", arcname="oci-layout")
archive.add(Path(tmpdir) / "blobs", arcname="blobs")
runtime.load_image_tarball_file(temporary_tar.name)
runtime.tag_image_by_digest(image_digest, image_name)
# XXX Convert the signatures to the expected format
# At this point, the signatures are verified
# We store the signatures just now to avoid storing unverified signatures
# store_signatures(signatures, image_hash, pubkey)
return True
def get_file_hash(file: Optional[str] = None, content: Optional[bytes] = None) -> str:
"""Get the sha256 hash of a file or content"""
if not file and not content:
raise errors.UpdaterError("No file or content provided")
if file:
with open(file, "rb") as f:
content = f.read()
if content:
return sha256(content).hexdigest()
return ""
def load_signatures(image_hash: str, pubkey: str) -> List[Dict]:
@ -197,23 +285,21 @@ def verify_offline_image_signature(image: str, pubkey: str) -> bool:
Verifies that a local image has a valid signature
"""
log.info(f"Verifying local image {image} against pubkey {pubkey}")
image_hash = load_image_hash(image)
image_hash = runtime.get_local_image_hash(image)
log.debug(f"Image hash: {image_hash}")
signatures = load_signatures(image_hash, pubkey)
if len(signatures) < 1:
raise errors.LocalSignatureNotFound("No signatures found")
for signature in signatures:
if not verify_signature(signature, pubkey):
if not verify_signature(signature, image_hash, pubkey):
msg = f"Unable to verify signature for {image} with pubkey {pubkey}"
raise errors.SignatureVerificationError(msg)
return True
def get_signatures(image: str, hash: str) -> List[Dict]:
"""
Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format.
"""
def get_remote_signatures(image: str, hash: str) -> List[Dict]:
"""Retrieve the signatures from the registry, via `cosign download`."""
utils.ensure_cosign()
process = subprocess.run(
@ -225,4 +311,7 @@ def get_signatures(image: str, hash: str) -> List[Dict]:
# XXX: Check the output first.
# Remove the last return, split on newlines, convert from JSON
signatures_raw = process.stdout.decode("utf-8").strip().split("\n")
return list(map(json.loads, signatures_raw))
signatures = list(map(json.loads, signatures_raw))
if len(signatures) < 1:
raise errors.NoRemoteSignatures("No signatures found for the image")
return signatures