This commit is contained in:
Alexis Métaireau 2025-01-29 18:32:18 +00:00 committed by GitHub
commit f36dd56205
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 763 additions and 6 deletions

View file

@ -0,0 +1,57 @@
name: Release container image
on:
push:
tags:
- "container-image/**"
branches:
- "test/image-**"
workflow_dispatch:
permissions:
id-token: write
packages: write
contents: read
attestations: write
env:
REGISTRY: ghcr.io/${{ github.repository_owner }}
REGISTRY_USER: ${{ github.actor }}
REGISTRY_PASSWORD: ${{ github.token }}
IMAGE_NAME: dangerzone/dangerzone
jobs:
build-container-image:
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Login to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: USERNAME
password: ${{ github.token }}
- name: Build and push the dangerzone image
id: build-image
run: |
sudo apt-get install -y python3-poetry
python3 ./install/common/build-image.py
echo ${{ github.token }} | podman login ghcr.io -u USERNAME --password-stdin
# Load the image with the final name directly
gunzip -c share/container.tar.gz | podman load
FINAL_IMAGE_NAME="${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}"
TAG=$(git describe --long --first-parent | tail -c +2)
podman tag dangerzone.rocks/dangerzone:$TAG "$FINAL_IMAGE_NAME"
podman push "$FINAL_IMAGE_NAME" --digestfile=digest
echo "digest=$(cat digest)" >> "$GITHUB_OUTPUT"
- name: Generate artifact attestation
uses: actions/attest-build-provenance@v1
with:
subject-name: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
subject-digest: "${{ steps.build-image.outputs.digest }}"
push-to-registry: true

View file

@ -15,11 +15,9 @@ log = logging.getLogger(__name__)
def get_runtime_name() -> str:
if platform.system() == "Linux":
runtime_name = "podman"
else:
return "podman"
# Windows, Darwin, and unknown use docker for now, dangerzone-vm eventually
runtime_name = "docker"
return runtime_name
return "docker"
def get_runtime_version() -> Tuple[int, int]:
@ -147,3 +145,18 @@ def load_image_tarball() -> None:
)
log.info("Successfully installed container image from")
def container_pull(image: str) -> bool:
"""Pull a container image from a registry."""
cmd = [get_runtime_name(), "pull", f"{image}"]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
process.communicate()
return process.returncode == 0
def load_image_hash(image: str) -> str:
"""Returns a image hash from a local image name"""
cmd = [get_runtime_name(), "image", "inspect", image, "-f", "{{.Digest}}"]
result = subprocess.run(cmd, capture_output=True, check=True)
return result.stdout.strip().decode().strip("sha256:")

View file

@ -0,0 +1,3 @@
import logging
log = logging.getLogger(__name__)

View file

