mirror of
https://github.com/freedomofpress/dangerzone.git
synced 2025-04-28 18:02:38 +02:00
Locally store the signatures for oci-images archives
On air-gapped environements, it's now possible to load signatures generated by `cosign save` commands. The signatures embedded in this format will be converted to the one used by `cosign download signature`.
This commit is contained in:
parent
087e5bd1ad
commit
8ae4af8698
3 changed files with 70 additions and 17 deletions
|
@ -155,6 +155,9 @@ def load_image_tarball_file(tarball_path: str) -> None:
|
|||
|
||||
|
||||
def tag_image_by_digest(digest: str, tag: str) -> None:
|
||||
"""Tag a container image by digest.
|
||||
The sha256: prefix should be omitted from the digest.
|
||||
"""
|
||||
image_id = get_image_id_by_digest(digest)
|
||||
cmd = [get_runtime(), "tag", image_id, tag]
|
||||
log.debug(" ".join(cmd))
|
||||
|
@ -162,11 +165,14 @@ def tag_image_by_digest(digest: str, tag: str) -> None:
|
|||
|
||||
|
||||
def get_image_id_by_digest(digest: str) -> str:
|
||||
"""Get an image ID from a digest.
|
||||
The sha256: prefix should be omitted from the digest.
|
||||
"""
|
||||
cmd = [
|
||||
get_runtime(),
|
||||
"images",
|
||||
"-f",
|
||||
f"digest={digest}",
|
||||
f"digest=sha256:{digest}",
|
||||
"--format",
|
||||
"{{.Id}}",
|
||||
]
|
||||
|
@ -174,7 +180,9 @@ def get_image_id_by_digest(digest: str) -> str:
|
|||
process = subprocess.run(
|
||||
cmd, startupinfo=get_subprocess_startupinfo(), check=True, capture_output=True
|
||||
)
|
||||
return process.stdout.decode().strip()
|
||||
breakpoint()
|
||||
# In case we have multiple lines, we only want the first one.
|
||||
return process.stdout.decode().strip().split("\n")[0]
|
||||
|
||||
|
||||
def container_pull(image: str) -> bool:
|
||||
|
|
|
@ -22,6 +22,10 @@ class SignatureVerificationError(SignatureError):
|
|||
pass
|
||||
|
||||
|
||||
class SignatureExtractionError(SignatureError):
|
||||
pass
|
||||
|
||||
|
||||
class SignaturesFolderDoesNotExist(SignatureError):
|
||||
pass
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@ import platform
|
|||
import re
|
||||
import subprocess
|
||||
import tarfile
|
||||
from base64 import b64decode
|
||||
from base64 import b64decode, b64encode
|
||||
from hashlib import sha256
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
|
@ -156,6 +156,10 @@ def upgrade_container_image(image: str, manifest_hash: str, pubkey: str) -> bool
|
|||
return runtime.container_pull(image)
|
||||
|
||||
|
||||
def _get_blob(tmpdir: str, hash: str) -> Path:
|
||||
return Path(tmpdir) / "blobs" / "sha256" / hash.replace("sha256:", "")
|
||||
|
||||
|
||||
def upgrade_container_image_airgapped(
|
||||
container_tar: str, pubkey: str, image_name: str
|
||||
) -> bool:
|
||||
|
@ -166,8 +170,19 @@ def upgrade_container_image_airgapped(
|
|||
Right now, the archive is extracted and reconstructed, requiring some space
|
||||
on the filesystem.
|
||||
"""
|
||||
|
||||
# XXX Use a memory buffer instead of the filesystem
|
||||
with TemporaryDirectory() as tmpdir:
|
||||
|
||||
def _get_signature_filename(manifests: List[Dict]) -> Path:
|
||||
for manifest in manifests:
|
||||
if (
|
||||
manifest["annotations"].get("kind")
|
||||
== "dev.cosignproject.cosign/sigs"
|
||||
):
|
||||
return _get_blob(tmpdir, manifest["digest"])
|
||||
raise errors.SignatureExtractionError()
|
||||
|
||||
with tarfile.open(container_tar, "r") as archive:
|
||||
archive.extractall(tmpdir)
|
||||
|
||||
|
@ -179,14 +194,19 @@ def upgrade_container_image_airgapped(
|
|||
# Remove the signatures from the archive.
|
||||
with open(Path(tmpdir) / "index.json") as f:
|
||||
index_json = json.load(f)
|
||||
index_json["manifests"] = [
|
||||
manifest
|
||||
for manifest in index_json["manifests"]
|
||||
if manifest["annotations"].get("kind")
|
||||
!= "dev.cosignproject.cosign/sigs"
|
||||
]
|
||||
|
||||
image_digest = index_json["manifests"][0].get("digest")
|
||||
signature_filename = _get_signature_filename(index_json["manifests"])
|
||||
|
||||
index_json["manifests"] = [
|
||||
manifest
|
||||
for manifest in index_json["manifests"]
|
||||
if manifest["annotations"].get("kind") != "dev.cosignproject.cosign/sigs"
|
||||
]
|
||||
|
||||
with open(signature_filename, "rb") as f:
|
||||
signatures = convert_oci_images_signatures(json.load(f), tmpdir)
|
||||
|
||||
image_digest = index_json["manifests"][0].get("digest").replace("sha256:", "")
|
||||
|
||||
# Write the new index.json to the temp folder
|
||||
with open(Path(tmpdir) / "index.json", "w") as f:
|
||||
|
@ -202,15 +222,34 @@ def upgrade_container_image_airgapped(
|
|||
runtime.load_image_tarball_file(temporary_tar.name)
|
||||
runtime.tag_image_by_digest(image_digest, image_name)
|
||||
|
||||
# XXX Convert the signatures to the expected format
|
||||
|
||||
# At this point, the signatures are verified
|
||||
# We store the signatures just now to avoid storing unverified signatures
|
||||
# store_signatures(signatures, image_hash, pubkey)
|
||||
|
||||
store_signatures(signatures, image_digest, pubkey)
|
||||
return True
|
||||
|
||||
|
||||
def convert_oci_images_signatures(
|
||||
signatures_manifest: List[Dict], tmpdir: str
|
||||
) -> List[Dict]:
|
||||
def _to_cosign_signature(layer: Dict) -> Dict:
|
||||
signature = layer["annotations"]["dev.cosignproject.cosign/signature"]
|
||||
bundle = json.loads(layer["annotations"]["dev.sigstore.cosign/bundle"])
|
||||
payload_body = json.loads(b64decode(bundle["Payload"]["body"]))
|
||||
|
||||
payload_location = _get_blob(tmpdir, layer["digest"])
|
||||
with open(payload_location, "rb") as f:
|
||||
payload_b64 = b64encode(f.read()).decode()
|
||||
|
||||
return {
|
||||
"Base64Signature": payload_body["spec"]["signature"]["content"],
|
||||
"Payload": payload_b64,
|
||||
"Cert": None,
|
||||
"Chain": None,
|
||||
"rekorBundle": bundle,
|
||||
"RFC3161Timestamp": None,
|
||||
}
|
||||
|
||||
return [_to_cosign_signature(layer) for layer in signatures_manifest["layers"]]
|
||||
|
||||
|
||||
def get_file_hash(file: Optional[str] = None, content: Optional[bytes] = None) -> str:
|
||||
"""Get the sha256 hash of a file or content"""
|
||||
if not file and not content:
|
||||
|
@ -268,7 +307,9 @@ def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str) -> No
|
|||
raise errors.InvalidSignatures("Signatures do not share the same image hash")
|
||||
|
||||
if f"sha256:{image_hash}" != hashes[0]:
|
||||
raise errors.SignatureMismatch("Signatures do not match the given image hash")
|
||||
raise errors.SignatureMismatch(
|
||||
f"Signatures do not match the given image hash ({image_hash}, {hashes[0]})"
|
||||
)
|
||||
|
||||
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
|
||||
pubkey_signatures.mkdir(exist_ok=True)
|
||||
|
|
Loading…
Reference in a new issue