ansible-module/plugins/module_utils/module.py

397 Zeilen
14 KiB
Python

import pathlib
from copy import deepcopy
from typing import Any, Callable, ClassVar, Dict, NoReturn, Optional, Type, TypeVar, Union, overload, TypedDict
from ansible.utils.display import Display
import ansible.module_utils.basic as basic
from .generic import AnsibleParameter, Types, systemdbool
__all__ = (
"AnsibleModule",
"SystemdUnitModule",
"installable",
"SystemdReloadMixin",
)
T = TypeVar("T")
class TypedDiff(TypedDict):
before: str
after: str
before_header: Optional[str] = None
after_header: Optional[str] = None
def docify(input: Union[dict, AnsibleParameter]) -> dict:
options = dict()
for name, help in input.items():
options[name] = dict(type=help["type"])
if "description" in help:
if isinstance(help["description"], str):
help["description"] = help["description"].split("\n")
options[name]["description"] = help["description"]
if "required" in help and help["required"]:
options[name]["required"] = True
else:
options[name]["required"] = False
if help["type"] == "list":
options[name]["elements"] = help["elements"]
if not options[name]["required"]:
options[name]["default"] = []
if "default" in help:
options[name]["default"] = help["default"]
if "options" in help and help["options"] != {}:
options[name]["options"] = docify(help["options"])
if "choices" in help and len(help["choices"]) > 0:
options[name]["choices"] = tuple(help["choices"])
return options
class AnsibleModule(object):
"""Simple wrapper for the basic.AnsibleModule"""
#: name of the module. This is required for the generation of the Ansible documentation
name: ClassVar[str]
#: The AnsibleModule for this Module
module: basic.AnsibleModule
#: The result of this Module call. It always contains the changed key, so in any case an Module can report if it changed anything
result: dict
#: the specification of the arguments. Subclasses that are usable Modules must set this value.
module_spec: ClassVar[dict]
#: This is set by classes that define common things for their subclasses, like behaviour of the run and check methods. This is used by `SystemdUnitModule`
_common_args: ClassVar[dict[str, Any]] = dict()
#: For log messages
display: Display
@property
def params(self) -> Dict[str, Any]:
"""params is an wrapper for the module.params"""
return self.module.params # type: ignore
def __init__(self):
self.result = dict(changed=False)
specs = dict()
specs.update(deepcopy(self._common_args))
modspec = self.module_spec
if "argument_spec" in modspec and "argument_spec" in self._common_args:
specs["argument_spec"].update(modspec["argument_spec"])
del modspec["argument_spec"]
specs.update(modspec)
self.display = Display()
self.module = basic.AnsibleModule(**specs)
self.tmpdir = pathlib.Path(self.module.tmpdir)
def set(self, key: str, value):
"""sets an value for the result"""
self.result[key] = value
@overload
def diff(self, diff: TypedDiff): # pragma: nocover
pass
@overload
def diff(
self,
before: Optional[str] = None,
after: Optional[str] = None,
before_header: Optional[str] = None,
after_header: Optional[str] = None,
): # pragma: nocover
pass
def diff( # type: ignore
self,
diff=None,
*,
before=None,
after=None,
before_header=None,
after_header=None,
):
"""adds the special return value "diff". This allows Modules to present the changes of files to the caller. it takes care of the special semantics of the return value"""
if "diff" not in self.result:
self.result["diff"] = list()
if diff is not None and not any((before is not None, after is not None)):
pass
elif all((before is not None, after is not None, diff is None)):
diff = dict(
before=before,
after=after,
)
if before_header is not None:
diff["before_header"] = before_header
if after_header is not None:
diff["after_header"] = after_header
else:
raise TypeError("only diff or before and after can be set, not both of them")
self.result["diff"].append(diff)
def get(self, key: str, default: Optional[T] = None) -> T: # type: ignore
"""returns an Parameter of the Module."""
if self.params[key] is None and default is not None:
return default
elif self.params[key] is None and default is None:
raise KeyError()
return self.params[key] # type:ignore
def changed_get(self):
"""value that shows if changes were detected/made"""
return self.result["changed"]
def changed_set(self, value):
self.result["changed"] = not not value
changed = property(changed_get, changed_set, None, changed_get.__doc__)
def prepare(self): # pragma: nocover
raise NotImplementedError()
def check(self): # pragma: nocover
raise NotImplementedError()
def run(self): # pragma: nocover
raise NotImplementedError()
def __call__(self) -> NoReturn: # pragma: nocover
"""This calls the module. first prepare is called and then check or run, depending on the check mode.
If an exception is raised this is catched and the module automatically fails with an traceback
"""
self.prepare()
try:
if self.module.check_mode:
self.check()
else:
self.run()
except Exception as exc:
import traceback
self.fail("".join(traceback.format_exception(type(exc), exc, exc.__traceback__)))
self.exit()
@classmethod
def doc(cls) -> str:
"""this returns the documentation string of the module. If the help arguments of an Types method was given, it adds this as an helptext of this parameter"""
try:
import yaml
except ImportError: # pragma: nocover
return "---\n"
doc = cls.__doc__
if doc is None:
doc = ""
specs = dict()
if "argument_spec" in cls._common_args: # pragma: nocover
specs.update(cls._common_args["argument_spec"])
if "argument_spec" in cls.module_spec: # pragma: nocover
specs.update(cls.module_spec["argument_spec"])
options = docify(specs)
docu = doc.split("\n")
return str(
yaml.safe_dump(
dict(
module=cls.name,
short_description=docu[0],
description=docu,
options=options,
),
stream=None,
explicit_start=True,
)
)
def fail(self, message: str) -> NoReturn:
"""Wrapper for AnsibleModule.fail_json"""
self.module.fail_json(message, **self.result)
def exit(self) -> NoReturn:
"""Wrapper for AnsibleModule.exit_json"""
self.module.exit_json(**self.result)
class SystemdUnitModule(AnsibleModule):
#: path of the unitfile managed by this module
unitfile: pathlib.Path
#: subclasses of this always support the file common args and the check mode
_common_args = dict(
supports_check_mode=True,
add_file_common_args=True,
argument_spec=dict(
description=Types.str(help="An description for programs that access systemd"),
documentation=Types.list(str, help="Paths where documentation can be found"),
requires=Types.list(
str,
help="list of units that this unit requires. If it fails or can't be started this unit fails. without before/after this is started at the same time",
),
wants=Types.list(
str,
help="list of units that this unit wants. If it fails or can't be started it does not affect this unit",
),
partof=Types.list(
str,
help="list of units that this unit is part of.\nIf the restart this unit does it too, but if this restarts it does not affect the other units.",
),
before=Types.list(
str,
help="list of units that this unit needs to be started before this unit.",
),
after=Types.list(
str,
help="list of units that this unit wants to be started after this unit",
),
),
)
#: if defined it will be called after run has changed the unitfile
post: Optional[Callable[[], None]] = None
#: generates the install section if the unit is installable
install: ClassVar[Optional[Callable[["SystemdUnitModule"], str | None]]] = None
def unit(self) -> str: # pragma: nocover
raise NotImplementedError()
def header(self) -> Optional[str]:
parts = self.map_param(
description="Description",
documentation="Documentation",
requires="Requires",
wants="Wants",
partof="PartOf",
before="Before",
after="After",
)
if len(parts) == 0:
return None
header = "[Unit]\n" + "".join(parts)
return header
def _unit(self, *parts: Optional[str]) -> str:
opart = []
for part in parts:
if part is not None:
opart.append(part)
return "\n".join(opart)
def map_param(self, **parammap: str) -> list[str]:
"""maps an dict with keys for an section with given params. The key of the dictionary is the parameter and the value is the key in the unitfile. If an parameter has multiple values it adds multiple entries"""
output: list[str] = []
for param, key in parammap.items():
if self.params[param] is not None:
params = self.params[param]
if isinstance(params, (list, tuple, set)):
output.extend((f"{key}={systemdbool(p)}\n" for p in params))
else:
output.append(f"{key}={systemdbool(params)}\n")
return output
def unitfile_gen(self) -> None: # pragma: nocover
path = self.tmpdir / "newunit"
with open(path, "w") as unit:
unit.write(self.unit())
self.module.set_owner_if_different(path.as_posix(), "root", False)
self.module.set_group_if_different(path.as_posix(), "root", False)
self.module.set_mode_if_different(path.as_posix(), "0644", False)
if self.unitfile.exists():
diff: dict[str, str] = dict()
self.changed = self.changed | self.module.set_owner_if_different(
self.unitfile.as_posix(),
"root",
self.result["changed"],
diff,
)
self.diff(diff)
diff = dict()
self.changed = self.changed | self.module.set_group_if_different(
self.unitfile.as_posix(),
"root",
self.result["changed"],
diff,
)
self.diff(diff)
diff = dict()
self.changed = self.changed | self.module.set_mode_if_different(
self.unitfile.as_posix(),
"0644",
self.result["changed"],
diff,
)
self.diff(diff)
def check(self): # pragma: nocover
self.set("unitfile", self.unitfile.as_posix())
self.unitfile_gen()
if not self.unitfile.exists():
self.diff(before="", after=self.unit(), before_header=self.unitfile.as_posix())
self.changed = True
else:
if self.module.sha256(self.unitfile.as_posix()) != self.module.sha256((self.tmpdir / "newunit").as_posix()):
self.changed = True
self.diff(
before=self.unitfile.read_text(),
after=self.unit(),
before_header=self.unitfile.as_posix(),
)
def run(self): # pragma: nocover
self.check()
if not self.changed:
return
self.module.atomic_move(
src=(self.tmpdir / "newunit").as_posix(),
dest=self.unitfile.as_posix(),
)
if hasattr(self, "post") and self.post is not None:
self.post()
_INSTALL_MAPPING = dict(
required_by="RequiredBy",
wanted_by="WantedBy",
)
def installable(_class: Type[SystemdUnitModule]): # pragma: nocover
"""adds the required arguments to the spec and adds the install method for the unit method"""
specs = _class.module_spec
arguments = dict(
required_by=Types.list(elements=str, help="systemd units that require this mount"),
wanted_by=Types.list(
elements=str,
help="systemd units that want the mount, but not explicitly require it. Commonly used for target if not service explicitly require it.",
),
)
specs["argument_spec"].update(arguments)
def install(self: SystemdUnitModule) -> Optional[str]:
parts = []
for argument, key in _INSTALL_MAPPING.items():
if self.get(argument, False):
for unit in self.get(argument): # type: ignore
parts.append("{}={}\n".format(key, unit))
if len(parts) == 0:
return None
return "[Install]\n" + "".join(parts)
_class.install = install
_class.module_spec = specs
return _class
class SystemdReloadMixin:
module: basic.AnsibleModule
unitfile: pathlib.Path
restartable: bool = True
changed: bool
def post(self): # pragma: nocover
if not self.changed:
return
systemctl = self.module.get_bin_path("systemctl", required=True)
self.module.run_command([systemctl, "daemon-reload"], check_rc=True)
if self.unitfile.stem.endswith("@") or not self.restartable:
return
(rc, _, _) = self.module.run_command([systemctl, "is-enabled", self.unitfile.name], check_rc=False)
if rc == 0:
self.module.run_command([systemctl, "restart", self.unitfile.name], check_rc=True)