This commit is contained in:
Alexis Métaireau 2025-02-04 15:19:24 +00:00 committed by GitHub
commit 223b061d27
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 1241 additions and 9 deletions

View file

@ -0,0 +1,57 @@
name: Release container image
on:
push:
tags:
- "container-image/**"
branches:
- "test/image-**"
workflow_dispatch:
permissions:
id-token: write
packages: write
contents: read
attestations: write
env:
REGISTRY: ghcr.io/${{ github.repository_owner }}
REGISTRY_USER: ${{ github.actor }}
REGISTRY_PASSWORD: ${{ github.token }}
IMAGE_NAME: dangerzone/dangerzone
jobs:
build-container-image:
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Login to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: USERNAME
password: ${{ github.token }}
- name: Build and push the dangerzone image
id: build-image
run: |
sudo apt-get install -y python3-poetry
python3 ./install/common/build-image.py
echo ${{ github.token }} | podman login ghcr.io -u USERNAME --password-stdin
# Load the image with the final name directly
gunzip -c share/container.tar.gz | podman load
FINAL_IMAGE_NAME="${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}"
TAG=$(git describe --long --first-parent | tail -c +2)
podman tag dangerzone.rocks/dangerzone:$TAG "$FINAL_IMAGE_NAME"
podman push "$FINAL_IMAGE_NAME" --digestfile=digest
echo "digest=$(cat digest)" >> "$GITHUB_OUTPUT"
- name: Generate artifact attestation
uses: actions/attest-build-provenance@v1
with:
subject-name: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
subject-digest: "${{ steps.build-image.outputs.digest }}"
push-to-registry: true

View file

@ -3,7 +3,7 @@ import logging
import platform
import shutil
import subprocess
from typing import List, Tuple
from typing import List, Optional, Tuple
from . import errors
from .util import get_resource_path, get_subprocess_startupinfo
@ -15,11 +15,9 @@ log = logging.getLogger(__name__)
def get_runtime_name() -> str:
if platform.system() == "Linux":
runtime_name = "podman"
else:
# Windows, Darwin, and unknown use docker for now, dangerzone-vm eventually
runtime_name = "docker"
return runtime_name
return "podman"
# Windows, Darwin, and unknown use docker for now, dangerzone-vm eventually
return "docker"
def get_runtime_version() -> Tuple[int, int]:
@ -118,7 +116,7 @@ def get_expected_tag() -> str:
return f.read().strip()
def load_image_tarball() -> None:
def load_image_tarball_in_memory() -> None:
log.info("Installing Dangerzone container image...")
p = subprocess.Popen(
[get_runtime(), "load"],
@ -147,3 +145,61 @@ def load_image_tarball() -> None:
)
log.info("Successfully installed container image from")
def load_image_tarball_file(tarball_path: str) -> None:
cmd = [get_runtime(), "load", "-i", tarball_path]
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
log.info("Successfully installed container image from %s", tarball_path)
def tag_image_by_digest(digest: str, tag: str) -> None:
"""Tag a container image by digest.
The sha256: prefix should be omitted from the digest.
"""
image_id = get_image_id_by_digest(digest)
cmd = [get_runtime(), "tag", image_id, tag]
log.debug(" ".join(cmd))
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
def get_image_id_by_digest(digest: str) -> str:
"""Get an image ID from a digest.
The sha256: prefix should be omitted from the digest.
"""
cmd = [
get_runtime(),
"images",
"-f",
f"digest=sha256:{digest}",
"--format",
"{{.Id}}",
]
log.debug(" ".join(cmd))
process = subprocess.run(
cmd, startupinfo=get_subprocess_startupinfo(), check=True, capture_output=True
)
# In case we have multiple lines, we only want the first one.
return process.stdout.decode().strip().split("\n")[0]
def container_pull(image: str) -> bool:
"""Pull a container image from a registry."""
cmd = [get_runtime_name(), "pull", f"{image}"]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
process.communicate()
return process.returncode == 0
def get_local_image_hash(image: str) -> Optional[str]:
"""
Returns a image hash from a local image name
"""
cmd = [get_runtime_name(), "image", "inspect", image, "-f", "{{.Digest}}"]
try:
result = subprocess.run(cmd, capture_output=True, check=True)
except subprocess.CalledProcessError as e:
return None
else:
return result.stdout.strip().decode().strip("sha256:")

View file

@ -102,7 +102,7 @@ class Container(IsolationProvider):
return True
# Load the image tarball into the container runtime.
container_utils.load_image_tarball()
container_utils.load_image_tarball_in_memory()
# Check that the container image has the expected image tag.
# See https://github.com/freedomofpress/dangerzone/issues/988 for an example

