Compare commits

..

3 commits

Author SHA1 Message Date
75330b335c
Merge 12aafa2606 into 88a6b37770 2025-02-03 19:20:52 +00:00
Alexis Métaireau
12aafa2606
Locally store the signatures for oci-images archives
Some checks are pending
Tests / run tests (ubuntu 22.04) (push) Blocked by required conditions
Tests / run tests (ubuntu 24.04) (push) Blocked by required conditions
Tests / run tests (ubuntu 24.10) (push) Blocked by required conditions
Tests / run-lint (push) Waiting to run
Tests / build-container-image (push) Waiting to run
Tests / Download and cache Tesseract data (push) Waiting to run
Tests / windows (push) Blocked by required conditions
Tests / macOS (arch64) (push) Blocked by required conditions
Tests / macOS (x86_64) (push) Blocked by required conditions
Tests / build-deb (debian bullseye) (push) Blocked by required conditions
Tests / build-deb (debian trixie) (push) Blocked by required conditions
Tests / build-deb (ubuntu 20.04) (push) Blocked by required conditions
Tests / build-deb (ubuntu 22.04) (push) Blocked by required conditions
Tests / build-deb (ubuntu 24.04) (push) Blocked by required conditions
Tests / build-deb (ubuntu 24.10) (push) Blocked by required conditions
Tests / install-deb (debian bookworm) (push) Blocked by required conditions
Tests / install-deb (debian bullseye) (push) Blocked by required conditions
Tests / install-deb (debian trixie) (push) Blocked by required conditions
Tests / install-deb (ubuntu 20.04) (push) Blocked by required conditions
Tests / install-deb (ubuntu 22.04) (push) Blocked by required conditions
Tests / install-deb (ubuntu 24.04) (push) Blocked by required conditions
Tests / install-deb (ubuntu 24.10) (push) Blocked by required conditions
Tests / build-install-rpm (fedora 40) (push) Blocked by required conditions
Tests / build-install-rpm (fedora 41) (push) Blocked by required conditions
Tests / run tests (debian bookworm) (push) Blocked by required conditions
Tests / run tests (debian bullseye) (push) Blocked by required conditions
Tests / run tests (debian trixie) (push) Blocked by required conditions
Tests / check-reproducibility (push) Waiting to run
Release container image / build-container-image (push) Waiting to run
Tests / build-deb (debian bookworm) (push) Blocked by required conditions
On air-gapped environements, it's now possible to load signatures
generated by `cosign save` commands. The signatures embedded in this
format will be converted to the one used by `cosign download signature`.
2025-02-03 20:18:10 +01:00
Alexis Métaireau
087e5bd1ad
Allow installation on air-gapped systems
- Verify the archive against the known public signature
- Prepare a new archive format (with signature removed)
- Load the new image and retag it with the expected tag

During this process, the signatures are lost and should instead be
converted to a known format. Additionally, the name fo the repository
should ideally come from the signatures rather than from the command
line.
2025-02-03 18:04:24 +01:00
7 changed files with 428 additions and 50 deletions

View file

