Reorganize the registry.py module to be simpler

This commit is contained in:
Alexis Métaireau 2025-02-11 17:20:01 +01:00
parent 46f510ab79
commit 769a78dd27
No known key found for this signature in database
GPG key ID: C65C7A89A8FFC56E
5 changed files with 100 additions and 138 deletions

View file

@ -9,7 +9,7 @@ from . import errors
from .util import get_resource_path, get_subprocess_startupinfo from .util import get_resource_path, get_subprocess_startupinfo
OLD_CONTAINER_NAME = "dangerzone.rocks/dangerzone" OLD_CONTAINER_NAME = "dangerzone.rocks/dangerzone"
CONTAINER_NAME = "ghcr.io/almet/dangerzone/dangerzone" CONTAINER_NAME = "ghcr.io/freedomofpress/dangerzone/dangerzone"
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -111,7 +111,7 @@ def delete_image_tag(tag: str) -> None:
) )
def load_image_tarball_in_memory() -> None: def load_image_tarball_from_gzip() -> None:
log.info("Installing Dangerzone container image...") log.info("Installing Dangerzone container image...")
p = subprocess.Popen( p = subprocess.Popen(
[get_runtime(), "load"], [get_runtime(), "load"],
@ -142,7 +142,7 @@ def load_image_tarball_in_memory() -> None:
log.info("Successfully installed container image from") log.info("Successfully installed container image from")
def load_image_tarball_file(tarball_path: str) -> None: def load_image_tarball_from_tar(tarball_path: str) -> None:
cmd = [get_runtime(), "load", "-i", tarball_path] cmd = [get_runtime(), "load", "-i", tarball_path]
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True) subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)

View file

@ -3,7 +3,6 @@ from tempfile import NamedTemporaryFile
from . import cosign from . import cosign
# NOTE: You can grab the SLSA attestation for an image/tag pair with the following # NOTE: You can grab the SLSA attestation for an image/tag pair with the following
# commands: # commands:
# #
@ -51,7 +50,11 @@ def generate_cue_policy(repo, workflow, commit, branch):
def verify( def verify(
image_name: str, branch: str, commit: str, repository: str, workflow: str, image_name: str,
branch: str,
commit: str,
repository: str,
workflow: str,
) -> bool: ) -> bool:
""" """
Look up the image attestation to see if the image has been built Look up the image attestation to see if the image has been built

View file

@ -6,9 +6,9 @@ import click
from . import attestations, errors, log, registry, signatures from . import attestations, errors, log, registry, signatures
DEFAULT_REPOSITORY = "almet/dangerzone" DEFAULT_REPOSITORY = "freedomofpress/dangerzone"
DEFAULT_BRANCH = "main" DEFAULT_BRANCH = "main"
DEFAULT_IMAGE_NAME = "ghcr.io/almet/dangerzone/dangerzone" DEFAULT_IMAGE_NAME = "ghcr.io/freedomofpress/dangerzone/dangerzone"
@click.group() @click.group()
@ -97,8 +97,8 @@ def list_remote_tags(image: str) -> None:
@main.command() @main.command()
@click.argument("image") @click.argument("image")
def get_manifest(image: str) -> None: def get_manifest(image: str) -> None:
"""Retrieves a remove manifest for a given image and displays it.""" """Retrieves a remote manifest for a given image and displays it."""
click.echo(registry.get_manifest(image)) click.echo(registry.get_manifest(image).content)
@main.command() @main.command()

View file

