mirror of
https://github.com/freedomofpress/dangerzone.git
synced 2025-05-19 11:40:36 +02:00
Compare commits
21 commits
72075f8cd4
...
eb1821de78
Author | SHA1 | Date | |
---|---|---|---|
eb1821de78 | |||
![]() |
5acb302acf | ||
![]() |
537d23e233 | ||
![]() |
0724f86b13 | ||
![]() |
668ee71895 | ||
![]() |
988971096c | ||
![]() |
5202d79270 | ||
![]() |
ccae6c5b16 | ||
![]() |
aac6c6334a | ||
![]() |
431f0cb803 | ||
![]() |
d667c284c7 | ||
![]() |
379c9f8f00 | ||
![]() |
1e9e468e37 | ||
![]() |
5a4ddb17c9 | ||
![]() |
22d235cabd | ||
![]() |
5001328ae9 | ||
![]() |
db33038c23 | ||
![]() |
6aff845493 | ||
![]() |
7002ab85a0 | ||
![]() |
f6562ae59c | ||
![]() |
27647cc309 |
5 changed files with 28 additions and 27 deletions
|
@ -187,7 +187,7 @@ def container_pull(image: str) -> bool:
|
||||||
return process.returncode == 0
|
return process.returncode == 0
|
||||||
|
|
||||||
|
|
||||||
def get_local_image_digest(image: str) -> Optional[str]:
|
def get_local_image_digest(image: str) -> str:
|
||||||
"""
|
"""
|
||||||
Returns a image hash from a local image name
|
Returns a image hash from a local image name
|
||||||
"""
|
"""
|
||||||
|
@ -204,6 +204,6 @@ def get_local_image_digest(image: str) -> Optional[str]:
|
||||||
)
|
)
|
||||||
return lines[0].replace("sha256:", "")
|
return lines[0].replace("sha256:", "")
|
||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as e:
|
||||||
return None
|
raise errors.ImageNotPresentException(
|
||||||
else:
|
f"The image {image} does not exist locally"
|
||||||
return result.stdout.strip().decode().strip("sha256:")
|
)
|
||||||
|
|
|
@ -34,7 +34,7 @@ predicate: {{
|
||||||
// This condition verifies that the image was generated from
|
// This condition verifies that the image was generated from
|
||||||
// the source repository we expect. Replace this with your
|
// the source repository we expect. Replace this with your
|
||||||
// repository.
|
// repository.
|
||||||
uri: =~"^git\\+https://github.com/{repo}@refs/heads/{branch}"
|
uri: =~"^git\\+https://github.com/{repository}@refs/heads/{branch}"
|
||||||
// Add a condition to check for a specific commit hash
|
// Add a condition to check for a specific commit hash
|
||||||
digest: {{
|
digest: {{
|
||||||
sha1: "{commit}"
|
sha1: "{commit}"
|
||||||
|
@ -45,10 +45,6 @@ predicate: {{
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
def generate_cue_policy(repo, workflow, commit, branch):
|
|
||||||
return CUE_POLICY.format(repo=repo, workflow=workflow, commit=commit, branch=branch)
|
|
||||||
|
|
||||||
|
|
||||||
def verify(
|
def verify(
|
||||||
image_name: str,
|
image_name: str,
|
||||||
branch: str,
|
branch: str,
|
||||||
|
@ -61,7 +57,9 @@ def verify(
|
||||||
on Github runners, and from a given repository.
|
on Github runners, and from a given repository.
|
||||||
"""
|
"""
|
||||||
cosign.ensure_installed()
|
cosign.ensure_installed()
|
||||||
policy = generate_cue_policy(repository, workflow, commit, branch)
|
policy = CUE_POLICY.format(
|
||||||
|
repository=repository, workflow=workflow, commit=commit, branch=branch
|
||||||
|
)
|
||||||
|
|
||||||
# Put the value in files and verify with cosign
|
# Put the value in files and verify with cosign
|
||||||
with (
|
with (
|
||||||
|
|
|
@ -52,3 +52,7 @@ class LocalSignatureNotFound(SignatureError):
|
||||||
|
|
||||||
class CosignNotInstalledError(SignatureError):
|
class CosignNotInstalledError(SignatureError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidLogIndex(SignatureError):
|
||||||
|
pass
|
||||||
|
|
|
@ -52,7 +52,7 @@ def parse_image_location(input_string: str) -> Image:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _get_auth_header(image) -> Dict[str, str]:
|
def _get_auth_header(image: Image) -> Dict[str, str]:
|
||||||
auth_url = f"https://{image.registry}/token"
|
auth_url = f"https://{image.registry}/token"
|
||||||
response = requests.get(
|
response = requests.get(
|
||||||
auth_url,
|
auth_url,
|
||||||
|
@ -66,7 +66,7 @@ def _get_auth_header(image) -> Dict[str, str]:
|
||||||
return {"Authorization": f"Bearer {token}"}
|
return {"Authorization": f"Bearer {token}"}
|
||||||
|
|
||||||
|
|
||||||
def _url(image):
|
def _url(image: Image) -> str:
|
||||||
return f"https://{image.registry}/v2/{image.namespace}/{image.image_name}"
|
return f"https://{image.registry}/v2/{image.namespace}/{image.image_name}"
|
||||||
|
|
||||||
|
|
||||||
|
@ -79,7 +79,7 @@ def list_tags(image_str: str) -> list:
|
||||||
return tags
|
return tags
|
||||||
|
|
||||||
|
|
||||||
def get_manifest(image_str) -> requests.Response:
|
def get_manifest(image_str: str) -> requests.Response:
|
||||||
"""Get manifest information for a specific tag"""
|
"""Get manifest information for a specific tag"""
|
||||||
image = parse_image_location(image_str)
|
image = parse_image_location(image_str)
|
||||||
manifest_url = f"{_url(image)}/manifests/{image.tag}"
|
manifest_url = f"{_url(image)}/manifests/{image.tag}"
|
||||||
|
@ -93,16 +93,13 @@ def get_manifest(image_str) -> requests.Response:
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
def list_manifests(image_str) -> list:
|
def list_manifests(image_str: str) -> list:
|
||||||
return get_manifest(image_str).json().get("manifests")
|
return get_manifest(image_str).json().get("manifests")
|
||||||
|
|
||||||
|
|
||||||
def get_blob(image, digest: str) -> requests.Response:
|
def get_blob(image: Image, digest: str) -> requests.Response:
|
||||||
response = requests.get(
|
response = requests.get(
|
||||||
f"{_url(image)}/blobs/{digest}",
|
f"{_url(image)}/blobs/{digest}", headers=_get_auth_header(image)
|
||||||
headers={
|
|
||||||
"Authorization": f"Bearer {_get_auth_token(image)}",
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
return response
|
return response
|
||||||
|
@ -111,8 +108,7 @@ def get_blob(image, digest: str) -> requests.Response:
|
||||||
def get_manifest_digest(
|
def get_manifest_digest(
|
||||||
image_str: str, tag_manifest_content: Optional[bytes] = None
|
image_str: str, tag_manifest_content: Optional[bytes] = None
|
||||||
) -> str:
|
) -> str:
|
||||||
image = parse_image_location(image_str)
|
|
||||||
if not tag_manifest_content:
|
if not tag_manifest_content:
|
||||||
tag_manifest_content = get_manifest(image).content
|
tag_manifest_content = get_manifest(image_str).content
|
||||||
|
|
||||||
return sha256(tag_manifest_content).hexdigest()
|
return sha256(tag_manifest_content).hexdigest()
|
||||||
|
|
|
@ -108,7 +108,7 @@ def verify_signature(signature: dict, image_digest: str, pubkey: str) -> bool:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def is_update_available(image: str) -> (bool, Optional[str]):
|
def is_update_available(image: str) -> Tuple[bool, Optional[str]]:
|
||||||
remote_digest = registry.get_manifest_digest(image)
|
remote_digest = registry.get_manifest_digest(image)
|
||||||
local_digest = runtime.get_local_image_digest(image)
|
local_digest = runtime.get_local_image_digest(image)
|
||||||
log.debug("Remote digest: %s", remote_digest)
|
log.debug("Remote digest: %s", remote_digest)
|
||||||
|
@ -198,7 +198,7 @@ def upgrade_container_image_airgapped(container_tar: str, pubkey: str) -> str:
|
||||||
in ("dev.cosignproject.cosign/imageIndex", "dev.cosignproject.cosign/image")
|
in ("dev.cosignproject.cosign/imageIndex", "dev.cosignproject.cosign/image")
|
||||||
]
|
]
|
||||||
|
|
||||||
with open(signature_filename, "rb") as f:
|
with open(signature_filename, "r") as f:
|
||||||
image_name, signatures = convert_oci_images_signatures(json.load(f), tmpdir)
|
image_name, signatures = convert_oci_images_signatures(json.load(f), tmpdir)
|
||||||
log.info(f"Found image name: {image_name}")
|
log.info(f"Found image name: {image_name}")
|
||||||
|
|
||||||
|
@ -232,8 +232,8 @@ def upgrade_container_image_airgapped(container_tar: str, pubkey: str) -> str:
|
||||||
|
|
||||||
|
|
||||||
def convert_oci_images_signatures(
|
def convert_oci_images_signatures(
|
||||||
signatures_manifest: List[Dict], tmpdir: str
|
signatures_manifest: Dict, tmpdir: str
|
||||||
) -> (str, List[Dict]):
|
) -> Tuple[str, List[Dict]]:
|
||||||
def _to_cosign_signature(layer: Dict) -> Dict:
|
def _to_cosign_signature(layer: Dict) -> Dict:
|
||||||
signature = layer["annotations"]["dev.cosignproject.cosign/signature"]
|
signature = layer["annotations"]["dev.cosignproject.cosign/signature"]
|
||||||
bundle = json.loads(layer["annotations"]["dev.sigstore.cosign/bundle"])
|
bundle = json.loads(layer["annotations"]["dev.sigstore.cosign/bundle"])
|
||||||
|
@ -252,9 +252,12 @@ def convert_oci_images_signatures(
|
||||||
"RFC3161Timestamp": None,
|
"RFC3161Timestamp": None,
|
||||||
}
|
}
|
||||||
|
|
||||||
layers = signatures_manifest["layers"]
|
layers = signatures_manifest.get("layers", [])
|
||||||
signatures = [_to_cosign_signature(layer) for layer in layers]
|
signatures = [_to_cosign_signature(layer) for layer in layers]
|
||||||
|
|
||||||
|
if not signatures:
|
||||||
|
raise errors.SignatureExtractionError()
|
||||||
|
|
||||||
payload_location = _get_blob(tmpdir, layers[0]["digest"])
|
payload_location = _get_blob(tmpdir, layers[0]["digest"])
|
||||||
with open(payload_location, "r") as f:
|
with open(payload_location, "r") as f:
|
||||||
payload = json.load(f)
|
payload = json.load(f)
|
||||||
|
@ -380,7 +383,7 @@ def get_remote_signatures(image: str, digest: str) -> List[Dict]:
|
||||||
return signatures
|
return signatures
|
||||||
|
|
||||||
|
|
||||||
def prepare_airgapped_archive(image_name, destination):
|
def prepare_airgapped_archive(image_name: str, destination: str):
|
||||||
if "@sha256:" not in image_name:
|
if "@sha256:" not in image_name:
|
||||||
raise errors.AirgappedImageDownloadError(
|
raise errors.AirgappedImageDownloadError(
|
||||||
"The image name must include a digest, e.g. ghcr.io/freedomofpress/dangerzone/dangerzone@sha256:123456"
|
"The image name must include a digest, e.g. ghcr.io/freedomofpress/dangerzone/dangerzone@sha256:123456"
|
||||||
|
|
Loading…
Reference in a new issue