@ -116,7 +116,7 @@ def get_expected_tag() -> str:
return f.read().strip()
def load_image_tarball() -> None:
def load_image_tarball_in_memory() -> None:
log.info("Installing Dangerzone container image...")
p = subprocess.Popen(
[get_runtime(), "load"],
@ -147,6 +147,44 @@ def load_image_tarball() -> None:
log.info("Successfully installed container image from")
def load_image_tarball_file(tarball_path: str) -> None:
cmd = [get_runtime(), "load", "-i", tarball_path]
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
log.info("Successfully installed container image from %s", tarball_path)
def tag_image_by_digest(digest: str, tag: str) -> None:
"""Tag a container image by digest.
The sha256: prefix should be omitted from the digest.
"""
image_id = get_image_id_by_digest(digest)
cmd = [get_runtime(), "tag", image_id, tag]
log.debug(" ".join(cmd))
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
def get_image_id_by_digest(digest: str) -> str:
"""Get an image ID from a digest.
The sha256: prefix should be omitted from the digest.
"""
cmd = [
get_runtime(),
"images",
"-f",
f"digest=sha256:{digest}",
"--format",
"{{.Id}}",
]
log.debug(" ".join(cmd))
process = subprocess.run(
cmd, startupinfo=get_subprocess_startupinfo(), check=True, capture_output=True
)
breakpoint()
# In case we have multiple lines, we only want the first one.
return process.stdout.decode().strip().split("\n")[0]
def container_pull(image: str) -> bool:
"""Pull a container image from a registry."""
cmd = [get_runtime_name(), "pull", f"{image}"]
@ -155,8 +193,10 @@ def container_pull(image: str) -> bool:
return process.returncode == 0
def load_image_hash(image: str) -> str:
"""Returns a image hash from a local image name"""
def get_local_image_hash(image: str) -> str:
"""
Returns a image hash from a local image name
"""
cmd = [get_runtime_name(), "image", "inspect", image, "-f", "{{.Digest}}"]
result = subprocess.run(cmd, capture_output=True, check=True)
return result.stdout.strip().decode().strip("sha256:")

View file

@ -102,7 +102,7 @@ class Container(IsolationProvider):
return True
# Load the image tarball into the container runtime.
container_utils.load_image_tarball()
container_utils.load_image_tarball_in_memory()
# Check that the container image has the expected image tag.
# See https://github.com/freedomofpress/dangerzone/issues/988 for an example

189
dangerzone/rntime.py Normal file
View file

@ -0,0 +1,189 @@
import gzip
import logging
import platform
import shutil
import subprocess
from typing import List, Optional, Tuple
from . import errors
from .util import get_resource_path, get_subprocess_startupinfo
CONTAINER_NAME = "dangerzone.rocks/dangerzone"
log = logging.getLogger(__name__)
def get_runtime_name() -> str:
if platform.system() == "Linux":
return "podman"
# Windows, Darwin, and unknown use docker for now, dangerzone-vm eventually
return "docker"
def get_runtime_version() -> Tuple[int, int]:
"""Get the major/minor parts of the Docker/Podman version.
Some of the operations we perform in this module rely on some Podman features
that are not available across all of our platforms. In order to have a proper
fallback, we need to know the Podman version. More specifically, we're fine with
just knowing the major and minor version, since writing/installing a full-blown
semver parser is an overkill.
"""
# Get the Docker/Podman version, using a Go template.
runtime = get_runtime_name()
if runtime == "podman":
query = "{{.Client.Version}}"
else:
query = "{{.Server.Version}}"
cmd = [runtime, "version", "-f", query]
try:
version = subprocess.run(
cmd,
startupinfo=get_subprocess_startupinfo(),
capture_output=True,
check=True,
).stdout.decode()
except Exception as e:
msg = f"Could not get the version of the {runtime.capitalize()} tool: {e}"
raise RuntimeError(msg) from e
# Parse this version and return the major/minor parts, since we don't need the
# rest.
try:
major, minor, _ = version.split(".", 3)
return (int(major), int(minor))
except Exception as e:
msg = (
f"Could not parse the version of the {runtime.capitalize()} tool"
f" (found: '{version}') due to the following error: {e}"
)
raise RuntimeError(msg)
def get_runtime() -> str:
container_tech = get_runtime_name()
runtime = shutil.which(container_tech)
if runtime is None:
raise errors.NoContainerTechException(container_tech)
return runtime
def list_image_tags() -> List[str]:
"""Get the tags of all loaded Dangerzone images.
This method returns a mapping of image tags to image IDs, for all Dangerzone
images. This can be useful when we want to find which are the local image tags,
and which image ID does the "latest" tag point to.
"""
return (
subprocess.check_output(
[
get_runtime(),
"image",
"list",
"--format",
"{{ .Tag }}",
CONTAINER_NAME,
],
text=True,
startupinfo=get_subprocess_startupinfo(),
)
.strip()
.split()
)
def delete_image_tag(tag: str) -> None:
"""Delete a Dangerzone image tag."""
name = CONTAINER_NAME + ":" + tag
log.warning(f"Deleting old container image: {name}")
try:
subprocess.check_output(
[get_runtime(), "rmi", "--force", name],
startupinfo=get_subprocess_startupinfo(),
)
except Exception as e:
log.warning(
f"Couldn't delete old container image '{name}', so leaving it there."
f" Original error: {e}"
)
def get_expected_tag() -> str:
"""Get the tag of the Dangerzone image tarball from the image-id.txt file."""
with open(get_resource_path("image-id.txt")) as f:
return f.read().strip()
def tag_image_by_digest(digest: str, tag: str) -> None:
image_id = get_image_id_by_digest(digest)
cmd = [get_runtime(), "tag", image_id, tag]
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
def get_image_id_by_digest(digest: str) -> str:
cmd = [
get_runtime(),
"image",
"tag",
"-f",
f'digest="{digest}"',
"--format ",
"{{.Id}}",
]
process = subprocess.run(
cmd, startupinfo=get_subprocess_startupinfo(), check=True, capture_output=True
)
return process.stdout.decode().strip()
def load_image_tarball_in_memory(
compressed_container_path: Optional[str] = None,
) -> None:
if compressed_container_path is None:
compressed_container_path = get_resource_path("container.tar.gz")
log.info("Installing Dangerzone container image...")
p = subprocess.Popen(
[get_runtime(), "load"],
stdin=subprocess.PIPE,
startupinfo=get_subprocess_startupinfo(),
)
chunk_size = 4 << 20
with gzip.open(compressed_container_path) as f:
while True:
chunk = f.read(chunk_size)
if len(chunk) > 0:
if p.stdin:
p.stdin.write(chunk)
else:
break
_, err = p.communicate()
if p.returncode < 0:
if err:
error = err.decode()
else:
error = "No output"
raise errors.ImageInstallationException(
f"Could not install container image: {error}"
)
log.info("Successfully installed container image from")
def load_image_tarball_file(container_path: str) -> None:
cmd = [get_runtime(), "load", "-i", container_path]
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
log.info("Successfully installed container image from %s", container_path)
def container_pull(image: str) -> bool:
# XXX - Move to container_utils.py
cmd = [get_runtime_name(), "pull", f"{image}"]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
process.communicate()
return process.returncode == 0

View file

@ -7,16 +7,20 @@ import click
from ..util import get_resource_path
from . import errors, log, registry
from .attestations import verify_attestation
from .signatures import upgrade_container_image, verify_offline_image_signature
from .signatures import (
upgrade_container_image,
upgrade_container_image_airgapped,
verify_offline_image_signature,
)
DEFAULT_REPOSITORY = "freedomofpress/dangerzone"
DEFAULT_IMAGE_NAME = "ghcr.io/freedomofpress/dangerzone"
PUBKEY_DEFAULT_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key")
@click.group()
@click.option("--debug", is_flag=True)
def main(debug=False) -> None:
def main(debug: bool) -> None:
if debug:
click.echo("Debug mode enabled")
level = logging.DEBUG
@ -26,11 +30,9 @@ def main(debug=False) -> None:
@main.command()
@click.option("--image")
@click.argument("image")
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
@click.option("--airgap", is_flag=True)
# XXX Add options to do airgap upgrade
def upgrade(image: str, pubkey: str, airgap: bool) -> None:
def upgrade(image: str, pubkey: str) -> None:
"""Upgrade the image to the latest signed version."""
manifest_hash = registry.get_manifest_hash(image)
try:
@ -41,6 +43,20 @@ def upgrade(image: str, pubkey: str, airgap: bool) -> None:
raise click.Abort()
@main.command()
@click.argument("image_filename")
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
@click.option("--image-name", default=DEFAULT_IMAGE_NAME)
def upgrade_airgapped(image_filename: str, pubkey: str, image_name: str) -> None:
"""Upgrade the image to the latest signed version."""
try:
upgrade_container_image_airgapped(image_filename, pubkey, image_name)
click.echo(f"✅ Installed image {image_filename} on the system")
except errors.ImageAlreadyUpToDate as e:
click.echo(f"{e}")
raise click.Abort()
@main.command()
@click.argument("image")
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)

