mirror of
https://github.com/freedomofpress/dangerzone.git
synced 2025-04-28 18:02:38 +02:00
Fixup: remove rntime.py
This commit is contained in:
parent
ad3d0e4182
commit
60c144aab0
1 changed files with 0 additions and 189 deletions
|
@ -1,189 +0,0 @@
|
|||
import gzip
|
||||
import logging
|
||||
import platform
|
||||
import shutil
|
||||
import subprocess
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
from . import errors
|
||||
from .util import get_resource_path, get_subprocess_startupinfo
|
||||
|
||||
CONTAINER_NAME = "dangerzone.rocks/dangerzone"
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_runtime_name() -> str:
|
||||
if platform.system() == "Linux":
|
||||
return "podman"
|
||||
# Windows, Darwin, and unknown use docker for now, dangerzone-vm eventually
|
||||
return "docker"
|
||||
|
||||
|
||||
def get_runtime_version() -> Tuple[int, int]:
|
||||
"""Get the major/minor parts of the Docker/Podman version.
|
||||
|
||||
Some of the operations we perform in this module rely on some Podman features
|
||||
that are not available across all of our platforms. In order to have a proper
|
||||
fallback, we need to know the Podman version. More specifically, we're fine with
|
||||
just knowing the major and minor version, since writing/installing a full-blown
|
||||
semver parser is an overkill.
|
||||
"""
|
||||
# Get the Docker/Podman version, using a Go template.
|
||||
runtime = get_runtime_name()
|
||||
if runtime == "podman":
|
||||
query = "{{.Client.Version}}"
|
||||
else:
|
||||
query = "{{.Server.Version}}"
|
||||
|
||||
cmd = [runtime, "version", "-f", query]
|
||||
try:
|
||||
version = subprocess.run(
|
||||
cmd,
|
||||
startupinfo=get_subprocess_startupinfo(),
|
||||
capture_output=True,
|
||||
check=True,
|
||||
).stdout.decode()
|
||||
except Exception as e:
|
||||
msg = f"Could not get the version of the {runtime.capitalize()} tool: {e}"
|
||||
raise RuntimeError(msg) from e
|
||||
|
||||
# Parse this version and return the major/minor parts, since we don't need the
|
||||
# rest.
|
||||
try:
|
||||
major, minor, _ = version.split(".", 3)
|
||||
return (int(major), int(minor))
|
||||
except Exception as e:
|
||||
msg = (
|
||||
f"Could not parse the version of the {runtime.capitalize()} tool"
|
||||
f" (found: '{version}') due to the following error: {e}"
|
||||
)
|
||||
raise RuntimeError(msg)
|
||||
|
||||
|
||||
def get_runtime() -> str:
|
||||
container_tech = get_runtime_name()
|
||||
runtime = shutil.which(container_tech)
|
||||
if runtime is None:
|
||||
raise errors.NoContainerTechException(container_tech)
|
||||
return runtime
|
||||
|
||||
|
||||
def list_image_tags() -> List[str]:
|
||||
"""Get the tags of all loaded Dangerzone images.
|
||||
|
||||
This method returns a mapping of image tags to image IDs, for all Dangerzone
|
||||
images. This can be useful when we want to find which are the local image tags,
|
||||
and which image ID does the "latest" tag point to.
|
||||
"""
|
||||
return (
|
||||
subprocess.check_output(
|
||||
[
|
||||
get_runtime(),
|
||||
"image",
|
||||
"list",
|
||||
"--format",
|
||||
"{{ .Tag }}",
|
||||
CONTAINER_NAME,
|
||||
],
|
||||
text=True,
|
||||
startupinfo=get_subprocess_startupinfo(),
|
||||
)
|
||||
.strip()
|
||||
.split()
|
||||
)
|
||||
|
||||
|
||||
def delete_image_tag(tag: str) -> None:
|
||||
"""Delete a Dangerzone image tag."""
|
||||
name = CONTAINER_NAME + ":" + tag
|
||||
log.warning(f"Deleting old container image: {name}")
|
||||
try:
|
||||
subprocess.check_output(
|
||||
[get_runtime(), "rmi", "--force", name],
|
||||
startupinfo=get_subprocess_startupinfo(),
|
||||
)
|
||||
except Exception as e:
|
||||
log.warning(
|
||||
f"Couldn't delete old container image '{name}', so leaving it there."
|
||||
f" Original error: {e}"
|
||||
)
|
||||
|
||||
|
||||
def get_expected_tag() -> str:
|
||||
"""Get the tag of the Dangerzone image tarball from the image-id.txt file."""
|
||||
with open(get_resource_path("image-id.txt")) as f:
|
||||
return f.read().strip()
|
||||
|
||||
|
||||
def tag_image_by_digest(digest: str, tag: str) -> None:
|
||||
image_id = get_image_id_by_digest(digest)
|
||||
cmd = [get_runtime(), "tag", image_id, tag]
|
||||
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
|
||||
|
||||
|
||||
def get_image_id_by_digest(digest: str) -> str:
|
||||
cmd = [
|
||||
get_runtime(),
|
||||
"image",
|
||||
"tag",
|
||||
"-f",
|
||||
f'digest="{digest}"',
|
||||
"--format ",
|
||||
"{{.Id}}",
|
||||
]
|
||||
process = subprocess.run(
|
||||
cmd, startupinfo=get_subprocess_startupinfo(), check=True, capture_output=True
|
||||
)
|
||||
return process.stdout.decode().strip()
|
||||
|
||||
|
||||
def load_image_tarball_in_memory(
|
||||
compressed_container_path: Optional[str] = None,
|
||||
) -> None:
|
||||
if compressed_container_path is None:
|
||||
compressed_container_path = get_resource_path("container.tar.gz")
|
||||
|
||||
log.info("Installing Dangerzone container image...")
|
||||
p = subprocess.Popen(
|
||||
[get_runtime(), "load"],
|
||||
stdin=subprocess.PIPE,
|
||||
startupinfo=get_subprocess_startupinfo(),
|
||||
)
|
||||
|
||||
chunk_size = 4 << 20
|
||||
|
||||
with gzip.open(compressed_container_path) as f:
|
||||
while True:
|
||||
chunk = f.read(chunk_size)
|
||||
if len(chunk) > 0:
|
||||
if p.stdin:
|
||||
p.stdin.write(chunk)
|
||||
else:
|
||||
break
|
||||
_, err = p.communicate()
|
||||
if p.returncode < 0:
|
||||
if err:
|
||||
error = err.decode()
|
||||
else:
|
||||
error = "No output"
|
||||
raise errors.ImageInstallationException(
|
||||
f"Could not install container image: {error}"
|
||||
)
|
||||
|
||||
log.info("Successfully installed container image from")
|
||||
|
||||
|
||||
def load_image_tarball_file(container_path: str) -> None:
|
||||
cmd = [get_runtime(), "load", "-i", container_path]
|
||||
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True)
|
||||
|
||||
log.info("Successfully installed container image from %s", container_path)
|
||||
|
||||
|
||||
def container_pull(image: str) -> bool:
|
||||
# XXX - Move to container_utils.py
|
||||
cmd = [get_runtime_name(), "pull", f"{image}"]
|
||||
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
|
||||
process.communicate()
|
||||
return process.returncode == 0
|
Loading…
Reference in a new issue