# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
# Copyright 2015-2021 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Implement the git source handler."""
import logging
import re
import subprocess
import sys
from pathlib import Path
from typing import Any, Literal, cast
import pydantic
from typing_extensions import Self, override
from craft_parts.utils.git import get_git_command
from . import errors
from .base import (
BaseSourceModel,
SourceHandler,
get_json_extra_schema,
get_model_config,
)
logger = logging.getLogger(__name__)
MAX_COMMIT_LENGTH = 40
[docs]
class GitSourceModel(BaseSourceModel, frozen=True): # type: ignore[misc]
"""Pydantic model for a git-based source."""
pattern = r"(^git(\+.+:|[@:])|\.git$)"
model_config = get_model_config(get_json_extra_schema(r"(^git[+@:]|\.git$)"))
source_type: Literal["git"] = "git"
source: str
source_tag: str | None = None
source_commit: str | None = None
source_branch: str | None = None
source_depth: int = 0
source_submodules: list[str] | None = None
@pydantic.model_validator(mode="after")
def _validate_mutually_exclusive_fields(self) -> Self:
if self.source_tag and self.source_branch:
raise errors.IncompatibleSourceOptions(
type(self).model_fields["source_type"].default,
["source-tag", "source-branch"],
)
if self.source_tag and self.source_commit:
raise errors.IncompatibleSourceOptions(
type(self).model_fields["source_type"].default,
["source-tag", "source-commit"],
)
if self.source_branch and self.source_commit:
raise errors.IncompatibleSourceOptions(
type(self).model_fields["source_type"].default,
["source-branch", "source-commit"],
)
return self
[docs]
class GitSource(SourceHandler):
"""The git source handler.
Retrieve part sources from a git repository. Branch, depth, commit
and tag can be specified using part properties ``source-branch``,
``source-depth``, `source-commit``, ``source-tag``, and ``source-submodules``.
"""
source_model = GitSourceModel
[docs]
@classmethod
def version(cls) -> str:
"""Get git version information."""
return subprocess.check_output(
[get_git_command(), "version"],
universal_newlines=True,
stderr=subprocess.DEVNULL,
).strip()
[docs]
@classmethod
def check_command_installed(cls) -> bool:
"""Check if git is installed."""
try:
cls.version()
except FileNotFoundError:
return False
return True
[docs]
@classmethod
def generate_version(cls, *, part_src_dir: Path | None = None) -> str:
"""Return the latest git tag from PWD or defined part_src_dir.
The output depends on the use of annotated tags and will return
something like: '2.28+git.10.abcdef' where '2.28 is the
tag, '+git' indicates there are commits ahead of the tag, in
this case it is '10' and the latest commit hash begins with
'abcdef'. If there are no tags or the revision cannot be
determined, this will return 0 as the tag and only the commit
hash of the latest commit.
"""
if not part_src_dir:
part_src_dir = Path.cwd()
encoding = sys.getfilesystemencoding()
try:
output = (
subprocess.check_output(
[get_git_command(), "-C", str(part_src_dir), "describe", "--dirty"],
stderr=subprocess.DEVNULL,
)
.decode(encoding)
.strip()
)
except subprocess.CalledProcessError as err:
# If we fall into this exception it is because the repo is not
# tagged at all.
proc = subprocess.Popen( # pylint: disable=consider-using-with
[
get_git_command(),
"-C",
str(part_src_dir),
"describe",
"--dirty",
"--always",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
# This most likely means the project we are in is not driven
# by git.
raise errors.VCSError(message=stderr.decode(encoding).strip()) from err
return f"0+git.{stdout.decode(encoding).strip()}"
match = re.search(
r"^(?P<tag>[a-zA-Z0-9.+~-]+)-"
r"(?P<revs_ahead>\d+)-"
r"g(?P<commit>[0-9a-fA-F]+(?:-dirty)?)$",
output,
)
if not match:
# This means we have a pure tag
return output
tag = match.group("tag")
revs_ahead = match.group("revs_ahead")
commit = match.group("commit")
return f"{tag}+git{revs_ahead}.{commit}"
def __init__( # pylint: disable=too-many-arguments
self,
source: str,
part_src_dir: Path,
**kwargs: Any,
) -> None:
super().__init__(source, part_src_dir, **kwargs)
def _expand_commit(self, commit: str) -> str:
"""Expand a commit hash to full length."""
# short-circuit if the commit is already full length
if len(commit) == MAX_COMMIT_LENGTH:
return commit
try:
# 'git rev-parse' expands non-ambiguous commits to full length (40 characters).
return subprocess.check_output(
[
get_git_command(),
"-C",
str(self.part_src_dir),
"rev-parse",
commit,
],
text=True,
).strip()
except subprocess.CalledProcessError as err:
# 'rev-parse' will fail if the commit is out of the source-depth range, so give a helpful error
if self.source_depth:
raise errors.VCSError(
message=f"Failed to parse commit {commit!r}.",
resolution=(
f"Ensure 'source-commit' is correct, provide a full-length ({MAX_COMMIT_LENGTH} character) commit, "
"or remove the 'source-depth' key from the part."
),
) from err
raise errors.VCSError(
message=f"Failed to parse commit {commit!r}.",
resolution="Ensure 'source-commit' is correct or provide a full-length (40 character) commit.",
) from err
def _fetch_origin_commit(self) -> None:
"""Fetch from origin, using source-commit if defined."""
command = [get_git_command(), "-C", str(self.part_src_dir), "fetch", "origin"]
if self.source_commit:
command.append(self._expand_commit(self.source_commit))
self._run(command)
def _get_current_branch(self) -> str:
"""Get current git branch."""
command = [
get_git_command(),
"-C",
str(self.part_src_dir),
"branch",
"--show-current",
]
return self._run_output(command)
def _pull_existing(self) -> None:
"""Pull data for an existing repository.
For an existing (initialized) local git repository, pull from origin
using branch, tag, or commit if defined.
`git reset --hard` is preferred over `git pull` to avoid
merge and rebase conflicts.
If no branch, tag, or commit is defined, then pull from origin/<current-branch>.
"""
refspec = "HEAD"
if self.source_branch:
refspec = "refs/remotes/origin/" + self.source_branch
elif self.source_tag:
refspec = "refs/tags/" + self.source_tag
elif self.source_commit:
self._fetch_origin_commit()
refspec = self._expand_commit(self.source_commit)
else:
refspec = "refs/remotes/origin/" + self._get_current_branch()
command_prefix = [get_git_command(), "-C", str(self.part_src_dir)]
command = [*command_prefix, "fetch", "--prune"]
if self.source_submodules is None or len(self.source_submodules) > 0:
command.append("--recurse-submodules=yes")
self._run(command)
command = [*command_prefix, "reset", "--hard", refspec]
self._run(command)
if self.source_submodules is None or len(self.source_submodules) > 0:
command = [*command_prefix, "submodule", "update", "--recursive", "--force"]
if self.source_submodules:
for submodule in self.source_submodules:
command.append(submodule)
self._run(command)
def _clone_new(self) -> None:
"""Clone a git repository, using submodules, branch, and depth if defined."""
git_version = self._get_git_version()
# Attempt a shallow fetch for source_commits
# This breaks on older versions of git. 2.34.1 (the default on jammy) is the first
# version shipped on LTS that supports this.
# In the case that this does not succeed, the only impact will be that a deep fetch
# must be performed instead - nothing will break, it will just be less efficient
if self.source_commit and git_version >= (2, 34, 1):
try:
self._clone_at_commit()
except ShallowFetchError:
pass
else:
return
command = [
get_git_command(),
"-c",
"advice.detachedHead=false",
"clone",
]
if self.source_submodules is None:
command.append("--recursive")
else:
command.extend(
["--recursive=" + submodule for submodule in self.source_submodules]
)
if self.source_tag or self.source_branch:
command.extend(
["--branch", cast(str, self.source_tag or self.source_branch)]
)
if self.source_depth:
command.extend(["--depth", str(self.source_depth)])
# reformat source string
command.append(self._format_source())
logger.debug("Executing: %s", " ".join([str(i) for i in command]))
self._run([*command, str(self.part_src_dir)])
# Checkout to a specific commit when shallow cloning wasn't possible
if self.source_commit:
self._fetch_origin_commit()
full_commit = self._expand_commit(self.source_commit)
command = [
get_git_command(),
"-C",
str(self.part_src_dir),
"checkout",
full_commit,
]
logger.debug("Executing: %s", " ".join([str(i) for i in command]))
self._run(command)
def _clone_at_commit(self) -> None:
"""Load a repository at a specific commit.
:raises ShallowFetchError: When a short commit is given, as it is not possible
for git to parse it to a full commit hash without cloning the full repository
anyways.
"""
try:
full_commit = self._expand_commit(self.source_commit)
except errors.VCSError:
if self.source_depth:
logger.warning(
"A shallow clone is not possible with a short hash. "
"To get a shallow clone, provide a full-length "
f"({MAX_COMMIT_LENGTH}-character) hash instead."
)
raise ShallowFetchError
command_prefix = [get_git_command(), "-C", str(self.part_src_dir)]
if not self.part_src_dir.exists():
self.part_src_dir.mkdir()
if not (self.part_src_dir / ".git").exists():
self._run([*command_prefix, "init"])
self._run(
[
*command_prefix,
"remote",
"add",
"origin",
self._format_source(),
]
)
# Fetch the specific commit
command = [*command_prefix, "fetch", "origin", full_commit]
if self.source_depth:
command.extend(["--depth", str(self.source_depth)])
logger.debug("Executing: %s", " ".join([str(i) for i in command]))
self._run(command)
# Checkout to that commit
command = [*command_prefix, "checkout", full_commit]
self._run(command)
# source_submodules can either be None or [], which behave differently.
# submodule update doesn't have the `--recursive=` syntax that clone does,
# so instead we just only run this command for the default behavior
# (None = all modules) or for explicitly requested modules.
if self.source_submodules is None or self.source_submodules:
command = [
*command_prefix,
"submodule",
"update",
"--init",
"--recursive",
]
if self.source_submodules:
command.extend(self.source_submodules)
logger.debug("Executing: %s", " ".join([str(i) for i in command]))
self._run(command)
[docs]
def is_local(self) -> bool:
"""Verify whether the git repository is on the local filesystem."""
return Path(self.part_src_dir, ".git").exists()
def _format_source(self) -> str:
"""Format source to a git-compatible format.
Protocols for git are http[s]://, ftp[s]://, git://, ssh://, git+ssh://, and
file://. Additionally, scp-style syntax is also valid: [user@]host:path/to/repo)
Local sources are formatted as file:///absolute/path/to/local/source
This is done because `git clone --depth=1 path/to/local/source` is invalid
"""
protocol_pattern = re.compile(r"^[\w\-.@+]+:")
if protocol_pattern.search(self.source):
return self.source
return f"file://{Path(self.source).resolve()}"
[docs]
@override
def pull(self) -> None:
"""Retrieve the local or remote source files."""
if self.is_local():
self._pull_existing()
else:
self._clone_new()
self.source_details = self._get_source_details()
def _get_source_details(self) -> dict[str, str | None]:
"""Return a dictionary containing current source parameters."""
tag = self.source_tag
commit = self.source_commit
branch = self.source_branch
source = self.source
if not tag and not branch and not commit:
commit = self._run_output(
[get_git_command(), "-C", str(self.part_src_dir), "rev-parse", "HEAD"]
)
return {
"source-commit": commit,
"source-branch": branch,
"source": source,
"source-tag": tag,
}
@classmethod
def _get_git_version(cls) -> tuple[int, int, int]:
response = cls._run_output([get_git_command(), "version"])
pat = re.compile(r"git version ((?:\d+\.){2}\d+)")
match = re.match(pat, response)
# Default if the version can't be recognized
if not match:
return (0, 0, 0)
version = match.group(1)
# Convert to a tuple of integers for comparison
components = version.split(".", maxsplit=2)
# Ignore the type, the regex already asserts that it will be a 3-piece tuple
# but mypy believes this is `tuple[int, ...]`
return tuple(int(component) for component in components) # type: ignore[return-value]
[docs]
class ShallowFetchError(Exception):
"""A shallow fetch was not possible."""