Introduce a subprocess_run utility function

This is done to avoid forgetting windows specific arguments when calling `subprocess.run`.
This commit is contained in:
Alexis Métaireau 2025-02-26 16:09:58 +01:00
parent a797a2b6f8
commit d37630d37b
No known key found for this signature in database
GPG key ID: C65C7A89A8FFC56E

View file

@ -56,6 +56,11 @@ class Runtime(object):
return "podman" if platform.system() == "Linux" else "docker" return "podman" if platform.system() == "Linux" else "docker"
def subprocess_run(*args, **kwargs) -> subprocess.CompletedProcess:
"""subprocess.run with the correct startupinfo for Windows."""
return subprocess.run(*args, startupinfo=get_subprocess_startupinfo(), **kwargs)
def get_runtime_version(runtime: Optional[Runtime] = None) -> Tuple[int, int]: def get_runtime_version(runtime: Optional[Runtime] = None) -> Tuple[int, int]:
"""Get the major/minor parts of the Docker/Podman version. """Get the major/minor parts of the Docker/Podman version.
@ -75,9 +80,8 @@ def get_runtime_version(runtime: Optional[Runtime] = None) -> Tuple[int, int]:
cmd = [str(runtime.path), "version", "-f", query] cmd = [str(runtime.path), "version", "-f", query]
try: try:
version = subprocess.run( version = subprocess_run(
cmd, cmd,
startupinfo=get_subprocess_startupinfo(),
capture_output=True, capture_output=True,
check=True, check=True,
).stdout.decode() ).stdout.decode()
@ -193,8 +197,6 @@ def load_image_tarball() -> None:
add_image_tag(bad_tag, good_tag) add_image_tag(bad_tag, good_tag)
delete_image_tag(bad_tag) delete_image_tag(bad_tag)
log.info("Successfully installed container image")
def tag_image_by_digest(digest: str, tag: str) -> None: def tag_image_by_digest(digest: str, tag: str) -> None:
"""Tag a container image by digest. """Tag a container image by digest.
@ -204,7 +206,7 @@ def tag_image_by_digest(digest: str, tag: str) -> None:
image_id = get_image_id_by_digest(digest) image_id = get_image_id_by_digest(digest)
cmd = [str(runtime.path), "tag", image_id, tag] cmd = [str(runtime.path), "tag", image_id, tag]
log.debug(" ".join(cmd)) log.debug(" ".join(cmd))
subprocess.run(cmd, startupinfo=get_subprocess_startupinfo(), check=True) subprocess_run(cmd, check=True)
def get_image_id_by_digest(digest: str) -> str: def get_image_id_by_digest(digest: str) -> str:
@ -221,9 +223,7 @@ def get_image_id_by_digest(digest: str) -> str:
"{{.Id}}", "{{.Id}}",
] ]
log.debug(" ".join(cmd)) log.debug(" ".join(cmd))
process = subprocess.run( process = subprocess_run(cmd, check=True, capture_output=True)
cmd, startupinfo=get_subprocess_startupinfo(), check=True, capture_output=True
)
# In case we have multiple lines, we only want the first one. # In case we have multiple lines, we only want the first one.
return process.stdout.decode().strip().split("\n")[0] return process.stdout.decode().strip().split("\n")[0]
@ -232,10 +232,12 @@ def container_pull(image: str, manifest_digest: str):
"""Pull a container image from a registry.""" """Pull a container image from a registry."""
runtime = Runtime() runtime = Runtime()
cmd = [str(runtime.path), "pull", f"{image}@sha256:{manifest_digest}"] cmd = [str(runtime.path), "pull", f"{image}@sha256:{manifest_digest}"]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE) try:
process.communicate() subprocess_run(cmd, check=True)
if process.returncode != 0: except subprocess.CalledProcessError as e:
raise errors.ContainerPullException(f"Could not pull the container image: {e}") raise errors.ContainerPullException(
f"Could not pull the container image: {e}"
) from e
def get_local_image_digest(image: str) -> str: def get_local_image_digest(image: str) -> str:
@ -249,7 +251,11 @@ def get_local_image_digest(image: str) -> str:
cmd = [str(runtime.path), "images", image, "--format", "{{.Digest}}"] cmd = [str(runtime.path), "images", image, "--format", "{{.Digest}}"]
log.debug(" ".join(cmd)) log.debug(" ".join(cmd))
try: try:
result = subprocess.run(cmd, capture_output=True, check=True) result = subprocess_run(
cmd,
capture_output=True,
check=True,
)
lines = result.stdout.decode().strip().split("\n") lines = result.stdout.decode().strip().split("\n")
if len(lines) != 1: if len(lines) != 1:
raise errors.MultipleImagesFoundException( raise errors.MultipleImagesFoundException(