mirror of
https://github.com/freedomofpress/dangerzone.git
synced 2025-04-28 18:02:38 +02:00
Allow installation on air-gapped systems
- Verify the archive against the known public signature - Prepare a new archive format (with signature removed) - Load the new image and retag it with the expected tag During this process, the signatures are lost and should instead be converted to a known format. Additionally, the name fo the repository should ideally come from the signatures rather than from the command line.
This commit is contained in:
parent
f7069a9c16
commit
087e5bd1ad
6 changed files with 373 additions and 48 deletions
|
@ -116,7 +116,7 @@ def get_expected_tag() -> str:
|
|||
return f.read().strip()
|
||||
|
||||
|
||||
def load_image_tarball() -> None:
|
||||
def load_image_tarball_in_memory() -> None:
|
||||
log.info("Installing Dangerzone container image...")
|
||||
p = subprocess.Popen(
|
||||
[get_runtime(), "load"],
|
||||
|
@ -147,6 +147,36 @@ def load_image_tarball() -> None:
|
|||
log.info("Successfully installed container image from")
|
||||
|
||||
|
||||
def load_image_tarball_file(tarball_path: str) -> None:
|
||||
cmd = [get_runtime(), "load", "-i", tarball_path]
|
||||
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
|
||||
|
||||
log.info("Successfully installed container image from %s", tarball_path)
|
||||
|
||||
|
||||
def tag_image_by_digest(digest: str, tag: str) -> None:
|
||||
image_id = get_image_id_by_digest(digest)
|
||||
cmd = [get_runtime(), "tag", image_id, tag]
|
||||
log.debug(" ".join(cmd))
|
||||
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
|
||||
|
||||
|
||||
def get_image_id_by_digest(digest: str) -> str:
|
||||
cmd = [
|
||||
get_runtime(),
|
||||
"images",
|
||||
"-f",
|
||||
f"digest={digest}",
|
||||
"--format",
|
||||
"{{.Id}}",
|
||||
]
|
||||
log.debug(" ".join(cmd))
|
||||
process = subprocess.run(
|
||||
cmd, startupinfo=get_subprocess_startupinfo(), check=True, capture_output=True
|
||||
)
|
||||
return process.stdout.decode().strip()
|
||||
|
||||
|
||||
def container_pull(image: str) -> bool:
|
||||
"""Pull a container image from a registry."""
|
||||
cmd = [get_runtime_name(), "pull", f"{image}"]
|
||||
|
@ -155,8 +185,10 @@ def container_pull(image: str) -> bool:
|
|||
return process.returncode == 0
|
||||
|
||||
|
||||
def load_image_hash(image: str) -> str:
|
||||
"""Returns a image hash from a local image name"""
|
||||
def get_local_image_hash(image: str) -> str:
|
||||
"""
|
||||
Returns a image hash from a local image name
|
||||
"""
|
||||
cmd = [get_runtime_name(), "image", "inspect", image, "-f", "{{.Digest}}"]
|
||||
result = subprocess.run(cmd, capture_output=True, check=True)
|
||||
return result.stdout.strip().decode().strip("sha256:")
|
||||
|
|
|
@ -102,7 +102,7 @@ class Container(IsolationProvider):
|
|||
return True
|
||||
|
||||
# Load the image tarball into the container runtime.
|
||||
container_utils.load_image_tarball()
|
||||
container_utils.load_image_tarball_in_memory()
|
||||
|
||||
# Check that the container image has the expected image tag.
|
||||
# See https://github.com/freedomofpress/dangerzone/issues/988 for an example
|
||||
|
|
189
dangerzone/rntime.py
Normal file
189
dangerzone/rntime.py
Normal file
|
@ -0,0 +1,189 @@
|
|||
import gzip
|
||||
import logging
|
||||
import platform
|
||||
import shutil
|
||||
import subprocess
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
from . import errors
|
||||
from .util import get_resource_path, get_subprocess_startupinfo
|
||||
|
||||
CONTAINER_NAME = "dangerzone.rocks/dangerzone"
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_runtime_name() -> str:
|
||||
if platform.system() == "Linux":
|
||||
return "podman"
|
||||
# Windows, Darwin, and unknown use docker for now, dangerzone-vm eventually
|
||||
return "docker"
|
||||
|
||||
|
||||
def get_runtime_version() -> Tuple[int, int]:
|
||||
"""Get the major/minor parts of the Docker/Podman version.
|
||||
|
||||
Some of the operations we perform in this module rely on some Podman features
|
||||
that are not available across all of our platforms. In order to have a proper
|
||||
fallback, we need to know the Podman version. More specifically, we're fine with
|
||||
just knowing the major and minor version, since writing/installing a full-blown
|
||||
semver parser is an overkill.
|
||||
"""
|
||||
# Get the Docker/Podman version, using a Go template.
|
||||
runtime = get_runtime_name()
|
||||
if runtime == "podman":
|
||||
query = "{{.Client.Version}}"
|
||||
else:
|
||||
query = "{{.Server.Version}}"
|
||||
|
||||
cmd = [runtime, "version", "-f", query]
|
||||
try:
|
||||
version = subprocess.run(
|
||||
cmd,
|
||||
startupinfo=get_subprocess_startupinfo(),
|
||||
capture_output=True,
|
||||
check=True,
|
||||
).stdout.decode()
|
||||
except Exception as e:
|
||||
msg = f"Could not get the version of the {runtime.capitalize()} tool: {e}"
|
||||
raise RuntimeError(msg) from e
|
||||
|
||||
# Parse this version and return the major/minor parts, since we don't need the
|
||||
# rest.
|
||||
try:
|
||||
major, minor, _ = version.split(".", 3)
|
||||
return (int(major), int(minor))
|
||||
except Exception as e:
|
||||
msg = (
|
||||
f"Could not parse the version of the {runtime.capitalize()} tool"
|
||||
f" (found: '{version}') due to the following error: {e}"
|
||||
)
|
||||
raise RuntimeError(msg)
|
||||
|
||||
|
||||
def get_runtime() -> str:
|
||||
container_tech = get_runtime_name()
|
||||
runtime = shutil.which(container_tech)
|
||||
if runtime is None:
|
||||
raise errors.NoContainerTechException(container_tech)
|
||||
return runtime
|
||||
|
||||
|
||||
def list_image_tags() -> List[str]:
|
||||
"""Get the tags of all loaded Dangerzone images.
|
||||
|
||||
This method returns a mapping of image tags to image IDs, for all Dangerzone
|
||||
images. This can be useful when we want to find which are the local image tags,
|
||||
and which image ID does the "latest" tag point to.
|
||||
"""
|
||||
return (
|
||||
subprocess.check_output(
|
||||
[
|
||||
get_runtime(),
|
||||
"image",
|
||||
"list",
|
||||
"--format",
|
||||
"{{ .Tag }}",
|
||||
CONTAINER_NAME,
|
||||
],
|
||||
text=True,
|
||||
startupinfo=get_subprocess_startupinfo(),
|
||||
)
|
||||
.strip()
|
||||
.split()
|
||||
)
|
||||
|
||||
|
||||
def delete_image_tag(tag: str) -> None:
|
||||
"""Delete a Dangerzone image tag."""
|
||||
name = CONTAINER_NAME + ":" + tag
|
||||
log.warning(f"Deleting old container image: {name}")
|
||||
try:
|
||||
subprocess.check_output(
|
||||
[get_runtime(), "rmi", "--force", name],
|
||||
startupinfo=get_subprocess_startupinfo(),
|
||||
)
|
||||
except Exception as e:
|
||||
log.warning(
|
||||
f"Couldn't delete old container image '{name}', so leaving it there."
|
||||
f" Original error: {e}"
|
||||
)
|
||||
|
||||
|
||||
def get_expected_tag() -> str:
|
||||
"""Get the tag of the Dangerzone image tarball from the image-id.txt file."""
|
||||
with open(get_resource_path("image-id.txt")) as f:
|
||||
return f.read().strip()
|
||||
|
||||
|
||||
def tag_image_by_digest(digest: str, tag: str) -> None:
|
||||
image_id = get_image_id_by_digest(digest)
|
||||
cmd = [get_runtime(), "tag", image_id, tag]
|
||||
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
|
||||
|
||||
|
||||
def get_image_id_by_digest(digest: str) -> str:
|
||||
cmd = [
|
||||
get_runtime(),
|
||||
"image",
|
||||
"tag",
|
||||
"-f",
|
||||
f'digest="{digest}"',
|
||||
"--format ",
|
||||
"{{.Id}}",
|
||||
]
|
||||
process = subprocess.run(
|
||||
cmd, startupinfo=get_subprocess_startupinfo(), check=True, capture_output=True
|
||||
)
|
||||
return process.stdout.decode().strip()
|
||||
|
||||
|
||||
def load_image_tarball_in_memory(
|
||||
compressed_container_path: Optional[str] = None,
|
||||
) -> None:
|
||||
if compressed_container_path is None:
|
||||
compressed_container_path = get_resource_path("container.tar.gz")
|
||||
|
||||
log.info("Installing Dangerzone container image...")
|
||||
p = subprocess.Popen(
|
||||
[get_runtime(), "load"],
|
||||
stdin=subprocess.PIPE,
|
||||
startupinfo=get_subprocess_startupinfo(),
|
||||
)
|
||||
|
||||
chunk_size = 4 << 20
|
||||
|
||||
with gzip.open(compressed_container_path) as f:
|
||||
while True:
|
||||
chunk = f.read(chunk_size)
|
||||
if len(chunk) > 0:
|
||||
if p.stdin:
|
||||
p.stdin.write(chunk)
|
||||
else:
|
||||
break
|
||||
_, err = p.communicate()
|
||||
if p.returncode < 0:
|
||||
if err:
|
||||
error = err.decode()
|
||||
else:
|
||||
error = "No output"
|
||||
raise errors.ImageInstallationException(
|
||||
f"Could not install container image: {error}"
|
||||
)
|
||||
|
||||
log.info("Successfully installed container image from")
|
||||
|
||||
|
||||
def load_image_tarball_file(container_path: str) -> None:
|
||||
cmd = [get_runtime(), "load", "-i", container_path]
|
||||
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
|
||||
|
||||
log.info("Successfully installed container image from %s", container_path)
|
||||
|
||||
|
||||
def container_pull(image: str) -> bool:
|
||||
# XXX - Move to container_utils.py
|
||||
cmd = [get_runtime_name(), "pull", f"{image}"]
|
||||
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
|
||||
process.communicate()
|
||||
return process.returncode == 0
|
|
@ -7,16 +7,20 @@ import click
|
|||
from ..util import get_resource_path
|
||||
from . import errors, log, registry
|
||||
from .attestations import verify_attestation
|
||||
from .signatures import upgrade_container_image, verify_offline_image_signature
|
||||
from .signatures import (
|
||||
upgrade_container_image,
|
||||
upgrade_container_image_airgapped,
|
||||
verify_offline_image_signature,
|
||||
)
|
||||
|
||||
DEFAULT_REPOSITORY = "freedomofpress/dangerzone"
|
||||
|
||||
DEFAULT_IMAGE_NAME = "ghcr.io/freedomofpress/dangerzone"
|
||||
PUBKEY_DEFAULT_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key")
|
||||
|
||||
|
||||
@click.group()
|
||||
@click.option("--debug", is_flag=True)
|
||||
def main(debug=False) -> None:
|
||||
def main(debug: bool) -> None:
|
||||
if debug:
|
||||
click.echo("Debug mode enabled")
|
||||
level = logging.DEBUG
|
||||
|
@ -26,11 +30,9 @@ def main(debug=False) -> None:
|
|||
|
||||
|
||||
@main.command()
|
||||
@click.option("--image")
|
||||
@click.argument("image")
|
||||
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
|
||||
@click.option("--airgap", is_flag=True)
|
||||
# XXX Add options to do airgap upgrade
|
||||
def upgrade(image: str, pubkey: str, airgap: bool) -> None:
|
||||
def upgrade(image: str, pubkey: str) -> None:
|
||||
"""Upgrade the image to the latest signed version."""
|
||||
manifest_hash = registry.get_manifest_hash(image)
|
||||
try:
|
||||
|
@ -41,6 +43,20 @@ def upgrade(image: str, pubkey: str, airgap: bool) -> None:
|
|||
raise click.Abort()
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.argument("image_filename")
|
||||
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
|
||||
@click.option("--image-name", default=DEFAULT_IMAGE_NAME)
|
||||
def upgrade_airgapped(image_filename: str, pubkey: str, image_name: str) -> None:
|
||||
"""Upgrade the image to the latest signed version."""
|
||||
try:
|
||||
upgrade_container_image_airgapped(image_filename, pubkey, image_name)
|
||||
click.echo(f"✅ Installed image {image_filename} on the system")
|
||||
except errors.ImageAlreadyUpToDate as e:
|
||||
click.echo(f"✅ {e}")
|
||||
raise click.Abort()
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.argument("image")
|
||||
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
|
||||
|
|
|
@ -12,7 +12,6 @@ __all__ = [
|
|||
"list_tags",
|
||||
"get_manifest",
|
||||
"get_attestation",
|
||||
"Image",
|
||||
"parse_image_location",
|
||||
]
|
||||
|
||||
|
|
|
@ -2,15 +2,16 @@ import json
|
|||
import platform
|
||||
import re
|
||||
import subprocess
|
||||
import tarfile
|
||||
from base64 import b64decode
|
||||
from hashlib import sha256
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import Dict, List, Tuple
|
||||
from tempfile import NamedTemporaryFile, TemporaryDirectory
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
from ..container_utils import container_pull, load_image_hash
|
||||
from . import errors, log, utils
|
||||
from .registry import get_manifest_hash
|
||||
from .. import container_utils as runtime
|
||||
from . import errors, log, registry, utils
|
||||
|
||||
try:
|
||||
import platformdirs
|
||||
|
@ -54,12 +55,40 @@ def signature_to_bundle(sig: Dict) -> Dict:
|
|||
}
|
||||
|
||||
|
||||
def verify_signature(signature: dict, pubkey: str) -> bool:
|
||||
def cosign_verify_local_image(oci_image_folder: str, pubkey: str) -> bool:
|
||||
"""Verify the given path against the given public key"""
|
||||
|
||||
utils.ensure_cosign()
|
||||
cmd = [
|
||||
"cosign",
|
||||
"verify",
|
||||
"--key",
|
||||
pubkey,
|
||||
"--offline",
|
||||
"--local-image",
|
||||
oci_image_folder,
|
||||
]
|
||||
log.debug(" ".join(cmd))
|
||||
result = subprocess.run(cmd, capture_output=True)
|
||||
if result.returncode == 0:
|
||||
log.debug("Signature verified")
|
||||
return True
|
||||
log.debug("Failed to verify signature", result.stderr)
|
||||
return False
|
||||
|
||||
|
||||
def verify_signature(signature: dict, image_hash: str, pubkey: str) -> bool:
|
||||
"""Verify a signature against a given public key"""
|
||||
# XXX - Also verfy the identity/docker-reference field against the expected value
|
||||
# e.g. ghcr.io/freedomofpress/dangerzone/dangerzone
|
||||
|
||||
utils.ensure_cosign()
|
||||
signature_bundle = signature_to_bundle(signature)
|
||||
|
||||
payload_bytes = b64decode(signature_bundle["Payload"])
|
||||
if json.loads(payload_bytes)["critical"]["type"] != f"sha256:{image_hash}":
|
||||
raise errors.SignatureMismatch("The signature does not match the image hash")
|
||||
|
||||
with (
|
||||
NamedTemporaryFile(mode="w") as signature_file,
|
||||
NamedTemporaryFile(mode="bw") as payload_file,
|
||||
|
@ -67,7 +96,6 @@ def verify_signature(signature: dict, pubkey: str) -> bool:
|
|||
json.dump(signature_bundle, signature_file)
|
||||
signature_file.flush()
|
||||
|
||||
payload_bytes = b64decode(signature_bundle["Payload"])
|
||||
payload_file.write(payload_bytes)
|
||||
payload_file.flush()
|
||||
|
||||
|
@ -92,47 +120,107 @@ def verify_signature(signature: dict, pubkey: str) -> bool:
|
|||
return False
|
||||
|
||||
|
||||
def new_image_release(image) -> bool:
|
||||
remote_hash = get_manifest_hash(image)
|
||||
local_hash = load_image_hash(image)
|
||||
def new_image_release(image: str) -> bool:
|
||||
remote_hash = registry.get_manifest_hash(image)
|
||||
local_hash = runtime.get_local_image_hash(image)
|
||||
log.debug("Remote hash: %s", remote_hash)
|
||||
log.debug("Local hash: %s", local_hash)
|
||||
return remote_hash != local_hash
|
||||
|
||||
|
||||
def upgrade_container_image(
|
||||
image: str,
|
||||
manifest_hash: str,
|
||||
def verify_signatures(
|
||||
signatures: List[Dict],
|
||||
image_hash: str,
|
||||
pubkey: str,
|
||||
) -> bool:
|
||||
for signature in signatures:
|
||||
if not verify_signature(signature, image_hash, pubkey):
|
||||
raise errors.SignatureVerificationError()
|
||||
return True
|
||||
|
||||
|
||||
def upgrade_container_image(image: str, manifest_hash: str, pubkey: str) -> bool:
|
||||
"""Verify and upgrade the image to the latest, if signed."""
|
||||
if not new_image_release(image):
|
||||
raise errors.ImageAlreadyUpToDate("The image is already up to date")
|
||||
return False
|
||||
|
||||
signatures = get_signatures(image, manifest_hash)
|
||||
log.debug("Signatures: %s", signatures)
|
||||
|
||||
if len(signatures) < 1:
|
||||
raise errors.NoRemoteSignatures("No remote signatures found")
|
||||
|
||||
for signature in signatures:
|
||||
signature_is_valid = verify_signature(signature, pubkey)
|
||||
if not signature_is_valid:
|
||||
raise errors.SignatureVerificationError()
|
||||
signatures = get_remote_signatures(image, manifest_hash)
|
||||
verify_signatures(signatures, manifest_hash, pubkey)
|
||||
|
||||
# At this point, the signatures are verified
|
||||
# We store the signatures just now to avoid storing unverified signatures
|
||||
store_signatures(signatures, manifest_hash, pubkey)
|
||||
|
||||
# let's upgrade the image
|
||||
# XXX Use the hash here to avoid race conditions
|
||||
return container_pull(image)
|
||||
# XXX Use the image digest here to avoid race conditions
|
||||
return runtime.container_pull(image)
|
||||
|
||||
|
||||
def get_file_hash(file: str) -> str:
|
||||
def upgrade_container_image_airgapped(
|
||||
container_tar: str, pubkey: str, image_name: str
|
||||
) -> bool:
|
||||
"""
|
||||
Verify the given archive against its self-contained signatures, then
|
||||
upgrade the image and retag it to the expected tag.
|
||||
|
||||
Right now, the archive is extracted and reconstructed, requiring some space
|
||||
on the filesystem.
|
||||
"""
|
||||
# XXX Use a memory buffer instead of the filesystem
|
||||
with TemporaryDirectory() as tmpdir:
|
||||
with tarfile.open(container_tar, "r") as archive:
|
||||
archive.extractall(tmpdir)
|
||||
|
||||
# XXX Check if the contained signatures match the given ones?
|
||||
# Or maybe store both signatures?
|
||||
if not cosign_verify_local_image(tmpdir, pubkey):
|
||||
raise errors.SignatureVerificationError()
|
||||
|
||||
# Remove the signatures from the archive.
|
||||
with open(Path(tmpdir) / "index.json") as f:
|
||||
index_json = json.load(f)
|
||||
index_json["manifests"] = [
|
||||
manifest
|
||||
for manifest in index_json["manifests"]
|
||||
if manifest["annotations"].get("kind")
|
||||
!= "dev.cosignproject.cosign/sigs"
|
||||
]
|
||||
|
||||
image_digest = index_json["manifests"][0].get("digest")
|
||||
|
||||
# Write the new index.json to the temp folder
|
||||
with open(Path(tmpdir) / "index.json", "w") as f:
|
||||
json.dump(index_json, f)
|
||||
|
||||
with NamedTemporaryFile(suffix=".tar") as temporary_tar:
|
||||
with tarfile.open(temporary_tar.name, "w") as archive:
|
||||
# The root is the tmpdir
|
||||
archive.add(Path(tmpdir) / "index.json", arcname="index.json")
|
||||
archive.add(Path(tmpdir) / "oci-layout", arcname="oci-layout")
|
||||
archive.add(Path(tmpdir) / "blobs", arcname="blobs")
|
||||
|
||||
runtime.load_image_tarball_file(temporary_tar.name)
|
||||
runtime.tag_image_by_digest(image_digest, image_name)
|
||||
|
||||
# XXX Convert the signatures to the expected format
|
||||
|
||||
# At this point, the signatures are verified
|
||||
# We store the signatures just now to avoid storing unverified signatures
|
||||
# store_signatures(signatures, image_hash, pubkey)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def get_file_hash(file: Optional[str] = None, content: Optional[bytes] = None) -> str:
|
||||
"""Get the sha256 hash of a file or content"""
|
||||
if not file and not content:
|
||||
raise errors.UpdaterError("No file or content provided")
|
||||
if file:
|
||||
with open(file, "rb") as f:
|
||||
content = f.read()
|
||||
if content:
|
||||
return sha256(content).hexdigest()
|
||||
return ""
|
||||
|
||||
|
||||
def load_signatures(image_hash: str, pubkey: str) -> List[Dict]:
|
||||
|
@ -197,23 +285,21 @@ def verify_offline_image_signature(image: str, pubkey: str) -> bool:
|
|||
Verifies that a local image has a valid signature
|
||||
"""
|
||||
log.info(f"Verifying local image {image} against pubkey {pubkey}")
|
||||
image_hash = load_image_hash(image)
|
||||
image_hash = runtime.get_local_image_hash(image)
|
||||
log.debug(f"Image hash: {image_hash}")
|
||||
signatures = load_signatures(image_hash, pubkey)
|
||||
if len(signatures) < 1:
|
||||
raise errors.LocalSignatureNotFound("No signatures found")
|
||||
|
||||
for signature in signatures:
|
||||
if not verify_signature(signature, pubkey):
|
||||
if not verify_signature(signature, image_hash, pubkey):
|
||||
msg = f"Unable to verify signature for {image} with pubkey {pubkey}"
|
||||
raise errors.SignatureVerificationError(msg)
|
||||
return True
|
||||
|
||||
|
||||
def get_signatures(image: str, hash: str) -> List[Dict]:
|
||||
"""
|
||||
Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format.
|
||||
"""
|
||||
def get_remote_signatures(image: str, hash: str) -> List[Dict]:
|
||||
"""Retrieve the signatures from the registry, via `cosign download`."""
|
||||
utils.ensure_cosign()
|
||||
|
||||
process = subprocess.run(
|
||||
|
@ -225,4 +311,7 @@ def get_signatures(image: str, hash: str) -> List[Dict]:
|
|||
# XXX: Check the output first.
|
||||
# Remove the last return, split on newlines, convert from JSON
|
||||
signatures_raw = process.stdout.decode("utf-8").strip().split("\n")
|
||||
return list(map(json.loads, signatures_raw))
|
||||
signatures = list(map(json.loads, signatures_raw))
|
||||
if len(signatures) < 1:
|
||||
raise errors.NoRemoteSignatures("No signatures found for the image")
|
||||
return signatures
|
||||
|
|
Loading…
Reference in a new issue