# Copyright 2025 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Utilities for Maven projects and settings."""
import logging
import os
import re
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from pathlib import Path
from typing import cast
from urllib.parse import urlparse
from typing_extensions import Self, override
from craft_parts import infos
from ._xml import (
CRAFT_REPO_TEMPLATE,
DISTRIBUTION_REPO_TEMPLATE,
LOCAL_REPO_TEMPLATE,
MIRROR_REPO,
PROXIES_TEMPLATE,
PROXY_CREDENTIALS_TEMPLATE,
PROXY_TEMPLATE,
SETTINGS_TEMPLATE,
)
logger = logging.getLogger(__name__)
ArtifactDict = dict[str, set[str]]
GroupDict = dict[str, ArtifactDict]
[docs]
def create_maven_settings(
*, part_info: infos.PartInfo, set_mirror: bool
) -> Path | None:
"""Create a Maven configuration file.
The settings file contains additional configuration for Maven, such
as proxy parameters.
If it detects that no configuration is necessary, it will return None
and do nothing.
:param part_info: The part info for the part invoking Maven.
:param set_mirror: Whether to configure for a local build.
:return: Returns a Path object to the settings file if one is created,
otherwise None.
"""
# Short-circuit exit if no config is needed
if not (_needs_proxy_config() or set_mirror):
return None
settings_path = part_info.part_build_subdir / ".parts/.m2/settings.xml"
settings_path.parent.mkdir(parents=True, exist_ok=True)
proxies_element = _get_proxy_config() if _needs_proxy_config() else ""
if set_mirror:
local_repo = part_info.part_build_subdir / ".parts/.m2/repository"
backstage_repo = cast("Path", part_info.backstage_dir) / "maven-use"
if backstage_repo.is_dir():
# This is the shared repository in the backstage
craft_element = CRAFT_REPO_TEMPLATE.format(repo_uri=backstage_repo.as_uri())
else:
craft_element = ""
local_element = LOCAL_REPO_TEMPLATE.format(repo_dir=local_repo)
mirror_element = MIRROR_REPO
else:
craft_element = local_element = mirror_element = ""
settings_xml = SETTINGS_TEMPLATE.format(
local_repository_element=local_element,
craft_repository_element=craft_element,
mirror_repository_element=mirror_element,
proxies_element=proxies_element,
)
settings_path.write_text(settings_xml)
return settings_path
def _get_proxy_config() -> str:
"""Generate an XML string for proxy configurations.
Reads the environment for information on desired proxy settings and
transforms those variables into Maven XML settings entries.
"""
# Transform all environment variables to their lowercase form to support HTTPS_PROXY
# vs. https_proxy and such
case_insensitive_env = {item[0].lower(): item[1] for item in os.environ.items()}
proxies: list[str] = []
for protocol in ["http", "https"]:
env_name = f"{protocol}_proxy"
if env_name not in case_insensitive_env:
continue
proxy_url = urlparse(case_insensitive_env[env_name])
if proxy_url.username is not None and proxy_url.password is not None:
credentials = PROXY_CREDENTIALS_TEMPLATE.format(
username=proxy_url.username, password=proxy_url.password
)
else:
credentials = ""
proxy_element = PROXY_TEMPLATE.format(
id=env_name,
protocol=protocol,
host=proxy_url.hostname,
port=proxy_url.port,
credentials=credentials,
non_proxy_hosts=_get_no_proxy_string(),
)
proxies.append(proxy_element)
return PROXIES_TEMPLATE.format(proxies="\n".join(proxies))
def _needs_proxy_config() -> bool:
"""Determine whether or not proxy configuration is necessary for Maven."""
proxy_vars = ["http_proxy", "https_proxy", "HTTP_PROXY", "HTTPS_PROXY"]
return any(key in os.environ for key in proxy_vars)
def _get_no_proxy_string() -> str:
no_proxy = [k.strip() for k in os.environ.get("no_proxy", "localhost").split(",")]
return "|".join(no_proxy)
[docs]
def update_pom(
*, part_info: infos.PartInfo, add_distribution: bool, self_contained: bool
) -> None:
"""Update the POM file of a Maven project.
:param part_info: Information about the invoking part.
:param add_distribution: Whether or not to configure the `mvn deploy` location.
:param self_contained: Whether or not to patch version numbers with what is
actually available.
"""
pom_xml = part_info.part_build_subdir / "pom.xml"
if not pom_xml.is_file():
raise MavenXMLError("'pom.xml' does not exist")
tree = ET.parse(pom_xml) # noqa: S314, unsafe parsing with xml
project = tree.getroot()
namespace = re.search("{(.*)}", project.tag)
# Older pom files may not contain a namespace
namespaces = {"": namespace.group(1)} if namespace else {}
for prefix, uri in namespaces.items():
ET.register_namespace(prefix, uri)
if add_distribution:
# Add a distributionManagement element, to tell "maven deploy" to deploy the
# artifacts (jars, poms, etc) to the export dir.
distribution_dir = part_info.part_export_dir / "maven-use"
distribution_element = ET.fromstring( # noqa: S314, unsafe parsing with xml
DISTRIBUTION_REPO_TEMPLATE.format(repo_uri=distribution_dir.as_uri())
)
project.append(distribution_element)
existing = _get_existing_artifacts(part_info)
if self_contained:
MavenArtifact.update_versions(project, namespaces, existing)
MavenPlugin.update_versions(project, namespaces, existing)
tree.write(pom_xml)
[docs]
@dataclass
class MavenArtifact:
"""A dataclass for Maven artifacts."""
group_id: str
artifact_id: str
version: str
field_name: str = "Dependency"
[docs]
@classmethod
def from_element(cls, element: ET.Element, namespaces: dict[str, str]) -> Self:
"""Create a MavenArtifact from an XML artifact element."""
group_id = _get_element_text(_find_element(element, "groupId", namespaces))
artifact_id = _get_element_text(
_find_element(element, "artifactId", namespaces)
)
version = _get_element_text(_find_element(element, "version", namespaces))
return cls(group_id, artifact_id, version)
[docs]
@classmethod
def from_pom(cls, pom: Path) -> Self:
"""Create a MavenArtifact from a pom file."""
tree = ET.parse(pom) # noqa: S314, unsafe parsing with xml
project = tree.getroot()
namespaces = {}
if match := re.search("{(.*)}", project.tag):
namespace = match.group(1)
namespaces = {"": namespace}
return cls.from_element(project, namespaces)
@classmethod
def _collect_elements(
cls, project: ET.Element, namespaces: dict[str, str]
) -> list[ET.Element]:
dependencies = project.find("dependencies", namespaces)
if dependencies is None:
return []
return dependencies.findall("dependency", namespaces)
[docs]
@classmethod
def update_versions(
cls, project: ET.Element, namespaces: dict[str, str], existing: GroupDict
) -> None:
"""Update all of the versions for this project as necessary."""
for dependency in cls._collect_elements(project, namespaces):
dep = cls.from_element(dependency, namespaces)
if new_version := _get_available_version(existing, dep):
_set_version(dependency, namespaces, new_version)
else:
logger.debug(
f"{cls.field_name} {dep.artifact_id} has no available version, skipping."
)
[docs]
@dataclass
class MavenPlugin(MavenArtifact):
"""A dataclass for Maven plugins.
These are different because plugins have a default groupId.
"""
field_name: str = "Plugin"
[docs]
@classmethod
@override
def from_element(cls, element: ET.Element, namespaces: dict[str, str]) -> Self:
"""Create a MavenPlugin from an XML plugin element.
If no groupId is found, 'org.apache.maven.plugins' will be used.
For more information on the default plugin group, see:
https://maven.apache.org/guides/mini/guide-configuring-plugins.html
"""
group_id_element = element.find("groupId", namespaces)
if group_id_element is not None:
group_id = _get_element_text(group_id_element)
else:
group_id = "org.apache.maven.plugins"
artifact_id = _get_element_text(
_find_element(element, "artifactId", namespaces)
)
version = _get_element_text(_find_element(element, "version", namespaces))
return cls(group_id, artifact_id, version)
@classmethod
@override
def _collect_elements(
cls, project: ET.Element, namespaces: dict[str, str]
) -> list[ET.Element]:
build = project.find("build", namespaces)
if build is None:
return []
plugins = build.find("plugins", namespaces)
if plugins is None:
return []
return plugins.findall("plugin", namespaces)
def _get_existing_artifacts(part_info: infos.PartInfo) -> GroupDict:
result: GroupDict = GroupDict()
search_locations = [
part_info.backstage_dir / "maven-use",
Path("/usr/share/maven-repo"),
]
for loc in search_locations:
if not loc.is_dir():
continue
for pom in loc.glob("**/*.pom"):
art = MavenArtifact.from_pom(pom)
group_artifacts = result.setdefault(art.group_id, {})
versions = group_artifacts.setdefault(art.artifact_id, set())
versions.add(art.version)
return result
def _get_available_version(
existing: GroupDict, dependency: MavenArtifact
) -> str | None:
if versions := existing.get(dependency.group_id, {}).get(
dependency.artifact_id, set()
):
return next(iter(versions))
return None
def _set_version(
element: ET.Element, namespaces: dict[str, str], new_version: str
) -> None:
group_id = _get_element_text(_find_element(element, "groupId", namespaces))
artifact_id = _get_element_text(_find_element(element, "artifactId", namespaces))
version_element = element.find("version", namespaces)
# If no version is specified at all, always set it
if version_element is None:
new_version_element = ET.Element("version")
new_version_element.text = new_version
element.append(new_version_element)
comment = ET.Comment(f"Version set by craft-parts to '{new_version}'")
element.append(comment)
logger.debug(
"Setting version of '%s.%s' to '%s'",
group_id,
artifact_id,
new_version,
)
return
current_version = _get_element_text(version_element)
if current_version == new_version:
return
version_element.text = new_version
comment = ET.Comment(
f"Version updated by craft-parts from '{current_version}' to '{new_version}'"
)
logger.debug(
"Updating version of '%s.%s' from '%s' to '%s'",
group_id,
artifact_id,
current_version,
new_version,
)
element.append(comment)
[docs]
@dataclass
class MavenXMLError(BaseException):
"""An error encountered while parsing XML for Maven projects."""
message: str
def __str__(self) -> str:
return self.message
def _find_element(
element: ET.Element, path: str, namespaces: dict[str, str]
) -> ET.Element:
"""Find a field within an element.
This is equivalent to `element.find(path, namespaces)`, except that
an exception is raised if the needle isn't found to reduce boilerplate.
:param element: The haystack to search.
:param path: The needle to find in the haystack.
:param namespaces: A mapping of namespaces to use during the search.
:raises _MavenXMLError: if the needle can't be found.
:return: The discovered element.
"""
if (needle := element.find(path, namespaces)) is not None:
return needle
raise MavenXMLError(message=f"Could not parse {path}.")
def _get_element_text(element: ET.Element) -> str:
"""Extract the text field from an element.
This is equivalent to `element.text`, except that an exception is
raised if the text field is empty to reduce boilerplate.
:param element: The element to read from.
:raises _MavenXMLError: if there is no text field.
:return: The content of the text field.
"""
if (text := element.text) is not None:
return text
raise MavenXMLError(message=f"No text field found on {element.tag}.")