@ -0,0 +1,44 @@
import subprocess
from tempfile import NamedTemporaryFile
from . import utils
def verify_attestation(
manifest: bytes, attestation_bundle: bytes, image_tag: str, expected_repo: str
) -> bool:
"""
Look up the image attestation to see if the image has been built
on Github runners, and from a given repository.
"""
utils.ensure_cosign()
# Put the value in files and verify with cosign
with (
NamedTemporaryFile(mode="wb") as manifest_json,
NamedTemporaryFile(mode="wb") as attestation_bundle_json,
):
manifest_json.write(manifest)
manifest_json.flush()
attestation_bundle_json.write(attestation_bundle)
attestation_bundle_json.flush()
# Call cosign with the temporary file paths
cmd = [
"cosign",
"verify-blob-attestation",
"--bundle",
attestation_bundle_json.name,
"--new-bundle-format",
"--certificate-oidc-issuer",
"https://token.actions.githubusercontent.com",
"--certificate-identity-regexp",
f"^https://github.com/{expected_repo}/.github/workflows/release-container-image.yml@refs/heads/test/image-publication-cosign",
manifest_json.name,
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
error = result.stderr.decode()
raise Exception(f"Attestation cannot be verified. {error}")
return True

104
dangerzone/updater/cli.py Normal file
View file

@ -0,0 +1,104 @@
#!/usr/bin/python
import logging
import click
from ..util import get_resource_path
from . import errors, log, registry
from .attestations import verify_attestation
from .signatures import upgrade_container_image, verify_offline_image_signature
DEFAULT_REPOSITORY = "freedomofpress/dangerzone"
PUBKEY_DEFAULT_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key")
@click.group()
@click.option("--debug", is_flag=True)
def main(debug=False) -> None:
if debug:
click.echo("Debug mode enabled")
level = logging.DEBUG
else:
level = logging.INFO
logging.basicConfig(level=level)
@main.command()
@click.option("--image")
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
@click.option("--airgap", is_flag=True)
# XXX Add options to do airgap upgrade
def upgrade(image: str, pubkey: str, airgap: bool) -> None:
"""Upgrade the image to the latest signed version."""
manifest_hash = registry.get_manifest_hash(image)
try:
is_upgraded = upgrade_container_image(image, manifest_hash, pubkey)
click.echo(f"✅ The local image {image} has been upgraded")
except errors.ImageAlreadyUpToDate as e:
click.echo(f"{e}")
raise click.Abort()
@main.command()
@click.argument("image")
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
def verify_offline(image: str, pubkey: str) -> None:
"""
Verify the local image signature against a public key and the stored signatures.
"""
# XXX remove a potentiel :tag
if verify_offline_image_signature(image, pubkey):
click.echo(
(
f"Verifying the local image:\n\n"
f"pubkey: {pubkey}\n"
f"image: {image}\n\n"
f"✅ The local image {image} has been signed with {pubkey}"
)
)
@main.command()
@click.argument("image")
def list_remote_tags(image: str) -> None:
click.echo(f"Existing tags for {image}")
for tag in registry.list_tags(image):
click.echo(tag)
@main.command()
@click.argument("image")
def get_manifest(image: str) -> None:
click.echo(registry.get_manifest(image))
@main.command()
@click.argument("image")
@click.option(
"--repository",
default=DEFAULT_REPOSITORY,
help="The github repository to check the attestation for",
)
def attest_provenance(image: str, repository: str) -> None:
"""
Look up the image attestation to see if the image has been built
on Github runners, and from a given repository.
"""
# XXX put this inside a module
# if shutil.which("cosign") is None:
# click.echo("The cosign binary is needed but not installed.")
# raise click.Abort()
parsed = registry.parse_image_location(image)
manifest, bundle = registry.get_attestation(image)
verified = verify_attestation(manifest, bundle, parsed.tag, repository)
if verified:
click.echo(
f"🎉 The image available at `{parsed.full_name}` has been built by Github Runners from the `{repository}` repository"
)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,42 @@
class UpdaterError(Exception):
pass
class ImageAlreadyUpToDate(UpdaterError):
pass
class SignatureError(UpdaterError):
pass
class RegistryError(UpdaterError):
pass
class NoRemoteSignatures(SignatureError):
pass
class SignatureVerificationError(SignatureError):
pass
class SignaturesFolderDoesNotExist(SignatureError):
pass
class InvalidSignatures(SignatureError):
pass
class SignatureMismatch(SignatureError):
pass
class LocalSignatureNotFound(SignatureError):
pass
class CosignNotInstalledError(SignatureError):
pass

View file

@ -0,0 +1,219 @@
import hashlib
import re
from collections import namedtuple
from typing import Dict, Optional, Tuple
import requests
from . import errors, log
__all__ = [
"get_manifest_hash",
"list_tags",
"get_manifest",
"get_attestation",
"Image",
"parse_image_location",
]
SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json"
DOCKER_MANIFEST_DISTRIBUTION = "application/vnd.docker.distribution.manifest.v2+json"
DOCKER_MANIFEST_INDEX = "application/vnd.oci.image.index.v1+json"
OCI_IMAGE_MANIFEST = "application/vnd.oci.image.manifest.v1+json"
class Image(namedtuple("Image", ["registry", "namespace", "image_name", "tag"])):
__slots__ = ()
@property
def full_name(self) -> str:
tag = f":{self.tag}" if self.tag else ""
return f"{self.registry}/{self.namespace}/{self.image_name}{tag}"
def parse_image_location(input_string: str) -> Image:
"""Parses container image location into an Image namedtuple"""
pattern = (
r"^"
r"(?P<registry>[a-zA-Z0-9.-]+)/"
r"(?P<namespace>[a-zA-Z0-9-]+)/"
r"(?P<image_name>[^:]+)"
r"(?::(?P<tag>[a-zA-Z0-9.-]+))?"
r"$"
)
match = re.match(pattern, input_string)
if not match:
raise ValueError("Malformed image location")
return Image(
registry=match.group("registry"),
namespace=match.group("namespace"),
image_name=match.group("image_name"),
tag=match.group("tag") or "latest",
)
class RegistryClient:
def __init__(
self,
image: Image | str,
):
if isinstance(image, str):
image = parse_image_location(image)
self._image = image
self._registry = image.registry
self._namespace = image.namespace
self._image_name = image.image_name
self._auth_token = None
self._base_url = f"https://{self._registry}"
self._image_url = f"{self._base_url}/v2/{self._namespace}/{self._image_name}"
def get_auth_token(self) -> Optional[str]:
if not self._auth_token:
auth_url = f"{self._base_url}/token"
response = requests.get(
auth_url,
params={
"service": f"{self._registry}",
"scope": f"repository:{self._namespace}/{self._image_name}:pull",
},
)
response.raise_for_status()
self._auth_token = response.json()["token"]
return self._auth_token
def get_auth_header(self) -> Dict[str, str]:
return {"Authorization": f"Bearer {self.get_auth_token()}"}
def list_tags(self) -> list:
url = f"{self._image_url}/tags/list"
response = requests.get(url, headers=self.get_auth_header())
response.raise_for_status()
tags = response.json().get("tags", [])
return tags
def get_manifest(
self, tag: str, extra_headers: Optional[dict] = None
) -> requests.Response:
"""Get manifest information for a specific tag"""
manifest_url = f"{self._image_url}/manifests/{tag}"
headers = {
"Accept": DOCKER_MANIFEST_DISTRIBUTION,
"Authorization": f"Bearer {self.get_auth_token()}",
}
if extra_headers:
headers.update(extra_headers)
response = requests.get(manifest_url, headers=headers)
response.raise_for_status()
return response
def list_manifests(self, tag: str) -> list:
return (
self.get_manifest(
tag,
{
"Accept": DOCKER_MANIFEST_INDEX,
},
)
.json()
.get("manifests")
)
def get_blob(self, hash: str) -> requests.Response:
url = f"{self._image_url}/blobs/{hash}"
response = requests.get(
url,
headers={
"Authorization": f"Bearer {self.get_auth_token()}",
},
)
response.raise_for_status()
return response
def get_manifest_hash(
self, tag: str, tag_manifest_content: Optional[bytes] = None
) -> str:
if not tag_manifest_content:
tag_manifest_content = self.get_manifest(tag).content
return hashlib.sha256(tag_manifest_content).hexdigest()
def get_attestation(self, tag: str) -> Tuple[bytes, bytes]:
"""
Retrieve an attestation from a given tag.
The attestation needs to be attached using the Cosign Bundle
Specification defined at:
https://github.com/sigstore/cosign/blob/main/specs/BUNDLE_SPEC.md
Returns a tuple with the tag manifest content and the bundle content.
"""
# FIXME: do not only rely on the first layer
def _find_sigstore_bundle_manifest(
manifests: list,
) -> Tuple[Optional[str], Optional[str]]:
for manifest in manifests:
if manifest["artifactType"] == SIGSTORE_BUNDLE:
return manifest["mediaType"], manifest["digest"]
return None, None
def _get_bundle_blob_digest(layers: list) -> Optional[str]:
for layer in layers:
if layer.get("mediaType") == SIGSTORE_BUNDLE:
return layer["digest"]
return None
tag_manifest_content = self.get_manifest(tag).content
# The attestation is available on the same container registry, with a
# specific tag named "sha256-{sha256(manifest)}"
tag_manifest_hash = self.get_manifest_hash(tag, tag_manifest_content)
# This will get us a "list" of manifests...
manifests = self.list_manifests(f"sha256-{tag_manifest_hash}")
# ... from which we want the sigstore bundle
bundle_manifest_mediatype, bundle_manifest_digest = (
_find_sigstore_bundle_manifest(manifests)
)
if not bundle_manifest_digest:
raise errors.RegistryError("Not able to find sigstore bundle manifest info")
bundle_manifest = self.get_manifest(
bundle_manifest_digest, extra_headers={"Accept": bundle_manifest_mediatype}
).json()
# From there, we will get the attestation in a blob.
# It will be the first layer listed at this manifest hash location
layers = bundle_manifest.get("layers", [])
blob_digest = _get_bundle_blob_digest(layers)
log.info(f"Found sigstore bundle blob digest: {blob_digest}")
if not blob_digest:
raise errors.RegistryError("Not able to find sigstore bundle blob info")
bundle = self.get_blob(blob_digest)
return tag_manifest_content, bundle.content
def get_manifest_hash(image_str: str) -> str:
image = parse_image_location(image_str)
return RegistryClient(image).get_manifest_hash(image.tag)
def list_tags(image_str: str) -> list:
return RegistryClient(image_str).list_tags()
def get_manifest(image_str: str) -> bytes:
image = parse_image_location(image_str)
client = RegistryClient(image)
resp = client.get_manifest(image.tag, extra_headers={"Accept": OCI_IMAGE_MANIFEST})
return resp.content
def get_attestation(image_str: str) -> Tuple[bytes, bytes]:
image = parse_image_location(image_str)
return RegistryClient(image).get_attestation(image.tag)

View file

@ -0,0 +1,228 @@
import json
import platform
import re
import subprocess
from base64 import b64decode
from hashlib import sha256
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Dict, List, Tuple
from ..container_utils import container_pull, load_image_hash
from . import errors, log, utils
from .registry import get_manifest_hash
try:
import platformdirs
except ImportError:
import appdirs as platformdirs # type: ignore[no-redef]
def get_config_dir() -> Path:
return Path(platformdirs.user_config_dir("dangerzone"))
# XXX Store this somewhere else.
SIGNATURES_PATH = get_config_dir() / "signatures"
__all__ = [
"verify_signature",
"load_signatures",
"store_signatures",
"verify_offline_image_signature",
]
def signature_to_bundle(sig: Dict) -> Dict:
"""Convert a cosign-download signature to the format expected by cosign bundle."""
bundle = sig["Bundle"]
payload = bundle["Payload"]
return {
"base64Signature": sig["Base64Signature"],
"Payload": sig["Payload"],
"cert": sig["Cert"],
"chain": sig["Chain"],
"rekorBundle": {
"SignedEntryTimestamp": bundle["SignedEntryTimestamp"],
"Payload": {
"body": payload["body"],
"integratedTime": payload["integratedTime"],
"logIndex": payload["logIndex"],
"logID": payload["logID"],
},
},
"RFC3161Timestamp": sig["RFC3161Timestamp"],
}
def verify_signature(signature: dict, pubkey: str) -> bool:
"""Verify a signature against a given public key"""
utils.ensure_cosign()
signature_bundle = signature_to_bundle(signature)
with (
NamedTemporaryFile(mode="w") as signature_file,
NamedTemporaryFile(mode="bw") as payload_file,
):
json.dump(signature_bundle, signature_file)
signature_file.flush()
payload_bytes = b64decode(signature_bundle["Payload"])
payload_file.write(payload_bytes)
payload_file.flush()
cmd = [
"cosign",
"verify-blob",
"--key",
pubkey,
"--bundle",
signature_file.name,
payload_file.name,
]
log.debug(" ".join(cmd))
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
# XXX Raise instead?
log.debug("Failed to verify signature", result.stderr)
return False
if result.stderr == b"Verified OK\n":
log.debug("Signature verified")
return True
return False
def new_image_release(image) -> bool:
remote_hash = get_manifest_hash(image)
local_hash = load_image_hash(image)
log.debug("Remote hash: %s", remote_hash)
log.debug("Local hash: %s", local_hash)
return remote_hash != local_hash
def upgrade_container_image(
image: str,
manifest_hash: str,
pubkey: str,
) -> bool:
if not new_image_release(image):
raise errors.ImageAlreadyUpToDate("The image is already up to date")
return False
signatures = get_signatures(image, manifest_hash)
log.debug("Signatures: %s", signatures)
if len(signatures) < 1:
raise errors.NoRemoteSignatures("No remote signatures found")
for signature in signatures:
signature_is_valid = verify_signature(signature, pubkey)
if not signature_is_valid:
raise errors.SignatureVerificationError()
# At this point, the signatures are verified
# We store the signatures just now to avoid storing unverified signatures
store_signatures(signatures, manifest_hash, pubkey)
# let's upgrade the image
# XXX Use the hash here to avoid race conditions
return container_pull(image)
def get_file_hash(file: str) -> str:
with open(file, "rb") as f:
content = f.read()
return sha256(content).hexdigest()
def load_signatures(image_hash: str, pubkey: str) -> List[Dict]:
"""
Load signatures from the local filesystem
See store_signatures() for the expected format.
"""
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
if not pubkey_signatures.exists():
msg = (
f"Cannot find a '{pubkey_signatures}' folder."
"You might need to download the image signatures first."
)
raise errors.SignaturesFolderDoesNotExist(msg)
with open(pubkey_signatures / f"{image_hash}.json") as f:
log.debug("Loading signatures from %s", f.name)
return json.load(f)
def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str) -> None:
"""
Store signatures locally in the SIGNATURE_PATH folder, like this:
~/.config/dangerzone/signatures/
<pubkey-hash>
<image-hash>.json
<image-hash>.json
The format used in the `.json` file is the one of `cosign download
signature`, which differs from the "bundle" one used afterwards.
It can be converted to the one expected by cosign verify --bundle with
the `signature_to_bundle()` function.
"""
def _get_digest(sig: Dict) -> str:
payload = json.loads(b64decode(sig["Payload"]))
return payload["critical"]["image"]["docker-manifest-digest"]
# All the signatures should share the same hash.
hashes = list(map(_get_digest, signatures))
if len(set(hashes)) != 1:
raise errors.InvalidSignatures("Signatures do not share the same image hash")
if f"sha256:{image_hash}" != hashes[0]:
raise errors.SignatureMismatch("Signatures do not match the given image hash")
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
pubkey_signatures.mkdir(exist_ok=True)
with open(pubkey_signatures / f"{image_hash}.json", "w") as f:
log.debug(
f"Storing signatures for {image_hash} in {pubkey_signatures}/{image_hash}.json"
)
json.dump(signatures, f)
def verify_offline_image_signature(image: str, pubkey: str) -> bool:
"""
Verifies that a local image has a valid signature
"""
log.info(f"Verifying local image {image} against pubkey {pubkey}")
image_hash = load_image_hash(image)
log.debug(f"Image hash: {image_hash}")
signatures = load_signatures(image_hash, pubkey)
if len(signatures) < 1:
raise errors.LocalSignatureNotFound("No signatures found")
for signature in signatures:
if not verify_signature(signature, pubkey):
msg = f"Unable to verify signature for {image} with pubkey {pubkey}"
raise errors.SignatureVerificationError(msg)
return True
def get_signatures(image: str, hash: str) -> List[Dict]:
"""
Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format.
"""
utils.ensure_cosign()
process = subprocess.run(
["cosign", "download", "signature", f"{image}@sha256:{hash}"],
capture_output=True,
check=True,
)
# XXX: Check the output first.
# Remove the last return, split on newlines, convert from JSON
signatures_raw = process.stdout.decode("utf-8").strip().split("\n")
return list(map(json.loads, signatures_raw))

