Source code for craft_parts.overlays.chroot

# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
# Copyright 2021 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""Execute a callable in a chroot environment."""

import logging
import multiprocessing
import os
import sys
from collections.abc import Callable
from multiprocessing.connection import Connection
from pathlib import Path
from typing import Any, NamedTuple

from craft_parts.utils import os_utils

from . import errors

logger = logging.getLogger(__name__)


[docs] def chroot( path: Path, target: Callable, *args: Any, **kwargs: Any ) -> Any: # noqa: ANN401 """Execute a callable in a chroot environment. :param path: The new filesystem root. :param target: The callable to run in the chroot environment. :param args: Arguments for target. :param kwargs: Keyword arguments for target. :returns: The target function return value. """ logger.debug("[pid=%d] parent process", os.getpid()) parent_conn, child_conn = multiprocessing.Pipe() child = multiprocessing.Process( target=_runner, args=(Path(path), child_conn, target, args, kwargs) ) logger.debug("[pid=%d] set up chroot", os.getpid()) _setup_chroot(path) try: child.start() res, err = parent_conn.recv() child.join() finally: logger.debug("[pid=%d] clean up chroot", os.getpid()) _cleanup_chroot(path) if isinstance(err, str): raise errors.OverlayChrootExecutionError(err) return res
def _runner( path: Path, conn: Connection, target: Callable, args: tuple, kwargs: dict, ) -> None: """Chroot to the execution directory and call the target function.""" logger.debug("[pid=%d] child process: target=%r", os.getpid(), target) try: logger.debug("[pid=%d] chroot to %r", os.getpid(), path) os.chdir(path) os.chroot(path) res = target(*args, **kwargs) except Exception as exc: # noqa: BLE001 conn.send((None, str(exc))) return conn.send((res, None)) def _setup_chroot(path: Path) -> None: """Prepare the chroot environment before executing the target function.""" logger.debug("setup chroot: %r", path) if sys.platform == "linux": _setup_chroot_linux(path) def _cleanup_chroot(path: Path) -> None: """Clean the chroot environment after executing the target function.""" logger.debug("cleanup chroot: %r", path) if sys.platform == "linux": _cleanup_chroot_linux(path) class _Mount(NamedTuple): """Mount entry for chroot setup.""" fstype: str | None src: str mountpoint: str options: list[str] | None # Essential filesystems to mount in order to have basic utilities and # name resolution working inside the chroot environment. _linux_mounts: list[_Mount] = [ _Mount(None, "/etc/resolv.conf", "/etc/resolv.conf", ["--bind"]), _Mount("proc", "proc", "/proc", None), _Mount("sysfs", "sysfs", "/sys", None), # Device nodes require MS_REC to be bind mounted inside a container. _Mount(None, "/dev", "/dev", ["--rbind", "--make-rprivate"]), ] def _setup_chroot_linux(path: Path) -> None: """Linux-specific chroot environment preparation.""" # Some images (such as cloudimgs) symlink ``/etc/resolv.conf`` to # ``/run/systemd/resolve/stub-resolv.conf``. We want resolv.conf to be # a regular file to bind-mount the host resolver configuration on. # # There's no need to restore the file to its original condition because # this operation happens on a temporary filesystem layer. resolv_conf = path / "etc/resolv.conf" if resolv_conf.is_symlink(): resolv_conf.unlink() resolv_conf.touch() elif not resolv_conf.exists() and resolv_conf.parent.is_dir(): resolv_conf.touch() pid = os.getpid() for entry in _linux_mounts: args = [] if entry.options: args.extend(entry.options) if entry.fstype: args.append(f"-t{entry.fstype}") mountpoint = path / entry.mountpoint.lstrip("/") # Only mount if mountpoint exists. if mountpoint.exists(): logger.debug("[pid=%d] mount %r on chroot", pid, str(mountpoint)) os_utils.mount(entry.src, str(mountpoint), *args) else: logger.debug("[pid=%d] mountpoint %r does not exist", pid, str(mountpoint)) logger.debug("chroot setup complete") def _cleanup_chroot_linux(path: Path) -> None: """Linux-specific chroot environment cleanup.""" pid = os.getpid() for entry in reversed(_linux_mounts): mountpoint = path / entry.mountpoint.lstrip("/") if mountpoint.exists(): logger.debug("[pid=%d] umount: %r", pid, str(mountpoint)) if entry.options and "--rbind" in entry.options: # Mount points under /dev may be in use and make the bind mount # unmountable. This may happen in destructive mode depending on # the host environment, so use MNT_DETACH to defer unmounting. os_utils.umount(str(mountpoint), "--recursive", "--lazy") else: os_utils.umount(str(mountpoint))