189
dangerzone/rntime.py Normal file
View file

@ -0,0 +1,189 @@
import gzip
import logging
import platform
import shutil
import subprocess
from typing import List, Optional, Tuple
from . import errors
from .util import get_resource_path, get_subprocess_startupinfo
CONTAINER_NAME = "dangerzone.rocks/dangerzone"
log = logging.getLogger(__name__)
def get_runtime_name() -> str:
if platform.system() == "Linux":
return "podman"
# Windows, Darwin, and unknown use docker for now, dangerzone-vm eventually
return "docker"
def get_runtime_version() -> Tuple[int, int]:
"""Get the major/minor parts of the Docker/Podman version.
Some of the operations we perform in this module rely on some Podman features
that are not available across all of our platforms. In order to have a proper
fallback, we need to know the Podman version. More specifically, we're fine with
just knowing the major and minor version, since writing/installing a full-blown
semver parser is an overkill.
"""
# Get the Docker/Podman version, using a Go template.
runtime = get_runtime_name()
if runtime == "podman":
query = "{{.Client.Version}}"
else:
query = "{{.Server.Version}}"
cmd = [runtime, "version", "-f", query]
try:
version = subprocess.run(
cmd,
startupinfo=get_subprocess_startupinfo(),
capture_output=True,
check=True,
).stdout.decode()
except Exception as e:
msg = f"Could not get the version of the {runtime.capitalize()} tool: {e}"
raise RuntimeError(msg) from e
# Parse this version and return the major/minor parts, since we don't need the
# rest.
try:
major, minor, _ = version.split(".", 3)
return (int(major), int(minor))
except Exception as e:
msg = (
f"Could not parse the version of the {runtime.capitalize()} tool"
f" (found: '{version}') due to the following error: {e}"
)
raise RuntimeError(msg)
def get_runtime() -> str:
container_tech = get_runtime_name()
runtime = shutil.which(container_tech)
if runtime is None:
raise errors.NoContainerTechException(container_tech)
return runtime
def list_image_tags() -> List[str]:
"""Get the tags of all loaded Dangerzone images.
This method returns a mapping of image tags to image IDs, for all Dangerzone
images. This can be useful when we want to find which are the local image tags,
and which image ID does the "latest" tag point to.
"""
return (
subprocess.check_output(
[
get_runtime(),
"image",
"list",
"--format",
"{{ .Tag }}",
CONTAINER_NAME,
],
text=True,
startupinfo=get_subprocess_startupinfo(),
)
.strip()
.split()
)
def delete_image_tag(tag: str) -> None:
"""Delete a Dangerzone image tag."""
name = CONTAINER_NAME + ":" + tag
log.warning(f"Deleting old container image: {name}")
try:
subprocess.check_output(
[get_runtime(), "rmi", "--force", name],
startupinfo=get_subprocess_startupinfo(),
)
except Exception as e:
log.warning(
f"Couldn't delete old container image '{name}', so leaving it there."
f" Original error: {e}"
)
def get_expected_tag() -> str:
"""Get the tag of the Dangerzone image tarball from the image-id.txt file."""
with open(get_resource_path("image-id.txt")) as f:
return f.read().strip()
def tag_image_by_digest(digest: str, tag: str) -> None:
image_id = get_image_id_by_digest(digest)
cmd = [get_runtime(), "tag", image_id, tag]
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
def get_image_id_by_digest(digest: str) -> str:
cmd = [
get_runtime(),
"image",
"tag",
"-f",
f'digest="{digest}"',
"--format ",
"{{.Id}}",
]
process = subprocess.run(
cmd, startupinfo=get_subprocess_startupinfo(), check=True, capture_output=True
)
return process.stdout.decode().strip()
def load_image_tarball_in_memory(
compressed_container_path: Optional[str] = None,
) -> None:
if compressed_container_path is None:
compressed_container_path = get_resource_path("container.tar.gz")
log.info("Installing Dangerzone container image...")
p = subprocess.Popen(
[get_runtime(), "load"],
stdin=subprocess.PIPE,
startupinfo=get_subprocess_startupinfo(),
)
chunk_size = 4 << 20
with gzip.open(compressed_container_path) as f:
while True:
chunk = f.read(chunk_size)
if len(chunk) > 0:
if p.stdin:
p.stdin.write(chunk)
else:
break
_, err = p.communicate()
if p.returncode < 0:
if err:
error = err.decode()
else:
error = "No output"
raise errors.ImageInstallationException(
f"Could not install container image: {error}"
)
log.info("Successfully installed container image from")
def load_image_tarball_file(container_path: str) -> None:
cmd = [get_runtime(), "load", "-i", container_path]
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
log.info("Successfully installed container image from %s", container_path)
def container_pull(image: str) -> bool:
# XXX - Move to container_utils.py
cmd = [get_runtime_name(), "pull", f"{image}"]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
process.communicate()
return process.returncode == 0

