Add a dangerzone-image CLI script

It contains utilities to interact with OCI registries, like getting the list of
published tags and getting the content of a manifest. It does so
via the use of the Docker Registry API v2 [0].

The script has been added to the `dev_scripts`, and is also installed on
the system under `dangerzone-image`.

[0]  https://docs.github.com/en/packages/working-with-a-github-packages-registry/working-with-the-container-registry
This commit is contained in:
Alexis Métaireau 2025-02-11 18:13:39 +01:00
parent 83be5fb151
commit e1bdb75435
No known key found for this signature in database
GPG key ID: C65C7A89A8FFC56E
6 changed files with 185 additions and 0 deletions

View file

@ -0,0 +1,3 @@
import logging
log = logging.getLogger(__name__)

42
dangerzone/updater/cli.py Normal file
View file

@ -0,0 +1,42 @@
#!/usr/bin/python
import logging
import click
from . import attestations, errors, log, registry, signatures
DEFAULT_REPOSITORY = "freedomofpress/dangerzone"
DEFAULT_BRANCH = "main"
DEFAULT_IMAGE_NAME = "ghcr.io/freedomofpress/dangerzone/dangerzone"
@click.group()
@click.option("--debug", is_flag=True)
def main(debug: bool) -> None:
if debug:
click.echo("Debug mode enabled")
level = logging.DEBUG
else:
level = logging.INFO
logging.basicConfig(level=level)
@main.command()
@click.argument("image")
def list_remote_tags(image: str) -> None:
"""List the tags available for a given image."""
click.echo(f"Existing tags for {image}")
for tag in registry.list_tags(image):
click.echo(tag)
@main.command()
@click.argument("image")
def get_manifest(image: str) -> None:
"""Retrieves a remote manifest for a given image and displays it."""
click.echo(registry.get_manifest(image).content)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,10 @@
class UpdaterError(Exception):
pass
class ImageNotFound(UpdaterError):
pass
class RegistryError(UpdaterError):
pass

View file

@ -0,0 +1,116 @@
import re
from collections import namedtuple
from hashlib import sha256
from typing import Dict, Optional, Tuple
import requests
from . import errors, log
__all__ = [
"get_manifest_digest",
"list_tags",
"get_manifest",
"parse_image_location",
]
SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json"
IMAGE_INDEX_MEDIA_TYPE = "application/vnd.oci.image.index.v1+json"
ACCEPT_MANIFESTS_HEADER = ",".join(
[
"application/vnd.docker.distribution.manifest.v1+json",
"application/vnd.docker.distribution.manifest.v1+prettyjws",
"application/vnd.docker.distribution.manifest.v2+json",
"application/vnd.oci.image.manifest.v1+json",
"application/vnd.docker.distribution.manifest.list.v2+json",
IMAGE_INDEX_MEDIA_TYPE,
]
)
Image = namedtuple("Image", ["registry", "namespace", "image_name", "tag", "digest"])
def parse_image_location(input_string: str) -> Image:
"""Parses container image location into an Image namedtuple"""
pattern = (
r"^"
r"(?P<registry>[a-zA-Z0-9.-]+)/"
r"(?P<namespace>[a-zA-Z0-9-]+)/"
r"(?P<image_name>[^:@]+)"
r"(?::(?P<tag>[a-zA-Z0-9.-]+))?"
r"(?:@(?P<digest>sha256:[a-zA-Z0-9]+))?"
r"$"
)
match = re.match(pattern, input_string)
if not match:
raise ValueError("Malformed image location")
return Image(
registry=match.group("registry"),
namespace=match.group("namespace"),
image_name=match.group("image_name"),
tag=match.group("tag") or "latest",
digest=match.group("digest"),
)
def _get_auth_header(image: Image) -> Dict[str, str]:
auth_url = f"https://{image.registry}/token"
response = requests.get(
auth_url,
params={
"service": f"{image.registry}",
"scope": f"repository:{image.namespace}/{image.image_name}:pull",
},
)
response.raise_for_status()
token = response.json()["token"]
return {"Authorization": f"Bearer {token}"}
def _url(image: Image) -> str:
return f"https://{image.registry}/v2/{image.namespace}/{image.image_name}"
def list_tags(image_str: str) -> list:
image = parse_image_location(image_str)
url = f"{_url(image)}/tags/list"
response = requests.get(url, headers=_get_auth_header(image))
response.raise_for_status()
tags = response.json().get("tags", [])
return tags
def get_manifest(image_str: str) -> requests.Response:
"""Get manifest information for a specific tag"""
image = parse_image_location(image_str)
manifest_url = f"{_url(image)}/manifests/{image.tag}"
headers = {
"Accept": ACCEPT_MANIFESTS_HEADER,
}
headers.update(_get_auth_header(image))
response = requests.get(manifest_url, headers=headers)
response.raise_for_status()
return response
def list_manifests(image_str: str) -> list:
return get_manifest(image_str).json().get("manifests")
def get_blob(image: Image, digest: str) -> requests.Response:
response = requests.get(
f"{_url(image)}/blobs/{digest}", headers=_get_auth_header(image)
)
response.raise_for_status()
return response
def get_manifest_digest(
image_str: str, tag_manifest_content: Optional[bytes] = None
) -> str:
if not tag_manifest_content:
tag_manifest_content = get_manifest(image_str).content
return sha256(tag_manifest_content).hexdigest()

13
dev_scripts/dangerzone-image Executable file
View file

@ -0,0 +1,13 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
# Load dangerzone module and resources from the source code tree
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.dangerzone_dev = True
from dangerzone.updater import cli
cli.main()

View file

@ -27,6 +27,7 @@ packaging = "*"
[tool.poetry.scripts]
dangerzone = 'dangerzone:main'
dangerzone-cli = 'dangerzone:main'
dangerzone-image = "dangerzone.updater.cli:main"
# Dependencies required for packaging the code on various platforms.
[tool.poetry.group.package.dependencies]