# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
# Copyright 2015-2021,2024 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Definitions and helpers to handle filesets."""
import os
from glob import iglob
from craft_parts import errors, features
from craft_parts.utils import path_utils
[docs]
class Fileset:
"""A class that represents a list of filepath strings to include or exclude.
Filepaths to include do not begin with a hyphen.
Filepaths to exclude begin with a hyphen.
"""
def __init__(self, entries: list[str], *, name: str = "") -> None:
"""Initialize a fileset.
If the partition feature is enabled, files in the default partition are
normalized to begin with `(default)/`. For example, ["foo", "(default)/bar"]
is normalized to ["(default)/foo", "(default)/bar"].
:param entries: List of filepaths represented as strings.
:param name: Name of the fileset.
:raises FilesetError: If any entry is an absolute filepath.
"""
self._name = name
self._validate_entries(entries)
self._list: list[str] = [_normalize_entry(entry) for entry in entries]
def __repr__(self) -> str:
return f"Fileset({self._list!r}, name={self._name!r})"
@property
def name(self) -> str:
"""Return the fileset name."""
return self._name
@property
def entries(self) -> list[str]:
"""Return the list of entries in this fileset."""
return self._list.copy()
@property
def includes(self) -> list[str]:
"""Return the list of files to be included."""
return [x for x in self._list if x[0] != "-"]
@property
def excludes(self) -> list[str]:
"""Return the list of files to be excluded."""
return [x[1:] for x in self._list if x[0] == "-"]
[docs]
def remove(self, item: str) -> None:
"""Remove this entry from the list of files.
:param item: The item to remove.
"""
self._list.remove(_normalize_entry(item))
[docs]
def combine(self, other: "Fileset") -> None:
"""Combine the entries in this fileset with entries from another fileset.
:param other: The fileset to combine with.
"""
to_combine = False
# combine if the fileset has a wildcard
if "*" in self.entries:
to_combine = True
self.remove("*")
other_excludes = set(other.excludes)
my_includes = set(self.includes)
contradicting_set = set.intersection(other_excludes, my_includes)
if contradicting_set:
raise errors.FilesetConflict(contradicting_set)
# combine if the fileset is only excludes
if {x[0] for x in self.entries} == set("-"):
to_combine = True
if to_combine:
self._list = list(set(self._list + other.entries))
def _validate_entries(self, entries: list[str]) -> None:
"""Validate that entries are not absolute filepaths.
:param entries: List of entries to validate.
:raises FilesetError: If `entries` contains an absolute filepath.
"""
for entry in entries:
filepath = entry[1:] if entry[0] == "-" else entry
if os.path.isabs(filepath):
raise errors.FilesetError(
name=self.name, message=f"path {filepath!r} must be relative."
)
[docs]
def migratable_filesets(
fileset: Fileset, srcdir: str, partition: str | None = None
) -> tuple[set[str], set[str]]:
"""Determine the files to migrate from a directory based on a fileset.
:param fileset: The fileset used to filter files in the srcdir.
:param srcdir: Directory containing files to migrate.
:param partition: If provided, only get files to migrate to the partition.
:return: A tuple containing the set of files and the set of directories to migrate.
"""
includes, excludes = _get_file_list(fileset, partition)
include_files = _generate_include_set(srcdir, includes)
exclude_files, exclude_dirs = _generate_exclude_set(srcdir, excludes)
files = include_files - exclude_files
for exclude_dir in exclude_dirs:
files = {x for x in files if not x.startswith(exclude_dir + "/")}
# Separate dirs from files.
dirs = {
x
for x in files
if os.path.isdir(os.path.join(srcdir, x))
and not os.path.islink(os.path.join(srcdir, x))
}
# Remove dirs from files.
files = files - dirs
# Include (resolved) parent directories for each selected file.
for _filename in files:
filename = _get_resolved_relative_path(_filename, srcdir)
dirname = os.path.dirname(filename)
while dirname:
dirs.add(dirname)
dirname = os.path.dirname(dirname)
# Resolve parent paths for dirs and files.
resolved_dirs = set()
for dirname in dirs:
resolved_dirs.add(_get_resolved_relative_path(dirname, srcdir))
resolved_files = set()
for filename in files:
resolved_files.add(_get_resolved_relative_path(filename, srcdir))
return resolved_files, resolved_dirs
def _get_file_list(
fileset: Fileset, partition: str | None
) -> tuple[list[str], list[str]]:
"""Split a fileset to obtain include and exclude file filters.
If the fileset does not include any files, then the include list will contain a
single wildcard: ["*"].
:param fileset: The fileset to split.
:param partition: If provided, only get the file list for files in the partition.
:return: A tuple containing the include and exclude lists.
:raises FeatureError: If the partition feature is enabled but no partition is
provided or if a partition is provided but the partition feature is not enabled.
"""
if features.Features().enable_partitions and not partition:
raise errors.FeatureError(
message=(
"A partition must be provided if the partition feature is enabled."
)
)
if not features.Features().enable_partitions and partition:
raise errors.FeatureError(
message=(
"The partition feature must be enabled if a partition is provided."
)
)
includes: list[str] = []
excludes: list[str] = []
for item in fileset.entries:
if item.startswith("-"):
excludes.append(item[1:])
elif item.startswith("\\"):
includes.append(item[1:])
else:
includes.append(item)
# short circuit if no partition was provided
if not partition:
return includes or ["*"], excludes
# only include files for the partition
processed_includes: list[str] = []
for file in includes:
file_partition, file_inner_path = path_utils.get_partition_and_path(file)
if file_partition == partition:
processed_includes.append(str(file_inner_path))
# only exclude files for the partition
processed_excludes: list[str] = []
for file in excludes:
file_partition, file_inner_path = path_utils.get_partition_and_path(file)
if file_partition == partition:
processed_excludes.append(str(file_inner_path))
# the default behavior is to include everything
return processed_includes or ["*"], processed_excludes
def _generate_include_set(directory: str, includes: list[str]) -> set[str]:
"""Obtain the list of files to include based on include file filter.
:param directory: The path to the tree containing the files to filter.
:return: The set of files to include.
"""
include_files = set()
for include in includes:
if "*" in include:
pattern = os.path.join(directory, include)
matches = iglob(pattern, recursive=True)
include_files |= set(matches)
else:
include_files |= {os.path.join(directory, include)}
include_dirs = [
x for x in include_files if os.path.isdir(x) and not os.path.islink(x)
]
include_files = {os.path.relpath(x, directory) for x in include_files}
# Expand includeFiles, so that an exclude like '*/*.so' will still match
# files from an include like 'lib'
for include_dir in include_dirs:
for root, dirs, files in os.walk(include_dir):
include_files |= {
os.path.relpath(os.path.join(root, d), directory) for d in dirs
}
include_files |= {
os.path.relpath(os.path.join(root, f), directory) for f in files
}
return include_files
def _generate_exclude_set(
directory: str, excludes: list[str]
) -> tuple[set[str], set[str]]:
"""Obtain the list of files to exclude based on exclude file filter.
:param directory: The path to the tree containing the files to filter.
:return: The set of files to exclude.
"""
exclude_files = set()
for exclude in excludes:
pattern = os.path.join(directory, exclude)
matches = iglob(pattern, recursive=True)
exclude_files |= set(matches)
exclude_dirs = {
os.path.relpath(x, directory) for x in exclude_files if os.path.isdir(x)
}
exclude_files = {os.path.relpath(x, directory) for x in exclude_files}
return exclude_files, exclude_dirs
def _get_resolved_relative_path(relative_path: str, base_directory: str) -> str:
"""Resolve path components against target base_directory.
If the resulting target path is a symlink, it will not be followed.
Only the path's parents are fully resolved against base_directory,
and the relative path is returned.
:param relative_path: Path of target, relative to base_directory.
:param base_directory: Base path of target.
:return: Resolved path, relative to base_directory.
"""
parent_relpath, filename = os.path.split(relative_path)
parent_abspath = os.path.realpath(os.path.join(base_directory, parent_relpath))
filename_abspath = os.path.join(parent_abspath, filename)
return os.path.relpath(filename_abspath, base_directory)
def _normalize_entry(entry: str) -> str:
"""Normalize an entry to begin with a partition, if partitions are enabled.
If partitions are enabled, `foo` will be normalized to `(default)/foo`.
If partitions are not enabled, `foo` will be left as `foo`.
:param entry: Entry to normalize.
:returns: Normalized entry.
"""
# split file into an optional prefix (a hyphen character) and the file
split_file = (entry[0], entry[1:]) if entry[0] == "-" else ("", entry)
partition, inner_path = path_utils.get_partition_and_path(split_file[1])
if partition:
return f"{split_file[0]}({partition})/{inner_path}"
return entry