View file

@ -0,0 +1,10 @@
import subprocess
from . import errors
def ensure_cosign() -> None:
try:
subprocess.run(["cosign", "version"], capture_output=True, check=True)
except subprocess.CalledProcessError:
raise errors.CosignNotInstalledError()

View file

@ -8,7 +8,7 @@ import unicodedata
try:
import platformdirs
except ImportError:
import appdirs as platformdirs
import appdirs as platformdirs # type: ignore[no-redef]
def get_config_dir() -> str:

13
dev_scripts/dangerzone-image Executable file
View file

@ -0,0 +1,13 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
# Load dangerzone module and resources from the source code tree
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.dangerzone_dev = True
from dangerzone.updater import cli
cli.main()

View file

@ -0,0 +1,23 @@
# Independent Container Updates
Since version 0.9.0, Dangerzone is able to ship container images independently
from issuing a new release of the software.
This is useful as images need to be kept updated with the latest security fixes.
## Nightly images and attestations
Each night, new images are built and pushed to our container registry, alongside
with a provenance attestation, enabling anybody to ensure that the image has
been originally built by Github CI runners, from a defined source repository (in our case `freedomofpress/dangerzone`).
To verify the attestations against our expectations, use the following command:
```bash
poetry run ./dev_scripts/registry.py attest ghcr.io/freedomofpress/dangerzone/dangerzone:latest --repo freedomofpress/dangerzone
```
In case of sucess, it will report back:
```
🎉 The image available at `ghcr.io/freedomofpress/dangerzone/dangerzone:latest` has been built by Github runners from the `freedomofpress/dangerzone` repository.
```

View file

@ -34,6 +34,7 @@ shiboken6 = [
[tool.poetry.scripts]
dangerzone = 'dangerzone:main'
dangerzone-cli = 'dangerzone:main'
dangerzone-image = "dangerzone.updater.cli:main"
# Dependencies required for packaging the code on various platforms.
[tool.poetry.group.package.dependencies]