Source code for craft_parts.executor.step_handler

# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
# Copyright 2016-2022,2024 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""Handle the execution of built-in or user specified step commands."""

import dataclasses
import functools
import json
import logging
import os
import subprocess
import tempfile
import textwrap
import time
from pathlib import Path
from typing import TextIO

from craft_parts import errors, packages
from craft_parts.infos import StepInfo
from craft_parts.parts import Part
from craft_parts.plugins import Plugin
from craft_parts.sources.local_source import SourceHandler
from craft_parts.steps import Step
from craft_parts.utils import file_utils, process

from . import filesets
from .filesets import Fileset
from .migration import migrate_files

logger = logging.getLogger(__name__)

Stream = TextIO | int | None


[docs] @dataclasses.dataclass(frozen=True) class StepContents: """Files and directories to be added to the step's state.""" files: set[str] = dataclasses.field(default_factory=set) dirs: set[str] = dataclasses.field(default_factory=set)
[docs] class StepHandler: """Executes built-in or user-specified step commands. The step handler takes care of the execution of a step, using either a built-in set of actions to be taken, or executing a user-defined script defined in the part specification User-defined scripts may also call the built-in handler for a step by invoking a control utility. This class implements the built-in handlers and a FIFO-based mechanism and API to be used by the external control utility to communicate with the running instance. """ def __init__( self, part: Part, *, step_info: StepInfo, plugin: Plugin, source_handler: SourceHandler | None, env: str, stdout: Stream = None, stderr: Stream = None, partitions: set[str] | None = None, ) -> None: self._part = part self._step_info = step_info self._plugin = plugin self._source_handler = source_handler self._env = env self._stdout = stdout self._stderr = stderr self._partitions = partitions
[docs] def run_builtin(self) -> StepContents: """Run the built-in commands for the current step.""" step = self._step_info.step if step == Step.PULL: handler = self._builtin_pull elif step == Step.OVERLAY: handler = self._builtin_overlay elif step == Step.BUILD: handler = self._builtin_build elif step == Step.STAGE: handler = self._builtin_stage elif step == Step.PRIME: handler = self._builtin_prime else: raise RuntimeError( "Request to run the built-in handler for an invalid step." ) return handler()
def _builtin_pull(self) -> StepContents: if self._source_handler: self._source_handler.pull() pull_commands = self._plugin.get_pull_commands() if pull_commands: try: _create_and_run_script( pull_commands, script_path=self._part.part_run_dir.absolute() / "pull.sh", cwd=self._part.part_src_subdir, stdout=self._stdout, stderr=self._stderr, ) except process.ProcessError as process_error: raise ( errors.PluginPullError(part_name=self._part.name) ) from process_error return StepContents() @staticmethod def _builtin_overlay() -> StepContents: return StepContents() def _builtin_build(self) -> StepContents: # Plugin commands. build_commands = self._plugin.get_build_commands() # save script to set the build environment build_environment_script_path = ( self._part.part_run_dir.absolute() / "environment.sh" ) build_environment_script_path.write_text(self._env) build_environment_script_path.chmod(0o644) try: _create_and_run_script( build_commands, script_path=self._part.part_run_dir.absolute() / "build.sh", build_environment_script_path=build_environment_script_path, cwd=self._part.part_build_subdir, stdout=self._stdout, stderr=self._stderr, ) except process.ProcessError as process_error: raise errors.PluginBuildError( part_name=self._part.name, plugin_name=self._part.plugin_name, stderr=process_error.result.stderr, ) from process_error return StepContents() def _builtin_stage(self) -> StepContents: stage_fileset = Fileset(self._part.spec.stage_files, name="stage") def pkgconfig_fixup(file_path: str) -> None: if os.path.islink(file_path): return if not file_path.endswith(".pc"): return packages.fix_pkg_config( prefix_prepend=self._part.stage_dir, pkg_config_file=Path(file_path), prefix_trim=self._part.part_install_dir, ) if self._partitions: files: set[str] = set() dirs: set[str] = set() for partition in self._partitions: partition_files, partition_dirs = filesets.migratable_filesets( stage_fileset, str(self._part.part_install_dirs[partition]), partition, ) partition_files, partition_dirs = migrate_files( files=partition_files, dirs=partition_dirs, srcdir=self._part.part_install_dirs[partition], destdir=self._part.dirs.get_stage_dir(partition), fixup_func=pkgconfig_fixup, ) files.update(partition_files) dirs.update(partition_dirs) else: files, dirs = filesets.migratable_filesets( stage_fileset, str(self._part.part_install_dir) ) files, dirs = migrate_files( files=files, dirs=dirs, srcdir=self._part.part_install_dir, destdir=self._part.stage_dir, fixup_func=pkgconfig_fixup, ) return StepContents(files, dirs) def _builtin_prime(self) -> StepContents: prime_fileset = Fileset(self._part.spec.prime_files, name="prime") # If we're priming and we don't have an explicit set of files to prime # include the files from the stage step if prime_fileset.entries == ["*"] or len(prime_fileset.includes) == 0: stage_fileset = Fileset(self._part.spec.stage_files, name="stage") prime_fileset.combine(stage_fileset) if self._partitions: files: set[str] = set() dirs: set[str] = set() for partition in self._partitions: partition_files, partition_dirs = filesets.migratable_filesets( prime_fileset, str(self._part.part_install_dirs[partition]), partition, ) srcdir = self._part.dirs.get_stage_dir(partition) destdir = self._part.dirs.get_prime_dir(partition) partition_files, partition_dirs = migrate_files( files=partition_files, dirs=partition_dirs, srcdir=srcdir, destdir=destdir, permissions=self._part.spec.permissions, ) files.update(partition_files) dirs.update(partition_dirs) else: files, dirs = filesets.migratable_filesets( prime_fileset, str(self._part.part_install_dir) ) files, dirs = migrate_files( files=files, dirs=dirs, srcdir=self._part.stage_dir, destdir=self._part.prime_dir, permissions=self._part.spec.permissions, ) return StepContents(files, dirs)
[docs] def run_scriptlet( self, scriptlet: str, *, scriptlet_name: str, step: Step, work_dir: Path, ) -> None: """Execute a scriptlet. :param scriptlet: the scriptlet to run. :param work_dir: the directory where the script will be executed. """ with tempfile.TemporaryDirectory() as tempdir: call_fifo = file_utils.NonBlockingRWFifo( os.path.join(tempdir, "function_call") ) feedback_fifo = file_utils.NonBlockingRWFifo( os.path.join(tempdir, "call_feedback") ) script = textwrap.dedent( f"""\ set -euo pipefail export PARTS_CALL_FIFO={call_fifo.path} export PARTS_FEEDBACK_FIFO={feedback_fifo.path} {self._env} set -x {scriptlet}""" ) # FIXME: refactor ctl protocol server with tempfile.TemporaryFile(mode="w+") as script_file: print(script, file=script_file) script_file.flush() script_file.seek(0) process = subprocess.Popen( # pylint: disable=consider-using-with ["/bin/bash"], stdin=script_file, cwd=work_dir, stdout=self._stdout, stderr=self._stderr, ) status = None try: while status is None: function_call = call_fifo.read() if not function_call: status = process.poll() time.sleep(0.1) # Don't loop TOO busily continue # Handle the function and send feedback to caller. try: retval = self._handle_control_api( step, scriptlet_name, function_call.strip() ) feedback_fifo.write(f"OK {retval!s}\n" if retval else "OK\n") except errors.PartsError as error: feedback_fifo.write(f"ERR {error!s}\n") except Exception as error: logger.debug("scriptlet execution failed: %s", error) raise finally: call_fifo.close() feedback_fifo.close() if process.returncode != 0: raise errors.ScriptletRunError( part_name=self._part.name, scriptlet_name=scriptlet_name, exit_code=status, )
def _handle_control_api( self, step: Step, scriptlet_name: str, function_call: str ) -> str: """Parse the command message received from the client.""" try: function_json = json.loads(function_call) except json.decoder.JSONDecodeError as err: raise RuntimeError( f"{scriptlet_name!r} scriptlet called a function with invalid json: " f"{function_call}" ) from err for attr in ["function", "args"]: if attr not in function_json: raise RuntimeError( f"{scriptlet_name!r} control call missing attribute {attr!r}" ) cmd_name = function_json["function"] cmd_args = function_json["args"] return self._process_api_commands( cmd_name, cmd_args, step=step, scriptlet_name=scriptlet_name ) def _process_api_commands( self, cmd_name: str, cmd_args: list[str], *, step: Step, scriptlet_name: str ) -> str: """Invoke API command actions.""" retval = "" invalid_control_api_call = functools.partial( errors.InvalidControlAPICall, part_name=self._part.name, scriptlet_name=scriptlet_name, ) if cmd_name == "default": if len(cmd_args) > 0: raise invalid_control_api_call( message=f"invalid arguments to command {cmd_name!r}", ) self._execute_builtin_handler(step) elif cmd_name == "set": if len(cmd_args) != 1: raise invalid_control_api_call( message=(f"invalid arguments to command {cmd_name!r}"), ) if "=" not in cmd_args[0]: raise invalid_control_api_call( message=( f"invalid arguments to command {cmd_name!r} (want key=value)" ), ) name, value = cmd_args[0].split("=") try: self._step_info.set_project_var(name, value) except (ValueError, RuntimeError) as err: raise errors.InvalidControlAPICall( part_name=self._part.name, scriptlet_name=scriptlet_name, message=str(err), ) from err elif cmd_name == "get": if len(cmd_args) != 1: raise invalid_control_api_call( message=(f"invalid number of arguments to command {cmd_name!r}"), ) (name,) = cmd_args try: retval = self._step_info.get_project_var(name, raw_read=True) except ValueError as err: raise errors.InvalidControlAPICall( part_name=self._part.name, scriptlet_name=scriptlet_name, message=str(err), ) from err else: raise invalid_control_api_call( message=f"invalid command {cmd_name!r}", ) return retval def _execute_builtin_handler(self, step: Step) -> None: if step == Step.PULL: self._builtin_pull() elif step == Step.OVERLAY: self._builtin_overlay() elif step == Step.BUILD: self._builtin_build() elif step == Step.STAGE: self._builtin_stage() elif step == Step.PRIME: self._builtin_prime()
def _create_and_run_script( commands: list[str], script_path: Path, cwd: Path, stdout: Stream, stderr: Stream, build_environment_script_path: Path | None = None, ) -> None: """Create a script with step-specific commands and execute it.""" with script_path.open("w") as run_file: print("#!/bin/bash", file=run_file) print("set -euo pipefail", file=run_file) if build_environment_script_path: print(f"source {build_environment_script_path}", file=run_file) print("set -x", file=run_file) for cmd in commands: print(cmd, file=run_file) script_path.chmod(0o755) logger.debug("Executing %r", script_path) process.run([script_path], cwd=cwd, stdout=stdout, stderr=stderr, check=True)