Source code for craft_parts.sources.git_source

# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
# Copyright 2015-2021 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""Implement the git source handler."""

import logging
import re
import subprocess
import sys
from pathlib import Path
from typing import Any, Literal, cast

import pydantic
from overrides import overrides
from typing_extensions import Self

from craft_parts.utils.git import get_git_command

from . import errors
from .base import (
    BaseSourceModel,
    SourceHandler,
    get_json_extra_schema,
    get_model_config,
)

logger = logging.getLogger(__name__)


[docs] class GitSourceModel(BaseSourceModel, frozen=True): # type: ignore[misc] """Pydantic model for a git-based source.""" pattern = r"(^git(\+.+:|[@:])|\.git$)" model_config = get_model_config(get_json_extra_schema(r"(^git[+@:]|\.git$)")) source_type: Literal["git"] = "git" source: str source_tag: str | None = None source_commit: str | None = None source_branch: str | None = None source_depth: int = 0 source_submodules: list[str] | None = None # TODO: make these mutually exclusive fields declarative with a jsonschema too. @pydantic.model_validator(mode="after") def _validate_mutually_exclusive_fields(self) -> Self: if self.source_tag and self.source_branch: raise errors.IncompatibleSourceOptions( self.model_fields["source_type"].default, ["source-tag", "source-branch"], ) if self.source_tag and self.source_commit: raise errors.IncompatibleSourceOptions( self.model_fields["source_type"].default, ["source-tag", "source-commit"], ) if self.source_branch and self.source_commit: raise errors.IncompatibleSourceOptions( self.model_fields["source_type"].default, ["source-branch", "source-commit"], ) return self
[docs] class GitSource(SourceHandler): """The git source handler. Retrieve part sources from a git repository. Branch, depth, commit and tag can be specified using part properties ``source-branch``, ``source-depth``, `source-commit``, ``source-tag``, and ``source-submodules``. """ source_model = GitSourceModel
[docs] @classmethod def version(cls) -> str: """Get git version information.""" return subprocess.check_output( [get_git_command(), "version"], universal_newlines=True, stderr=subprocess.DEVNULL, ).strip()
[docs] @classmethod def check_command_installed(cls) -> bool: """Check if git is installed.""" try: cls.version() except FileNotFoundError: return False return True
[docs] @classmethod def generate_version(cls, *, part_src_dir: Path | None = None) -> str: """Return the latest git tag from PWD or defined part_src_dir. The output depends on the use of annotated tags and will return something like: '2.28+git.10.abcdef' where '2.28 is the tag, '+git' indicates there are commits ahead of the tag, in this case it is '10' and the latest commit hash begins with 'abcdef'. If there are no tags or the revision cannot be determined, this will return 0 as the tag and only the commit hash of the latest commit. """ if not part_src_dir: part_src_dir = Path.cwd() encoding = sys.getfilesystemencoding() try: output = ( subprocess.check_output( [get_git_command(), "-C", str(part_src_dir), "describe", "--dirty"], stderr=subprocess.DEVNULL, ) .decode(encoding) .strip() ) except subprocess.CalledProcessError as err: # If we fall into this exception it is because the repo is not # tagged at all. proc = subprocess.Popen( # pylint: disable=consider-using-with [ get_git_command(), "-C", str(part_src_dir), "describe", "--dirty", "--always", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = proc.communicate() if proc.returncode != 0: # This most likely means the project we are in is not driven # by git. raise errors.VCSError(message=stderr.decode(encoding).strip()) from err return f"0+git.{stdout.decode(encoding).strip()}" match = re.search( r"^(?P<tag>[a-zA-Z0-9.+~-]+)-" r"(?P<revs_ahead>\d+)-" r"g(?P<commit>[0-9a-fA-F]+(?:-dirty)?)$", output, ) if not match: # This means we have a pure tag return output tag = match.group("tag") revs_ahead = match.group("revs_ahead") commit = match.group("commit") return f"{tag}+git{revs_ahead}.{commit}"
def __init__( # pylint: disable=too-many-arguments self, source: str, part_src_dir: Path, **kwargs: Any, ) -> None: super().__init__(source, part_src_dir, **kwargs) def _fetch_origin_commit(self) -> None: """Fetch from origin, using source-commit if defined.""" command = [get_git_command(), "-C", str(self.part_src_dir), "fetch", "origin"] if self.source_commit: command.append(self.source_commit) self._run(command) def _get_current_branch(self) -> str: """Get current git branch.""" command = [ get_git_command(), "-C", str(self.part_src_dir), "branch", "--show-current", ] return self._run_output(command) def _pull_existing(self) -> None: """Pull data for an existing repository. For an existing (initialized) local git repository, pull from origin using branch, tag, or commit if defined. `git reset --hard` is preferred over `git pull` to avoid merge and rebase conflicts. If no branch, tag, or commit is defined, then pull from origin/<current-branch>. """ refspec = "HEAD" if self.source_branch: refspec = "refs/remotes/origin/" + self.source_branch elif self.source_tag: refspec = "refs/tags/" + self.source_tag elif self.source_commit: refspec = self.source_commit self._fetch_origin_commit() else: refspec = "refs/remotes/origin/" + self._get_current_branch() command_prefix = [get_git_command(), "-C", str(self.part_src_dir)] command = [*command_prefix, "fetch", "--prune"] if self.source_submodules is None or len(self.source_submodules) > 0: command.append("--recurse-submodules=yes") self._run(command) command = [*command_prefix, "reset", "--hard", refspec] self._run(command) if self.source_submodules is None or len(self.source_submodules) > 0: command = [*command_prefix, "submodule", "update", "--recursive", "--force"] if self.source_submodules: for submodule in self.source_submodules: command.append(submodule) self._run(command) def _clone_new(self) -> None: """Clone a git repository, using submodules, branch, and depth if defined.""" command = [get_git_command(), "clone"] if self.source_submodules is None: command.append("--recursive") else: command.extend( ["--recursive=" + submodule for submodule in self.source_submodules] ) if self.source_tag or self.source_branch: command.extend( ["--branch", cast(str, self.source_tag or self.source_branch)] ) if self.source_depth: command.extend(["--depth", str(self.source_depth)]) # reformat source string command.append(self._format_source()) logger.debug("Executing: %s", " ".join([str(i) for i in command])) self._run([*command, str(self.part_src_dir)]) if self.source_commit: self._fetch_origin_commit() command = [ get_git_command(), "-C", str(self.part_src_dir), "checkout", self.source_commit, ] logger.debug("Executing: %s", " ".join([str(i) for i in command])) self._run(command)
[docs] def is_local(self) -> bool: """Verify whether the git repository is on the local filesystem.""" return Path(self.part_src_dir, ".git").exists()
def _format_source(self) -> str: """Format source to a git-compatible format. Protocols for git are http[s]://, ftp[s]://, git://, ssh://, git+ssh://, and file://. Additionally, scp-style syntax is also valid: [user@]host:path/to/repo) Local sources are formatted as file:///absolute/path/to/local/source This is done because `git clone --depth=1 path/to/local/source` is invalid """ protocol_pattern = re.compile(r"^[\w\-.@+]+:") if protocol_pattern.search(self.source): return self.source return f"file://{Path(self.source).resolve()}"
[docs] @overrides def pull(self) -> None: """Retrieve the local or remote source files.""" if self.is_local(): self._pull_existing() else: self._clone_new() self.source_details = self._get_source_details()
def _get_source_details(self) -> dict[str, str | None]: """Return a dictionary containing current source parameters.""" tag = self.source_tag commit = self.source_commit branch = self.source_branch source = self.source if not tag and not branch and not commit: commit = self._run_output( [get_git_command(), "-C", str(self.part_src_dir), "rev-parse", "HEAD"] ) return { "source-commit": commit, "source-branch": branch, "source": source, "source-tag": tag, }