Source code for dbusmock.templates.iio-sensors-proxy

"""sensors proxy mock template
"""

# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version.  See http://www.gnu.org/copyleft/lgpl.html for the full text
# of the license.

[docs] __author__ = "Marco Trevisan"
import re import dbus from dbusmock import MOCK_IFACE
[docs] BUS_NAME = "net.hadess.SensorProxy"
[docs] MAIN_OBJ = "/net/hadess/SensorProxy"
[docs] MAIN_IFACE = "net.hadess.SensorProxy"
[docs] COMPASS_IFACE = "net.hadess.SensorProxy.Compass"
[docs] SYSTEM_BUS = True
[docs] CAMEL_TO_SNAKE_CASE_RE = re.compile(r"(?<!^)(?=[A-Z])")
[docs] def load(mock, parameters=None): mock.has_accelerometer = False mock.accelerometer_owners = {} mock.accelerometer_orientation = "undefined" mock.has_ambient_light = False mock.ambient_light_owners = {} mock.light_level_unit = "lux" mock.light_level = 0.0 mock.has_proximity = False mock.proximity_near = False mock.proximity_owners = {} mock.has_compass = False mock.compass_owners = {} mock.compass_heading = -1.0 if parameters: for p, v in parameters.items(): setattr(mock, p, v) for iface in [MAIN_IFACE, COMPASS_IFACE]: mock.AddProperties(iface, mock.GetAll(iface))
[docs] def emit_signal_to_destination(mock, interface, name, signature, destination, *args): # We need to do this manually, could be made easier via # https://gitlab.freedesktop.org/dbus/dbus-python/-/merge_requests/13 message = dbus.lowlevel.SignalMessage(mock.path, interface, name) if destination: message.set_destination(destination) message.append(*args, signature=signature) for location in mock.locations: location[0].send_message(message)
[docs] def emit_properties_changed(mock, interface=MAIN_IFACE, properties=None, destination=None): if properties is None: properties = mock.GetAll(interface) elif isinstance(properties, str): properties = [properties] if isinstance(properties, (list, set)): properties = {p: mock.Get(interface, p) for p in properties} elif not isinstance(properties, dict): raise TypeError("Unsupported properties type") emit_signal_to_destination( mock, dbus.PROPERTIES_IFACE, "PropertiesChanged", "sa{sv}as", destination, interface, properties, [] )
@dbus.service.method(dbus.PROPERTIES_IFACE, in_signature="s", out_signature="a{sv}")
[docs] def GetAll(self, interface): if interface == MAIN_IFACE: return { "HasAccelerometer": dbus.Boolean(self.has_accelerometer), "AccelerometerOrientation": dbus.String(self.accelerometer_orientation), "HasAmbientLight": dbus.Boolean(self.has_ambient_light), "LightLevelUnit": dbus.String(self.light_level_unit), "LightLevel": dbus.Double(self.light_level), "HasProximity": dbus.Boolean(self.has_proximity), "ProximityNear": dbus.Boolean(self.proximity_near), } if interface == COMPASS_IFACE: return { "HasCompass": dbus.Boolean(self.has_compass), "CompassHeading": dbus.Double(self.compass_heading), } return dbus.Dictionary({}, signature="sv")
[docs] def register_owner(self, owners_dict, name): if name in owners_dict: return def name_cb(unique_name): if unique_name: return owners_dict.pop(name).cancel() owners_dict[name] = self.connection.watch_name_owner(name, name_cb)
[docs] def unregister_owner(owners_dict, name): watcher = owners_dict.pop(name, None) if watcher: watcher.cancel()
@dbus.service.method(MAIN_IFACE, sender_keyword="sender")
[docs] def ClaimAccelerometer(self, sender): register_owner(self, self.accelerometer_owners, sender)
@dbus.service.method(MAIN_IFACE, sender_keyword="sender")
[docs] def ReleaseAccelerometer(self, sender): unregister_owner(self.accelerometer_owners, sender)
@dbus.service.method(MAIN_IFACE, sender_keyword="sender")
[docs] def ClaimLight(self, sender): register_owner(self, self.ambient_light_owners, sender)
@dbus.service.method(MAIN_IFACE, sender_keyword="sender")
[docs] def ReleaseLight(self, sender): unregister_owner(self.ambient_light_owners, sender)
@dbus.service.method(MAIN_IFACE, sender_keyword="sender")
[docs] def ClaimProximity(self, sender): register_owner(self, self.proximity_owners, sender)
@dbus.service.method(MAIN_IFACE, sender_keyword="sender")
[docs] def ReleaseProximity(self, sender): unregister_owner(self.proximity_owners, sender)
@dbus.service.method(MAIN_IFACE, sender_keyword="sender")
[docs] def ClaimCompass(self, sender): register_owner(self, self.compass_owners, sender)
@dbus.service.method(MAIN_IFACE, sender_keyword="sender")
[docs] def ReleaseCompass(self, sender): unregister_owner(self.compass_owners, sender)
[docs] def sensor_to_attribute(sensor): if sensor == "light": return "ambient_light" return sensor
[docs] def is_valid_sensor_for_interface(sensor, interface): if interface == "net.hadess.SensorProxy": return sensor in ["accelerometer", "ambient_light", "proximity"] if interface == "net.hadess.SensorProxy.Compass": return sensor == "compass" return False
@dbus.service.method(MOCK_IFACE, in_signature="ssv")
[docs] def SetInternalProperty(self, interface, property_name, value): property_attribute = CAMEL_TO_SNAKE_CASE_RE.sub("_", property_name).lower() sensor = sensor_to_attribute(property_attribute.split("_")[0]) owners = None if is_valid_sensor_for_interface(sensor, interface): if not getattr(self, f"has_{sensor}"): raise dbus.exceptions.DBusException( f"No {sensor} sensor available", name="org.freedesktop.DBus.Mock.Error" ) owners = getattr(self, f"{sensor}_owners") # We allow setting a property from any client here, even if not claiming # but only owners, if any, will be notified about sensors changes pre_value = getattr(self, property_attribute) if pre_value != value: setattr(self, property_attribute, value) if owners: for owner in owners: emit_properties_changed(self, interface, property_name, owner) elif owners is None: emit_properties_changed(self, interface, property_name, None)
@dbus.service.method(MOCK_IFACE, in_signature="s")
[docs] def GetInternalProperty(self, property_name): property_attribute = CAMEL_TO_SNAKE_CASE_RE.sub("_", property_name).lower() value = getattr(self, property_attribute) if property_name.endswith("Owners"): return dbus.Array(value.keys(), signature="s") return value