Source code for craft_parts.sources.git_source

# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
# Copyright 2015-2021 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""Implement the git source handler."""

import logging
import re
import subprocess
import sys
from pathlib import Path
from typing import Any, Literal, cast

import pydantic
from typing_extensions import Self, override

from craft_parts.utils.git import get_git_command

from . import errors
from .base import (
    BaseSourceModel,
    SourceHandler,
    get_json_extra_schema,
    get_model_config,
)

logger = logging.getLogger(__name__)


MAX_COMMIT_LENGTH = 40


[docs] class GitSourceModel(BaseSourceModel, frozen=True): # type: ignore[misc] """Pydantic model for a git-based source.""" pattern = r"(^git(\+.+:|[@:])|\.git$)" model_config = get_model_config(get_json_extra_schema(r"(^git[+@:]|\.git$)")) source_type: Literal["git"] = "git" source: str source_tag: str | None = None source_commit: str | None = None source_branch: str | None = None source_depth: int = 0 source_submodules: list[str] | None = None @pydantic.model_validator(mode="after") def _validate_mutually_exclusive_fields(self) -> Self: if self.source_tag and self.source_branch: raise errors.IncompatibleSourceOptions( type(self).model_fields["source_type"].default, ["source-tag", "source-branch"], ) if self.source_tag and self.source_commit: raise errors.IncompatibleSourceOptions( type(self).model_fields["source_type"].default, ["source-tag", "source-commit"], ) if self.source_branch and self.source_commit: raise errors.IncompatibleSourceOptions( type(self).model_fields["source_type"].default, ["source-branch", "source-commit"], ) return self
[docs] class GitSource(SourceHandler): """The git source handler. Retrieve part sources from a git repository. Branch, depth, commit and tag can be specified using part properties ``source-branch``, ``source-depth``, `source-commit``, ``source-tag``, and ``source-submodules``. """ source_model = GitSourceModel
[docs] @classmethod def version(cls) -> str: """Get git version information.""" return subprocess.check_output( [get_git_command(), "version"], universal_newlines=True, stderr=subprocess.DEVNULL, ).strip()
[docs] @classmethod def check_command_installed(cls) -> bool: """Check if git is installed.""" try: cls.version() except FileNotFoundError: return False return True
[docs] @classmethod def generate_version(cls, *, part_src_dir: Path | None = None) -> str: """Return the latest git tag from PWD or defined part_src_dir. The output depends on the use of annotated tags and will return something like: '2.28+git.10.abcdef' where '2.28 is the tag, '+git' indicates there are commits ahead of the tag, in this case it is '10' and the latest commit hash begins with 'abcdef'. If there are no tags or the revision cannot be determined, this will return 0 as the tag and only the commit hash of the latest commit. """ if not part_src_dir: part_src_dir = Path.cwd() encoding = sys.getfilesystemencoding() try: output = ( subprocess.check_output( [get_git_command(), "-C", str(part_src_dir), "describe", "--dirty"], stderr=subprocess.DEVNULL, ) .decode(encoding) .strip() ) except subprocess.CalledProcessError as err: # If we fall into this exception it is because the repo is not # tagged at all. proc = subprocess.Popen( # pylint: disable=consider-using-with [ get_git_command(), "-C", str(part_src_dir), "describe", "--dirty", "--always", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = proc.communicate() if proc.returncode != 0: # This most likely means the project we are in is not driven # by git. raise errors.VCSError(message=stderr.decode(encoding).strip()) from err return f"0+git.{stdout.decode(encoding).strip()}" match = re.search( r"^(?P<tag>[a-zA-Z0-9.+~-]+)-" r"(?P<revs_ahead>\d+)-" r"g(?P<commit>[0-9a-fA-F]+(?:-dirty)?)$", output, ) if not match: # This means we have a pure tag return output tag = match.group("tag") revs_ahead = match.group("revs_ahead") commit = match.group("commit") return f"{tag}+git{revs_ahead}.{commit}"
def __init__( # pylint: disable=too-many-arguments self, source: str, part_src_dir: Path, **kwargs: Any, ) -> None: super().__init__(source, part_src_dir, **kwargs) def _expand_commit(self, commit: str) -> str: """Expand a commit hash to full length.""" # short-circuit if the commit is already full length if len(commit) == MAX_COMMIT_LENGTH: return commit try: # 'git rev-parse' expands non-ambiguous commits to full length (40 characters). return subprocess.check_output( [ get_git_command(), "-C", str(self.part_src_dir), "rev-parse", commit, ], text=True, ).strip() except subprocess.CalledProcessError as err: # 'rev-parse' will fail if the commit is out of the source-depth range, so give a helpful error if self.source_depth: raise errors.VCSError( message=f"Failed to parse commit {commit!r}.", resolution=( f"Ensure 'source-commit' is correct, provide a full-length ({MAX_COMMIT_LENGTH} character) commit, " "or remove the 'source-depth' key from the part." ), ) from err raise errors.VCSError( message=f"Failed to parse commit {commit!r}.", resolution="Ensure 'source-commit' is correct or provide a full-length (40 character) commit.", ) from err def _fetch_origin_commit(self) -> None: """Fetch from origin, using source-commit if defined.""" command = [get_git_command(), "-C", str(self.part_src_dir), "fetch", "origin"] if self.source_commit: command.append(self._expand_commit(self.source_commit)) self._run(command) def _get_current_branch(self) -> str: """Get current git branch.""" command = [ get_git_command(), "-C", str(self.part_src_dir), "branch", "--show-current", ] return self._run_output(command) def _pull_existing(self) -> None: """Pull data for an existing repository. For an existing (initialized) local git repository, pull from origin using branch, tag, or commit if defined. `git reset --hard` is preferred over `git pull` to avoid merge and rebase conflicts. If no branch, tag, or commit is defined, then pull from origin/<current-branch>. """ refspec = "HEAD" if self.source_branch: refspec = "refs/remotes/origin/" + self.source_branch elif self.source_tag: refspec = "refs/tags/" + self.source_tag elif self.source_commit: self._fetch_origin_commit() refspec = self._expand_commit(self.source_commit) else: refspec = "refs/remotes/origin/" + self._get_current_branch() command_prefix = [get_git_command(), "-C", str(self.part_src_dir)] command = [*command_prefix, "fetch", "--prune"] if self.source_submodules is None or len(self.source_submodules) > 0: command.append("--recurse-submodules=yes") self._run(command) command = [*command_prefix, "reset", "--hard", refspec] self._run(command) if self.source_submodules is None or len(self.source_submodules) > 0: command = [*command_prefix, "submodule", "update", "--recursive", "--force"] if self.source_submodules: for submodule in self.source_submodules: command.append(submodule) self._run(command) def _clone_new(self) -> None: """Clone a git repository, using submodules, branch, and depth if defined.""" git_version = self._get_git_version() # Attempt a shallow fetch for source_commits # This breaks on older versions of git. 2.34.1 (the default on jammy) is the first # version shipped on LTS that supports this. # In the case that this does not succeed, the only impact will be that a deep fetch # must be performed instead - nothing will break, it will just be less efficient if self.source_commit and git_version >= (2, 34, 1): try: self._clone_at_commit() except ShallowFetchError: pass else: return command = [ get_git_command(), "-c", "advice.detachedHead=false", "clone", ] if self.source_submodules is None: command.append("--recursive") else: command.extend( ["--recursive=" + submodule for submodule in self.source_submodules] ) if self.source_tag or self.source_branch: command.extend( ["--branch", cast(str, self.source_tag or self.source_branch)] ) if self.source_depth: command.extend(["--depth", str(self.source_depth)]) # reformat source string command.append(self._format_source()) logger.debug("Executing: %s", " ".join([str(i) for i in command])) self._run([*command, str(self.part_src_dir)]) # Checkout to a specific commit when shallow cloning wasn't possible if self.source_commit: self._fetch_origin_commit() full_commit = self._expand_commit(self.source_commit) command = [ get_git_command(), "-C", str(self.part_src_dir), "checkout", full_commit, ] logger.debug("Executing: %s", " ".join([str(i) for i in command])) self._run(command) def _clone_at_commit(self) -> None: """Load a repository at a specific commit. :raises ShallowFetchError: When a short commit is given, as it is not possible for git to parse it to a full commit hash without cloning the full repository anyways. """ try: full_commit = self._expand_commit(self.source_commit) except errors.VCSError: if self.source_depth: logger.warning( "A shallow clone is not possible with a short hash. " "To get a shallow clone, provide a full-length " f"({MAX_COMMIT_LENGTH}-character) hash instead." ) raise ShallowFetchError command_prefix = [get_git_command(), "-C", str(self.part_src_dir)] if not self.part_src_dir.exists(): self.part_src_dir.mkdir() if not (self.part_src_dir / ".git").exists(): self._run([*command_prefix, "init"]) self._run( [ *command_prefix, "remote", "add", "origin", self._format_source(), ] ) # Fetch the specific commit command = [*command_prefix, "fetch", "origin", full_commit] if self.source_depth: command.extend(["--depth", str(self.source_depth)]) logger.debug("Executing: %s", " ".join([str(i) for i in command])) self._run(command) # Checkout to that commit command = [*command_prefix, "checkout", full_commit] self._run(command) # source_submodules can either be None or [], which behave differently. # submodule update doesn't have the `--recursive=` syntax that clone does, # so instead we just only run this command for the default behavior # (None = all modules) or for explicitly requested modules. if self.source_submodules is None or self.source_submodules: command = [ *command_prefix, "submodule", "update", "--init", "--recursive", ] if self.source_submodules: command.extend(self.source_submodules) logger.debug("Executing: %s", " ".join([str(i) for i in command])) self._run(command)
[docs] def is_local(self) -> bool: """Verify whether the git repository is on the local filesystem.""" return Path(self.part_src_dir, ".git").exists()
def _format_source(self) -> str: """Format source to a git-compatible format. Protocols for git are http[s]://, ftp[s]://, git://, ssh://, git+ssh://, and file://. Additionally, scp-style syntax is also valid: [user@]host:path/to/repo) Local sources are formatted as file:///absolute/path/to/local/source This is done because `git clone --depth=1 path/to/local/source` is invalid """ protocol_pattern = re.compile(r"^[\w\-.@+]+:") if protocol_pattern.search(self.source): return self.source return f"file://{Path(self.source).resolve()}"
[docs] @override def pull(self) -> None: """Retrieve the local or remote source files.""" if self.is_local(): self._pull_existing() else: self._clone_new() self.source_details = self._get_source_details()
def _get_source_details(self) -> dict[str, str | None]: """Return a dictionary containing current source parameters.""" tag = self.source_tag commit = self.source_commit branch = self.source_branch source = self.source if not tag and not branch and not commit: commit = self._run_output( [get_git_command(), "-C", str(self.part_src_dir), "rev-parse", "HEAD"] ) return { "source-commit": commit, "source-branch": branch, "source": source, "source-tag": tag, } @classmethod def _get_git_version(cls) -> tuple[int, int, int]: response = cls._run_output([get_git_command(), "version"]) pat = re.compile(r"git version ((?:\d+\.){2}\d+)") match = re.match(pat, response) # Default if the version can't be recognized if not match: return (0, 0, 0) version = match.group(1) # Convert to a tuple of integers for comparison components = version.split(".", maxsplit=2) # Ignore the type, the regex already asserts that it will be a 3-piece tuple # but mypy believes this is `tuple[int, ...]` return tuple(int(component) for component in components) # type: ignore[return-value]
[docs] class ShallowFetchError(Exception): """A shallow fetch was not possible."""