diff --git a/CHANGES.md b/CHANGES.md index a917964f..4c5fc9d3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,7 @@ ### 3.33.0 (2024-0x-xx xx:xx:00 UTC) * Update filelock 3.14.0 (8556141) to 3.15.4 (9a979df) +* Update package resource API 68.2.2 (8ad627d) to 70.1.1 (222ebf9) * Update urllib3 2.2.1 (54d6edf) to 2.2.2 (27e2a5c) diff --git a/lib/pkg_resources/__init__.py b/lib/pkg_resources/__init__.py index c10a88e5..c4ace5aa 100644 --- a/lib/pkg_resources/__init__.py +++ b/lib/pkg_resources/__init__.py @@ -20,9 +20,11 @@ This module is deprecated. Users are directed to :mod:`importlib.resources`, :mod:`importlib.metadata` and :pypi:`packaging` instead. """ +from __future__ import annotations + import sys -if sys.version_info < (3, 8): +if sys.version_info < (3, 8): # noqa: UP036 # Check for unsupported versions raise RuntimeError("Python 3.8 or later is required") import os @@ -32,23 +34,21 @@ import re import types from typing import ( Any, + Literal, + Dict, + Iterator, Mapping, MutableSequence, NamedTuple, NoReturn, - Sequence, - Set, Tuple, - Type, Union, TYPE_CHECKING, - List, Protocol, Callable, - Dict, Iterable, - Optional, TypeVar, + overload, ) import zipfile import zipimport @@ -99,7 +99,8 @@ from pkg_resources.extern.packaging import version as _packaging_version from pkg_resources.extern.platformdirs import user_cache_dir as _user_cache_dir if TYPE_CHECKING: - from _typeshed import StrPath + from _typeshed import BytesPath, StrPath, StrOrBytesPath + from typing_extensions import Self warnings.warn( "pkg_resources is deprecated as an API. " @@ -110,17 +111,22 @@ warnings.warn( _T = TypeVar("_T") +_DistributionT = TypeVar("_DistributionT", bound="Distribution") # Type aliases _NestedStr = Union[str, Iterable[Union[str, Iterable["_NestedStr"]]]] -_InstallerType = Callable[["Requirement"], Optional["Distribution"]] +_InstallerTypeT = Callable[["Requirement"], "_DistributionT"] +_InstallerType = Callable[["Requirement"], Union["Distribution", None]] _PkgReqType = Union[str, "Requirement"] _EPDistType = Union["Distribution", _PkgReqType] -_MetadataType = Optional["IResourceProvider"] +_MetadataType = Union["IResourceProvider", None] +_ResolvedEntryPoint = Any # Can be any attribute in the module +_ResourceStream = Any # TODO / Incomplete: A readable file-like object # Any object works, but let's indicate we expect something like a module (optionally has __loader__ or __file__) _ModuleLike = Union[object, types.ModuleType] -_ProviderFactoryType = Callable[[_ModuleLike], "IResourceProvider"] +# Any: Should be _ModuleLike but we end up with issues where _ModuleLike doesn't have _ZipLoaderModule's __loader__ +_ProviderFactoryType = Callable[[Any], "IResourceProvider"] _DistFinderType = Callable[[_T, str, bool], Iterable["Distribution"]] -_NSHandlerType = Callable[[_T, str, str, types.ModuleType], Optional[str]] +_NSHandlerType = Callable[[_T, str, str, types.ModuleType], Union[str, None]] _AdapterT = TypeVar( "_AdapterT", _DistFinderType[Any], _ProviderFactoryType, _NSHandlerType[Any] ) @@ -131,6 +137,10 @@ class _LoaderProtocol(Protocol): def load_module(self, fullname: str, /) -> types.ModuleType: ... +class _ZipLoaderModule(Protocol): + __loader__: zipimport.zipimporter + + _PEP440_FALLBACK = re.compile(r"^v?(?P(?:[0-9]+!)?[0-9]+(?:\.[0-9]+)*)", re.I) @@ -144,7 +154,7 @@ class PEP440Warning(RuntimeWarning): parse_version = _packaging_version.Version -_state_vars: Dict[str, str] = {} +_state_vars: dict[str, str] = {} def _declare_state(vartype: str, varname: str, initial_value: _T) -> _T: @@ -152,7 +162,7 @@ def _declare_state(vartype: str, varname: str, initial_value: _T) -> _T: return initial_value -def __getstate__(): +def __getstate__() -> dict[str, Any]: state = {} g = globals() for k, v in _state_vars.items(): @@ -160,7 +170,7 @@ def __getstate__(): return state -def __setstate__(state): +def __setstate__(state: dict[str, Any]) -> dict[str, Any]: g = globals() for k, v in state.items(): g['_sset_' + _state_vars[k]](k, g[k], v) @@ -315,17 +325,17 @@ class VersionConflict(ResolutionError): _template = "{self.dist} is installed but {self.req} is required" @property - def dist(self): + def dist(self) -> Distribution: return self.args[0] @property - def req(self): + def req(self) -> Requirement: return self.args[1] def report(self): return self._template.format(**locals()) - def with_context(self, required_by: Set[Union["Distribution", str]]): + def with_context(self, required_by: set[Distribution | str]): """ If required_by is non-empty, return a version of self that is a ContextualVersionConflict. @@ -345,7 +355,7 @@ class ContextualVersionConflict(VersionConflict): _template = VersionConflict._template + ' by {self.required_by}' @property - def required_by(self): + def required_by(self) -> set[str]: return self.args[2] @@ -358,11 +368,11 @@ class DistributionNotFound(ResolutionError): ) @property - def req(self): + def req(self) -> Requirement: return self.args[0] @property - def requirers(self): + def requirers(self) -> set[str] | None: return self.args[1] @property @@ -382,7 +392,7 @@ class UnknownExtra(ResolutionError): """Distribution doesn't have an "extra feature" of the given name""" -_provider_factories: Dict[Type[_ModuleLike], _ProviderFactoryType] = {} +_provider_factories: dict[type[_ModuleLike], _ProviderFactoryType] = {} PY_MAJOR = '{}.{}'.format(*sys.version_info) EGG_DIST = 3 @@ -393,7 +403,7 @@ DEVELOP_DIST = -1 def register_loader_type( - loader_type: Type[_ModuleLike], provider_factory: _ProviderFactoryType + loader_type: type[_ModuleLike], provider_factory: _ProviderFactoryType ): """Register `provider_factory` to make providers for `loader_type` @@ -404,7 +414,11 @@ def register_loader_type( _provider_factories[loader_type] = provider_factory -def get_provider(moduleOrReq: Union[str, "Requirement"]): +@overload +def get_provider(moduleOrReq: str) -> IResourceProvider: ... +@overload +def get_provider(moduleOrReq: Requirement) -> Distribution: ... +def get_provider(moduleOrReq: str | Requirement) -> IResourceProvider | Distribution: """Return an IResourceProvider for the named module or requirement""" if isinstance(moduleOrReq, Requirement): return working_set.find(moduleOrReq) or require(str(moduleOrReq))[0] @@ -466,7 +480,7 @@ darwinVersionString = re.compile(r"darwin-(\d+)\.(\d+)\.(\d+)-(.*)") get_platform = get_build_platform -def compatible_platforms(provided: Optional[str], required: Optional[str]): +def compatible_platforms(provided: str | None, required: str | None): """Can code for the `provided` platform run on the `required` platform? Returns true if either platform is ``None``, or the platforms are equal. @@ -515,23 +529,34 @@ def compatible_platforms(provided: Optional[str], required: Optional[str]): return False -def get_distribution(dist: _EPDistType): +@overload +def get_distribution(dist: _DistributionT) -> _DistributionT: ... +@overload +def get_distribution(dist: _PkgReqType) -> Distribution: ... +def get_distribution(dist: Distribution | _PkgReqType) -> Distribution: """Return a current distribution object for a Requirement or string""" if isinstance(dist, str): dist = Requirement.parse(dist) if isinstance(dist, Requirement): - dist = get_provider(dist) + # Bad type narrowing, dist has to be a Requirement here, so get_provider has to return Distribution + dist = get_provider(dist) # type: ignore[assignment] if not isinstance(dist, Distribution): - raise TypeError("Expected string, Requirement, or Distribution", dist) + raise TypeError("Expected str, Requirement, or Distribution", dist) return dist -def load_entry_point(dist: _EPDistType, group: str, name: str): +def load_entry_point(dist: _EPDistType, group: str, name: str) -> _ResolvedEntryPoint: """Return `name` entry point of `group` for `dist` or raise ImportError""" return get_distribution(dist).load_entry_point(group, name) -def get_entry_map(dist: _EPDistType, group: Optional[str] = None): +@overload +def get_entry_map( + dist: _EPDistType, group: None = None +) -> dict[str, dict[str, EntryPoint]]: ... +@overload +def get_entry_map(dist: _EPDistType, group: str) -> dict[str, EntryPoint]: ... +def get_entry_map(dist: _EPDistType, group: str | None = None): """Return the entry point map for `group`, or the full entry map""" return get_distribution(dist).get_entry_map(group) @@ -545,10 +570,10 @@ class IMetadataProvider(Protocol): def has_metadata(self, name: str) -> bool: """Does the package's distribution contain the named metadata?""" - def get_metadata(self, name: str): + def get_metadata(self, name: str) -> str: """The named metadata resource as a string""" - def get_metadata_lines(self, name: str): + def get_metadata_lines(self, name: str) -> Iterator[str]: """Yield named metadata resource as list of non-blank non-comment lines Leading and trailing whitespace is stripped from each line, and lines @@ -557,49 +582,53 @@ class IMetadataProvider(Protocol): def metadata_isdir(self, name: str) -> bool: """Is the named metadata a directory? (like ``os.path.isdir()``)""" - def metadata_listdir(self, name: str): + def metadata_listdir(self, name: str) -> list[str]: """List of metadata names in the directory (like ``os.listdir()``)""" - def run_script(self, script_name: str, namespace: Dict[str, Any]): + def run_script(self, script_name: str, namespace: dict[str, Any]) -> None: """Execute the named script in the supplied namespace dictionary""" class IResourceProvider(IMetadataProvider, Protocol): """An object that provides access to package resources""" - def get_resource_filename(self, manager: "ResourceManager", resource_name: str): + def get_resource_filename( + self, manager: ResourceManager, resource_name: str + ) -> str: """Return a true filesystem path for `resource_name` `manager` must be a ``ResourceManager``""" - def get_resource_stream(self, manager: "ResourceManager", resource_name: str): + def get_resource_stream( + self, manager: ResourceManager, resource_name: str + ) -> _ResourceStream: """Return a readable file-like object for `resource_name` `manager` must be a ``ResourceManager``""" def get_resource_string( - self, manager: "ResourceManager", resource_name: str + self, manager: ResourceManager, resource_name: str ) -> bytes: """Return the contents of `resource_name` as :obj:`bytes` `manager` must be a ``ResourceManager``""" - def has_resource(self, resource_name: str): + def has_resource(self, resource_name: str) -> bool: """Does the package contain the named resource?""" - def resource_isdir(self, resource_name: str): + def resource_isdir(self, resource_name: str) -> bool: """Is the named resource a directory? (like ``os.path.isdir()``)""" - def resource_listdir(self, resource_name: str): + def resource_listdir(self, resource_name: str) -> list[str]: """List of resource names in the directory (like ``os.listdir()``)""" class WorkingSet: """A collection of active distributions on sys.path (or a similar list)""" - def __init__(self, entries: Optional[Iterable[str]] = None): + def __init__(self, entries: Iterable[str] | None = None): """Create working set from list of path entries (default=sys.path)""" - self.entries: List[str] = [] + self.entries: list[str] = [] self.entry_keys = {} self.by_key = {} self.normalized_to_canonical_keys = {} @@ -668,11 +697,11 @@ class WorkingSet: for dist in find_distributions(entry, True): self.add(dist, entry, False) - def __contains__(self, dist: "Distribution"): + def __contains__(self, dist: Distribution) -> bool: """True if `dist` is the active distribution for its project""" return self.by_key.get(dist.key) == dist - def find(self, req: "Requirement"): + def find(self, req: Requirement) -> Distribution | None: """Find a distribution matching requirement `req` If there is an active distribution for the requested project, this @@ -696,7 +725,7 @@ class WorkingSet: raise VersionConflict(dist, req) return dist - def iter_entry_points(self, group: str, name: Optional[str] = None): + def iter_entry_points(self, group: str, name: str | None = None): """Yield entry point objects from `group` matching `name` If `name` is None, yields all entry points in `group` from all @@ -718,13 +747,13 @@ class WorkingSet: ns['__name__'] = name self.require(requires)[0].run_script(script_name, ns) - def __iter__(self): + def __iter__(self) -> Iterator[Distribution]: """Yield distributions for non-duplicate projects in the working set The yield order is the order in which the items' path entries were added to the working set. """ - seen = {} + seen = set() for item in self.entries: if item not in self.entry_keys: # workaround a cache issue @@ -732,13 +761,13 @@ class WorkingSet: for key in self.entry_keys[item]: if key not in seen: - seen[key] = 1 + seen.add(key) yield self.by_key[key] def add( self, - dist: "Distribution", - entry: Optional[str] = None, + dist: Distribution, + entry: str | None = None, insert: bool = True, replace: bool = False, ): @@ -773,14 +802,42 @@ class WorkingSet: keys2.append(dist.key) self._added_new(dist) + @overload def resolve( self, - requirements: Iterable["Requirement"], - env: Optional["Environment"] = None, - installer: Optional[_InstallerType] = None, + requirements: Iterable[Requirement], + env: Environment | None, + installer: _InstallerTypeT[_DistributionT], replace_conflicting: bool = False, - extras: Optional[Tuple[str, ...]] = None, - ): + extras: tuple[str, ...] | None = None, + ) -> list[_DistributionT]: ... + @overload + def resolve( + self, + requirements: Iterable[Requirement], + env: Environment | None = None, + *, + installer: _InstallerTypeT[_DistributionT], + replace_conflicting: bool = False, + extras: tuple[str, ...] | None = None, + ) -> list[_DistributionT]: ... + @overload + def resolve( + self, + requirements: Iterable[Requirement], + env: Environment | None = None, + installer: _InstallerType | None = None, + replace_conflicting: bool = False, + extras: tuple[str, ...] | None = None, + ) -> list[Distribution]: ... + def resolve( + self, + requirements: Iterable[Requirement], + env: Environment | None = None, + installer: _InstallerType | None | _InstallerTypeT[_DistributionT] = None, + replace_conflicting: bool = False, + extras: tuple[str, ...] | None = None, + ) -> list[Distribution] | list[_DistributionT]: """List all distributions needed to (recursively) meet `requirements` `requirements` must be a sequence of ``Requirement`` objects. `env`, @@ -808,7 +865,7 @@ class WorkingSet: # set up the stack requirements = list(requirements)[::-1] # set of processed requirements - processed = {} + processed = set() # key -> dist best = {} to_activate = [] @@ -842,14 +899,14 @@ class WorkingSet: required_by[new_requirement].add(req.project_name) req_extras[new_requirement] = req.extras - processed[req] = True + processed.add(req) # return list of distros to activate return to_activate def _resolve_dist( self, req, best, replace_conflicting, env, installer, required_by, to_activate - ) -> "Distribution": + ) -> Distribution: dist = best.get(req.key) if dist is None: # Find the best distribution and add it to the map @@ -878,13 +935,41 @@ class WorkingSet: raise VersionConflict(dist, req).with_context(dependent_req) return dist + @overload def find_plugins( self, - plugin_env: "Environment", - full_env: Optional["Environment"] = None, - installer: Optional[_InstallerType] = None, + plugin_env: Environment, + full_env: Environment | None, + installer: _InstallerTypeT[_DistributionT], fallback: bool = True, - ): + ) -> tuple[list[_DistributionT], dict[Distribution, Exception]]: ... + @overload + def find_plugins( + self, + plugin_env: Environment, + full_env: Environment | None = None, + *, + installer: _InstallerTypeT[_DistributionT], + fallback: bool = True, + ) -> tuple[list[_DistributionT], dict[Distribution, Exception]]: ... + @overload + def find_plugins( + self, + plugin_env: Environment, + full_env: Environment | None = None, + installer: _InstallerType | None = None, + fallback: bool = True, + ) -> tuple[list[Distribution], dict[Distribution, Exception]]: ... + def find_plugins( + self, + plugin_env: Environment, + full_env: Environment | None = None, + installer: _InstallerType | None | _InstallerTypeT[_DistributionT] = None, + fallback: bool = True, + ) -> tuple[ + list[Distribution] | list[_DistributionT], + dict[Distribution, Exception], + ]: """Find all activatable distributions in `plugin_env` Example usage:: @@ -923,8 +1008,8 @@ class WorkingSet: # scan project names in alphabetic order plugin_projects.sort() - error_info = {} - distributions = {} + error_info: dict[Distribution, Exception] = {} + distributions: dict[Distribution, Exception | None] = {} if full_env is None: env = Environment(self.entries) @@ -982,7 +1067,7 @@ class WorkingSet: return needed def subscribe( - self, callback: Callable[["Distribution"], object], existing: bool = True + self, callback: Callable[[Distribution], object], existing: bool = True ): """Invoke `callback` for all distributions @@ -1024,9 +1109,7 @@ class _ReqExtras(Dict["Requirement", Tuple[str, ...]]): Map each requirement to the extras that demanded it. """ - def markers_pass( - self, req: "Requirement", extras: Optional[Tuple[str, ...]] = None - ): + def markers_pass(self, req: Requirement, extras: tuple[str, ...] | None = None): """ Evaluate markers for req against each extra that demanded it. @@ -1046,9 +1129,9 @@ class Environment: def __init__( self, - search_path: Optional[Sequence[str]] = None, - platform: Optional[str] = get_supported_platform(), - python: Optional[str] = PY_MAJOR, + search_path: Iterable[str] | None = None, + platform: str | None = get_supported_platform(), + python: str | None = PY_MAJOR, ): """Snapshot distributions available on a search path @@ -1071,7 +1154,7 @@ class Environment: self.python = python self.scan(search_path) - def can_add(self, dist: "Distribution"): + def can_add(self, dist: Distribution): """Is distribution `dist` acceptable for this environment? The distribution must match the platform and python version @@ -1085,11 +1168,11 @@ class Environment: ) return py_compat and compatible_platforms(dist.platform, self.platform) - def remove(self, dist: "Distribution"): + def remove(self, dist: Distribution): """Remove `dist` from the environment""" self._distmap[dist.key].remove(dist) - def scan(self, search_path: Optional[Sequence[str]] = None): + def scan(self, search_path: Iterable[str] | None = None): """Scan `search_path` for distributions usable in this environment Any distributions found are added to the environment. @@ -1104,7 +1187,7 @@ class Environment: for dist in find_distributions(item): self.add(dist) - def __getitem__(self, project_name: str): + def __getitem__(self, project_name: str) -> list[Distribution]: """Return a newest-to-oldest list of distributions for `project_name` Uses case-insensitive `project_name` comparison, assuming all the @@ -1115,7 +1198,7 @@ class Environment: distribution_key = project_name.lower() return self._distmap.get(distribution_key, []) - def add(self, dist: "Distribution"): + def add(self, dist: Distribution): """Add `dist` if we ``can_add()`` it and it has not already been added""" if self.can_add(dist) and dist.has_version(): dists = self._distmap.setdefault(dist.key, []) @@ -1123,13 +1206,29 @@ class Environment: dists.append(dist) dists.sort(key=operator.attrgetter('hashcmp'), reverse=True) + @overload def best_match( self, - req: "Requirement", + req: Requirement, working_set: WorkingSet, - installer: Optional[Callable[["Requirement"], Any]] = None, + installer: _InstallerTypeT[_DistributionT], replace_conflicting: bool = False, - ): + ) -> _DistributionT: ... + @overload + def best_match( + self, + req: Requirement, + working_set: WorkingSet, + installer: _InstallerType | None = None, + replace_conflicting: bool = False, + ) -> Distribution | None: ... + def best_match( + self, + req: Requirement, + working_set: WorkingSet, + installer: _InstallerType | None | _InstallerTypeT[_DistributionT] = None, + replace_conflicting: bool = False, + ) -> Distribution | None: """Find distribution best matching `req` and usable on `working_set` This calls the ``find(req)`` method of the `working_set` to see if a @@ -1156,11 +1255,32 @@ class Environment: # try to download/install return self.obtain(req, installer) + @overload def obtain( self, - requirement: "Requirement", - installer: Optional[Callable[["Requirement"], Any]] = None, - ): + requirement: Requirement, + installer: _InstallerTypeT[_DistributionT], + ) -> _DistributionT: ... + @overload + def obtain( + self, + requirement: Requirement, + installer: Callable[[Requirement], None] | None = None, + ) -> None: ... + @overload + def obtain( + self, + requirement: Requirement, + installer: _InstallerType | None = None, + ) -> Distribution | None: ... + def obtain( + self, + requirement: Requirement, + installer: Callable[[Requirement], None] + | _InstallerType + | None + | _InstallerTypeT[_DistributionT] = None, + ) -> Distribution | None: """Obtain a distribution matching `requirement` (e.g. via download) Obtain a distro that matches requirement (e.g. via download). In the @@ -1171,13 +1291,13 @@ class Environment: to the `installer` argument.""" return installer(requirement) if installer else None - def __iter__(self): + def __iter__(self) -> Iterator[str]: """Yield the unique project names of the available distributions""" for key in self._distmap.keys(): if self[key]: yield key - def __iadd__(self, other: Union["Distribution", "Environment"]): + def __iadd__(self, other: Distribution | Environment): """In-place addition of a distribution or environment""" if isinstance(other, Distribution): self.add(other) @@ -1189,7 +1309,7 @@ class Environment: raise TypeError("Can't add %r to environment" % (other,)) return self - def __add__(self, other: Union["Distribution", "Environment"]): + def __add__(self, other: Distribution | Environment): """Add an environment or distribution to an environment""" new = self.__class__([], platform=None, python=None) for env in self, other: @@ -1216,15 +1336,15 @@ class ExtractionError(RuntimeError): The exception instance that caused extraction to fail """ - manager: "ResourceManager" + manager: ResourceManager cache_path: str - original_error: Optional[BaseException] + original_error: BaseException | None class ResourceManager: """Manage resource extraction and packages""" - extraction_path: Optional[str] = None + extraction_path: str | None = None def __init__(self): self.cached_files = {} @@ -1293,7 +1413,7 @@ class ResourceManager: err.original_error = old_exc raise err - def get_cache_path(self, archive_name: str, names: Iterable[str] = ()): + def get_cache_path(self, archive_name: str, names: Iterable[StrPath] = ()): """Return absolute location in cache for `archive_name` and `names` The parent directory of the resulting path will be created if it does @@ -1315,7 +1435,7 @@ class ResourceManager: self._warn_unsafe_extraction_path(extract_path) - self.cached_files[target_path] = 1 + self.cached_files[target_path] = True return target_path @staticmethod @@ -1345,7 +1465,7 @@ class ResourceManager: ).format(**locals()) warnings.warn(msg, UserWarning) - def postprocess(self, tempname: str, filename: str): + def postprocess(self, tempname: StrOrBytesPath, filename: StrOrBytesPath): """Perform any platform-specific postprocessing of `tempname` This is where Mac header rewrites should be done; other platforms don't @@ -1389,7 +1509,7 @@ class ResourceManager: self.extraction_path = path - def cleanup_resources(self, force: bool = False) -> List[str]: + def cleanup_resources(self, force: bool = False) -> list[str]: """ Delete all extracted resource files and directories, returning a list of the file and directory names that could not be successfully removed. @@ -1404,7 +1524,7 @@ class ResourceManager: return [] -def get_default_cache(): +def get_default_cache() -> str: """ Return the ``PYTHON_EGG_CACHE`` environment variable or a platform-relevant user cache dir for an app @@ -1496,7 +1616,7 @@ def invalid_marker(text: str): return False -def evaluate_marker(text: str, extra: Optional[str] = None): +def evaluate_marker(text: str, extra: str | None = None) -> bool: """ Evaluate a PEP 508 environment marker. Return a boolean indicating the marker result in this environment. @@ -1514,10 +1634,9 @@ def evaluate_marker(text: str, extra: Optional[str] = None): class NullProvider: """Try to implement resources and metadata for arbitrary PEP 302 loaders""" - egg_name: Optional[str] = None - egg_info: Optional[str] = None - loader: Optional[_LoaderProtocol] = None - module_path: Optional[str] # Some subclasses can have a None module_path + egg_name: str | None = None + egg_info: str | None = None + loader: _LoaderProtocol | None = None def __init__(self, module: _ModuleLike): self.loader = getattr(module, '__loader__', None) @@ -1560,7 +1679,7 @@ class NullProvider: exc.reason += ' in {} file at path: {}'.format(name, path) raise - def get_metadata_lines(self, name: str): + def get_metadata_lines(self, name: str) -> Iterator[str]: return yield_lines(self.get_metadata(name)) def resource_isdir(self, resource_name: str): @@ -1572,12 +1691,12 @@ class NullProvider: def resource_listdir(self, resource_name: str): return self._listdir(self._fn(self.module_path, resource_name)) - def metadata_listdir(self, name: str): + def metadata_listdir(self, name: str) -> list[str]: if self.egg_info: return self._listdir(self._fn(self.egg_info, name)) return [] - def run_script(self, script_name: str, namespace: Dict[str, Any]): + def run_script(self, script_name: str, namespace: dict[str, Any]): script = 'scripts/' + script_name if not self.has_metadata(script): raise ResolutionError( @@ -1585,6 +1704,7 @@ class NullProvider: **locals() ), ) + script_text = self.get_metadata(script).replace('\r\n', '\n') script_text = script_text.replace('\r', '\n') script_filename = self._fn(self.egg_info, script) @@ -1615,12 +1735,16 @@ class NullProvider: "Can't perform this operation for unregistered loader type" ) - def _listdir(self, path): + def _listdir(self, path) -> list[str]: raise NotImplementedError( "Can't perform this operation for unregistered loader type" ) - def _fn(self, base, resource_name: str): + def _fn(self, base: str | None, resource_name: str): + if base is None: + raise TypeError( + "`base` parameter in `_fn` is `None`. Either override this method or check the parameter first." + ) self._validate_resource_path(resource_name) if resource_name: return os.path.join(base, *resource_name.split('/')) @@ -1780,7 +1904,8 @@ DefaultProvider._register() class EmptyProvider(NullProvider): """Provider that returns nothing for all requests""" - module_path = None + # A special case, we don't want all Providers inheriting from NullProvider to have a potentially None module_path + module_path: str | None = None # type: ignore[assignment] _isdir = _has = lambda self, path: False @@ -1802,7 +1927,7 @@ class ZipManifests(Dict[str, "MemoizedZipManifests.manifest_mod"]): zip manifest builder """ - # `path` could be `Union["StrPath", IO[bytes]]` but that violates the LSP for `MemoizedZipManifests.load` + # `path` could be `StrPath | IO[bytes]` but that violates the LSP for `MemoizedZipManifests.load` @classmethod def build(cls, path: str): """ @@ -1831,10 +1956,10 @@ class MemoizedZipManifests(ZipManifests): """ class manifest_mod(NamedTuple): - manifest: Dict[str, zipfile.ZipInfo] + manifest: dict[str, zipfile.ZipInfo] mtime: float - def load(self, path: str): # type: ignore[override] # ZipManifests.load is a classmethod + def load(self, path: str) -> dict[str, zipfile.ZipInfo]: # type: ignore[override] # ZipManifests.load is a classmethod """ Load a manifest at path or return a suitable manifest already loaded. """ @@ -1851,12 +1976,12 @@ class MemoizedZipManifests(ZipManifests): class ZipProvider(EggProvider): """Resource support for zips and eggs""" - eagers: Optional[List[str]] = None + eagers: list[str] | None = None _zip_manifests = MemoizedZipManifests() # ZipProvider's loader should always be a zipimporter or equivalent loader: zipimport.zipimporter - def __init__(self, module: _ModuleLike): + def __init__(self, module: _ZipLoaderModule): super().__init__(module) self.zip_pre = self.loader.archive + os.sep @@ -1905,7 +2030,7 @@ class ZipProvider(EggProvider): return timestamp, size # FIXME: 'ZipProvider._extract_resource' is too complex (12) - def _extract_resource(self, manager: ResourceManager, zip_path): # noqa: C901 + def _extract_resource(self, manager: ResourceManager, zip_path) -> str: # noqa: C901 if zip_path in self._index(): for name in self._index()[zip_path]: last = self._extract_resource(manager, os.path.join(zip_path, name)) @@ -2033,7 +2158,7 @@ class FileMetadata(EmptyProvider): the provided location. """ - def __init__(self, path: "StrPath"): + def __init__(self, path: StrPath): self.path = path def _get_metadata_path(self, name): @@ -2042,7 +2167,7 @@ class FileMetadata(EmptyProvider): def has_metadata(self, name: str) -> bool: return name == 'PKG-INFO' and os.path.isfile(self.path) - def get_metadata(self, name): + def get_metadata(self, name: str): if name != 'PKG-INFO': raise KeyError("No metadata except PKG-INFO is available") @@ -2058,7 +2183,7 @@ class FileMetadata(EmptyProvider): msg = tmpl.format(**locals()) warnings.warn(msg) - def get_metadata_lines(self, name): + def get_metadata_lines(self, name: str) -> Iterator[str]: return yield_lines(self.get_metadata(name)) @@ -2102,12 +2227,12 @@ class EggMetadata(ZipProvider): self._setup_prefix() -_distribution_finders: Dict[type, _DistFinderType[Any]] = _declare_state( +_distribution_finders: dict[type, _DistFinderType[Any]] = _declare_state( 'dict', '_distribution_finders', {} ) -def register_finder(importer_type: Type[_T], distribution_finder: _DistFinderType[_T]): +def register_finder(importer_type: type[_T], distribution_finder: _DistFinderType[_T]): """Register `distribution_finder` to find distributions in sys.path items `importer_type` is the type or class of a PEP 302 "Importer" (sys.path item @@ -2126,7 +2251,7 @@ def find_distributions(path_item: str, only: bool = False): def find_eggs_in_zip( importer: zipimport.zipimporter, path_item: str, only: bool = False -): +) -> Iterator[Distribution]: """ Find eggs in zip files; possibly multiple nested eggs. """ @@ -2156,7 +2281,7 @@ register_finder(zipimport.zipimporter, find_eggs_in_zip) def find_nothing( - importer: Optional[object], path_item: Optional[str], only: Optional[bool] = False + importer: object | None, path_item: str | None, only: bool | None = False ): return () @@ -2164,7 +2289,7 @@ def find_nothing( register_finder(object, find_nothing) -def find_on_path(importer: Optional[object], path_item, only=False): +def find_on_path(importer: object | None, path_item, only=False): """Yield distributions accessible on a sys.path directory""" path_item = _normalize_cached(path_item) @@ -2219,7 +2344,7 @@ class NoDists: return iter(()) -def safe_listdir(path): +def safe_listdir(path: StrOrBytesPath): """ Attempt to list contents of path, but suppress some exceptions. """ @@ -2235,13 +2360,13 @@ def safe_listdir(path): return () -def distributions_from_metadata(path): +def distributions_from_metadata(path: str): root = os.path.dirname(path) if os.path.isdir(path): if len(os.listdir(path)) == 0: # empty metadata dir; skip return - metadata = PathMetadata(root, path) + metadata: _MetadataType = PathMetadata(root, path) else: metadata = FileMetadata(path) entry = os.path.basename(path) @@ -2281,16 +2406,16 @@ if hasattr(pkgutil, 'ImpImporter'): register_finder(importlib.machinery.FileFinder, find_on_path) -_namespace_handlers: Dict[type, _NSHandlerType[Any]] = _declare_state( +_namespace_handlers: dict[type, _NSHandlerType[Any]] = _declare_state( 'dict', '_namespace_handlers', {} ) -_namespace_packages: Dict[Optional[str], List[str]] = _declare_state( +_namespace_packages: dict[str | None, list[str]] = _declare_state( 'dict', '_namespace_packages', {} ) def register_namespace_handler( - importer_type: Type[_T], namespace_handler: _NSHandlerType[_T] + importer_type: type[_T], namespace_handler: _NSHandlerType[_T] ): """Register `namespace_handler` to declare namespace packages @@ -2423,7 +2548,7 @@ def declare_namespace(packageName: str): _imp.release_lock() -def fixup_namespace_packages(path_item: str, parent: Optional[str] = None): +def fixup_namespace_packages(path_item: str, parent: str | None = None): """Ensure that previously-declared namespace packages include path_item""" _imp.acquire_lock() try: @@ -2437,7 +2562,7 @@ def fixup_namespace_packages(path_item: str, parent: Optional[str] = None): def file_ns_handler( importer: object, - path_item: "StrPath", + path_item: StrPath, packageName: str, module: types.ModuleType, ): @@ -2462,9 +2587,9 @@ register_namespace_handler(importlib.machinery.FileFinder, file_ns_handler) def null_ns_handler( importer: object, - path_item: Optional[str], - packageName: Optional[str], - module: Optional[_ModuleLike], + path_item: str | None, + packageName: str | None, + module: _ModuleLike | None, ): return None @@ -2472,12 +2597,16 @@ def null_ns_handler( register_namespace_handler(object, null_ns_handler) -def normalize_path(filename: "StrPath"): +@overload +def normalize_path(filename: StrPath) -> str: ... +@overload +def normalize_path(filename: BytesPath) -> bytes: ... +def normalize_path(filename: StrOrBytesPath): """Normalize a file/dir name for comparison purposes""" return os.path.normcase(os.path.realpath(os.path.normpath(_cygwin_patch(filename)))) -def _cygwin_patch(filename: "StrPath"): # pragma: nocover +def _cygwin_patch(filename: StrOrBytesPath): # pragma: nocover """ Contrary to POSIX 2008, on Cygwin, getcwd (3) contains symlink components. Using @@ -2488,9 +2617,19 @@ def _cygwin_patch(filename: "StrPath"): # pragma: nocover return os.path.abspath(filename) if sys.platform == 'cygwin' else filename -@functools.lru_cache(maxsize=None) -def _normalize_cached(filename): - return normalize_path(filename) +if TYPE_CHECKING: + # https://github.com/python/mypy/issues/16261 + # https://github.com/python/typeshed/issues/6347 + @overload + def _normalize_cached(filename: StrPath) -> str: ... + @overload + def _normalize_cached(filename: BytesPath) -> bytes: ... + def _normalize_cached(filename: StrOrBytesPath) -> str | bytes: ... +else: + + @functools.lru_cache(maxsize=None) + def _normalize_cached(filename): + return normalize_path(filename) def _is_egg_path(path): @@ -2549,7 +2688,7 @@ class EntryPoint: module_name: str, attrs: Iterable[str] = (), extras: Iterable[str] = (), - dist: Optional["Distribution"] = None, + dist: Distribution | None = None, ): if not MODULE(module_name): raise ValueError("Invalid module name", module_name) @@ -2570,12 +2709,26 @@ class EntryPoint: def __repr__(self): return "EntryPoint.parse(%r)" % str(self) + @overload + def load( + self, + require: Literal[True] = True, + env: Environment | None = None, + installer: _InstallerType | None = None, + ) -> _ResolvedEntryPoint: ... + @overload + def load( + self, + require: Literal[False], + *args: Any, + **kwargs: Any, + ) -> _ResolvedEntryPoint: ... def load( self, require: bool = True, - *args: Optional[Union[Environment, _InstallerType]], - **kwargs: Optional[Union[Environment, _InstallerType]], - ): + *args: Environment | _InstallerType | None, + **kwargs: Environment | _InstallerType | None, + ) -> _ResolvedEntryPoint: """ Require packages for this EntryPoint, then resolve it. """ @@ -2592,7 +2745,7 @@ class EntryPoint: self.require(*args, **kwargs) # type: ignore return self.resolve() - def resolve(self): + def resolve(self) -> _ResolvedEntryPoint: """ Resolve the entry point from its module and attrs. """ @@ -2604,8 +2757,8 @@ class EntryPoint: def require( self, - env: Optional[Environment] = None, - installer: Optional[_InstallerType] = None, + env: Environment | None = None, + installer: _InstallerType | None = None, ): if not self.dist: error_cls = UnknownExtra if self.extras else AttributeError @@ -2630,7 +2783,7 @@ class EntryPoint: ) @classmethod - def parse(cls, src: str, dist: Optional["Distribution"] = None): + def parse(cls, src: str, dist: Distribution | None = None): """Parse a single entry point from string `src` Entry point syntax follows the form:: @@ -2663,12 +2816,12 @@ class EntryPoint: cls, group: str, lines: _NestedStr, - dist: Optional["Distribution"] = None, + dist: Distribution | None = None, ): """Parse an entry point group""" if not MODULE(group): raise ValueError("Invalid group name", group) - this = {} + this: dict[str, Self] = {} for line in yield_lines(lines): ep = cls.parse(line, dist) if ep.name in this: @@ -2679,15 +2832,16 @@ class EntryPoint: @classmethod def parse_map( cls, - data: Union[str, Iterable[str], Dict[str, Union[str, Iterable[str]]]], - dist: Optional["Distribution"] = None, + data: str | Iterable[str] | dict[str, str | Iterable[str]], + dist: Distribution | None = None, ): """Parse a map of entry point groups""" + _data: Iterable[tuple[str | None, str | Iterable[str]]] if isinstance(data, dict): _data = data.items() else: _data = split_sections(data) - maps: Dict[str, Dict[str, EntryPoint]] = {} + maps: dict[str, dict[str, Self]] = {} for group, lines in _data: if group is None: if not lines: @@ -2722,12 +2876,12 @@ class Distribution: def __init__( self, - location: Optional[str] = None, + location: str | None = None, metadata: _MetadataType = None, - project_name: Optional[str] = None, - version: Optional[str] = None, - py_version: Optional[str] = PY_MAJOR, - platform: Optional[str] = None, + project_name: str | None = None, + version: str | None = None, + py_version: str | None = PY_MAJOR, + platform: str | None = None, precedence: int = EGG_DIST, ): self.project_name = safe_name(project_name or 'Unknown') @@ -2743,10 +2897,10 @@ class Distribution: def from_location( cls, location: str, - basename: str, + basename: StrPath, metadata: _MetadataType = None, **kw: int, # We could set `precedence` explicitly, but keeping this as `**kw` for full backwards and subclassing compatibility - ): + ) -> Distribution: project_name, version, py_version, platform = [None] * 4 basename, ext = os.path.splitext(basename) if ext.lower() in _distributionImpl: @@ -2784,16 +2938,16 @@ class Distribution: def __hash__(self): return hash(self.hashcmp) - def __lt__(self, other: "Distribution"): + def __lt__(self, other: Distribution): return self.hashcmp < other.hashcmp - def __le__(self, other: "Distribution"): + def __le__(self, other: Distribution): return self.hashcmp <= other.hashcmp - def __gt__(self, other: "Distribution"): + def __gt__(self, other: Distribution): return self.hashcmp > other.hashcmp - def __ge__(self, other: "Distribution"): + def __ge__(self, other: Distribution): return self.hashcmp >= other.hashcmp def __eq__(self, other: object): @@ -2885,14 +3039,14 @@ class Distribution: return self.__dep_map @staticmethod - def _filter_extras(dm): + def _filter_extras(dm: dict[str | None, list[Requirement]]): """ Given a mapping of extras to dependencies, strip off environment markers and filter out any dependencies not matching the markers. """ for extra in list(filter(None, dm)): - new_extra = extra + new_extra: str | None = extra reqs = dm.pop(extra) new_extra, _, marker = extra.partition(':') fails_marker = marker and ( @@ -2915,7 +3069,7 @@ class Distribution: def requires(self, extras: Iterable[str] = ()): """List of Requirements needed for this distro if `extras` are used""" dm = self._dep_map - deps = [] + deps: list[Requirement] = [] deps.extend(dm.get(None, ())) for ext in extras: try: @@ -2951,7 +3105,7 @@ class Distribution: lines = self._get_metadata(self.PKG_INFO) return _version_from_file(lines) - def activate(self, path: Optional[List[str]] = None, replace: bool = False): + def activate(self, path: list[str] | None = None, replace: bool = False): """Ensure distribution is importable on `path` (default=sys.path)""" if path is None: path = sys.path @@ -3003,7 +3157,7 @@ class Distribution: @classmethod def from_filename( cls, - filename: str, + filename: StrPath, metadata: _MetadataType = None, **kw: int, # We could set `precedence` explicitly, but keeping this as `**kw` for full backwards and subclassing compatibility ): @@ -3020,14 +3174,18 @@ class Distribution: return Requirement.parse(spec) - def load_entry_point(self, group: str, name: str): + def load_entry_point(self, group: str, name: str) -> _ResolvedEntryPoint: """Return the `name` entry point of `group` or raise ImportError""" ep = self.get_entry_info(group, name) if ep is None: raise ImportError("Entry point %r not found" % ((group, name),)) return ep.load() - def get_entry_map(self, group: Optional[str] = None): + @overload + def get_entry_map(self, group: None = None) -> dict[str, dict[str, EntryPoint]]: ... + @overload + def get_entry_map(self, group: str) -> dict[str, EntryPoint]: ... + def get_entry_map(self, group: str | None = None): """Return the entry point map for `group`, or the full entry map""" if not hasattr(self, "_ep_map"): self._ep_map = EntryPoint.parse_map( @@ -3044,7 +3202,7 @@ class Distribution: # FIXME: 'Distribution.insert_on' is too complex (13) def insert_on( # noqa: C901 self, - path: List[str], + path: list[str], loc=None, replace: bool = False, ): @@ -3152,7 +3310,7 @@ class Distribution: return False return True - def clone(self, **kw: Optional[Union[str, int, IResourceProvider]]): + def clone(self, **kw: str | int | IResourceProvider | None): """Copy this distribution, substituting in any changed keyword args""" names = 'project_name version py_version platform location precedence' for attr in names.split(): @@ -3212,11 +3370,11 @@ class DistInfoDistribution(Distribution): self.__dep_map = self._compute_dependencies() return self.__dep_map - def _compute_dependencies(self): + def _compute_dependencies(self) -> dict[str | None, list[Requirement]]: """Recompute this distribution's dependencies.""" - dm = self.__dep_map = {None: []} + self.__dep_map: dict[str | None, list[Requirement]] = {None: []} - reqs = [] + reqs: list[Requirement] = [] # Including any condition expressions for req in self._parsed_pkg_info.get_all('Requires-Dist') or []: reqs.extend(parse_requirements(req)) @@ -3227,13 +3385,15 @@ class DistInfoDistribution(Distribution): yield req common = types.MappingProxyType(dict.fromkeys(reqs_for_extra(None))) - dm[None].extend(common) + self.__dep_map[None].extend(common) for extra in self._parsed_pkg_info.get_all('Provides-Extra') or []: s_extra = safe_extra(extra.strip()) - dm[s_extra] = [r for r in reqs_for_extra(extra) if r not in common] + self.__dep_map[s_extra] = [ + r for r in reqs_for_extra(extra) if r not in common + ] - return dm + return self.__dep_map _distributionImpl = { @@ -3278,7 +3438,7 @@ class Requirement(_packaging_requirements.Requirement): self.project_name, self.key = project_name, project_name.lower() self.specs = [(spec.operator, spec.version) for spec in self.specifier] # packaging.requirements.Requirement uses a set for its extras. We use a variable-length tuple - self.extras: Tuple[str] = tuple(map(safe_extra, self.extras)) + self.extras: tuple[str] = tuple(map(safe_extra, self.extras)) self.hashCmp = ( self.key, self.url, @@ -3294,7 +3454,7 @@ class Requirement(_packaging_requirements.Requirement): def __ne__(self, other): return not self == other - def __contains__(self, item: Union[Distribution, str, Tuple[str, ...]]): + def __contains__(self, item: Distribution | str | tuple[str, ...]) -> bool: if isinstance(item, Distribution): if item.key != self.key: return False @@ -3313,7 +3473,7 @@ class Requirement(_packaging_requirements.Requirement): return "Requirement.parse(%r)" % str(self) @staticmethod - def parse(s: Union[str, Iterable[str]]): + def parse(s: str | Iterable[str]): (req,) = parse_requirements(s) return req @@ -3339,7 +3499,7 @@ def _find_adapter(registry: Mapping[type, _AdapterT], ob: object) -> _AdapterT: raise TypeError(f"Could not find adapter for {registry} and {ob}") -def ensure_directory(path: str): +def ensure_directory(path: StrOrBytesPath): """Ensure that the parent directory of `path` exists""" dirname = os.path.dirname(path) os.makedirs(dirname, exist_ok=True) @@ -3358,7 +3518,7 @@ def _bypass_ensure_directory(path): pass -def split_sections(s: _NestedStr): +def split_sections(s: _NestedStr) -> Iterator[tuple[str | None, list[str]]]: """Split a string or iterable thereof into (section, content) pairs Each ``section`` is a stripped version of the section header ("[section]") @@ -3411,6 +3571,38 @@ class PkgResourcesDeprecationWarning(Warning): """ +# Ported from ``setuptools`` to avoid introducing an import inter-dependency: +_LOCALE_ENCODING = "locale" if sys.version_info >= (3, 10) else None + + +def _read_utf8_with_fallback(file: str, fallback_encoding=_LOCALE_ENCODING) -> str: + """See setuptools.unicode_utils._read_utf8_with_fallback""" + try: + with open(file, "r", encoding="utf-8") as f: + return f.read() + except UnicodeDecodeError: # pragma: no cover + msg = f"""\ + ******************************************************************************** + `encoding="utf-8"` fails with {file!r}, trying `encoding={fallback_encoding!r}`. + + This fallback behaviour is considered **deprecated** and future versions of + `setuptools/pkg_resources` may not implement it. + + Please encode {file!r} with "utf-8" to ensure future builds will succeed. + + If this file was produced by `setuptools` itself, cleaning up the cached files + and re-building/re-installing the package with a newer version of `setuptools` + (e.g. by updating `build-system.requires` in its `pyproject.toml`) + might solve the problem. + ******************************************************************************** + """ + # TODO: Add a deadline? + # See comment in setuptools.unicode_utils._Utf8EncodingNeeded + warnings.warn(msg, PkgResourcesDeprecationWarning, stacklevel=2) + with open(file, "r", encoding=fallback_encoding) as f: + return f.read() + + # from jaraco.functools 1.3 def _call_aside(f, *args, **kwargs): f(*args, **kwargs) @@ -3483,35 +3675,3 @@ if TYPE_CHECKING: add_activation_listener = working_set.subscribe run_script = working_set.run_script run_main = run_script - - -# ---- Ported from ``setuptools`` to avoid introducing an import inter-dependency ---- -LOCALE_ENCODING = "locale" if sys.version_info >= (3, 10) else None - - -def _read_utf8_with_fallback(file: str, fallback_encoding=LOCALE_ENCODING) -> str: - """See setuptools.unicode_utils._read_utf8_with_fallback""" - try: - with open(file, "r", encoding="utf-8") as f: - return f.read() - except UnicodeDecodeError: # pragma: no cover - msg = f"""\ - ******************************************************************************** - `encoding="utf-8"` fails with {file!r}, trying `encoding={fallback_encoding!r}`. - - This fallback behaviour is considered **deprecated** and future versions of - `setuptools/pkg_resources` may not implement it. - - Please encode {file!r} with "utf-8" to ensure future builds will succeed. - - If this file was produced by `setuptools` itself, cleaning up the cached files - and re-building/re-installing the package with a newer version of `setuptools` - (e.g. by updating `build-system.requires` in its `pyproject.toml`) - might solve the problem. - ******************************************************************************** - """ - # TODO: Add a deadline? - # See comment in setuptools.unicode_utils._Utf8EncodingNeeded - warnings.warn(msg, PkgResourcesDeprecationWarning, stacklevel=2) - with open(file, "r", encoding=fallback_encoding) as f: - return f.read() diff --git a/lib/pkg_resources/extern/__init__.py b/lib/pkg_resources/extern/__init__.py index a1b7490d..9b9ac10a 100644 --- a/lib/pkg_resources/extern/__init__.py +++ b/lib/pkg_resources/extern/__init__.py @@ -1,8 +1,9 @@ +from __future__ import annotations from importlib.machinery import ModuleSpec import importlib.util import sys from types import ModuleType -from typing import Iterable, Optional, Sequence +from typing import Iterable, Sequence class VendorImporter: @@ -15,7 +16,7 @@ class VendorImporter: self, root_name: str, vendored_names: Iterable[str] = (), - vendor_pkg: Optional[str] = None, + vendor_pkg: str | None = None, ): self.root_name = root_name self.vendored_names = set(vendored_names) @@ -65,8 +66,8 @@ class VendorImporter: def find_spec( self, fullname: str, - path: Optional[Sequence[str]] = None, - target: Optional[ModuleType] = None, + path: Sequence[str] | None = None, + target: ModuleType | None = None, ): """Return a module spec for vendored names.""" return (