View file

@ -22,6 +22,10 @@ class SignatureVerificationError(SignatureError):
pass
class SignatureExtractionError(SignatureError):
pass
class SignaturesFolderDoesNotExist(SignatureError):
pass

View file

@ -12,7 +12,6 @@ __all__ = [
"list_tags",
"get_manifest",
"get_attestation",
"Image",
"parse_image_location",
]

View file

@ -2,15 +2,16 @@ import json
import platform
import re
import subprocess
from base64 import b64decode
import tarfile
from base64 import b64decode, b64encode
from hashlib import sha256
from io import BytesIO
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Dict, List, Tuple
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import Dict, List, Optional, Tuple
from ..container_utils import container_pull, load_image_hash
from . import errors, log, utils
from .registry import get_manifest_hash
from .. import container_utils as runtime
from . import errors, log, registry, utils
try:
import platformdirs
@ -54,12 +55,40 @@ def signature_to_bundle(sig: Dict) -> Dict:
}
def verify_signature(signature: dict, pubkey: str) -> bool:
def cosign_verify_local_image(oci_image_folder: str, pubkey: str) -> bool:
"""Verify the given path against the given public key"""
utils.ensure_cosign()
cmd = [
"cosign",
"verify",
"--key",
pubkey,
"--offline",
"--local-image",
oci_image_folder,
]
log.debug(" ".join(cmd))
result = subprocess.run(cmd, capture_output=True)
if result.returncode == 0:
log.debug("Signature verified")
return True
log.debug("Failed to verify signature", result.stderr)
return False
def verify_signature(signature: dict, image_hash: str, pubkey: str) -> bool:
"""Verify a signature against a given public key"""
# XXX - Also verfy the identity/docker-reference field against the expected value
# e.g. ghcr.io/freedomofpress/dangerzone/dangerzone
utils.ensure_cosign()
signature_bundle = signature_to_bundle(signature)
payload_bytes = b64decode(signature_bundle["Payload"])
if json.loads(payload_bytes)["critical"]["type"] != f"sha256:{image_hash}":
raise errors.SignatureMismatch("The signature does not match the image hash")
with (
NamedTemporaryFile(mode="w") as signature_file,
NamedTemporaryFile(mode="bw") as payload_file,
@ -67,7 +96,6 @@ def verify_signature(signature: dict, pubkey: str) -> bool:
json.dump(signature_bundle, signature_file)
signature_file.flush()
payload_bytes = b64decode(signature_bundle["Payload"])
payload_file.write(payload_bytes)
payload_file.flush()
@ -92,47 +120,146 @@ def verify_signature(signature: dict, pubkey: str) -> bool:
return False
def new_image_release(image) -> bool:
remote_hash = get_manifest_hash(image)
local_hash = load_image_hash(image)
def new_image_release(image: str) -> bool:
remote_hash = registry.get_manifest_hash(image)
local_hash = runtime.get_local_image_hash(image)
log.debug("Remote hash: %s", remote_hash)
log.debug("Local hash: %s", local_hash)
return remote_hash != local_hash
def upgrade_container_image(
image: str,
manifest_hash: str,
def verify_signatures(
signatures: List[Dict],
image_hash: str,
pubkey: str,
) -> bool:
for signature in signatures:
if not verify_signature(signature, image_hash, pubkey):
raise errors.SignatureVerificationError()
return True
def upgrade_container_image(image: str, manifest_hash: str, pubkey: str) -> bool:
"""Verify and upgrade the image to the latest, if signed."""
if not new_image_release(image):
raise errors.ImageAlreadyUpToDate("The image is already up to date")
return False
signatures = get_signatures(image, manifest_hash)
log.debug("Signatures: %s", signatures)
if len(signatures) < 1:
raise errors.NoRemoteSignatures("No remote signatures found")
for signature in signatures:
signature_is_valid = verify_signature(signature, pubkey)
if not signature_is_valid:
raise errors.SignatureVerificationError()
signatures = get_remote_signatures(image, manifest_hash)
verify_signatures(signatures, manifest_hash, pubkey)
# At this point, the signatures are verified
# We store the signatures just now to avoid storing unverified signatures
store_signatures(signatures, manifest_hash, pubkey)
# let's upgrade the image
# XXX Use the hash here to avoid race conditions
return container_pull(image)
# XXX Use the image digest here to avoid race conditions
return runtime.container_pull(image)
def get_file_hash(file: str) -> str:
def _get_blob(tmpdir: str, hash: str) -> Path:
return Path(tmpdir) / "blobs" / "sha256" / hash.replace("sha256:", "")
def upgrade_container_image_airgapped(
container_tar: str, pubkey: str, image_name: str
) -> bool:
"""
Verify the given archive against its self-contained signatures, then
upgrade the image and retag it to the expected tag.
Right now, the archive is extracted and reconstructed, requiring some space
on the filesystem.
"""
# XXX Use a memory buffer instead of the filesystem
with TemporaryDirectory() as tmpdir:
def _get_signature_filename(manifests: List[Dict]) -> Path:
for manifest in manifests:
if (
manifest["annotations"].get("kind")
== "dev.cosignproject.cosign/sigs"
):
return _get_blob(tmpdir, manifest["digest"])
raise errors.SignatureExtractionError()
with tarfile.open(container_tar, "r") as archive:
archive.extractall(tmpdir)
# XXX Check if the contained signatures match the given ones?
# Or maybe store both signatures?
if not cosign_verify_local_image(tmpdir, pubkey):
raise errors.SignatureVerificationError()
# Remove the signatures from the archive.
with open(Path(tmpdir) / "index.json") as f:
index_json = json.load(f)
signature_filename = _get_signature_filename(index_json["manifests"])
index_json["manifests"] = [
manifest
for manifest in index_json["manifests"]
if manifest["annotations"].get("kind") != "dev.cosignproject.cosign/sigs"
]
with open(signature_filename, "rb") as f:
signatures = convert_oci_images_signatures(json.load(f), tmpdir)
image_digest = index_json["manifests"][0].get("digest").replace("sha256:", "")
# Write the new index.json to the temp folder
with open(Path(tmpdir) / "index.json", "w") as f:
json.dump(index_json, f)
with NamedTemporaryFile(suffix=".tar") as temporary_tar:
with tarfile.open(temporary_tar.name, "w") as archive:
# The root is the tmpdir
archive.add(Path(tmpdir) / "index.json", arcname="index.json")
archive.add(Path(tmpdir) / "oci-layout", arcname="oci-layout")
archive.add(Path(tmpdir) / "blobs", arcname="blobs")
runtime.load_image_tarball_file(temporary_tar.name)
runtime.tag_image_by_digest(image_digest, image_name)
store_signatures(signatures, image_digest, pubkey)
return True
def convert_oci_images_signatures(
signatures_manifest: List[Dict], tmpdir: str
) -> List[Dict]:
def _to_cosign_signature(layer: Dict) -> Dict:
signature = layer["annotations"]["dev.cosignproject.cosign/signature"]
bundle = json.loads(layer["annotations"]["dev.sigstore.cosign/bundle"])
payload_body = json.loads(b64decode(bundle["Payload"]["body"]))
payload_location = _get_blob(tmpdir, layer["digest"])
with open(payload_location, "rb") as f:
payload_b64 = b64encode(f.read()).decode()
return {
"Base64Signature": payload_body["spec"]["signature"]["content"],
"Payload": payload_b64,
"Cert": None,
"Chain": None,
"rekorBundle": bundle,
"RFC3161Timestamp": None,
}
return [_to_cosign_signature(layer) for layer in signatures_manifest["layers"]]
def get_file_hash(file: Optional[str] = None, content: Optional[bytes] = None) -> str:
"""Get the sha256 hash of a file or content"""
if not file and not content:
raise errors.UpdaterError("No file or content provided")
if file:
with open(file, "rb") as f:
content = f.read()
if content:
return sha256(content).hexdigest()
return ""
def load_signatures(image_hash: str, pubkey: str) -> List[Dict]:
@ -180,7 +307,9 @@ def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str) -> No
raise errors.InvalidSignatures("Signatures do not share the same image hash")
if f"sha256:{image_hash}" != hashes[0]:
raise errors.SignatureMismatch("Signatures do not match the given image hash")
raise errors.SignatureMismatch(
f"Signatures do not match the given image hash ({image_hash}, {hashes[0]})"
)
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
pubkey_signatures.mkdir(exist_ok=True)
@ -197,23 +326,21 @@ def verify_offline_image_signature(image: str, pubkey: str) -> bool:
Verifies that a local image has a valid signature
"""
log.info(f"Verifying local image {image} against pubkey {pubkey}")
image_hash = load_image_hash(image)
image_hash = runtime.get_local_image_hash(image)
log.debug(f"Image hash: {image_hash}")
signatures = load_signatures(image_hash, pubkey)
if len(signatures) < 1:
raise errors.LocalSignatureNotFound("No signatures found")
for signature in signatures:
if not verify_signature(signature, pubkey):
if not verify_signature(signature, image_hash, pubkey):
msg = f"Unable to verify signature for {image} with pubkey {pubkey}"
raise errors.SignatureVerificationError(msg)
return True
def get_signatures(image: str, hash: str) -> List[Dict]:
"""
Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format.
"""
def get_remote_signatures(image: str, hash: str) -> List[Dict]:
"""Retrieve the signatures from the registry, via `cosign download`."""
utils.ensure_cosign()
process = subprocess.run(
@ -225,4 +352,7 @@ def get_signatures(image: str, hash: str) -> List[Dict]:
# XXX: Check the output first.
# Remove the last return, split on newlines, convert from JSON
signatures_raw = process.stdout.decode("utf-8").strip().split("\n")
return list(map(json.loads, signatures_raw))
signatures = list(map(json.loads, signatures_raw))
if len(signatures) < 1:
raise errors.NoRemoteSignatures("No signatures found for the image")
return signatures