Compare commits

..

No commits in common. "5acb302acfceeb93c6738b4235a0323966f41a4f" and "5c9a38d3706ee985be95cb287cd2a6ba6ed40ce5" have entirely different histories.

5 changed files with 27 additions and 28 deletions

View file

@ -187,7 +187,7 @@ def container_pull(image: str) -> bool:
return process.returncode == 0 return process.returncode == 0
def get_local_image_digest(image: str) -> str: def get_local_image_digest(image: str) -> Optional[str]:
""" """
Returns a image hash from a local image name Returns a image hash from a local image name
""" """
@ -204,6 +204,6 @@ def get_local_image_digest(image: str) -> str:
) )
return lines[0].replace("sha256:", "") return lines[0].replace("sha256:", "")
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
raise errors.ImageNotPresentException( return None
f"The image {image} does not exist locally" else:
) return result.stdout.strip().decode().strip("sha256:")

View file

@ -34,7 +34,7 @@ predicate: {{
// This condition verifies that the image was generated from // This condition verifies that the image was generated from
// the source repository we expect. Replace this with your // the source repository we expect. Replace this with your
// repository. // repository.
uri: =~"^git\\+https://github.com/{repository}@refs/heads/{branch}" uri: =~"^git\\+https://github.com/{repo}@refs/heads/{branch}"
// Add a condition to check for a specific commit hash // Add a condition to check for a specific commit hash
digest: {{ digest: {{
sha1: "{commit}" sha1: "{commit}"
@ -45,6 +45,10 @@ predicate: {{
""" """
def generate_cue_policy(repo, workflow, commit, branch):
return CUE_POLICY.format(repo=repo, workflow=workflow, commit=commit, branch=branch)
def verify( def verify(
image_name: str, image_name: str,
branch: str, branch: str,
@ -57,9 +61,7 @@ def verify(
on Github runners, and from a given repository. on Github runners, and from a given repository.
""" """
cosign.ensure_installed() cosign.ensure_installed()
policy = CUE_POLICY.format( policy = generate_cue_policy(repository, workflow, commit, branch)
repository=repository, workflow=workflow, commit=commit, branch=branch
)
# Put the value in files and verify with cosign # Put the value in files and verify with cosign
with ( with (

View file

@ -52,7 +52,3 @@ class LocalSignatureNotFound(SignatureError):
class CosignNotInstalledError(SignatureError): class CosignNotInstalledError(SignatureError):
pass pass
class InvalidLogIndex(SignatureError):
pass

View file

@ -52,7 +52,7 @@ def parse_image_location(input_string: str) -> Image:
) )
def _get_auth_header(image: Image) -> Dict[str, str]: def _get_auth_header(image) -> Dict[str, str]:
auth_url = f"https://{image.registry}/token" auth_url = f"https://{image.registry}/token"
response = requests.get( response = requests.get(
auth_url, auth_url,
@ -66,7 +66,7 @@ def _get_auth_header(image: Image) -> Dict[str, str]:
return {"Authorization": f"Bearer {token}"} return {"Authorization": f"Bearer {token}"}
def _url(image: Image) -> str: def _url(image):
return f"https://{image.registry}/v2/{image.namespace}/{image.image_name}" return f"https://{image.registry}/v2/{image.namespace}/{image.image_name}"
@ -79,7 +79,7 @@ def list_tags(image_str: str) -> list:
return tags return tags
def get_manifest(image_str: str) -> requests.Response: def get_manifest(image_str) -> requests.Response:
"""Get manifest information for a specific tag""" """Get manifest information for a specific tag"""
image = parse_image_location(image_str) image = parse_image_location(image_str)
manifest_url = f"{_url(image)}/manifests/{image.tag}" manifest_url = f"{_url(image)}/manifests/{image.tag}"
@ -93,13 +93,16 @@ def get_manifest(image_str: str) -> requests.Response:
return response return response
def list_manifests(image_str: str) -> list: def list_manifests(image_str) -> list:
return get_manifest(image_str).json().get("manifests") return get_manifest(image_str).json().get("manifests")
def get_blob(image: Image, digest: str) -> requests.Response: def get_blob(image, digest: str) -> requests.Response:
response = requests.get( response = requests.get(
f"{_url(image)}/blobs/{digest}", headers=_get_auth_header(image) f"{_url(image)}/blobs/{digest}",
headers={
"Authorization": f"Bearer {_get_auth_token(image)}",
},
) )
response.raise_for_status() response.raise_for_status()
return response return response
@ -108,7 +111,8 @@ def get_blob(image: Image, digest: str) -> requests.Response:
def get_manifest_digest( def get_manifest_digest(
image_str: str, tag_manifest_content: Optional[bytes] = None image_str: str, tag_manifest_content: Optional[bytes] = None
) -> str: ) -> str:
image = parse_image_location(image_str)
if not tag_manifest_content: if not tag_manifest_content:
tag_manifest_content = get_manifest(image_str).content tag_manifest_content = get_manifest(image).content
return sha256(tag_manifest_content).hexdigest() return sha256(tag_manifest_content).hexdigest()

View file

@ -108,7 +108,7 @@ def verify_signature(signature: dict, image_digest: str, pubkey: str) -> bool:
return False return False
def is_update_available(image: str) -> Tuple[bool, Optional[str]]: def is_update_available(image: str) -> (bool, Optional[str]):
remote_digest = registry.get_manifest_digest(image) remote_digest = registry.get_manifest_digest(image)
local_digest = runtime.get_local_image_digest(image) local_digest = runtime.get_local_image_digest(image)
log.debug("Remote digest: %s", remote_digest) log.debug("Remote digest: %s", remote_digest)
@ -198,7 +198,7 @@ def upgrade_container_image_airgapped(container_tar: str, pubkey: str) -> str:
in ("dev.cosignproject.cosign/imageIndex", "dev.cosignproject.cosign/image") in ("dev.cosignproject.cosign/imageIndex", "dev.cosignproject.cosign/image")
] ]
with open(signature_filename, "r") as f: with open(signature_filename, "rb") as f:
image_name, signatures = convert_oci_images_signatures(json.load(f), tmpdir) image_name, signatures = convert_oci_images_signatures(json.load(f), tmpdir)
log.info(f"Found image name: {image_name}") log.info(f"Found image name: {image_name}")
@ -232,8 +232,8 @@ def upgrade_container_image_airgapped(container_tar: str, pubkey: str) -> str:
def convert_oci_images_signatures( def convert_oci_images_signatures(
signatures_manifest: Dict, tmpdir: str signatures_manifest: List[Dict], tmpdir: str
) -> Tuple[str, List[Dict]]: ) -> (str, List[Dict]):
def _to_cosign_signature(layer: Dict) -> Dict: def _to_cosign_signature(layer: Dict) -> Dict:
signature = layer["annotations"]["dev.cosignproject.cosign/signature"] signature = layer["annotations"]["dev.cosignproject.cosign/signature"]
bundle = json.loads(layer["annotations"]["dev.sigstore.cosign/bundle"]) bundle = json.loads(layer["annotations"]["dev.sigstore.cosign/bundle"])
@ -252,12 +252,9 @@ def convert_oci_images_signatures(
"RFC3161Timestamp": None, "RFC3161Timestamp": None,
} }
layers = signatures_manifest.get("layers", []) layers = signatures_manifest["layers"]
signatures = [_to_cosign_signature(layer) for layer in layers] signatures = [_to_cosign_signature(layer) for layer in layers]
if not signatures:
raise errors.SignatureExtractionError()
payload_location = _get_blob(tmpdir, layers[0]["digest"]) payload_location = _get_blob(tmpdir, layers[0]["digest"])
with open(payload_location, "r") as f: with open(payload_location, "r") as f:
payload = json.load(f) payload = json.load(f)
@ -383,7 +380,7 @@ def get_remote_signatures(image: str, digest: str) -> List[Dict]:
return signatures return signatures
def prepare_airgapped_archive(image_name: str, destination: str): def prepare_airgapped_archive(image_name, destination):
if "@sha256:" not in image_name: if "@sha256:" not in image_name:
raise errors.AirgappedImageDownloadError( raise errors.AirgappedImageDownloadError(
"The image name must include a digest, e.g. ghcr.io/freedomofpress/dangerzone/dangerzone@sha256:123456" "The image name must include a digest, e.g. ghcr.io/freedomofpress/dangerzone/dangerzone@sha256:123456"