@ -28,13 +28,7 @@ ACCEPT_MANIFESTS_HEADER = ",".join(
) )
class Image(namedtuple("Image", ["registry", "namespace", "image_name", "tag"])): Image = namedtuple("Image", ["registry", "namespace", "image_name", "tag"])
__slots__ = ()
@property
def full_name(self) -> str:
tag = f":{self.tag}" if self.tag else ""
return f"{self.registry}/{self.namespace}/{self.image_name}{tag}"
def parse_image_location(input_string: str) -> Image: def parse_image_location(input_string: str) -> Image:
@ -58,102 +52,67 @@ def parse_image_location(input_string: str) -> Image:
) )
class RegistryClient: def _get_auth_header(image) -> Dict[str, str]:
def __init__( auth_url = f"https://{image.registry}/token"
self,
image: Image | str,
):
if isinstance(image, str):
image = parse_image_location(image)
self._image = image
self._registry = image.registry
self._namespace = image.namespace
self._image_name = image.image_name
self._auth_token = None
self._base_url = f"https://{self._registry}"
self._image_url = f"{self._base_url}/v2/{self._namespace}/{self._image_name}"
def get_auth_token(self) -> Optional[str]:
if not self._auth_token:
auth_url = f"{self._base_url}/token"
response = requests.get( response = requests.get(
auth_url, auth_url,
params={ params={
"service": f"{self._registry}", "service": f"{image.registry}",
"scope": f"repository:{self._namespace}/{self._image_name}:pull", "scope": f"repository:{image.namespace}/{image.image_name}:pull",
}, },
) )
response.raise_for_status() response.raise_for_status()
self._auth_token = response.json()["token"] token = response.json()["token"]
return self._auth_token return {"Authorization": f"Bearer {token}"}
def get_auth_header(self) -> Dict[str, str]:
return {"Authorization": f"Bearer {self.get_auth_token()}"}
def list_tags(self) -> list: def _url(image):
url = f"{self._image_url}/tags/list" return f"https://{image.registry}/v2/{image.namespace}/{image.image_name}"
response = requests.get(url, headers=self.get_auth_header())
def list_tags(image_str: str) -> list:
image = parse_image_location(image_str)
url = f"{_url(image)}/tags/list"
response = requests.get(url, headers=_get_auth_header(image))
response.raise_for_status() response.raise_for_status()
tags = response.json().get("tags", []) tags = response.json().get("tags", [])
return tags return tags
def get_manifest(
self, def get_manifest(image_str) -> requests.Response:
tag: str,
) -> requests.Response:
"""Get manifest information for a specific tag""" """Get manifest information for a specific tag"""
manifest_url = f"{self._image_url}/manifests/{tag}" image = parse_image_location(image_str)
manifest_url = f"{_url(image)}/manifests/{image.tag}"
headers = { headers = {
"Accept": ACCEPT_MANIFESTS_HEADER, "Accept": ACCEPT_MANIFESTS_HEADER,
"Authorization": f"Bearer {self.get_auth_token()}",
} }
headers.update(_get_auth_header(image))
response = requests.get(manifest_url, headers=headers) response = requests.get(manifest_url, headers=headers)
response.raise_for_status() response.raise_for_status()
return response return response
def list_manifests(self, tag: str) -> list:
return (
self.get_manifest(
tag,
)
.json()
.get("manifests")
)
def get_blob(self, digest: str) -> requests.Response: def list_manifests(image_str) -> list:
url = f"{self._image_url}/blobs/{digest}" return get_manifest(image_str).json().get("manifests")
def get_blob(image, digest: str) -> requests.Response:
response = requests.get( response = requests.get(
url, f"{_url(image)}/blobs/{digest}",
headers={ headers={
"Authorization": f"Bearer {self.get_auth_token()}", "Authorization": f"Bearer {_get_auth_token(image)}",
}, },
) )
response.raise_for_status() response.raise_for_status()
return response return response
def get_manifest_digest( def get_manifest_digest(
self, tag: str, tag_manifest_content: Optional[bytes] = None image_str: str, tag_manifest_content: Optional[bytes] = None
) -> str: ) -> str:
image = parse_image_location(image_str)
if not tag_manifest_content: if not tag_manifest_content:
tag_manifest_content = self.get_manifest(tag).content tag_manifest_content = get_manifest(image).content
return sha256(tag_manifest_content).hexdigest() return sha256(tag_manifest_content).hexdigest()
# XXX Refactor this with regular functions rather than a class
def get_manifest_digest(image_str: str) -> str:
image = parse_image_location(image_str)
return RegistryClient(image).get_manifest_digest(image.tag)
def list_tags(image_str: str) -> list:
return RegistryClient(image_str).list_tags()
def get_manifest(image_str: str) -> bytes:
image = parse_image_location(image_str)
client = RegistryClient(image)
resp = client.get_manifest(image.tag)
return resp.content

