Source code for dbusmock.templates.systemd

"""systemd mock template
"""

# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version.  See http://www.gnu.org/copyleft/lgpl.html for the full text
# of the license.

[docs] __author__ = "Jonas Ã…dahl"
import dbus from gi.repository import GLib from dbusmock import MOCK_IFACE, mockobject
[docs] BUS_PREFIX = "org.freedesktop.systemd1"
[docs] PATH_PREFIX = "/org/freedesktop/systemd1"
[docs] BUS_NAME = BUS_PREFIX
[docs] MAIN_OBJ = PATH_PREFIX
[docs] MAIN_IFACE = BUS_PREFIX + ".Manager"
[docs] UNIT_IFACE = BUS_PREFIX + ".Unit"
[docs] SYSTEM_BUS = True
[docs] def load(mock, _parameters): mock.next_job_id = 1 mock.units = {} mock.AddProperties(MAIN_IFACE, {"Version": "v246"})
[docs] def escape_unit_name(name): for s in [".", "-"]: name = name.replace(s, "_") return name
[docs] def emit_job_new_remove(mock, job_id, job_path, name): mock.EmitSignal(MAIN_IFACE, "JobNew", "uos", [job_id, job_path, name]) mock.EmitSignal(MAIN_IFACE, "JobRemoved", "uoss", [job_id, job_path, name, "done"])
@dbus.service.method(MAIN_IFACE, in_signature="ss", out_signature="o")
[docs] def StartUnit(self, name, _mode): job_id = self.next_job_id self.next_job_id += 1 job_path = f"{PATH_PREFIX}/Job/{job_id}" GLib.idle_add(lambda: emit_job_new_remove(self, job_id, job_path, name)) unit_path = self.units[str(name)] unit = mockobject.objects[unit_path] unit.UpdateProperties(UNIT_IFACE, {"ActiveState": "active"}) return job_path
@dbus.service.method(MAIN_IFACE, in_signature="ssa(sv)a(sa(sv))", out_signature="o")
[docs] def StartTransientUnit(self, name, _mode, _properties, _aux): job_id = self.next_job_id self.next_job_id += 1 job_path = f"{PATH_PREFIX}/Job/{job_id}" GLib.idle_add(lambda: emit_job_new_remove(self, job_id, job_path, name)) return job_path
@dbus.service.method(MAIN_IFACE, in_signature="ss", out_signature="o")
[docs] def StopUnit(self, name, _mode): job_id = self.next_job_id self.next_job_id += 1 job_path = f"{PATH_PREFIX}/Job/{job_id}" GLib.idle_add(lambda: emit_job_new_remove(self, job_id, job_path, name)) unit_path = self.units[str(name)] unit = mockobject.objects[unit_path] unit.UpdateProperties(UNIT_IFACE, {"ActiveState": "inactive"}) return job_path
@dbus.service.method(MAIN_IFACE, in_signature="s", out_signature="o")
[docs] def GetUnit(self, name): return self.units[str(name)]
@dbus.service.method(MOCK_IFACE, in_signature="s")
[docs] def AddMockUnit(self, name): unit_path = f"{PATH_PREFIX}/unit/{escape_unit_name(name)}" self.units[str(name)] = unit_path self.AddObject( unit_path, UNIT_IFACE, { "Id": name, "Names": [name], "LoadState": "loaded", "ActiveState": "inactive", }, [], )