View file

@ -0,0 +1,3 @@
import logging
log = logging.getLogger(__name__)

View file

@ -0,0 +1,44 @@
import subprocess
from tempfile import NamedTemporaryFile
from . import cosign
def verify(
manifest: bytes, attestation_bundle: bytes, image_tag: str, expected_repo: str
) -> bool:
"""
Look up the image attestation to see if the image has been built
on Github runners, and from a given repository.
"""
cosign.ensure_installed()
# Put the value in files and verify with cosign
with (
NamedTemporaryFile(mode="wb") as manifest_json,
NamedTemporaryFile(mode="wb") as attestation_bundle_json,
):
manifest_json.write(manifest)
manifest_json.flush()
attestation_bundle_json.write(attestation_bundle)
attestation_bundle_json.flush()
# Call cosign with the temporary file paths
cmd = [
"cosign",
"verify-blob-attestation",
"--bundle",
attestation_bundle_json.name,
"--new-bundle-format",
"--certificate-oidc-issuer",
"https://token.actions.githubusercontent.com",
"--certificate-identity-regexp",
f"^https://github.com/{expected_repo}/.github/workflows/release-container-image.yml@refs/heads/test/image-publication-cosign",
manifest_json.name,
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
error = result.stderr.decode()
raise Exception(f"Attestation cannot be verified. {error}")
return True

132
dangerzone/updater/cli.py Normal file
View file

@ -0,0 +1,132 @@
#!/usr/bin/python
import logging
import click
from ..util import get_resource_path
from . import attestations, errors, log, registry, signatures
DEFAULT_REPOSITORY = "freedomofpress/dangerzone"
DEFAULT_IMAGE_NAME = "ghcr.io/freedomofpress/dangerzone/dangerzone"
PUBKEY_DEFAULT_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key")
@click.group()
@click.option("--debug", is_flag=True)
def main(debug: bool) -> None:
if debug:
click.echo("Debug mode enabled")
level = logging.DEBUG
else:
level = logging.INFO
logging.basicConfig(level=level)
@main.command()
@click.argument("image", default=DEFAULT_IMAGE_NAME)
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
def upgrade(image: str, pubkey: str) -> None:
"""Upgrade the image to the latest signed version."""
manifest_hash = registry.get_manifest_hash(image)
try:
is_upgraded = signatures.upgrade_container_image(image, manifest_hash, pubkey)
if is_upgraded:
click.echo(f"✅ The local image {image} has been upgraded")
click.echo(f"✅ The image has been signed with {pubkey}")
click.echo(f"✅ Signatures has been verified and stored locally")
except errors.ImageAlreadyUpToDate as e:
click.echo(f"{e}")
raise click.Abort()
@main.command()
@click.argument("image_filename")
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
def load_archive(image_filename: str, pubkey: str) -> None:
"""Upgrade the local image to the one in the archive."""
try:
loaded_image = signatures.upgrade_container_image_airgapped(
image_filename, pubkey
)
click.echo(
f"✅ Installed image {image_filename} on the system as {loaded_image}"
)
except errors.ImageAlreadyUpToDate as e:
click.echo(f"{e}")
raise click.Abort()
@main.command()
@click.argument("image")
@click.option("--output", default="dangerzone-airgapped.tar")
def prepare_archive(image: str, output: str) -> None:
"""Prepare an archive to upgrade the dangerzone image on an airgapped environment."""
signatures.prepare_airgapped_archive(image, output)
click.echo(f"✅ Archive {output} created")
@main.command()
@click.argument("image", default=DEFAULT_IMAGE_NAME)
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
def verify_local(image: str, pubkey: str) -> None:
"""
Verify the local image signature against a public key and the stored signatures.
"""
# XXX remove a potentiel :tag
if signatures.verify_local_image(image, pubkey):
click.echo(
(
f"Verifying the local image:\n\n"
f"pubkey: {pubkey}\n"
f"image: {image}\n\n"
f"✅ The local image {image} has been signed with {pubkey}"
)
)
@main.command()
@click.argument("image")
def list_remote_tags(image: str) -> None:
"""List the tags available for a given image."""
click.echo(f"Existing tags for {image}")
for tag in registry.list_tags(image):
click.echo(tag)
@main.command()
@click.argument("image")
def get_manifest(image: str) -> None:
"""Retrieves a remove manifest for a given image and displays it."""
click.echo(registry.get_manifest(image))
@main.command()
@click.argument("image")
@click.option(
"--repository",
default=DEFAULT_REPOSITORY,
help="The github repository to check the attestation for",
)
def attest_provenance(image: str, repository: str) -> None:
"""
Look up the image attestation to see if the image has been built
on Github runners, and from a given repository.
"""
# XXX put this inside a module
# if shutil.which("cosign") is None:
# click.echo("The cosign binary is needed but not installed.")
# raise click.Abort()
parsed = registry.parse_image_location(image)
manifest, bundle = registry.get_attestation(image)
verified = attestations.verify(manifest, bundle, parsed.tag, repository)
if verified:
click.echo(
f"🎉 The image available at `{parsed.full_name}` has been built by Github Runners from the `{repository}` repository"
)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,32 @@
import subprocess
from . import errors, log
def ensure_installed() -> None:
try:
subprocess.run(["cosign", "version"], capture_output=True, check=True)
except subprocess.CalledProcessError:
raise errors.CosignNotInstalledError()
def verify_local_image(oci_image_folder: str, pubkey: str) -> bool:
"""Verify the given path against the given public key"""
ensure_installed()
cmd = [
"cosign",
"verify",
"--key",
pubkey,
"--offline",
"--local-image",
oci_image_folder,
]
log.debug(" ".join(cmd))
result = subprocess.run(cmd, capture_output=True)
if result.returncode == 0:
log.info("Signature verified")
return True
log.info("Failed to verify signature", result.stderr)
return False

View file

@ -0,0 +1,54 @@
class UpdaterError(Exception):
pass
class ImageAlreadyUpToDate(UpdaterError):
pass
class ImageNotFound(UpdaterError):
pass
class SignatureError(UpdaterError):
pass
class RegistryError(UpdaterError):
pass
class AirgappedImageDownloadError(UpdaterError):
pass
class NoRemoteSignatures(SignatureError):
pass
class SignatureVerificationError(SignatureError):
pass
class SignatureExtractionError(SignatureError):
pass
class SignaturesFolderDoesNotExist(SignatureError):
pass
class InvalidSignatures(SignatureError):
pass
class SignatureMismatch(SignatureError):
pass
class LocalSignatureNotFound(SignatureError):
pass
class CosignNotInstalledError(SignatureError):
pass

View file

@ -0,0 +1,218 @@
import hashlib
import re
from collections import namedtuple
from typing import Dict, Optional, Tuple
import requests
from . import errors, log
__all__ = [
"get_manifest_hash",
"list_tags",
"get_manifest",
"get_attestation",
"parse_image_location",
]
SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json"
DOCKER_MANIFEST_DISTRIBUTION = "application/vnd.docker.distribution.manifest.v2+json"
DOCKER_MANIFEST_INDEX = "application/vnd.oci.image.index.v1+json"
OCI_IMAGE_MANIFEST = "application/vnd.oci.image.manifest.v1+json"
class Image(namedtuple("Image", ["registry", "namespace", "image_name", "tag"])):
__slots__ = ()
@property
def full_name(self) -> str:
tag = f":{self.tag}" if self.tag else ""
return f"{self.registry}/{self.namespace}/{self.image_name}{tag}"
def parse_image_location(input_string: str) -> Image:
"""Parses container image location into an Image namedtuple"""
pattern = (
r"^"
r"(?P<registry>[a-zA-Z0-9.-]+)/"
r"(?P<namespace>[a-zA-Z0-9-]+)/"
r"(?P<image_name>[^:]+)"
r"(?::(?P<tag>[a-zA-Z0-9.-]+))?"
r"$"
)
match = re.match(pattern, input_string)
if not match:
raise ValueError("Malformed image location")
return Image(
registry=match.group("registry"),
namespace=match.group("namespace"),
image_name=match.group("image_name"),
tag=match.group("tag") or "latest",
)
class RegistryClient:
def __init__(
self,
image: Image | str,
):
if isinstance(image, str):
image = parse_image_location(image)
self._image = image
self._registry = image.registry
self._namespace = image.namespace
self._image_name = image.image_name
self._auth_token = None
self._base_url = f"https://{self._registry}"
self._image_url = f"{self._base_url}/v2/{self._namespace}/{self._image_name}"
def get_auth_token(self) -> Optional[str]:
if not self._auth_token:
auth_url = f"{self._base_url}/token"
response = requests.get(
auth_url,
params={
"service": f"{self._registry}",
"scope": f"repository:{self._namespace}/{self._image_name}:pull",
},
)
response.raise_for_status()
self._auth_token = response.json()["token"]
return self._auth_token
def get_auth_header(self) -> Dict[str, str]:
return {"Authorization": f"Bearer {self.get_auth_token()}"}
def list_tags(self) -> list:
url = f"{self._image_url}/tags/list"
response = requests.get(url, headers=self.get_auth_header())
response.raise_for_status()
tags = response.json().get("tags", [])
return tags
def get_manifest(
self, tag: str, extra_headers: Optional[dict] = None
) -> requests.Response:
"""Get manifest information for a specific tag"""
manifest_url = f"{self._image_url}/manifests/{tag}"
headers = {
"Accept": DOCKER_MANIFEST_DISTRIBUTION,
"Authorization": f"Bearer {self.get_auth_token()}",
}
if extra_headers:
headers.update(extra_headers)
response = requests.get(manifest_url, headers=headers)
response.raise_for_status()
return response
def list_manifests(self, tag: str) -> list:
return (
self.get_manifest(
tag,
{
"Accept": DOCKER_MANIFEST_INDEX,
},
)
.json()
.get("manifests")
)
def get_blob(self, hash: str) -> requests.Response:
url = f"{self._image_url}/blobs/{hash}"
response = requests.get(
url,
headers={
"Authorization": f"Bearer {self.get_auth_token()}",
},
)
response.raise_for_status()
return response
def get_manifest_hash(
self, tag: str, tag_manifest_content: Optional[bytes] = None
) -> str:
if not tag_manifest_content:
tag_manifest_content = self.get_manifest(tag).content
return hashlib.sha256(tag_manifest_content).hexdigest()
def get_attestation(self, tag: str) -> Tuple[bytes, bytes]:
"""
Retrieve an attestation from a given tag.
The attestation needs to be attached using the Cosign Bundle
Specification defined at:
https://github.com/sigstore/cosign/blob/main/specs/BUNDLE_SPEC.md
Returns a tuple with the tag manifest content and the bundle content.
"""
# FIXME: do not only rely on the first layer
def _find_sigstore_bundle_manifest(
manifests: list,
) -> Tuple[Optional[str], Optional[str]]:
for manifest in manifests:
if manifest["artifactType"] == SIGSTORE_BUNDLE:
return manifest["mediaType"], manifest["digest"]
return None, None
def _get_bundle_blob_digest(layers: list) -> Optional[str]:
for layer in layers:
if layer.get("mediaType") == SIGSTORE_BUNDLE:
return layer["digest"]
return None
tag_manifest_content = self.get_manifest(tag).content
# The attestation is available on the same container registry, with a
# specific tag named "sha256-{sha256(manifest)}"
tag_manifest_hash = self.get_manifest_hash(tag, tag_manifest_content)
# This will get us a "list" of manifests...
manifests = self.list_manifests(f"sha256-{tag_manifest_hash}")
# ... from which we want the sigstore bundle
bundle_manifest_mediatype, bundle_manifest_digest = (
_find_sigstore_bundle_manifest(manifests)
)
if not bundle_manifest_digest:
raise errors.RegistryError("Not able to find sigstore bundle manifest info")
bundle_manifest = self.get_manifest(
bundle_manifest_digest, extra_headers={"Accept": bundle_manifest_mediatype}
).json()
# From there, we will get the attestation in a blob.
# It will be the first layer listed at this manifest hash location
layers = bundle_manifest.get("layers", [])
blob_digest = _get_bundle_blob_digest(layers)
log.info(f"Found sigstore bundle blob digest: {blob_digest}")
if not blob_digest:
raise errors.RegistryError("Not able to find sigstore bundle blob info")
bundle = self.get_blob(blob_digest)
return tag_manifest_content, bundle.content
def get_manifest_hash(image_str: str) -> str:
image = parse_image_location(image_str)
return RegistryClient(image).get_manifest_hash(image.tag)
def list_tags(image_str: str) -> list:
return RegistryClient(image_str).list_tags()
def get_manifest(image_str: str) -> bytes:
image = parse_image_location(image_str)
client = RegistryClient(image)
resp = client.get_manifest(image.tag, extra_headers={"Accept": OCI_IMAGE_MANIFEST})
return resp.content
def get_attestation(image_str: str) -> Tuple[bytes, bytes]:
image = parse_image_location(image_str)
return RegistryClient(image).get_attestation(image.tag)

View file

@ -0,0 +1,377 @@
import json
import platform
import re
import subprocess
import tarfile
from base64 import b64decode, b64encode
from hashlib import sha256
from io import BytesIO
from pathlib import Path
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import Dict, List, Optional, Tuple
from .. import container_utils as runtime
from . import cosign, errors, log, registry
try:
import platformdirs
except ImportError:
import appdirs as platformdirs # type: ignore[no-redef]
def get_config_dir() -> Path:
return Path(platformdirs.user_config_dir("dangerzone"))
# XXX Store this somewhere else.
SIGNATURES_PATH = get_config_dir() / "signatures"
__all__ = [
"verify_signature",
"load_signatures",
"store_signatures",
"verify_offline_image_signature",
]
def signature_to_bundle(sig: Dict) -> Dict:
"""Convert a cosign-download signature to the format expected by cosign bundle."""
bundle = sig["Bundle"]
payload = bundle["Payload"]
return {
"base64Signature": sig["Base64Signature"],
"Payload": sig["Payload"],
"cert": sig["Cert"],
"chain": sig["Chain"],
"rekorBundle": {
"SignedEntryTimestamp": bundle["SignedEntryTimestamp"],
"Payload": {
"body": payload["body"],
"integratedTime": payload["integratedTime"],
"logIndex": payload["logIndex"],
"logID": payload["logID"],
},
},
"RFC3161Timestamp": sig["RFC3161Timestamp"],
}
def verify_signature(signature: dict, image_hash: str, pubkey: str) -> bool:
"""Verify a signature against a given public key"""
# XXX - Also verfy the identity/docker-reference field against the expected value
# e.g. ghcr.io/freedomofpress/dangerzone/dangerzone
cosign.ensure_installed()
signature_bundle = signature_to_bundle(signature)
payload_bytes = b64decode(signature_bundle["Payload"])
payload_hash = json.loads(payload_bytes)["critical"]["image"][
"docker-manifest-digest"
]
if payload_hash != f"sha256:{image_hash}":
raise errors.SignatureMismatch(
f"The signature does not match the image hash ({payload_hash}, {image_hash})"
)
with (
NamedTemporaryFile(mode="w") as signature_file,
NamedTemporaryFile(mode="bw") as payload_file,
):
json.dump(signature_bundle, signature_file)
signature_file.flush()
payload_file.write(payload_bytes)
payload_file.flush()
cmd = [
"cosign",
"verify-blob",
"--key",
pubkey,
"--bundle",
signature_file.name,
payload_file.name,
]
log.debug(" ".join(cmd))
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
# XXX Raise instead?
log.debug("Failed to verify signature", result.stderr)
return False
if result.stderr == b"Verified OK\n":
log.debug("Signature verified")
return True
return False
def new_image_release(image: str) -> bool:
remote_hash = registry.get_manifest_hash(image)
local_hash = runtime.get_local_image_hash(image)
log.debug("Remote hash: %s", remote_hash)
log.debug("Local hash: %s", local_hash)
return remote_hash != local_hash
def verify_signatures(
signatures: List[Dict],
image_hash: str,
pubkey: str,
) -> bool:
for signature in signatures:
if not verify_signature(signature, image_hash, pubkey):
raise errors.SignatureVerificationError()
return True
def upgrade_container_image(image: str, manifest_hash: str, pubkey: str) -> bool:
"""Verify and upgrade the image to the latest, if signed."""
if not new_image_release(image):
raise errors.ImageAlreadyUpToDate("The image is already up to date")
signatures = get_remote_signatures(image, manifest_hash)
verify_signatures(signatures, manifest_hash, pubkey)
# At this point, the signatures are verified
# We store the signatures just now to avoid storing unverified signatures
store_signatures(signatures, manifest_hash, pubkey)
# let's upgrade the image
# XXX Use the image digest here to avoid race conditions
return runtime.container_pull(image)
def _get_blob(tmpdir: str, hash: str) -> Path:
return Path(tmpdir) / "blobs" / "sha256" / hash.replace("sha256:", "")
def upgrade_container_image_airgapped(container_tar: str, pubkey: str) -> str:
"""
Verify the given archive against its self-contained signatures, then
upgrade the image and retag it to the expected tag.
Right now, the archive is extracted and reconstructed, requiring some space
on the filesystem.
:return: The loaded image name
"""
# XXX Use a memory buffer instead of the filesystem
with TemporaryDirectory() as tmpdir:
def _get_signature_filename(manifests: List[Dict]) -> Path:
for manifest in manifests:
if (
manifest["annotations"].get("kind")
== "dev.cosignproject.cosign/sigs"
):
return _get_blob(tmpdir, manifest["digest"])
raise errors.SignatureExtractionError()
with tarfile.open(container_tar, "r") as archive:
archive.extractall(tmpdir)
if not cosign.verify_local_image(tmpdir, pubkey):
raise errors.SignatureVerificationError()
# Remove the signatures from the archive.
with open(Path(tmpdir) / "index.json") as f:
index_json = json.load(f)
signature_filename = _get_signature_filename(index_json["manifests"])
index_json["manifests"] = [
manifest
for manifest in index_json["manifests"]
if manifest["annotations"].get("kind") != "dev.cosignproject.cosign/sigs"
]
with open(signature_filename, "rb") as f:
image_name, signatures = convert_oci_images_signatures(json.load(f), tmpdir)
log.info(f"Found image name: {image_name}")
image_digest = index_json["manifests"][0].get("digest").replace("sha256:", "")
# Write the new index.json to the temp folder
with open(Path(tmpdir) / "index.json", "w") as f:
json.dump(index_json, f)
with NamedTemporaryFile(suffix=".tar") as temporary_tar:
with tarfile.open(temporary_tar.name, "w") as archive:
# The root is the tmpdir
archive.add(Path(tmpdir) / "index.json", arcname="index.json")
archive.add(Path(tmpdir) / "oci-layout", arcname="oci-layout")
archive.add(Path(tmpdir) / "blobs", arcname="blobs")
runtime.load_image_tarball_file(temporary_tar.name)
runtime.tag_image_by_digest(image_digest, image_name)
store_signatures(signatures, image_digest, pubkey)
return image_name
def convert_oci_images_signatures(
signatures_manifest: List[Dict], tmpdir: str
) -> (str, List[Dict]):
def _to_cosign_signature(layer: Dict) -> Dict:
signature = layer["annotations"]["dev.cosignproject.cosign/signature"]
bundle = json.loads(layer["annotations"]["dev.sigstore.cosign/bundle"])
payload_body = json.loads(b64decode(bundle["Payload"]["body"]))
payload_location = _get_blob(tmpdir, layer["digest"])
with open(payload_location, "rb") as f:
payload_b64 = b64encode(f.read()).decode()
return {
"Base64Signature": payload_body["spec"]["signature"]["content"],
"Payload": payload_b64,
"Cert": None,
"Chain": None,
"Bundle": bundle,
"RFC3161Timestamp": None,
}
layers = signatures_manifest["layers"]
signatures = [_to_cosign_signature(layer) for layer in layers]
payload_location = _get_blob(tmpdir, layers[0]["digest"])
with open(payload_location, "r") as f:
payload = json.load(f)
image_name = payload["critical"]["identity"]["docker-reference"]
return image_name, signatures
def get_file_hash(file: Optional[str] = None, content: Optional[bytes] = None) -> str:
"""Get the sha256 hash of a file or content"""
if not file and not content:
raise errors.UpdaterError("No file or content provided")
if file:
with open(file, "rb") as f:
content = f.read()
if content:
return sha256(content).hexdigest()
return ""
def load_signatures(image_hash: str, pubkey: str) -> List[Dict]:
"""
Load signatures from the local filesystem
See store_signatures() for the expected format.
"""
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
if not pubkey_signatures.exists():
msg = (
f"Cannot find a '{pubkey_signatures}' folder."
"You might need to download the image signatures first."
)
raise errors.SignaturesFolderDoesNotExist(msg)
with open(pubkey_signatures / f"{image_hash}.json") as f:
log.debug("Loading signatures from %s", f.name)
return json.load(f)
def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str) -> None:
"""
Store signatures locally in the SIGNATURE_PATH folder, like this:
~/.config/dangerzone/signatures/
<pubkey-hash>
<image-hash>.json
<image-hash>.json
The format used in the `.json` file is the one of `cosign download
signature`, which differs from the "bundle" one used afterwards.
It can be converted to the one expected by cosign verify --bundle with
the `signature_to_bundle()` function.
"""
def _get_digest(sig: Dict) -> str:
payload = json.loads(b64decode(sig["Payload"]))
return payload["critical"]["image"]["docker-manifest-digest"]
# All the signatures should share the same hash.
hashes = list(map(_get_digest, signatures))
if len(set(hashes)) != 1:
raise errors.InvalidSignatures("Signatures do not share the same image hash")
if f"sha256:{image_hash}" != hashes[0]:
raise errors.SignatureMismatch(
f"Signatures do not match the given image hash ({image_hash}, {hashes[0]})"
)
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
pubkey_signatures.mkdir(exist_ok=True)
with open(pubkey_signatures / f"{image_hash}.json", "w") as f:
log.info(
f"Storing signatures for {image_hash} in {pubkey_signatures}/{image_hash}.json"
)
json.dump(signatures, f)
def verify_local_image(image: str, pubkey: str) -> bool:
"""
Verifies that a local image has a valid signature
"""
log.info(f"Verifying local image {image} against pubkey {pubkey}")
try:
image_hash = runtime.get_local_image_hash(image)
except subprocess.CalledProcessError:
raise errors.ImageNotFound(f"The image {image} does not exist locally")
log.debug(f"Image hash: {image_hash}")
signatures = load_signatures(image_hash, pubkey)
if len(signatures) < 1:
raise errors.LocalSignatureNotFound("No signatures found")
for signature in signatures:
if not verify_signature(signature, image_hash, pubkey):
msg = f"Unable to verify signature for {image} with pubkey {pubkey}"
raise errors.SignatureVerificationError(msg)
return True
def get_remote_signatures(image: str, hash: str) -> List[Dict]:
"""Retrieve the signatures from the registry, via `cosign download`."""
cosign.ensure_installed()
process = subprocess.run(
["cosign", "download", "signature", f"{image}@sha256:{hash}"],
capture_output=True,
check=True,
)
# XXX: Check the output first.
# Remove the last return, split on newlines, convert from JSON
signatures_raw = process.stdout.decode("utf-8").strip().split("\n")
signatures = list(map(json.loads, signatures_raw))
if len(signatures) < 1:
raise errors.NoRemoteSignatures("No signatures found for the image")
return signatures
def prepare_airgapped_archive(image_name, destination):
if "@sha256:" not in image_name:
raise errors.AirgappedImageDownloadError(
"The image name must include a digest, e.g. ghcr.io/freedomofpress/dangerzone/dangerzone@sha256:123456"
)
cosign.ensure_installed()
# Get the image from the registry
with TemporaryDirectory() as tmpdir:
msg = f"Downloading image {image_name}. \nIt might take a while."
log.info(msg)
process = subprocess.run(
["cosign", "save", image_name, "--dir", tmpdir],
capture_output=True,
check=True,
)
if process.returncode != 0:
raise errors.AirgappedImageDownloadError()
with tarfile.open(destination, "w") as archive:
archive.add(tmpdir, arcname=".")

