Compare commits

..

1 commit

Author SHA1 Message Date
f36dd56205
Merge f7069a9c16 into 88a6b37770 2025-01-29 18:32:18 +00:00
7 changed files with 50 additions and 428 deletions

View file

@ -116,7 +116,7 @@ def get_expected_tag() -> str:
return f.read().strip() return f.read().strip()
def load_image_tarball_in_memory() -> None: def load_image_tarball() -> None:
log.info("Installing Dangerzone container image...") log.info("Installing Dangerzone container image...")
p = subprocess.Popen( p = subprocess.Popen(
[get_runtime(), "load"], [get_runtime(), "load"],
@ -147,44 +147,6 @@ def load_image_tarball_in_memory() -> None:
log.info("Successfully installed container image from") log.info("Successfully installed container image from")
def load_image_tarball_file(tarball_path: str) -> None:
cmd = [get_runtime(), "load", "-i", tarball_path]
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
log.info("Successfully installed container image from %s", tarball_path)
def tag_image_by_digest(digest: str, tag: str) -> None:
"""Tag a container image by digest.
The sha256: prefix should be omitted from the digest.
"""
image_id = get_image_id_by_digest(digest)
cmd = [get_runtime(), "tag", image_id, tag]
log.debug(" ".join(cmd))
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
def get_image_id_by_digest(digest: str) -> str:
"""Get an image ID from a digest.
The sha256: prefix should be omitted from the digest.
"""
cmd = [
get_runtime(),
"images",
"-f",
f"digest=sha256:{digest}",
"--format",
"{{.Id}}",
]
log.debug(" ".join(cmd))
process = subprocess.run(
cmd, startupinfo=get_subprocess_startupinfo(), check=True, capture_output=True
)
breakpoint()
# In case we have multiple lines, we only want the first one.
return process.stdout.decode().strip().split("\n")[0]
def container_pull(image: str) -> bool: def container_pull(image: str) -> bool:
"""Pull a container image from a registry.""" """Pull a container image from a registry."""
cmd = [get_runtime_name(), "pull", f"{image}"] cmd = [get_runtime_name(), "pull", f"{image}"]
@ -193,10 +155,8 @@ def container_pull(image: str) -> bool:
return process.returncode == 0 return process.returncode == 0
def get_local_image_hash(image: str) -> str: def load_image_hash(image: str) -> str:
""" """Returns a image hash from a local image name"""
Returns a image hash from a local image name
"""
cmd = [get_runtime_name(), "image", "inspect", image, "-f", "{{.Digest}}"] cmd = [get_runtime_name(), "image", "inspect", image, "-f", "{{.Digest}}"]
result = subprocess.run(cmd, capture_output=True, check=True) result = subprocess.run(cmd, capture_output=True, check=True)
return result.stdout.strip().decode().strip("sha256:") return result.stdout.strip().decode().strip("sha256:")

View file

@ -102,7 +102,7 @@ class Container(IsolationProvider):
return True return True
# Load the image tarball into the container runtime. # Load the image tarball into the container runtime.
container_utils.load_image_tarball_in_memory() container_utils.load_image_tarball()
# Check that the container image has the expected image tag. # Check that the container image has the expected image tag.
# See https://github.com/freedomofpress/dangerzone/issues/988 for an example # See https://github.com/freedomofpress/dangerzone/issues/988 for an example

View file

@ -1,189 +0,0 @@
import gzip
import logging
import platform
import shutil
import subprocess
from typing import List, Optional, Tuple
from . import errors
from .util import get_resource_path, get_subprocess_startupinfo
CONTAINER_NAME = "dangerzone.rocks/dangerzone"
log = logging.getLogger(__name__)
def get_runtime_name() -> str:
if platform.system() == "Linux":
return "podman"
# Windows, Darwin, and unknown use docker for now, dangerzone-vm eventually
return "docker"
def get_runtime_version() -> Tuple[int, int]:
"""Get the major/minor parts of the Docker/Podman version.
Some of the operations we perform in this module rely on some Podman features
that are not available across all of our platforms. In order to have a proper
fallback, we need to know the Podman version. More specifically, we're fine with
just knowing the major and minor version, since writing/installing a full-blown
semver parser is an overkill.
"""
# Get the Docker/Podman version, using a Go template.
runtime = get_runtime_name()
if runtime == "podman":
query = "{{.Client.Version}}"
else:
query = "{{.Server.Version}}"
cmd = [runtime, "version", "-f", query]
try:
version = subprocess.run(
cmd,
startupinfo=get_subprocess_startupinfo(),
capture_output=True,
check=True,
).stdout.decode()
except Exception as e:
msg = f"Could not get the version of the {runtime.capitalize()} tool: {e}"
raise RuntimeError(msg) from e
# Parse this version and return the major/minor parts, since we don't need the
# rest.
try:
major, minor, _ = version.split(".", 3)
return (int(major), int(minor))
except Exception as e:
msg = (
f"Could not parse the version of the {runtime.capitalize()} tool"
f" (found: '{version}') due to the following error: {e}"
)
raise RuntimeError(msg)
def get_runtime() -> str:
container_tech = get_runtime_name()
runtime = shutil.which(container_tech)
if runtime is None:
raise errors.NoContainerTechException(container_tech)
return runtime
def list_image_tags() -> List[str]:
"""Get the tags of all loaded Dangerzone images.
This method returns a mapping of image tags to image IDs, for all Dangerzone
images. This can be useful when we want to find which are the local image tags,
and which image ID does the "latest" tag point to.
"""
return (
subprocess.check_output(
[
get_runtime(),
"image",
"list",
"--format",
"{{ .Tag }}",
CONTAINER_NAME,
],
text=True,
startupinfo=get_subprocess_startupinfo(),
)
.strip()
.split()
)
def delete_image_tag(tag: str) -> None:
"""Delete a Dangerzone image tag."""
name = CONTAINER_NAME + ":" + tag
log.warning(f"Deleting old container image: {name}")
try:
subprocess.check_output(
[get_runtime(), "rmi", "--force", name],
startupinfo=get_subprocess_startupinfo(),
)
except Exception as e:
log.warning(
f"Couldn't delete old container image '{name}', so leaving it there."
f" Original error: {e}"
)
def get_expected_tag() -> str:
"""Get the tag of the Dangerzone image tarball from the image-id.txt file."""
with open(get_resource_path("image-id.txt")) as f:
return f.read().strip()
def tag_image_by_digest(digest: str, tag: str) -> None:
image_id = get_image_id_by_digest(digest)
cmd = [get_runtime(), "tag", image_id, tag]
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
def get_image_id_by_digest(digest: str) -> str:
cmd = [
get_runtime(),
"image",
"tag",
"-f",
f'digest="{digest}"',
"--format ",
"{{.Id}}",
]
process = subprocess.run(
cmd, startupinfo=get_subprocess_startupinfo(), check=True, capture_output=True
)
return process.stdout.decode().strip()
def load_image_tarball_in_memory(
compressed_container_path: Optional[str] = None,
) -> None:
if compressed_container_path is None:
compressed_container_path = get_resource_path("container.tar.gz")
log.info("Installing Dangerzone container image...")
p = subprocess.Popen(
[get_runtime(), "load"],
stdin=subprocess.PIPE,
startupinfo=get_subprocess_startupinfo(),
)
chunk_size = 4 << 20
with gzip.open(compressed_container_path) as f:
while True:
chunk = f.read(chunk_size)
if len(chunk) > 0:
if p.stdin:
p.stdin.write(chunk)
else:
break
_, err = p.communicate()
if p.returncode < 0:
if err:
error = err.decode()
else:
error = "No output"
raise errors.ImageInstallationException(
f"Could not install container image: {error}"
)
log.info("Successfully installed container image from")
def load_image_tarball_file(container_path: str) -> None:
cmd = [get_runtime(), "load", "-i", container_path]
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
log.info("Successfully installed container image from %s", container_path)
def container_pull(image: str) -> bool:
# XXX - Move to container_utils.py
cmd = [get_runtime_name(), "pull", f"{image}"]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
process.communicate()
return process.returncode == 0

View file

@ -7,20 +7,16 @@ import click
from ..util import get_resource_path from ..util import get_resource_path
from . import errors, log, registry from . import errors, log, registry
from .attestations import verify_attestation from .attestations import verify_attestation
from .signatures import ( from .signatures import upgrade_container_image, verify_offline_image_signature
upgrade_container_image,
upgrade_container_image_airgapped,
verify_offline_image_signature,
)
DEFAULT_REPOSITORY = "freedomofpress/dangerzone" DEFAULT_REPOSITORY = "freedomofpress/dangerzone"
DEFAULT_IMAGE_NAME = "ghcr.io/freedomofpress/dangerzone"
PUBKEY_DEFAULT_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key") PUBKEY_DEFAULT_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key")
@click.group() @click.group()
@click.option("--debug", is_flag=True) @click.option("--debug", is_flag=True)
def main(debug: bool) -> None: def main(debug=False) -> None:
if debug: if debug:
click.echo("Debug mode enabled") click.echo("Debug mode enabled")
level = logging.DEBUG level = logging.DEBUG
@ -30,9 +26,11 @@ def main(debug: bool) -> None:
@main.command() @main.command()
@click.argument("image") @click.option("--image")
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION) @click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
def upgrade(image: str, pubkey: str) -> None: @click.option("--airgap", is_flag=True)
# XXX Add options to do airgap upgrade
def upgrade(image: str, pubkey: str, airgap: bool) -> None:
"""Upgrade the image to the latest signed version.""" """Upgrade the image to the latest signed version."""
manifest_hash = registry.get_manifest_hash(image) manifest_hash = registry.get_manifest_hash(image)
try: try:
@ -43,20 +41,6 @@ def upgrade(image: str, pubkey: str) -> None:
raise click.Abort() raise click.Abort()
@main.command()
@click.argument("image_filename")
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
@click.option("--image-name", default=DEFAULT_IMAGE_NAME)
def upgrade_airgapped(image_filename: str, pubkey: str, image_name: str) -> None:
"""Upgrade the image to the latest signed version."""
try:
upgrade_container_image_airgapped(image_filename, pubkey, image_name)
click.echo(f"✅ Installed image {image_filename} on the system")
except errors.ImageAlreadyUpToDate as e:
click.echo(f"{e}")
raise click.Abort()
@main.command() @main.command()
@click.argument("image") @click.argument("image")
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION) @click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)

