import builtins import pathlib import warnings from typing import Any, Dict, Literal, NotRequired, Optional, Required, Sequence, Tuple, Type, TypedDict __all__ = ( "Types", "SYSTEMD_SERVICE_CONFIG", "SYSTEMD_NETWORK_CONFIG", "SYSTEMD_CONFIG_ROOT", "FILE_COMMON_ARGS", "systemdbool", "AnsibleParameter", "AnsibleReturnParameter", ) SYSTEMD_CONFIG_ROOT = pathlib.Path("/etc/systemd") SYSTEMD_NETWORK_CONFIG = SYSTEMD_CONFIG_ROOT / "network" SYSTEMD_SERVICE_CONFIG = SYSTEMD_CONFIG_ROOT / "system" FILE_COMMON_ARGS = frozenset(("owner", "group", "mode", "seuser", "serole", "setype", "selevel", "unsafe_writes")) AnsibleType = Literal[ "str", "bool", "int", "float", "path", "raw", "jsonarg", "json", "bytes", "dict", "list", "bits", ] ReturnOptions = Literal["always", "changed", "success"] class AnsibleParameter(TypedDict, total=False): description: Required[str | list[str]] required: NotRequired[bool] default: NotRequired[Any] type: Required[AnsibleType] choices: NotRequired[list[Any] | tuple[Any]] elements: NotRequired["AnsibleParameter" | AnsibleType] aliases: NotRequired[list[str]] version_added: NotRequired[str] options: NotRequired[dict[str, "AnsibleParameter"]] class AnsibleReturnParameter(TypedDict, total=False): description: Required[str | list[str]] type: Required[AnsibleType] returned: NotRequired[ReturnOptions] elements: NotRequired[AnsibleType] sample: NotRequired[list[Any] | Any] version_added: NotRequired[str] contains: NotRequired[dict[str, "AnsibleReturnParameter"]] def wrap_func(func, **updates): def wrapper(*args, **kwargs): return func(*args, **kwargs) attrs = {'__module__', '__name__', '__qualname__', '__doc__', '__annotations__', '__type_params__'}.difference( updates.keys() ) for attr in attrs: try: value = getattr(func, attr) except AttributeError: pass else: setattr(wrapper, attr, value) for attr, value in updates.items(): setattr(wrapper, attr, value) wrapper.__dict__.update(func.__dict__) setattr(wrapper, "__wrapped__", func) return wrapper GENERIC_DOC = """Returns an dictionary for the Ansible {type} type.""" class TypeBase(type): def __new__(cls, clsname, bases, attrs): if "_default" not in attrs: raise TypeError( f"The class {clsname} must define an wrapper function called _default that returns the default type" ) default_type = attrs["_default"] del attrs["_default"] types = frozenset( ( "str", "bool", "int", "float", "path", "raw", "jsonarg", "json", "bytes", "dict", "list", "bits", ) ) for attr in types - set(attrs.keys()): attrs[attr] = wrap_func( default_type(attr), __doc__=GENERIC_DOC.format(type=attr), __name__=attr, __qualname__=f"{clsname}.{attr}", ) attrs["__slots__"] = () return super().__new__(cls, clsname, bases, attrs) class Types(metaclass=TypeBase): @staticmethod def _default(name: AnsibleType): def wrapped( help: str | list[str], required: bool = False, choices: Optional[Sequence] = None, default: Optional[Any] = None, ) -> AnsibleParameter: option = AnsibleParameter(type=name, required=required, description=help) if choices is not None: option["choices"] = tuple(choices) if default is not None: option["default"] = default return option return wrapped @staticmethod def list( # type: ignore[misc] elements: Type[object] | AnsibleType | AnsibleParameter, help: str | list[str], required: bool = False, default: list[Any] | None = None, ) -> AnsibleParameter: """Wrapper for the Ansible list type Args: elements: The type of the elements required: if the item is absolutly required help: an helptext for the ansible-doc default: an default value. The Value is not converted. """ if required and default: raise ValueError("required and default are not allowed") option: AnsibleParameter = AnsibleParameter( type="list", required=required, description=help, ) if not isinstance(elements, (str, dict)): option["elements"] = elements.__name__ # type:ignore[reportGeneralTypeIssue] elif isinstance(elements, dict): option["elements"] = elements["type"] if elements["type"] == "dict" and "options" in elements: option["options"] = dict() for name, value in elements["options"].items(): option["options"][name] = value if "description" not in option["options"][name]: warnings.warn( # pragma: nocover f"helptext of option {name} is unset." " Ansible requires suboptions to have an documentation" ) elif "choices" in elements: option["choices"] = elements["choices"] if default is not None: option["default"] = default if help is not None and isinstance(help, str): option["description"] = help.split("\n") elif help is not None: option["description"] = help return option @staticmethod def dict(help: str | builtins.list[str], required: bool = False, **options: AnsibleParameter) -> AnsibleParameter: # type: ignore[misc] """Wrapper for the Ansible dict type Args: required: if the item is absolutly required help: an helptext for the ansible-doc options: The individual options that this parameter has """ option: AnsibleParameter = AnsibleParameter(type="dict", description=help, required=required) option["options"] = options if help is not None and isinstance(help, str): option["description"] = help.split("\n") elif help is not None: option["description"] = help return option class ReturnTypes(metaclass=TypeBase): @staticmethod def _default(name: AnsibleType): def wrapped( help: str | list[str], returned: ReturnOptions | None = None, sample: list[Any] | None = None, version_added: str | None = None, ) -> AnsibleReturnParameter: option = AnsibleReturnParameter(type=name, description=help) if returned is not None: option["returned"] = returned if sample is not None: option["sample"] = sample if version_added is not None: option["version_added"] = version_added return option return wrapped @staticmethod def list( help: str | list[str], elements: AnsibleType | AnsibleReturnParameter | Type[object], returned: ReturnOptions | None = None, sample: builtins.list[Any] | None = None, version_added: str | None = None, ) -> AnsibleReturnParameter: option = AnsibleReturnParameter(description=help, type="list") if returned is not None: option["returned"] = returned if isinstance(elements, str): option["elements"] = elements elif isinstance(elements, dict): option["elements"] = elements["type"] if "contains" in elements: option["contains"] = elements["contains"] else: option["elements"] = elements.__name__ # type:ignore[reportGeneralTypeIssue] if sample is not None: option["sample"] = sample if version_added is not None: option["version_added"] = version_added return option @staticmethod def dict( help: str | builtins.list[str], contains: dict[str, AnsibleReturnParameter], returned: ReturnOptions | None = None, sample: builtins.list[Any] | None = None, version_added: str | None = None, ): option = AnsibleReturnParameter(description=help, type="dict", contains=contains) if returned is not None: option["contains"] = contains if sample is not None: option["sample"] = sample if version_added is not None: option["version_added"] = version_added return option def systemdbool(b: bool | str) -> str: """Converts values into things systemd can parse""" if b is True: return "yes" elif b is False: return "no" return b def modspec( argument_spec: Dict[str, Dict[str, Any]], mutually_exclusive: Sequence[Tuple[str, ...]] = (), required_together: Sequence[Tuple[str, ...]] = (), required_one_of: Sequence[Tuple[str, ...]] = (), required_if: Sequence[Tuple[str, Any, Tuple[str, ...]] | Tuple[str, Any, Tuple[str, ...], bool]] = (), required_by: Dict[str, str | Tuple[str, ...]] = {}, supports_check_mode: bool = False, add_file_common_args: bool = False, deprecated: bool = False, version_added: str | None = None, notes: list[str] | None = None, extends_documentation_fragment: list[str] | None = None, ) -> Dict[str, Any]: # pragma: nocover """Wrapper to properly Type the module specs""" return dict( argument_spec=argument_spec, mutually_exclusive=mutually_exclusive, required_together=required_together, required_one_of=required_one_of, required_if=required_if, required_by=required_by, add_file_common_args=add_file_common_args, supports_check_mode=supports_check_mode, deprecated=deprecated, version_added=version_added, notes=notes, extends_documentation_fragment=extends_documentation_fragment, ) def joindict(*items: dict) -> dict: """merges one or more dictionaries into one""" odict = dict() for item in items: for key, value in item.items(): odict[key] = value return odict