View file

@ -8,7 +8,7 @@ import unicodedata
try:
import platformdirs
except ImportError:
import appdirs as platformdirs
import appdirs as platformdirs # type: ignore[no-redef]
def get_config_dir() -> str:

13
dev_scripts/dangerzone-image Executable file
View file

@ -0,0 +1,13 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
# Load dangerzone module and resources from the source code tree
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.dangerzone_dev = True
from dangerzone.updater import cli
cli.main()

View file

@ -0,0 +1,56 @@
# Independent Container Updates
Since version 0.9.0, Dangerzone is able to ship container images independently
from releases.
One of the main benefits of doing so is to lower the time needed to patch security issues inside the containers.
## Checking attestations
Each night, new images are built and pushed to the container registry, alongside
with a provenance attestation, enabling anybody to ensure that the image has
been originally built by Github CI runners, from a defined source repository (in our case `freedomofpress/dangerzone`).
To verify the attestations against our expectations, use the following command:
```bash
dangerzone-image attest-provenance ghcr.io/freedomofpress/dangerzone/dangerzone --repository freedomofpress/dangerzone
```
In case of sucess, it will report back:
```
🎉 The image available at `ghcr.io/freedomofpress/dangerzone/dangerzone:latest` has been built by Github runners from the `freedomofpress/dangerzone` repository.
```
## Install updates
To check if a new container image has been released, and update your local installation with it, you can use the following commands:
```bash
./dev_scripts/dangerzone-image --debug upgrade ghcr.io/almet/dangerzone/dangerzone
```
## Verify local
You can verify that the image you have locally matches the stored signatures, and that these have been signed with a trusted public key:
```bash
dangerzone-image verify-local ghcr.io/almet/dangerzone/dangerzone
```
## Air-gapped environments
In order to make updates on an air-gapped environment, you will need to prepare an archive for the air-gapped environment. This archive will contain all the needed material to validate that the new container image has been signed and is valid.
On the machine on which you prepare the packages:
```bash
dangerzone-image prepare-archive --output dz-fa94872.tar ghcr.io/almet/dangerzone/dangerzone@sha256:fa948726aac29a6ac49f01ec8fbbac18522b35b2491fdf716236a0b3502a2ca7
```
On the airgapped machine, copy the file and run the following command:
```bash
dangerzone-image load-archive dz-fa94872.tar
```

View file

@ -34,6 +34,7 @@ shiboken6 = [
[tool.poetry.scripts]
dangerzone = 'dangerzone:main'
dangerzone-cli = 'dangerzone:main'
dangerzone-image = "dangerzone.updater.cli:main"
# Dependencies required for packaging the code on various platforms.
[tool.poetry.group.package.dependencies]