import pathlib from copy import deepcopy from typing import Any, Callable, ClassVar, Dict, NoReturn, Optional, Type, TypeVar, Union, overload import ansible.module_utils.basic as basic try: from ansible_collections.sebastian.systemd.plugins.module_utils.generic import AnsibleParameter, Types except ImportError: from plugins.module_utils.generic import AnsibleParameter, Types __all__ = ( "AnsibleModule", "SystemdUnitModule", "installable", "SystemdReloadMixin", ) T = TypeVar("T") def docify(input: Union[dict, AnsibleParameter]) -> dict: options = dict() for name, help in input.items(): options[name] = dict(type=help["type"]) if "description" in help: if isinstance(help["description"], str): help["description"] = help["description"].split("\n") options[name]["description"] = help["description"] if "required" in help and help["required"]: options[name]["required"] = True else: options[name]["required"] = False if help["type"] == "list": options[name]["elements"] = help["elements"] if not options[name]["required"]: options[name]["default"] = [] if "default" in help: options[name]["default"] = help["default"] if "options" in help: options[name]["options"] = docify(help["options"]) if "choices" in help: options[name]["choices"] = tuple(help["choices"]) return options class AnsibleModule(object): """Simple wrapper for the basic.AnsibleModule""" #: name of the module. This is required for the generation of the Ansible documentation name: ClassVar[str] #: The AnsibleModule for this Module module: basic.AnsibleModule #: The result of this Module call. It always contains the changed key, so in any case an Module can report if it changed anything result: dict #: the specification of the arguments. Subclasses that are usable Modules must set this value. module_spec: ClassVar[dict] #: This is set by classes that define common things for their subclasses, like behaviour of the run and check methods. This is used by `SystemdUnitModule` _common_args = dict() @property def params(self) -> Dict[str, Any]: """params is an wrapper for the module.params""" return self.module.params # type: ignore def __init__(self): self.result = dict(changed=False) specs = dict() specs.update(deepcopy(self._common_args)) modspec = self.module_spec if "argument_spec" in modspec and "argument_spec" in self._common_args: specs["argument_spec"].update(modspec["argument_spec"]) del modspec["argument_spec"] specs.update(modspec) self.module = basic.AnsibleModule(**specs) self.tmpdir = pathlib.Path(self.module.tmpdir) def set(self, key: str, value): """sets an value for the result""" self.result[key] = value @overload def diff(self, diff: Dict[str, str]): pass @overload def diff( self, before: Optional[str] = None, after: Optional[str] = None, before_header: Optional[str] = None, after_header: Optional[str] = None, ): pass def diff( # type: ignore self, diff=None, *, before=None, after=None, before_header=None, after_header=None, ): """adds the special return value "diff". This allows Modules to present the changes of files to the caller. it takes care of the special semantics of the return value""" if "diff" not in self.result: self.result["diff"] = list() if diff is not None and not any((before is not None, after is not None)): pass elif all((before is not None, after is not None, diff is None)): diff = dict( before=before, after=after, ) if before_header is not None: diff["before_header"] = before_header if after_header is not None: diff["after_header"] = after_header else: raise TypeError("only diff or before and after can be set, not both of them") self.result["diff"].append(diff) def get(self, key: str, default: T = None) -> T: """returns an Parameter of the Module.""" if self.params[key] is None and default is not None: return default elif self.params[key] is None and default is None: raise KeyError() return self.params[key] def changed_get(self): """value that shows if changes were detected/made""" return self.result["changed"] def changed_set(self, value): self.result["changed"] = not not value changed = property(changed_get, changed_set, None, changed_get.__doc__) def prepare(self): raise NotImplementedError() def check(self): raise NotImplementedError() def run(self): raise NotImplementedError() def __call__(self) -> NoReturn: """This calls the module. first prepare is called and then check or run, depending on the check mode. If an exception is raised this is catched and the module automatically fails with an traceback """ self.prepare() try: if self.module.check_mode: self.check() else: self.run() except Exception as exc: import traceback self.module.fail_json( "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)), **self.result, ) self.module.exit_json(**self.result) @classmethod def doc(cls) -> str: """this returns the documentation string of the module. If the help arguments of an Types method was given, it adds this as an helptext of this parameter""" try: import yaml except ImportError: return "---\n" doc = cls.__doc__ if doc is None: doc = "" help: _sdict specs = dict() if "argument_spec" in cls._common_args: specs.update(cls._common_args["argument_spec"]) if "argument_spec" in cls.module_spec: specs.update(cls.module_spec["argument_spec"]) options = docify(specs) docu = doc.split("\n") return str( yaml.safe_dump( dict( module=cls.name, short_description=docu[0], description=docu, options=options, ), stream=None, explicit_start="---", ) ) class SystemdUnitModule(AnsibleModule): #: path of the unitfile managed by this module unitfile: pathlib.Path #: subclasses of this always support the file common args and the check mode _common_args = dict( supports_check_mode=True, add_file_common_args=True, argument_spec=dict( description=Types.str(help="An description for programs that access systemd"), documentation=Types.list(str, help="Paths where documentation can be found"), requires=Types.list( str, help="list of units that this unit requires. If it fails or can't be started this unit fails. without before/after this is started at the same time", ), wants=Types.list(str, help="list of units that this unit wants. If it fails or can't be started it does not affect this unit"), partof=Types.list( str, help="list of units that this unit is part of.\nIf the restart this unit does it too, but if this restarts it does not affect the other units.", ), before=Types.list(str, help="list of units that this unit needs to be started before this unit."), after=Types.list(str, help="list of units that this unit wants to be started after this unit"), ), ) #: if defined it will be called after run has changed the unitfile post: Optional[Callable[[], None]] def unit(self) -> str: raise NotImplementedError() def header(self) -> str: header = "[Unit]\n" header += "\n".join( self.map_param( description="Description", documentation="Documentation", requires="Requires", wants="Wants", partof="PartOf", before="Before", after="After", ) ) return header def map_param(self, **parammap: str): """maps an dict with keys for an section with given params. The key of the dictionary is the parameter and the value is the key in the unitfile. If an parameter has multiple values it adds multiple entries""" output: list[str] = [] for param, key in parammap.items(): if self.params[param] is not None: params = self.params[param] if isinstance(params, list): output.extend(("{}={}\n".format(key, p) for p in params)) else: output.append("{}={}\n".format(key, self.params[param])) return output def unitfile_gen(self): path = self.tmpdir / "newunit" with open(path, "w") as unit: unit.write(self.unit()) self.module.set_owner_if_different(path.as_posix(), "root", False) self.module.set_group_if_different(path.as_posix(), "root", False) self.module.set_mode_if_different(path.as_posix(), "0644", False) if self.unitfile.exists(): diff = dict() self.changed = self.changed | self.module.set_owner_if_different( self.unitfile.as_posix(), "root", self.result["changed"], diff, ) self.diff(diff) diff = dict() self.changed = self.changed | self.module.set_group_if_different( self.unitfile.as_posix(), "root", self.result["changed"], diff, ) self.diff(diff) diff = dict() self.changed = self.changed | self.module.set_mode_if_different( self.unitfile.as_posix(), "0644", self.result["changed"], diff, ) self.diff(diff) def check(self): self.set("unitfile", self.unitfile.as_posix()) self.unitfile_gen() if not self.unitfile.exists(): self.diff(before="", after=self.unit(), before_header=self.unitfile.as_posix()) self.changed = True else: if self.module.sha256(self.unitfile.as_posix()) != self.module.sha256((self.tmpdir / "newunit").as_posix()): self.changed = True self.diff( before=self.unitfile.read_text(), after=self.unit(), before_header=self.unitfile.as_posix(), ) def run(self): self.check() if not self.changed: return self.module.atomic_move( src=(self.tmpdir / "newunit").as_posix(), dest=self.unitfile.as_posix(), ) if hasattr(self, "post") and self.post is not None: self.post() _INSTALL_MAPPING = dict( required_by="RequiredBy", wanted_by="WantedBy", ) def installable(_class: Type[SystemdUnitModule]): """adds the required arguments to the spec and adds the install method for the unit method""" specs = _class.module_spec arguments = dict( required_by=Types.list(elements=str, help="systemd units that require this mount"), wanted_by=Types.list( elements=str, help="systemd units that want the mount, but not explicitly require it. Commonly used for target if not service explicitly require it.", ), ) specs["argument_spec"].update(arguments) def install(self: SystemdUnitModule) -> str: output = "[Install]\n" for argument, key in _INSTALL_MAPPING.items(): if self.get(argument, False): for unit in self.get(argument): output += "{}={}\n".format(key, unit) return output _class.install = install _class.module_spec = specs return _class class SystemdReloadMixin: def post(self): if not self.changed: return systemctl = self.module.get_bin_path("systemctl", required=True) self.module.run_command([systemctl, "daemon-reload"], check_rc=True) if self.unitfile.stem.endswith("@"): return (rc, _, _) = self.module.run_command([systemctl, "is-enabled", self.unitfile.name], check_rc=False) if rc == 0: self.module.run_command([systemctl, "restart", self.unitfile.name], check_rc=True)