View file

@ -152,34 +152,6 @@ def write_log_index(log_index: int) -> None:
f.write(str(log_index)) f.write(str(log_index))
def upgrade_container_image(image: str, manifest_digest: str, pubkey: str) -> bool:
"""Verify and upgrade the image to the latest, if signed."""
update_available, _ = is_update_available(image)
if not update_available:
raise errors.ImageAlreadyUpToDate("The image is already up to date")
signatures = get_remote_signatures(image, manifest_digest)
verify_signatures(signatures, manifest_digest, pubkey)
# Ensure that we only upgrade if the log index is higher than the last known one
incoming_log_index = get_log_index_from_signatures(signatures)
last_log_index = get_last_log_index()
if incoming_log_index < last_log_index:
raise errors.InvalidLogIndex(
"The log index is not higher than the last known one"
)
# let's upgrade the image
# XXX Use the image digest here to avoid race conditions
upgraded = runtime.container_pull(image)
# At this point, the signatures are verified
# We store the signatures just now to avoid storing unverified signatures
store_signatures(signatures, manifest_digest, pubkey)
return upgraded
def _get_blob(tmpdir: str, digest: str) -> Path: def _get_blob(tmpdir: str, digest: str) -> Path:
return Path(tmpdir) / "blobs" / "sha256" / digest.replace("sha256:", "") return Path(tmpdir) / "blobs" / "sha256" / digest.replace("sha256:", "")
@ -252,7 +224,7 @@ def upgrade_container_image_airgapped(container_tar: str, pubkey: str) -> str:
archive.add(Path(tmpdir) / "oci-layout", arcname="oci-layout") archive.add(Path(tmpdir) / "oci-layout", arcname="oci-layout")
archive.add(Path(tmpdir) / "blobs", arcname="blobs") archive.add(Path(tmpdir) / "blobs", arcname="blobs")
runtime.load_image_tarball_file(temporary_tar.name) runtime.load_image_tarball_from_tar(temporary_tar.name)
runtime.tag_image_by_digest(image_digest, image_name) runtime.tag_image_by_digest(image_digest, image_name)
store_signatures(signatures, image_digest, pubkey) store_signatures(signatures, image_digest, pubkey)
@ -431,3 +403,31 @@ def prepare_airgapped_archive(image_name, destination):
with tarfile.open(destination, "w") as archive: with tarfile.open(destination, "w") as archive:
archive.add(tmpdir, arcname=".") archive.add(tmpdir, arcname=".")
def upgrade_container_image(image: str, manifest_digest: str, pubkey: str) -> bool:
"""Verify and upgrade the image to the latest, if signed."""
update_available, _ = is_update_available(image)
if not update_available:
raise errors.ImageAlreadyUpToDate("The image is already up to date")
signatures = get_remote_signatures(image, manifest_digest)
verify_signatures(signatures, manifest_digest, pubkey)
# Ensure that we only upgrade if the log index is higher than the last known one
incoming_log_index = get_log_index_from_signatures(signatures)
last_log_index = get_last_log_index()
if incoming_log_index < last_log_index:
raise errors.InvalidLogIndex(
"The log index is not higher than the last known one"
)
# let's upgrade the image
# XXX Use the image digest here to avoid race conditions
upgraded = runtime.container_pull(image)
# At this point, the signatures are verified
# We store the signatures just now to avoid storing unverified signatures
store_signatures(signatures, manifest_digest, pubkey)
return upgraded