Source code for dbusmock.templates.logind

"""systemd logind mock template

This creates the expected methods and properties of the main
org.freedesktop.login1.Manager object. You can specify D-Bus property values
like "CanSuspend" or the return value of Inhibit() in "parameters".
"""

# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version.  See http://www.gnu.org/copyleft/lgpl.html for the full text
# of the license.

[docs] __author__ = "Martin Pitt"
import os import dbus from gi.repository import GLib from dbusmock import MOCK_IFACE, mockobject
[docs] BUS_NAME = "org.freedesktop.login1"
[docs] MAIN_OBJ = "/org/freedesktop/login1"
[docs] MAIN_IFACE = "org.freedesktop.login1.Manager"
[docs] SYSTEM_BUS = True
[docs] def load(mock, parameters): mock.AddMethods( MAIN_IFACE, [ ("PowerOff", "b", "", ""), ("Reboot", "b", "", ""), ("Suspend", "b", "", ""), ("Hibernate", "b", "", ""), ("HybridSleep", "b", "", ""), ("SuspendThenHibernate", "b", "", ""), ("CanPowerOff", "", "s", f'ret = "{parameters.get("CanPowerOff", "yes")}"'), ("CanReboot", "", "s", f'ret = "{parameters.get("CanReboot", "yes")}"'), ("CanSuspend", "", "s", f'ret = "{parameters.get("CanSuspend", "yes")}"'), ("CanHibernate", "", "s", f'ret = "{parameters.get("CanHibernate", "yes")}"'), ("CanHybridSleep", "", "s", f'ret = "{parameters.get("CanHybridSleep", "yes")}"'), ("CanSuspendThenHibernate", "", "s", f'ret = "{parameters.get("CanSuspendThenHibernate", "yes")}"'), ("GetSession", "s", "o", 'ret = "/org/freedesktop/login1/session/" + args[0]'), ("ActivateSession", "s", "", ""), ("ActivateSessionOnSeat", "ss", "", ""), ("KillSession", "sss", "", ""), ("LockSession", "s", "", ""), ("LockSessions", "", "", ""), ("ReleaseSession", "s", "", ""), ("TerminateSession", "s", "", ""), ("UnlockSession", "s", "", ""), ("UnlockSessions", "", "", ""), ("GetSeat", "s", "o", 'ret = "/org/freedesktop/login1/seat/" + args[0]'), ("ListSeats", "", "a(so)", 'ret = [(k.split("/")[-1], k) for k in objects.keys() if "/seat/" in k]'), ("TerminateSeat", "s", "", ""), ("GetUser", "u", "o", 'ret = "/org/freedesktop/login1/user/" + args[0]'), ("KillUser", "us", "", ""), ("TerminateUser", "u", "", ""), ], ) mock.AddProperties( MAIN_IFACE, dbus.Dictionary( { "IdleHint": parameters.get("IdleHint", False), "IdleAction": parameters.get("IdleAction", "ignore"), "IdleSinceHint": dbus.UInt64(parameters.get("IdleSinceHint", 0)), "IdleSinceHintMonotonic": dbus.UInt64(parameters.get("IdleSinceHintMonotonic", 0)), "IdleActionUSec": dbus.UInt64(parameters.get("IdleActionUSec", 1)), "PreparingForShutdown": parameters.get("PreparingForShutdown", False), "PreparingForSleep": parameters.get("PreparingForSleep", False), }, signature="sv", ), )
# # logind methods which are too big for squeezing into AddMethod() # @dbus.service.method(MAIN_IFACE, in_signature="", out_signature="a(uso)")
[docs] def ListUsers(_): users = [] for k, obj in mockobject.objects.items(): if "/user/" in k: uid = dbus.UInt32(int(k.split("/")[-1])) users.append((uid, obj.Get("org.freedesktop.login1.User", "Name"), k)) return users
@dbus.service.method(MAIN_IFACE, in_signature="", out_signature="a(susso)")
[docs] def ListSessions(_): sessions = [] for k, obj in mockobject.objects.items(): if "/session/" in k: session_id = k.split("/")[-1] uid = obj.Get("org.freedesktop.login1.Session", "User")[0] username = obj.Get("org.freedesktop.login1.Session", "Name") seat = obj.Get("org.freedesktop.login1.Session", "Seat")[0] sessions.append((session_id, uid, username, seat, k)) return sessions
@dbus.service.method(MAIN_IFACE, in_signature="ssss", out_signature="h")
[docs] def Inhibit(_, what, who, why, mode): if not hasattr(mockobject, "inhibitors"): mockobject.inhibitors = [] fd_r, fd_w = os.pipe() inhibitor = (what, who, why, mode, 1000, 123456) mockobject.inhibitors.append(inhibitor) def inhibitor_dropped(fd, cond): # pylint: disable=unused-argument os.close(fd) mockobject.inhibitors.remove(inhibitor) return False GLib.unix_fd_add_full(GLib.PRIORITY_HIGH, fd_r, GLib.IO_HUP, inhibitor_dropped) GLib.idle_add(os.close, fd_w) return fd_w
@dbus.service.method(MAIN_IFACE, in_signature="", out_signature="a(ssssuu)")
[docs] def ListInhibitors(_): if not hasattr(mockobject, "inhibitors"): mockobject.inhibitors = [] return mockobject.inhibitors
# # Convenience methods on the mock # @dbus.service.method(MOCK_IFACE, in_signature="s", out_signature="s")
[docs] def AddSeat(self, seat): """Convenience method to add a seat. Return the object path of the new seat. """ seat_path = "/org/freedesktop/login1/seat/" + seat if seat_path in mockobject.objects: raise dbus.exceptions.DBusException(f"Seat {seat} already exists", name=MOCK_IFACE + ".SeatExists") self.AddObject( seat_path, "org.freedesktop.login1.Seat", { "Sessions": dbus.Array([], signature="(so)"), "CanGraphical": False, "CanMultiSession": True, "CanTTY": False, "IdleHint": False, "ActiveSession": ("", dbus.ObjectPath("/")), "Id": seat, "IdleSinceHint": dbus.UInt64(0), "IdleSinceHintMonotonic": dbus.UInt64(0), }, [("ActivateSession", "s", "", ""), ("Terminate", "", "", "")], ) return seat_path
@dbus.service.method(MOCK_IFACE, in_signature="usb", out_signature="s")
[docs] def AddUser(self, uid, username, active): """Convenience method to add a user. Return the object path of the new user. """ user_path = f"/org/freedesktop/login1/user/{uid}" if user_path in mockobject.objects: raise dbus.exceptions.DBusException(f"User {uid} already exists", name=MOCK_IFACE + ".UserExists") self.AddObject( user_path, "org.freedesktop.login1.User", { "DefaultControlGroup": "systemd:/user/" + username, "Display": ("", dbus.ObjectPath("/")), "GID": dbus.UInt32(uid), "IdleHint": False, "IdleSinceHint": dbus.UInt64(0), "IdleSinceHintMonotonic": dbus.UInt64(0), "Linger": False, "Name": username, "RuntimePath": f"/run/user/{uid}", "Service": "", "Sessions": dbus.Array([], signature="(so)"), "Slice": f"user-{uid}.slice", "State": "active" if active else "online", "Timestamp": dbus.UInt64(42), "TimestampMonotonic": dbus.UInt64(42), "UID": dbus.UInt32(uid), }, [ ("Kill", "s", "", ""), ("Terminate", "", "", ""), ], ) return user_path
@dbus.service.method(MOCK_IFACE, in_signature="ssusb", out_signature="s")
[docs] def AddSession(self, session_id, seat, uid, username, active): """Convenience method to add a session. If the given seat and/or user do not exit, they will be created. Return the object path of the new session. """ seat_path = dbus.ObjectPath(f"/org/freedesktop/login1/seat/{seat}") if seat_path not in mockobject.objects: self.AddSeat(seat) user_path = dbus.ObjectPath(f"/org/freedesktop/login1/user/{uid}") if user_path not in mockobject.objects: self.AddUser(uid, username, active) session_path = dbus.ObjectPath(f"/org/freedesktop/login1/session/{session_id}") if session_path in mockobject.objects: raise dbus.exceptions.DBusException( f"Session {session_id} already exists", name=MOCK_IFACE + ".SessionExists" ) self.AddObject( session_path, "org.freedesktop.login1.Session", { "Controllers": dbus.Array([], signature="s"), "ResetControllers": dbus.Array([], signature="s"), "Active": active, "IdleHint": False, "LockedHint": False, "KillProcesses": False, "Remote": False, "Class": "user", "DefaultControlGroup": f"systemd:/user/{username}/{session_id}", "Display": os.getenv("DISPLAY", ""), "Id": session_id, "Name": username, "RemoteHost": "", "RemoteUser": "", "Service": "dbusmock", "State": "active" if active else "online", "TTY": "", "Type": "test", "Seat": (seat, seat_path), "User": (dbus.UInt32(uid), user_path), "Audit": dbus.UInt32(0), "Leader": dbus.UInt32(1), "VTNr": dbus.UInt32(1), "IdleSinceHint": dbus.UInt64(0), "IdleSinceHintMonotonic": dbus.UInt64(0), "Timestamp": dbus.UInt64(42), "TimestampMonotonic": dbus.UInt64(42), }, [ ("Activate", "", "", ""), ("Kill", "ss", "", ""), ("Lock", "", "", 'self.EmitSignal("", "Lock", "", [])'), ("SetIdleHint", "b", "", ""), ("SetLockedHint", "b", "", 'self.UpdateProperties("", {"LockedHint": args[0]})'), ("Terminate", "", "", ""), ("Unlock", "", "", 'self.EmitSignal("", "Unlock", "", [])'), ], ) # add session to seat obj_seat = mockobject.objects[seat_path] cur_sessions = obj_seat.Get("org.freedesktop.login1.Seat", "Sessions") cur_sessions.append((session_id, session_path)) obj_seat.Set("org.freedesktop.login1.Seat", "Sessions", cur_sessions) obj_seat.Set("org.freedesktop.login1.Seat", "ActiveSession", (session_id, session_path)) # add session to user obj_user = mockobject.objects[user_path] cur_sessions = obj_user.Get("org.freedesktop.login1.User", "Sessions") cur_sessions.append((session_id, session_path)) obj_user.Set("org.freedesktop.login1.User", "Sessions", cur_sessions) return session_path