# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
# Copyright 2020-2024 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Manages the state of packages obtained using apt."""
import logging
import os
import re
import shutil
from collections.abc import Iterable
from contextlib import ContextDecorator
from pathlib import Path
try:
import apt
import apt.cache
import apt.package
import apt.progress
import apt.progress.base
import apt_pkg
except ImportError as e:
raise ImportError(
"python-apt is needed for apt operations. "
"Check the craft-parts README for more information."
) from e
from craft_parts.utils import os_utils
from . import errors
from .base import get_pkg_name_parts
logger = logging.getLogger(__name__)
_HASHSUM_MISMATCH_PATTERN = re.compile(r"(E:Failed to fetch.+Hash Sum mismatch)+")
[docs]
class LogProgress(apt.progress.base.AcquireProgress):
"""Internal Base class for text progress classes."""
def __init__(self) -> None:
self._id = 1
[docs]
def fail(self, item: apt_pkg.AcquireItemDesc) -> None:
"""Handle failed item."""
apt.progress.base.AcquireProgress.fail(self, item)
if item.owner.status == item.owner.STAT_DONE:
logger.debug("Ign %s", item.description)
else:
logger.debug("Err %s", item.description)
logger.debug(" %s", item.owner.error_text)
[docs]
def fetch(self, item: apt_pkg.AcquireItemDesc) -> None:
"""Handle item's data is fetch."""
apt.progress.base.AcquireProgress.fetch(self, item)
# It's complete already (e.g. Hit)
if item.owner.complete:
return
item.owner.id = self._id
self._id += 1
line = f"Get: {item.owner.id} {item.description}"
if item.owner.filesize:
line += f" [{apt_pkg.size_to_str(item.owner.filesize)}B]"
logger.debug(line)
[docs]
class AptCache(ContextDecorator):
"""Transient cache for stage packages, or read-only for build packages."""
def __init__(
self,
*,
stage_cache: Path | None = None,
stage_cache_arch: str | None = None,
) -> None:
self.stage_cache = stage_cache
self.stage_cache_arch = stage_cache_arch
self.progress: LogProgress | None = None
# pylint: disable=attribute-defined-outside-init
def __enter__(self) -> "AptCache":
if self.stage_cache is not None:
self.progress = LogProgress()
self._populate_stage_cache_dir()
self.cache = apt.cache.Cache(rootdir=str(self.stage_cache), memonly=True)
else:
# Setting rootdir="/" is needed otherwise the previously set rootdir will
# be used and _deb.get_installed_packages() will return an empty list.
self.cache = apt.cache.Cache(rootdir="/")
return self
# pylint: enable=attribute-defined-outside-init
def __exit__(self, *exc: object) -> None:
self.cache.close()
def _populate_stage_cache_dir(self) -> None:
"""Create/refresh cache configuration.
(1) Delete old-style symlink cache, if symlink.
(2) Delete current-style (copied) tree.
(3) Copy current host apt configuration.
(4) Configure primary arch to target arch.
(5) Install dpkg into cache directory to support multi-arch.
"""
if self.stage_cache is None:
return
# Copy apt configuration from host.
cache_etc_apt_path = Path(self.stage_cache, "etc", "apt")
# Delete potentially outdated cache configuration.
if cache_etc_apt_path.is_symlink():
cache_etc_apt_path.unlink()
elif cache_etc_apt_path.exists():
shutil.rmtree(cache_etc_apt_path)
# Copy current cache configuration.
cache_etc_apt_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree("/etc/apt", cache_etc_apt_path, ignore=_ignore_unreadable_files)
# Specify default arch (if specified).
if self.stage_cache_arch is not None:
arch_conf_path = cache_etc_apt_path / "apt.conf.d" / "00default-arch"
arch_conf_path.write_text(f'APT::Architecture "{self.stage_cache_arch}";\n')
# dpkg also needs to be in the rootdir in order to support multiarch
# (apt calls dpkg --print-foreign-architectures).
dpkg_path = shutil.which("dpkg")
if dpkg_path:
# Symlink it into place
destination = Path(self.stage_cache, dpkg_path[1:])
if not destination.exists():
destination.parent.mkdir(parents=True, exist_ok=True)
os.symlink(dpkg_path, destination)
else:
logger.warning("Cannot find 'dpkg' command needed to support multiarch")
def _autokeep_packages(self) -> None:
# If the package has been installed automatically as a dependency
# of another package, and if no packages depend on it anymore,
# the package is no longer required.
for package in self.cache.get_changes():
if package.is_auto_removable:
package.mark_keep()
[docs]
def is_package_valid(self, package_name: str) -> bool:
"""Verify whether there is a valid package with the given name.
:param package_name: The name of the package to verify.
:return: Whether a package with the given name is valid.
"""
return package_name in self.cache or self.cache.is_virtual_package(package_name)
[docs]
def get_installed_version(
self, package_name: str, *, resolve_virtual_packages: bool = False
) -> str | None:
"""Obtain the version of the package currently installed on the system.
:param package_name: The package installed on the system.
:param resolve_virtual_packages: If the package is virtual, pick a non-virtual
package that satisfies this virtual package name.
:return: The installed package version.
"""
if resolve_virtual_packages and self.cache.is_virtual_package(package_name):
logger.warning(
"%s is a virtual package, use non-virtual packages for "
"deterministic results.",
package_name,
)
# Recurse until a "real" package is found.
return self.get_installed_version(
self.cache.get_providing_packages(package_name)[0].name,
resolve_virtual_packages=resolve_virtual_packages,
)
if package_name in self.cache:
installed = self.cache[package_name].installed
if installed is not None:
return installed.version
return None
[docs]
def fetch_archives(self, download_path: Path) -> list[tuple[str, str, Path]]:
"""Retrieve packages marked to be fetched.
:param download_path: The directory to download files to.
:return: A list of (<package-name>, <package-version>, <dl-path>) tuples.
"""
downloaded = []
for package in self.cache.get_changes():
if package.candidate is None:
continue
try:
logger.info("Downloading package: %s", package)
dl_path = package.candidate.fetch_binary(
str(download_path), progress=self.progress
)
except apt.package.FetchError as err:
raise errors.PackageFetchError(str(err)) from err
if package.candidate is None:
raise errors.PackageNotFound(package.name)
downloaded.append((package.name, package.candidate.version, Path(dl_path)))
return downloaded
[docs]
def get_installed_packages(self) -> dict[str, str]:
"""Obtain a list of all packages and versions installed on the system.
:return: A dictionary of files and installed versions.
"""
installed: dict[str, str] = {}
for package in self.cache:
if package.installed is not None:
installed[package.name] = str(package.installed.version)
return installed
[docs]
def get_packages_marked_for_installation(self) -> list[tuple[str, str]]:
"""Obtain a list of packages and versions to be installed on the system.
:return: A list of (<package-name>, <package-version>) tuples.
"""
changed_pkgs = self.cache.get_changes()
marked_install_pkgs = [p for p in changed_pkgs if p.marked_install]
missing_installation_candidate: list[str] = [
p.name for p in marked_install_pkgs if p.candidate is None
]
if missing_installation_candidate:
raise errors.PackagesNotFound(missing_installation_candidate)
return [
(p.name, p.candidate.version) for p in marked_install_pkgs if p.candidate
]
[docs]
def mark_packages(self, package_names: set[str]) -> None:
"""Mark the given package names to be fetched from the repository.
:param package_names: The set of package names to be marked.
"""
for _name in package_names:
name = _name[:-4] if _name.endswith(":any") else _name
if self.cache.is_virtual_package(name):
name = self.cache.get_providing_packages(name)[0].name
logger.debug("Marking %s (and its dependencies) to be fetched", name)
name_arch, version = get_pkg_name_parts(name)
if name_arch not in self.cache:
raise errors.PackageNotFound(name_arch)
package = self.cache[name_arch]
if version is not None:
_set_pkg_version(package, version)
logger.debug("package: %s", package)
# Disable automatic resolving of broken packages here
# because if that fails it raises a SystemError and the
# API doesn't expose enough information about the problem.
# Instead we let apt-get show a verbose error message later.
# Also, make sure this package is marked as auto-installed,
# which will propagate to its dependencies.
package.mark_install(auto_fix=False, from_user=False)
# Now mark this package as NOT automatically installed, which
# will leave its dependencies marked as auto-installed, which
# allows us to clean them up if necessary.
package.mark_auto(auto=False)
_verify_marked_install(package)
[docs]
def unmark_packages(self, unmark_names: set[str]) -> None:
"""Unmark packages and dependencies that are no longer required.
:param unmark_names: The names of the packages to unmark.
"""
skipped_essential = set()
skipped_filtered = set()
for package in self.cache.get_changes():
if package.candidate is None:
raise errors.PackageNotFound(package.name)
if package.candidate.priority == "essential":
# Filter 'essential' packages.
skipped_essential.add(package.name)
package.mark_keep()
continue
if package.name in unmark_names:
# Filter packages from given list.
skipped_filtered.add(package.name)
package.mark_keep()
continue
if skipped_essential:
logger.debug(
"Skipping priority essential packages: %s",
sorted(skipped_essential),
)
if skipped_filtered:
logger.debug(
"Skipping filtered manifest packages: %s",
sorted(skipped_filtered),
)
# Unmark dependencies that are no longer required.
self._autokeep_packages()
def _verify_marked_install(package: apt.package.Package) -> None:
if package.installed or package.marked_install:
return
if package.candidate is None:
raise errors.PackageNotFound(package.name)
broken_deps: list[str] = []
for package_dependencies in package.candidate.dependencies:
for dep in package_dependencies:
if not dep.target_versions:
broken_deps.append(dep.name) # noqa: PERF401
raise errors.PackageBroken(package.name, deps=broken_deps)
def _set_pkg_version(package: apt.package.Package, version: str) -> None:
"""Set candidate version to a specific version if available."""
pkg_version = package.versions.get(version)
if pkg_version:
package.candidate = pkg_version
else:
raise errors.PackageNotFound(f"{package.name}={version}")
def _ignore_unreadable_files(
path: str | os.PathLike[str], names: Iterable[str]
) -> list[str]:
"""Ignore unreadable files for copytree.
Pass this function as the ignore parameter for shutil.copytree to not
fail on any unreadable files.
"""
path = Path(path)
skip = []
for name in names:
child_path = path / name
if not child_path.is_file():
continue
try:
child_path.open("rb").close()
except PermissionError:
skip.append(name)
return skip