mirror of
https://github.com/freedomofpress/dangerzone.git
synced 2025-05-19 03:30:35 +02:00
Compare commits
20 commits
5c9a38d370
...
5acb302acf
Author | SHA1 | Date | |
---|---|---|---|
![]() |
5acb302acf | ||
![]() |
537d23e233 | ||
![]() |
0724f86b13 | ||
![]() |
668ee71895 | ||
![]() |
988971096c | ||
![]() |
5202d79270 | ||
![]() |
ccae6c5b16 | ||
![]() |
aac6c6334a | ||
![]() |
431f0cb803 | ||
![]() |
d667c284c7 | ||
![]() |
379c9f8f00 | ||
![]() |
1e9e468e37 | ||
![]() |
5a4ddb17c9 | ||
![]() |
22d235cabd | ||
![]() |
5001328ae9 | ||
![]() |
db33038c23 | ||
![]() |
6aff845493 | ||
![]() |
7002ab85a0 | ||
![]() |
f6562ae59c | ||
![]() |
27647cc309 |
5 changed files with 28 additions and 27 deletions
|
@ -187,7 +187,7 @@ def container_pull(image: str) -> bool:
|
|||
return process.returncode == 0
|
||||
|
||||
|
||||
def get_local_image_digest(image: str) -> Optional[str]:
|
||||
def get_local_image_digest(image: str) -> str:
|
||||
"""
|
||||
Returns a image hash from a local image name
|
||||
"""
|
||||
|
@ -204,6 +204,6 @@ def get_local_image_digest(image: str) -> Optional[str]:
|
|||
)
|
||||
return lines[0].replace("sha256:", "")
|
||||
except subprocess.CalledProcessError as e:
|
||||
return None
|
||||
else:
|
||||
return result.stdout.strip().decode().strip("sha256:")
|
||||
raise errors.ImageNotPresentException(
|
||||
f"The image {image} does not exist locally"
|
||||
)
|
||||
|
|
|
@ -34,7 +34,7 @@ predicate: {{
|
|||
// This condition verifies that the image was generated from
|
||||
// the source repository we expect. Replace this with your
|
||||
// repository.
|
||||
uri: =~"^git\\+https://github.com/{repo}@refs/heads/{branch}"
|
||||
uri: =~"^git\\+https://github.com/{repository}@refs/heads/{branch}"
|
||||
// Add a condition to check for a specific commit hash
|
||||
digest: {{
|
||||
sha1: "{commit}"
|
||||
|
@ -45,10 +45,6 @@ predicate: {{
|
|||
"""
|
||||
|
||||
|
||||
def generate_cue_policy(repo, workflow, commit, branch):
|
||||
return CUE_POLICY.format(repo=repo, workflow=workflow, commit=commit, branch=branch)
|
||||
|
||||
|
||||
def verify(
|
||||
image_name: str,
|
||||
branch: str,
|
||||
|
@ -61,7 +57,9 @@ def verify(
|
|||
on Github runners, and from a given repository.
|
||||
"""
|
||||
cosign.ensure_installed()
|
||||
policy = generate_cue_policy(repository, workflow, commit, branch)
|
||||
policy = CUE_POLICY.format(
|
||||
repository=repository, workflow=workflow, commit=commit, branch=branch
|
||||
)
|
||||
|
||||
# Put the value in files and verify with cosign
|
||||
with (
|
||||
|
|
|
@ -52,3 +52,7 @@ class LocalSignatureNotFound(SignatureError):
|
|||
|
||||
class CosignNotInstalledError(SignatureError):
|
||||
pass
|
||||
|
||||
|
||||
class InvalidLogIndex(SignatureError):
|
||||
pass
|
||||
|
|
|
@ -52,7 +52,7 @@ def parse_image_location(input_string: str) -> Image:
|
|||
)
|
||||
|
||||
|
||||
def _get_auth_header(image) -> Dict[str, str]:
|
||||
def _get_auth_header(image: Image) -> Dict[str, str]:
|
||||
auth_url = f"https://{image.registry}/token"
|
||||
response = requests.get(
|
||||
auth_url,
|
||||
|
@ -66,7 +66,7 @@ def _get_auth_header(image) -> Dict[str, str]:
|
|||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
|
||||
def _url(image):
|
||||
def _url(image: Image) -> str:
|
||||
return f"https://{image.registry}/v2/{image.namespace}/{image.image_name}"
|
||||
|
||||
|
||||
|
@ -79,7 +79,7 @@ def list_tags(image_str: str) -> list:
|
|||
return tags
|
||||
|
||||
|
||||
def get_manifest(image_str) -> requests.Response:
|
||||
def get_manifest(image_str: str) -> requests.Response:
|
||||
"""Get manifest information for a specific tag"""
|
||||
image = parse_image_location(image_str)
|
||||
manifest_url = f"{_url(image)}/manifests/{image.tag}"
|
||||
|
@ -93,16 +93,13 @@ def get_manifest(image_str) -> requests.Response:
|
|||
return response
|
||||
|
||||
|
||||
def list_manifests(image_str) -> list:
|
||||
def list_manifests(image_str: str) -> list:
|
||||
return get_manifest(image_str).json().get("manifests")
|
||||
|
||||
|
||||
def get_blob(image, digest: str) -> requests.Response:
|
||||
def get_blob(image: Image, digest: str) -> requests.Response:
|
||||
response = requests.get(
|
||||
f"{_url(image)}/blobs/{digest}",
|
||||
headers={
|
||||
"Authorization": f"Bearer {_get_auth_token(image)}",
|
||||
},
|
||||
f"{_url(image)}/blobs/{digest}", headers=_get_auth_header(image)
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response
|
||||
|
@ -111,8 +108,7 @@ def get_blob(image, digest: str) -> requests.Response:
|
|||
def get_manifest_digest(
|
||||
image_str: str, tag_manifest_content: Optional[bytes] = None
|
||||
) -> str:
|
||||
image = parse_image_location(image_str)
|
||||
if not tag_manifest_content:
|
||||
tag_manifest_content = get_manifest(image).content
|
||||
tag_manifest_content = get_manifest(image_str).content
|
||||
|
||||
return sha256(tag_manifest_content).hexdigest()
|
||||
|
|
|
@ -108,7 +108,7 @@ def verify_signature(signature: dict, image_digest: str, pubkey: str) -> bool:
|
|||
return False
|
||||
|
||||
|
||||
def is_update_available(image: str) -> (bool, Optional[str]):
|
||||
def is_update_available(image: str) -> Tuple[bool, Optional[str]]:
|
||||
remote_digest = registry.get_manifest_digest(image)
|
||||
local_digest = runtime.get_local_image_digest(image)
|
||||
log.debug("Remote digest: %s", remote_digest)
|
||||
|
@ -198,7 +198,7 @@ def upgrade_container_image_airgapped(container_tar: str, pubkey: str) -> str:
|
|||
in ("dev.cosignproject.cosign/imageIndex", "dev.cosignproject.cosign/image")
|
||||
]
|
||||
|
||||
with open(signature_filename, "rb") as f:
|
||||
with open(signature_filename, "r") as f:
|
||||
image_name, signatures = convert_oci_images_signatures(json.load(f), tmpdir)
|
||||
log.info(f"Found image name: {image_name}")
|
||||
|
||||
|
@ -232,8 +232,8 @@ def upgrade_container_image_airgapped(container_tar: str, pubkey: str) -> str:
|
|||
|
||||
|
||||
def convert_oci_images_signatures(
|
||||
signatures_manifest: List[Dict], tmpdir: str
|
||||
) -> (str, List[Dict]):
|
||||
signatures_manifest: Dict, tmpdir: str
|
||||
) -> Tuple[str, List[Dict]]:
|
||||
def _to_cosign_signature(layer: Dict) -> Dict:
|
||||
signature = layer["annotations"]["dev.cosignproject.cosign/signature"]
|
||||
bundle = json.loads(layer["annotations"]["dev.sigstore.cosign/bundle"])
|
||||
|
@ -252,9 +252,12 @@ def convert_oci_images_signatures(
|
|||
"RFC3161Timestamp": None,
|
||||
}
|
||||
|
||||
layers = signatures_manifest["layers"]
|
||||
layers = signatures_manifest.get("layers", [])
|
||||
signatures = [_to_cosign_signature(layer) for layer in layers]
|
||||
|
||||
if not signatures:
|
||||
raise errors.SignatureExtractionError()
|
||||
|
||||
payload_location = _get_blob(tmpdir, layers[0]["digest"])
|
||||
with open(payload_location, "r") as f:
|
||||
payload = json.load(f)
|
||||
|
@ -380,7 +383,7 @@ def get_remote_signatures(image: str, digest: str) -> List[Dict]:
|
|||
return signatures
|
||||
|
||||
|
||||
def prepare_airgapped_archive(image_name, destination):
|
||||
def prepare_airgapped_archive(image_name: str, destination: str):
|
||||
if "@sha256:" not in image_name:
|
||||
raise errors.AirgappedImageDownloadError(
|
||||
"The image name must include a digest, e.g. ghcr.io/freedomofpress/dangerzone/dangerzone@sha256:123456"
|
||||
|
|
Loading…
Reference in a new issue