import pathlib from copy import deepcopy from typing import Any, Callable, ClassVar, Dict, NoReturn, Optional, Type, TypeVar, Union, overload, TypedDict from ansible.utils.display import Display import ansible.module_utils.basic as basic from .generic import AnsibleParameter, Types, systemdbool __all__ = ( "AnsibleModule", "SystemdUnitModule", "installable", "SystemdReloadMixin", ) T = TypeVar("T") class TypedDiff(TypedDict): before: str after: str before_header: Optional[str] = None after_header: Optional[str] = None def docify(input: Union[dict, AnsibleParameter]) -> dict: options = dict() for name, help in input.items(): options[name] = dict(type=help["type"]) if "description" in help: if isinstance(help["description"], str): help["description"] = help["description"].split("\n") options[name]["description"] = help["description"] if "required" in help and help["required"]: options[name]["required"] = True else: options[name]["required"] = False if help["type"] == "list": options[name]["elements"] = help["elements"] if not options[name]["required"]: options[name]["default"] = [] if "default" in help: options[name]["default"] = help["default"] if "options" in help and help["options"] != {}: options[name]["options"] = docify(help["options"]) if "choices" in help and len(help["choices"]) > 0: options[name]["choices"] = tuple(help["choices"]) return options class AnsibleModule(object): """Simple wrapper for the basic.AnsibleModule""" #: name of the module. This is required for the generation of the Ansible documentation name: ClassVar[str] #: The AnsibleModule for this Module module: basic.AnsibleModule #: The result of this Module call. It always contains the changed key, so in any case an Module can report if it changed anything result: dict #: the specification of the arguments. Subclasses that are usable Modules must set this value. module_spec: ClassVar[dict] #: This is set by classes that define common things for their subclasses, like behaviour of the run and check methods. This is used by `SystemdUnitModule` _common_args: ClassVar[dict[str, Any]] = dict() #: For log messages display: Display @property def params(self) -> Dict[str, Any]: """params is an wrapper for the module.params""" return self.module.params # type: ignore def __init__(self): self.result = dict(changed=False) specs = dict() specs.update(deepcopy(self._common_args)) modspec = self.module_spec if "argument_spec" in modspec and "argument_spec" in self._common_args: specs["argument_spec"].update(modspec["argument_spec"]) del modspec["argument_spec"] specs.update(modspec) self.display = Display() self.module = basic.AnsibleModule(**specs) self.tmpdir = pathlib.Path(self.module.tmpdir) def set(self, key: str, value): """sets an value for the result""" self.result[key] = value @overload def diff(self, diff: TypedDiff): # pragma: nocover pass @overload def diff( self, before: Optional[str] = None, after: Optional[str] = None, before_header: Optional[str] = None, after_header: Optional[str] = None, ): # pragma: nocover pass def diff( # type: ignore self, diff=None, *, before=None, after=None, before_header=None, after_header=None, ): """adds the special return value "diff". This allows Modules to present the changes of files to the caller. it takes care of the special semantics of the return value""" if "diff" not in self.result: self.result["diff"] = list() if diff is not None and not any((before is not None, after is not None)): pass elif all((before is not None, after is not None, diff is None)): diff = dict( before=before, after=after, ) if before_header is not None: diff["before_header"] = before_header if after_header is not None: diff["after_header"] = after_header else: raise TypeError("only diff or before and after can be set, not both of them") self.result["diff"].append(diff) def get(self, key: str, default: Optional[T] = None) -> T: # type: ignore """returns an Parameter of the Module.""" if self.params[key] is None and default is not None: return default elif self.params[key] is None and default is None: raise KeyError() return self.params[key] # type:ignore def changed_get(self): """value that shows if changes were detected/made""" return self.result["changed"] def changed_set(self, value): self.result["changed"] = not not value changed = property(changed_get, changed_set, None, changed_get.__doc__) def prepare(self): # pragma: nocover raise NotImplementedError() def check(self): # pragma: nocover raise NotImplementedError() def run(self): # pragma: nocover raise NotImplementedError() def __call__(self) -> NoReturn: # pragma: nocover """This calls the module. first prepare is called and then check or run, depending on the check mode. If an exception is raised this is catched and the module automatically fails with an traceback """ self.prepare() try: if self.module.check_mode: self.check() else: self.run() except Exception as exc: import traceback self.fail("".join(traceback.format_exception(type(exc), exc, exc.__traceback__))) self.exit() @classmethod def doc(cls) -> str: """this returns the documentation string of the module. If the help arguments of an Types method was given, it adds this as an helptext of this parameter""" try: import yaml except ImportError: # pragma: nocover return "---\n" doc = cls.__doc__ if doc is None: doc = "" specs = dict() if "argument_spec" in cls._common_args: # pragma: nocover specs.update(cls._common_args["argument_spec"]) if "argument_spec" in cls.module_spec: # pragma: nocover specs.update(cls.module_spec["argument_spec"]) options = docify(specs) docu = doc.split("\n") return str( yaml.safe_dump( dict( module=cls.name, short_description=docu[0], description=docu, options=options, ), stream=None, explicit_start=True, ) ) def fail(self, message: str) -> NoReturn: """Wrapper for AnsibleModule.fail_json""" self.module.fail_json(message, **self.result) def exit(self) -> NoReturn: """Wrapper for AnsibleModule.exit_json""" self.module.exit_json(**self.result) class SystemdUnitModule(AnsibleModule): #: path of the unitfile managed by this module unitfile: pathlib.Path #: subclasses of this always support the file common args and the check mode _common_args = dict( supports_check_mode=True, add_file_common_args=True, argument_spec=dict( description=Types.str(help="An description for programs that access systemd"), documentation=Types.list(str, help="Paths where documentation can be found"), requires=Types.list( str, help="list of units that this unit requires. If it fails or can't be started this unit fails. without before/after this is started at the same time", ), wants=Types.list( str, help="list of units that this unit wants. If it fails or can't be started it does not affect this unit", ), partof=Types.list( str, help="list of units that this unit is part of.\nIf the restart this unit does it too, but if this restarts it does not affect the other units.", ), before=Types.list( str, help="list of units that this unit needs to be started before this unit.", ), after=Types.list( str, help="list of units that this unit wants to be started after this unit", ), ), ) #: if defined it will be called after run has changed the unitfile post: Optional[Callable[[], None]] = None #: generates the install section if the unit is installable install: ClassVar[Optional[Callable[["SystemdUnitModule"], str | None]]] = None def unit(self) -> str: # pragma: nocover raise NotImplementedError() def header(self) -> Optional[str]: parts = self.map_param( description="Description", documentation="Documentation", requires="Requires", wants="Wants", partof="PartOf", before="Before", after="After", ) if len(parts) == 0: return None header = "[Unit]\n" + "".join(parts) return header def _unit(self, *parts: Optional[str]) -> str: opart = [] for part in parts: if part is not None: opart.append(part) return "\n".join(opart) def map_param(self, **parammap: str) -> list[str]: """maps an dict with keys for an section with given params. The key of the dictionary is the parameter and the value is the key in the unitfile. If an parameter has multiple values it adds multiple entries""" output: list[str] = [] for param, key in parammap.items(): if self.params[param] is not None: params = self.params[param] if isinstance(params, (list, tuple, set)): output.extend((f"{key}={systemdbool(p)}\n" for p in params)) else: output.append(f"{key}={systemdbool(params)}\n") return output def unitfile_gen(self) -> None: # pragma: nocover path = self.tmpdir / "newunit" with open(path, "w") as unit: unit.write(self.unit()) self.module.set_owner_if_different(path.as_posix(), "root", False) self.module.set_group_if_different(path.as_posix(), "root", False) self.module.set_mode_if_different(path.as_posix(), "0644", False) if self.unitfile.exists(): diff: dict[str, str] = dict() self.changed = self.changed | self.module.set_owner_if_different( self.unitfile.as_posix(), "root", self.result["changed"], diff, ) self.diff(diff) diff = dict() self.changed = self.changed | self.module.set_group_if_different( self.unitfile.as_posix(), "root", self.result["changed"], diff, ) self.diff(diff) diff = dict() self.changed = self.changed | self.module.set_mode_if_different( self.unitfile.as_posix(), "0644", self.result["changed"], diff, ) self.diff(diff) def check(self): # pragma: nocover self.set("unitfile", self.unitfile.as_posix()) self.unitfile_gen() if not self.unitfile.exists(): self.diff(before="", after=self.unit(), before_header=self.unitfile.as_posix()) self.changed = True else: if self.module.sha256(self.unitfile.as_posix()) != self.module.sha256((self.tmpdir / "newunit").as_posix()): self.changed = True self.diff( before=self.unitfile.read_text(), after=self.unit(), before_header=self.unitfile.as_posix(), ) def run(self): # pragma: nocover self.check() if not self.changed: return self.module.atomic_move( src=(self.tmpdir / "newunit").as_posix(), dest=self.unitfile.as_posix(), ) if hasattr(self, "post") and self.post is not None: self.post() _INSTALL_MAPPING = dict( required_by="RequiredBy", wanted_by="WantedBy", ) def installable(_class: Type[SystemdUnitModule]): # pragma: nocover """adds the required arguments to the spec and adds the install method for the unit method""" specs = _class.module_spec arguments = dict( required_by=Types.list(elements=str, help="systemd units that require this mount"), wanted_by=Types.list( elements=str, help="systemd units that want the mount, but not explicitly require it. Commonly used for target if not service explicitly require it.", ), ) specs["argument_spec"].update(arguments) def install(self: SystemdUnitModule) -> Optional[str]: parts = [] for argument, key in _INSTALL_MAPPING.items(): if self.get(argument, False): for unit in self.get(argument): # type: ignore parts.append("{}={}\n".format(key, unit)) if len(parts) == 0: return None return "[Install]\n" + "".join(parts) _class.install = install _class.module_spec = specs return _class class SystemdReloadMixin: module: basic.AnsibleModule unitfile: pathlib.Path restartable: bool = True changed: bool def post(self): # pragma: nocover if not self.changed: return systemctl = self.module.get_bin_path("systemctl", required=True) self.module.run_command([systemctl, "daemon-reload"], check_rc=True) if self.unitfile.stem.endswith("@") or not self.restartable: return (rc, _, _) = self.module.run_command([systemctl, "is-enabled", self.unitfile.name], check_rc=False) if rc == 0: self.module.run_command([systemctl, "restart", self.unitfile.name], check_rc=True)