View file

@ -22,10 +22,6 @@ class SignatureVerificationError(SignatureError):
pass pass
class SignatureExtractionError(SignatureError):
pass
class SignaturesFolderDoesNotExist(SignatureError): class SignaturesFolderDoesNotExist(SignatureError):
pass pass

View file

@ -12,6 +12,7 @@ __all__ = [
"list_tags", "list_tags",
"get_manifest", "get_manifest",
"get_attestation", "get_attestation",
"Image",
"parse_image_location", "parse_image_location",
] ]

View file

@ -2,16 +2,15 @@ import json
import platform import platform
import re import re
import subprocess import subprocess
import tarfile from base64 import b64decode
from base64 import b64decode, b64encode
from hashlib import sha256 from hashlib import sha256
from io import BytesIO
from pathlib import Path from pathlib import Path
from tempfile import NamedTemporaryFile, TemporaryDirectory from tempfile import NamedTemporaryFile
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Tuple
from .. import container_utils as runtime from ..container_utils import container_pull, load_image_hash
from . import errors, log, registry, utils from . import errors, log, utils
from .registry import get_manifest_hash
try: try:
import platformdirs import platformdirs
@ -55,40 +54,12 @@ def signature_to_bundle(sig: Dict) -> Dict:
} }
def cosign_verify_local_image(oci_image_folder: str, pubkey: str) -> bool: def verify_signature(signature: dict, pubkey: str) -> bool:
"""Verify the given path against the given public key"""
utils.ensure_cosign()
cmd = [
"cosign",
"verify",
"--key",
pubkey,
"--offline",
"--local-image",
oci_image_folder,
]
log.debug(" ".join(cmd))
result = subprocess.run(cmd, capture_output=True)
if result.returncode == 0:
log.debug("Signature verified")
return True
log.debug("Failed to verify signature", result.stderr)
return False
def verify_signature(signature: dict, image_hash: str, pubkey: str) -> bool:
"""Verify a signature against a given public key""" """Verify a signature against a given public key"""
# XXX - Also verfy the identity/docker-reference field against the expected value
# e.g. ghcr.io/freedomofpress/dangerzone/dangerzone
utils.ensure_cosign() utils.ensure_cosign()
signature_bundle = signature_to_bundle(signature) signature_bundle = signature_to_bundle(signature)
payload_bytes = b64decode(signature_bundle["Payload"])
if json.loads(payload_bytes)["critical"]["type"] != f"sha256:{image_hash}":
raise errors.SignatureMismatch("The signature does not match the image hash")
with ( with (
NamedTemporaryFile(mode="w") as signature_file, NamedTemporaryFile(mode="w") as signature_file,
NamedTemporaryFile(mode="bw") as payload_file, NamedTemporaryFile(mode="bw") as payload_file,
@ -96,6 +67,7 @@ def verify_signature(signature: dict, image_hash: str, pubkey: str) -> bool:
json.dump(signature_bundle, signature_file) json.dump(signature_bundle, signature_file)
signature_file.flush() signature_file.flush()
payload_bytes = b64decode(signature_bundle["Payload"])
payload_file.write(payload_bytes) payload_file.write(payload_bytes)
payload_file.flush() payload_file.flush()
@ -120,146 +92,47 @@ def verify_signature(signature: dict, image_hash: str, pubkey: str) -> bool:
return False return False
def new_image_release(image: str) -> bool: def new_image_release(image) -> bool:
remote_hash = registry.get_manifest_hash(image) remote_hash = get_manifest_hash(image)
local_hash = runtime.get_local_image_hash(image) local_hash = load_image_hash(image)
log.debug("Remote hash: %s", remote_hash) log.debug("Remote hash: %s", remote_hash)
log.debug("Local hash: %s", local_hash) log.debug("Local hash: %s", local_hash)
return remote_hash != local_hash return remote_hash != local_hash
def verify_signatures( def upgrade_container_image(
signatures: List[Dict], image: str,
image_hash: str, manifest_hash: str,
pubkey: str, pubkey: str,
) -> bool: ) -> bool:
for signature in signatures:
if not verify_signature(signature, image_hash, pubkey):
raise errors.SignatureVerificationError()
return True
def upgrade_container_image(image: str, manifest_hash: str, pubkey: str) -> bool:
"""Verify and upgrade the image to the latest, if signed."""
if not new_image_release(image): if not new_image_release(image):
raise errors.ImageAlreadyUpToDate("The image is already up to date") raise errors.ImageAlreadyUpToDate("The image is already up to date")
return False
signatures = get_remote_signatures(image, manifest_hash) signatures = get_signatures(image, manifest_hash)
verify_signatures(signatures, manifest_hash, pubkey) log.debug("Signatures: %s", signatures)
if len(signatures) < 1:
raise errors.NoRemoteSignatures("No remote signatures found")
for signature in signatures:
signature_is_valid = verify_signature(signature, pubkey)
if not signature_is_valid:
raise errors.SignatureVerificationError()
# At this point, the signatures are verified # At this point, the signatures are verified
# We store the signatures just now to avoid storing unverified signatures # We store the signatures just now to avoid storing unverified signatures
store_signatures(signatures, manifest_hash, pubkey) store_signatures(signatures, manifest_hash, pubkey)
# let's upgrade the image # let's upgrade the image
# XXX Use the image digest here to avoid race conditions # XXX Use the hash here to avoid race conditions
return runtime.container_pull(image) return container_pull(image)
def _get_blob(tmpdir: str, hash: str) -> Path: def get_file_hash(file: str) -> str:
return Path(tmpdir) / "blobs" / "sha256" / hash.replace("sha256:", "") with open(file, "rb") as f:
content = f.read()
def upgrade_container_image_airgapped(
container_tar: str, pubkey: str, image_name: str
) -> bool:
"""
Verify the given archive against its self-contained signatures, then
upgrade the image and retag it to the expected tag.
Right now, the archive is extracted and reconstructed, requiring some space
on the filesystem.
"""
# XXX Use a memory buffer instead of the filesystem
with TemporaryDirectory() as tmpdir:
def _get_signature_filename(manifests: List[Dict]) -> Path:
for manifest in manifests:
if (
manifest["annotations"].get("kind")
== "dev.cosignproject.cosign/sigs"
):
return _get_blob(tmpdir, manifest["digest"])
raise errors.SignatureExtractionError()
with tarfile.open(container_tar, "r") as archive:
archive.extractall(tmpdir)
# XXX Check if the contained signatures match the given ones?
# Or maybe store both signatures?
if not cosign_verify_local_image(tmpdir, pubkey):
raise errors.SignatureVerificationError()
# Remove the signatures from the archive.
with open(Path(tmpdir) / "index.json") as f:
index_json = json.load(f)
signature_filename = _get_signature_filename(index_json["manifests"])
index_json["manifests"] = [
manifest
for manifest in index_json["manifests"]
if manifest["annotations"].get("kind") != "dev.cosignproject.cosign/sigs"
]
with open(signature_filename, "rb") as f:
signatures = convert_oci_images_signatures(json.load(f), tmpdir)
image_digest = index_json["manifests"][0].get("digest").replace("sha256:", "")
# Write the new index.json to the temp folder
with open(Path(tmpdir) / "index.json", "w") as f:
json.dump(index_json, f)
with NamedTemporaryFile(suffix=".tar") as temporary_tar:
with tarfile.open(temporary_tar.name, "w") as archive:
# The root is the tmpdir
archive.add(Path(tmpdir) / "index.json", arcname="index.json")
archive.add(Path(tmpdir) / "oci-layout", arcname="oci-layout")
archive.add(Path(tmpdir) / "blobs", arcname="blobs")
runtime.load_image_tarball_file(temporary_tar.name)
runtime.tag_image_by_digest(image_digest, image_name)
store_signatures(signatures, image_digest, pubkey)
return True
def convert_oci_images_signatures(
signatures_manifest: List[Dict], tmpdir: str
) -> List[Dict]:
def _to_cosign_signature(layer: Dict) -> Dict:
signature = layer["annotations"]["dev.cosignproject.cosign/signature"]
bundle = json.loads(layer["annotations"]["dev.sigstore.cosign/bundle"])
payload_body = json.loads(b64decode(bundle["Payload"]["body"]))
payload_location = _get_blob(tmpdir, layer["digest"])
with open(payload_location, "rb") as f:
payload_b64 = b64encode(f.read()).decode()
return {
"Base64Signature": payload_body["spec"]["signature"]["content"],
"Payload": payload_b64,
"Cert": None,
"Chain": None,
"rekorBundle": bundle,
"RFC3161Timestamp": None,
}
return [_to_cosign_signature(layer) for layer in signatures_manifest["layers"]]
def get_file_hash(file: Optional[str] = None, content: Optional[bytes] = None) -> str:
"""Get the sha256 hash of a file or content"""
if not file and not content:
raise errors.UpdaterError("No file or content provided")
if file:
with open(file, "rb") as f:
content = f.read()
if content:
return sha256(content).hexdigest() return sha256(content).hexdigest()
return ""
def load_signatures(image_hash: str, pubkey: str) -> List[Dict]: def load_signatures(image_hash: str, pubkey: str) -> List[Dict]:
@ -307,9 +180,7 @@ def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str) -> No
raise errors.InvalidSignatures("Signatures do not share the same image hash") raise errors.InvalidSignatures("Signatures do not share the same image hash")
if f"sha256:{image_hash}" != hashes[0]: if f"sha256:{image_hash}" != hashes[0]:
raise errors.SignatureMismatch( raise errors.SignatureMismatch("Signatures do not match the given image hash")
f"Signatures do not match the given image hash ({image_hash}, {hashes[0]})"
)
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey) pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
pubkey_signatures.mkdir(exist_ok=True) pubkey_signatures.mkdir(exist_ok=True)
@ -326,21 +197,23 @@ def verify_offline_image_signature(image: str, pubkey: str) -> bool:
Verifies that a local image has a valid signature Verifies that a local image has a valid signature
""" """
log.info(f"Verifying local image {image} against pubkey {pubkey}") log.info(f"Verifying local image {image} against pubkey {pubkey}")
image_hash = runtime.get_local_image_hash(image) image_hash = load_image_hash(image)
log.debug(f"Image hash: {image_hash}") log.debug(f"Image hash: {image_hash}")
signatures = load_signatures(image_hash, pubkey) signatures = load_signatures(image_hash, pubkey)
if len(signatures) < 1: if len(signatures) < 1:
raise errors.LocalSignatureNotFound("No signatures found") raise errors.LocalSignatureNotFound("No signatures found")
for signature in signatures: for signature in signatures:
if not verify_signature(signature, image_hash, pubkey): if not verify_signature(signature, pubkey):
msg = f"Unable to verify signature for {image} with pubkey {pubkey}" msg = f"Unable to verify signature for {image} with pubkey {pubkey}"
raise errors.SignatureVerificationError(msg) raise errors.SignatureVerificationError(msg)
return True return True
def get_remote_signatures(image: str, hash: str) -> List[Dict]: def get_signatures(image: str, hash: str) -> List[Dict]:
"""Retrieve the signatures from the registry, via `cosign download`.""" """
Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format.
"""
utils.ensure_cosign() utils.ensure_cosign()
process = subprocess.run( process = subprocess.run(
@ -352,7 +225,4 @@ def get_remote_signatures(image: str, hash: str) -> List[Dict]:
# XXX: Check the output first. # XXX: Check the output first.
# Remove the last return, split on newlines, convert from JSON # Remove the last return, split on newlines, convert from JSON
signatures_raw = process.stdout.decode("utf-8").strip().split("\n") signatures_raw = process.stdout.decode("utf-8").strip().split("\n")
signatures = list(map(json.loads, signatures_raw)) return list(map(json.loads, signatures_raw))
if len(signatures) < 1:
raise errors.NoRemoteSignatures("No signatures found for the image")
return signatures