"""sensors proxy mock template
"""
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version. See http://www.gnu.org/copyleft/lgpl.html for the full text
# of the license.
[docs]
__author__ = "Marco Trevisan"
[docs]
__copyright__ = """
(c) 2021 Canonical Ltd.
(c) 2017 - 2022 Martin Pitt <martin@piware.de>
"""
import re
import dbus
from dbusmock import MOCK_IFACE
[docs]
BUS_NAME = "net.hadess.SensorProxy"
[docs]
MAIN_OBJ = "/net/hadess/SensorProxy"
[docs]
MAIN_IFACE = "net.hadess.SensorProxy"
[docs]
COMPASS_IFACE = "net.hadess.SensorProxy.Compass"
[docs]
CAMEL_TO_SNAKE_CASE_RE = re.compile(r"(?<!^)(?=[A-Z])")
[docs]
def load(mock, parameters=None):
mock.has_accelerometer = False
mock.accelerometer_owners = {}
mock.accelerometer_orientation = "undefined"
mock.has_ambient_light = False
mock.ambient_light_owners = {}
mock.light_level_unit = "lux"
mock.light_level = 0.0
mock.has_proximity = False
mock.proximity_near = False
mock.proximity_owners = {}
mock.has_compass = False
mock.compass_owners = {}
mock.compass_heading = -1.0
if parameters:
for p, v in parameters.items():
setattr(mock, p, v)
for iface in [MAIN_IFACE, COMPASS_IFACE]:
mock.AddProperties(iface, mock.GetAll(iface))
[docs]
def emit_signal_to_destination(mock, interface, name, signature, destination, *args):
# We need to do this manually, could be made easier via
# https://gitlab.freedesktop.org/dbus/dbus-python/-/merge_requests/13
message = dbus.lowlevel.SignalMessage(mock.path, interface, name)
if destination:
message.set_destination(destination)
message.append(*args, signature=signature)
for location in mock.locations:
location[0].send_message(message)
[docs]
def emit_properties_changed(mock, interface=MAIN_IFACE, properties=None, destination=None):
if properties is None:
properties = mock.GetAll(interface)
elif isinstance(properties, str):
properties = [properties]
if isinstance(properties, (list, set)):
properties = {p: mock.Get(interface, p) for p in properties}
elif not isinstance(properties, dict):
raise TypeError("Unsupported properties type")
emit_signal_to_destination(
mock, dbus.PROPERTIES_IFACE, "PropertiesChanged", "sa{sv}as", destination, interface, properties, []
)
@dbus.service.method(dbus.PROPERTIES_IFACE, in_signature="s", out_signature="a{sv}")
[docs]
def GetAll(self, interface):
if interface == MAIN_IFACE:
return {
"HasAccelerometer": dbus.Boolean(self.has_accelerometer),
"AccelerometerOrientation": dbus.String(self.accelerometer_orientation),
"HasAmbientLight": dbus.Boolean(self.has_ambient_light),
"LightLevelUnit": dbus.String(self.light_level_unit),
"LightLevel": dbus.Double(self.light_level),
"HasProximity": dbus.Boolean(self.has_proximity),
"ProximityNear": dbus.Boolean(self.proximity_near),
}
if interface == COMPASS_IFACE:
return {
"HasCompass": dbus.Boolean(self.has_compass),
"CompassHeading": dbus.Double(self.compass_heading),
}
return dbus.Dictionary({}, signature="sv")
[docs]
def register_owner(self, owners_dict, name):
if name in owners_dict:
return
def name_cb(unique_name):
if unique_name:
return
owners_dict.pop(name).cancel()
owners_dict[name] = self.connection.watch_name_owner(name, name_cb)
[docs]
def unregister_owner(owners_dict, name):
watcher = owners_dict.pop(name, None)
if watcher:
watcher.cancel()
@dbus.service.method(MAIN_IFACE, sender_keyword="sender")
[docs]
def ClaimAccelerometer(self, sender):
register_owner(self, self.accelerometer_owners, sender)
@dbus.service.method(MAIN_IFACE, sender_keyword="sender")
[docs]
def ReleaseAccelerometer(self, sender):
unregister_owner(self.accelerometer_owners, sender)
@dbus.service.method(MAIN_IFACE, sender_keyword="sender")
[docs]
def ClaimLight(self, sender):
register_owner(self, self.ambient_light_owners, sender)
@dbus.service.method(MAIN_IFACE, sender_keyword="sender")
[docs]
def ReleaseLight(self, sender):
unregister_owner(self.ambient_light_owners, sender)
@dbus.service.method(MAIN_IFACE, sender_keyword="sender")
[docs]
def ClaimProximity(self, sender):
register_owner(self, self.proximity_owners, sender)
@dbus.service.method(MAIN_IFACE, sender_keyword="sender")
[docs]
def ReleaseProximity(self, sender):
unregister_owner(self.proximity_owners, sender)
@dbus.service.method(MAIN_IFACE, sender_keyword="sender")
[docs]
def ClaimCompass(self, sender):
register_owner(self, self.compass_owners, sender)
@dbus.service.method(MAIN_IFACE, sender_keyword="sender")
[docs]
def ReleaseCompass(self, sender):
unregister_owner(self.compass_owners, sender)
[docs]
def sensor_to_attribute(sensor):
if sensor == "light":
return "ambient_light"
return sensor
[docs]
def is_valid_sensor_for_interface(sensor, interface):
if interface == "net.hadess.SensorProxy":
return sensor in ["accelerometer", "ambient_light", "proximity"]
if interface == "net.hadess.SensorProxy.Compass":
return sensor == "compass"
return False
@dbus.service.method(MOCK_IFACE, in_signature="ssv")
[docs]
def SetInternalProperty(self, interface, property_name, value):
property_attribute = CAMEL_TO_SNAKE_CASE_RE.sub("_", property_name).lower()
sensor = sensor_to_attribute(property_attribute.split("_")[0])
owners = None
if is_valid_sensor_for_interface(sensor, interface):
if not getattr(self, f"has_{sensor}"):
raise dbus.exceptions.DBusException(
f"No {sensor} sensor available", name="org.freedesktop.DBus.Mock.Error"
)
owners = getattr(self, f"{sensor}_owners")
# We allow setting a property from any client here, even if not claiming
# but only owners, if any, will be notified about sensors changes
pre_value = getattr(self, property_attribute)
if pre_value != value:
setattr(self, property_attribute, value)
if owners:
for owner in owners:
emit_properties_changed(self, interface, property_name, owner)
elif owners is None:
emit_properties_changed(self, interface, property_name, None)
@dbus.service.method(MOCK_IFACE, in_signature="s")
[docs]
def GetInternalProperty(self, property_name):
property_attribute = CAMEL_TO_SNAKE_CASE_RE.sub("_", property_name).lower()
value = getattr(self, property_attribute)
if property_name.endswith("Owners"):
return dbus.Array(value.keys(), signature="s")
return value