Compare commits

...

47 Commits

Author SHA1 Message Date
sqozz 8c372513d8 Merge branch 'master' of ssh://git.geekify.de:1421/sqozz/sem6000 2020-05-09 16:44:28 +02:00
sqozz de79c2bca5 Adjust example.py 2020-05-09 16:44:06 +02:00
sqozz 06f2cfe33e Add serial and fw version property 2020-05-09 16:44:06 +02:00
sqozz df9a355ee8 Cleanup characteristics 2020-05-09 16:44:06 +02:00
sqozz 95983d464f Merge branch 'feature/ring_led' of sqozz/sem6000 into master 2020-05-08 20:34:15 +02:00
sqozz 3755f26081 Add function to set LED visibility 2020-05-08 20:32:55 +02:00
sqozz f488869f3d Add function to set icon 2020-05-08 20:32:14 +02:00
sqozz 6cc5804017 Merge branch 'features/config' of sqozz/sem6000 into master 2020-05-08 19:50:16 +02:00
sqozz 9b49abd66e Make current config availible as attributes 2020-05-08 19:43:46 +02:00
sqozz 80c0c5dd4b Add getSN function 2020-05-07 21:17:26 +02:00
sqozz aea3cc32f5 Add function to read current plug config 2020-05-07 21:16:50 +02:00
sqozz be6528f074 Merge branch 'feature/name_icon' of sqozz/sem6000 into master 2020-05-07 21:13:03 +02:00
sqozz b80c640e61 Raise exception if name is set without login 2020-05-07 12:17:52 +02:00
sqozz 271e1bc5bf Add function to set the plug icon 2020-05-07 11:56:40 +02:00
sqozz b931fb0a41 Add function to change plug name 2020-05-07 11:54:25 +02:00
sqozz 7d16b7b33b Merge branch 'feature/return_codes' of sqozz/sem6000 into master 2020-05-06 11:04:58 +02:00
sqozz 7a9a2949ff Add proper responses to all functions 2020-05-05 22:58:04 +02:00
sqozz 353c6f64bb Merge branch 'collectd' of cfr34k/sem6000 into master 2020-05-02 19:26:19 +02:00
Thomas Kolb adf675e1f1 collectd: clarify bluepy import 2020-05-02 19:26:19 +02:00
Thomas Kolb 341b6e84b6 Improved handling of unavailable devices 2020-05-02 19:26:19 +02:00
Thomas Kolb 91d682cd97 First version of collectd plugin 2020-05-02 19:26:19 +02:00
sqozz 32fc0785d3 Merge branch 'features/time_sync' of sqozz/sem6000 into master 2020-05-02 19:23:51 +02:00
sqozz a5312baef3 Add function to sync the time with the plug 2020-05-02 19:23:51 +02:00
sqozz f656c5d516 Merge branch 'fix_changePassword' of cfr34k/sem6000 into master 2020-05-02 19:22:37 +02:00
Thomas Kolb a0874625c7 fix: use function parameter in changePassword() 2020-01-28 15:54:36 +01:00
sqozz 80e6bbaaed Merge branch 'fix/zerodivision' of sqozz/sem6000 into master 2020-01-20 21:31:30 +01:00
sqozz 6f810c949b Fix ZeroDivisionError if plug is powered off 2020-01-20 21:31:30 +01:00
sqozz 34b1e667af Merge branch 'features/total_power' of sqozz/sem6000 into master 2020-01-20 21:27:19 +01:00
sqozz 101d4f4149 Add parsing of the total consumed power 2020-01-20 21:26:22 +01:00
sqozz 1b1b5d8562 Merge branch 'gitignore' of cfr34k/sem6000 into master 2020-01-20 21:10:05 +01:00
Thomas Kolb 63b066511a Ignore some files in Git
Specifically compiled Python code and Vim swap files.
2020-01-20 21:10:05 +01:00
sqozz d91813497f Merge branch 'power_factor' of cfr34k/sem6000 into master 2020-01-20 21:09:35 +01:00
Thomas Kolb 7b82461f03 example: restart on BrokenPipeError 2020-01-19 17:48:02 +01:00
Thomas Kolb 2f13d6f6a5 example: print power factor 2020-01-19 17:46:55 +01:00
Thomas Kolb 01cc1ed9b7 example: removed invalid constructor argument 2020-01-19 17:34:57 +01:00
Thomas Kolb 8f06b28f31 Added power factor calculation 2020-01-19 17:34:18 +01:00
sqozz d824a93c5b Merge branch 'remove_reconnect' of sqozz/sem6000 into master 2020-01-17 11:43:45 +01:00
sqozz e38d6abb12 Remove automatic reconnect 2020-01-17 11:43:45 +01:00
sqozz d0a57f483e Merge branch 'shebang' of cfr34k/sem6000 into master 2020-01-16 19:08:41 +01:00
Thomas Kolb 50cb935f80 Add shebang to example script 2020-01-16 18:50:34 +01:00
sqozz 60199338bf Merge branch 'improved_reconnect' of cfr34k/sem6000 into master 2020-01-16 18:43:20 +01:00
Thomas Kolb 2378f52a7e Reliable reconnect in user script 2020-01-16 14:50:48 +01:00
sqozz a91eb91ac6 Instantiate Peripheral on class initialization 2019-11-13 18:48:46 +01:00
sqozz 8b7e88db74 Merge branch 'btledev_private' of sqozz/sem6000 into master
This cleans up some rough edges of the initial version.

1. Reconnect support introduces better handling of missing or misbehaving wall sockets.
2. Variable visibility for the internally used ble characteristics just give the reader a slight clue about their (un)importance for them.
3. Variable access was cleaned up to use the provided setter for cmd and payload.
4. Some spaghetti code got slightly more readable.
5. Finally, graceful handling of disconnects is now possible. This allows short lived scripts (e.g. home-assistant components) to access multiple sockets with the same BLE adapter.
2019-11-13 18:13:36 +01:00
sqozz bc34c9dd8f Add proper connect and disconnect functions
This makes it possible to only open a short lived connection to the
physical SEM6000 socket. A few old artefacts referencing __reconnect do
still exist in the code.
2019-11-13 17:57:24 +01:00
sqozz 6c83bccbde Use setter for init BTLE payload data 2019-06-14 19:51:47 +02:00
sqozz 4f8b698a14 Change visibility of some properties 2019-02-27 22:53:08 +01:00
5 changed files with 479 additions and 142 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
__pycache__/
*.pyc
.*.sw?

View File

@ -1,3 +1,11 @@
# SEM6000 Python Library
SEM6000 is a energy meter and power switch with Bluetooth 4.0.
This library provides a Python module for these devices
## Run the example code
```
$ git clone … sem6000
$ cd sem6000
@ -5,4 +13,52 @@ $ virtualenv -p python3 python3_venv
$ . ./python3_venv/bin/activate
$ pip3 install -r requirements.txt
$ python3 example.py
```
```
## Collectd Plugin
You can find a Plugin for [collectd](https://collectd.org) in the `collectd`
subdirectory.
Installation procedure (the target directory may be changed of course):
```shell
# mkdir -p /usr/local/lib/collectd/python
# cp collectd/collectd_sem6000.py /usr/local/lib/collectd/python
# cp sem6000.py /usr/local/lib/collectd/python
```
Add or adjust the configuration for your collectds Python plugin as follows:
```
<Plugin python>
ModulePath "/usr/local/share/collectd/python"
LogTraces true
Interactive false
Import "collectd_sem6000"
<Module collectd_sem6000>
Address "12:34:56:78:90:ab"
SocketName "FirstSocket"
ReadTimeout 30
SuspendTime 300
</Module>
<Module collectd_sem6000>
Address "ab:cd:ef:13:37:42"
SocketName "ASecondSocket"
</Module>
# ...
</Plugin>
```
`ReadTimeout` and `SuspendTime` control whats happening when a device is
unavailable. If no value could be retrieved for `ReadTimeout` seconds, the
plugin does not retry for `SuspendTime` seconds. After that, normal operation
is resumed. This procedure ensures that an unreachable device does not block
other devices (too often) in the current single-threaded architecture.
If not specified, `ReadTimeout` is 30 seconds and `SuspendTime` is 5 minutes.
Make sure that everything listed in `requirements.txt` is available to the user
running collectd.

144
collectd/collectd_sem6000.py Executable file
View File

@ -0,0 +1,144 @@
#!/usr/bin/env python3
# coding: utf-8
# vim: noet ts=2 sw=2 sts=2
import os
import time
import collectd
from sem6000 import SEMSocket
from bluepy.btle import BTLEDisconnectError
instances = []
def init_func():
pass
def config_func(cfg):
global instances
config = {}
for node in cfg.children:
key = node.key.lower()
value = node.values[0]
if key in ['address', 'socketname']:
config[key] = value
if key == 'readtimeout':
config['readtimeout'] = int(value)
if key == 'suspendtime':
config['suspendtime'] = int(value)
if 'address' not in config.keys():
collectd.error('sem6000: address must be set')
return
if 'socketname' not in config.keys():
config['socketname'] = config['address'].replace(':', '')
if 'readtimeout' not in config.keys():
config['readtimeout'] = 30
if 'suspendtime' not in config.keys():
config['suspendtime'] = 300
instances.append( {
'config': config,
'socket': None,
'suspended': False,
'lastsuccess': 0,
'resumetime': 0
} )
def read_func():
global instances
for inst in instances:
config = inst['config']
if inst['suspended']:
if time.time() < inst['resumetime']:
continue
else:
collectd.info("sem6000: Device {} waking up.".format(config['address']))
inst['suspended'] = False
inst['lastsuccess'] = time.time()
try:
if inst['socket'] == None:
collectd.info("sem6000: Connecting to {}...".format(config['address']))
inst['socket'] = SEMSocket(config['address'])
collectd.info("sem6000: Connected.")
inst['socket'].getStatus()
except (SEMSocket.NotConnectedException, BTLEDisconnectError, BrokenPipeError) as e:
collectd.warning("sem6000: Exception caught: {}".format(e))
collectd.warning("sem6000: Restarting on next cycle...")
if inst['lastsuccess'] < time.time() - config['readtimeout']:
collectd.error("sem6000: no successful communication with {} for {:.1f} seconds. Suspending device for {:.1f} seconds.".format(
config['address'], config['readtimeout'], config['suspendtime']))
inst['suspended'] = True
inst['resumetime'] = time.time() + config['suspendtime']
if inst['socket'] != None:
inst['socket'].disconnect()
inst['socket'] = None
socket = inst['socket']
if socket != None and socket.voltage != 0:
collectd.debug("Uploading values for {}".format(socket.mac_address))
inst['lastsuccess'] = time.time()
val = collectd.Values(plugin = 'sem6000-{}'.format(config['socketname']))
val.type = 'voltage'
val.type_instance = 'grid'
val.values = [ socket.voltage ]
val.dispatch()
val.type = 'current'
val.type_instance = 'load'
val.values = [ socket.current ]
val.dispatch()
val.type = 'power'
val.type_instance = 'real_power'
val.values = [ socket.power ]
val.dispatch()
val.type = 'gauge'
val.type_instance = 'power_factor'
val.values = [ socket.power_factor ]
val.dispatch()
val.type = 'gauge'
val.type_instance = 'load_on'
val.values = [ socket.powered ]
val.dispatch()
val.type = 'frequency'
val.type_instance = 'grid'
val.values = [ socket.frequency ]
val.dispatch()
def shutdown_func():
global instances
for inst in instances:
if inst['socket'] != None:
inst['socket'].disconnect()
instances = []
collectd.register_config(config_func)
collectd.register_init(init_func)
collectd.register_read(read_func)
collectd.register_shutdown(shutdown_func)

View File

@ -1,23 +1,40 @@
#!/usr/bin/env python3
import time
from sem6000 import SEMSocket
# auto_reconnect_timeout enabled auto reconnect if sending a command fails. Valid values:
# None (default): everything that fails throws NotConnectedException's
# -1: infinite retries
# integer: seconds before exception is thrown
import bluepy
socket = SEMSocket('f0:c7:7f:0d:e7:17', auto_reconnect_timeout=None)
#socket.login("1337")
#socket.changePassword("1234")
#socket.login("1234")
socket = None
while True:
time.sleep(1)
try:
if socket == None:
print("Connecting... ", end="")
socket = SEMSocket('f0:c7:7f:0d:e7:17')
print("Success!")
print("You're now connected to: {} (Icon: {})".format(socket.name, socket.icons[0]))
if socket.login("1234") and socket.authenticated:
print("Login successful!")
socket.getSynConfig()
print()
print("=== Tariff settings ===")
print("Default charge:", socket.default_charge)
print("Night charge:", socket.night_charge)
print("Night tariff from {} to {}".format(socket.night_charge_start_time.tm_hour, socket.night_charge_end_time.tm_hour))
print()
print("=== Other settings ===")
print("Night mode:", "active" if socket.night_mode else "inactive")
print("Power protection:", socket.power_protect)
print()
socket.getStatus()
socket.setStatus(True)
print("=== {} ({}) ===".format(socket.mac_address, "on" if socket.powered else "off"))
print("\t{}V {}A → {}W@{}Hz".format(socket.voltage, socket.current, socket.power, socket.frequency))
except SEMSocket.NotConnectedException:
socket.reconnect(-1) #infinite reconnect attempts
print("\t{}V {}A → {}W@{}Hz (PF: {})".format(socket.voltage, socket.current, socket.power, socket.frequency, socket.power_factor))
except (SEMSocket.NotConnectedException, bluepy.btle.BTLEDisconnectError, BrokenPipeError):
print("Restarting...")
if socket != None:
socket.disconnect()
socket = None

View File

@ -1,35 +1,205 @@
from bluepy import btle
import struct
import time
import datetime
import uuid
class SEMSocket():
icons = ["plug", "speaker", "flatscreen", "desk lamp", "oven", "kitchen machine", "canning pot", "stanging lamp", "kettle", "mixer", "hanging lamp", "toaster", "washing machine", "fan", "fridge", "iron", "printer", "monitor", "notebook", "workstation", "video recorder", "curling iron", "heater"]
password = "0000"
auto_reconnect_timeout = None
powered = False
voltage = 0
current = 0
power = 0
power_factor = 0
total_power = 0
frequency = 0
mac_address = ""
custom_service = None
read_char = None
write_char = None
notify_char = None
connected = False
authenticated = False
_icon_idx = None
_name = None
_version_char = None
_write_char = None
_name_char = None
_btle_device = None
def __init__(self, mac, auto_reconnect_timeout = None):
def __init__(self, mac):
self.mac_address = mac
self.auto_reconnect_timeout = auto_reconnect_timeout
self._btle_device = btle.Peripheral(None ,addrType=btle.ADDR_TYPE_PUBLIC,iface=0)
try:
self.reconnect()
except self.NotConnectedException:
# initial connection may fail. It is up to the code what to do
pass
def getStatus(self):
#15, 5, 4, 0, 0, 0, 5, -1, -1
cmd = bytearray([0x04])
payload = bytearray([0x00, 0x00, 0x00])
msg = self.BTLEMessage(self, cmd, payload)
return msg.send()
def getSynConfig(self):
#15, 5, 16, 0, 0, 0, 17, -1, -1
cmd = bytearray([0x10])
payload = bytearray([0x00, 0x00, 0x00])
msg = self.BTLEMessage(self, cmd, payload)
msg.send()
def setStatus(self, status):
# 0f 06 03 00 01 00 00 05 ff ff -> on
# 0f 06 03 00 00 00 00 04 ff ff -> off
cmd = bytearray([0x03])
payload = bytearray([0x00, status, 0x00, 0x00])
msg = self.BTLEMessage(self, cmd, payload)
return msg.send()
def syncTime(self):
#15, 12, 1, 0, SECOND, MINUTE, HOUR_OF_DAY, DAY_OF_MONTH, MONTH (+1), int(YEAR/256), YEAR%256, 0, 0, CHKSUM, 255, 255
now = datetime.datetime.now()
cmd = bytearray([0x01])
payload = bytearray([0x00])
payload += bytearray([now.second, now.minute, now.hour])
payload += bytearray([now.day, now.month, int(now.year/256), now.year%256])
payload += bytearray([0x00, 0x00])
msg = self.BTLEMessage(self, cmd, payload)
return msg.send()
def login(self, password):
self.password = password
cmd = bytearray([0x17])
payload = bytearray()
payload.append(0x00)
payload.append(0x00)
for i in range(4):
payload.append(int(self.password[i]))
payload.append(0x00)
payload.append(0x00)
payload.append(0x00)
payload.append(0x00)
msg = self.BTLEMessage(self, cmd, payload)
success = msg.send()
self.authenticated = self.authenticated and success
return success
def changePassword(self, newPassword):
cmd = bytearray([0x17])
payload = bytearray()
payload.append(0x00)
payload.append(0x01)
for i in range(4):
payload.append(int(newPassword[i]))
for i in range(4):
payload.append(int(self.password[i]))
self.password = newPassword
msg = self.BTLEMessage(self, cmd, payload)
success = msg.send()
self.authenticated = self.authenticated and success
return success
def setIcon(self, iconIdx):
cmd = bytearray([0x0f])
payload = bytearray([0x00, 0x03, iconIdx, 0x00, 0x00, 0x00, 0x00])
msg = self.BTLEMessage(self, cmd, payload)
success = msg.send()
return success
def enableLED(self, status):
cmd = bytearray([0x0f])
payload = bytearray([0x00, 0x05, status, 0x00, 0x00, 0x00, 0x00])
msg = self.BTLEMessage(self, cmd, payload)
success = msg.send()
return success
@property
def name(self):
self._name = self._name_char.read().decode("UTF-8")
return self._name
@name.setter
def name(self, newName):
newNameBytes = newName.encode("UTF-8")
cmd = bytearray([0x02])
payload = bytearray()
payload.append(0x02)
for i in range(20):
if i <= (len(newNameBytes) - 1):
payload.append(newNameBytes[i])
else:
payload.append(0x00)
msg = self.BTLEMessage(self, cmd, payload)
success = msg.send()
# For some reason the original app sets the first 7 bytes of the payload to zero and sends it again.
# However, a first test showed, that this really doesn't change anything. If it becomes neccessary here's a first draft:
#for i in range(7):
# payload[i+1] = 0x00
#msg = self.BTLEMessage(self, cmd, payload)
if not success:
raise self.SendMessageFailed
if self.name != newName:
raise self.NotLoggedIn
@property
def serial(self):
# 15, 5, 17, 0, 0, 0, 18, -1, -1
cmd = bytearray([0x11])
payload = bytearray([0x00, 0x00, 0x00])
msg = self.BTLEMessage(self, cmd, payload)
success = msg.send()
if not success:
raise self.SendMessageFailed
return self._serial
@property
def firmware_version(self):
char_value = self._version_char.read()
major = char_value[11]
minor = char_value[12]
return "{}.{}".format(major, minor)
@property
def connected(self):
try:
return "conn" in self._btle_device.status().get("state")
except:
return False
def reconnect(self):
self.disconnect()
self.connect()
if not self.connected:
raise self.NotConnectedException
def connect(self):
self.disconnect()
self._btle_device.connect(self.mac_address)
self._btle_handler = self.BTLEHandler(self)
self._btle_device.setDelegate(self._btle_handler)
self._custom_service = self._btle_device.getServiceByUUID(0xfff0)
self._version_char = self._custom_service.getCharacteristics("0000fff1-0000-1000-8000-00805f9b34fb")[0] #contains firmware version info
self._write_char = self._custom_service.getCharacteristics("0000fff3-0000-1000-8000-00805f9b34fb")[0] #is used to write commands
self._name_char = self._custom_service.getCharacteristics("0000fff6-0000-1000-8000-00805f9b34fb")[0]
def disconnect(self):
if self.connected == True:
self._btle_device.disconnect()
#def ______RESET(self):
# #15, 5, 16, 0, 0, 0, 17, -1, -1 ??? maybe reset?
# pass
class NotConnectedException(Exception):
pass
class SendMessageFailed(Exception):
pass
class NotLoggedIn(Exception):
pass
class BTLEMessage():
MAGIC_START = bytearray([0x0f])
MAGIC_END = bytearray([0xff, 0xff])
@ -48,7 +218,7 @@ class SEMSocket():
@cmd.setter
def cmd(self, cmd):
self.__data = self.MAGIC_START + bytearray(1) + cmd + self.payload + bytearray(1) + self.MAGIC_END
self.__data = self.MAGIC_START + bytearray(1) + cmd + self.__payload + bytearray(1) + self.MAGIC_END
self.__cmd = cmd
self.__calc_length()
self.__calc_checksum()
@ -59,7 +229,7 @@ class SEMSocket():
@payload.setter
def payload(self, payload):
self.__data = self.MAGIC_START + bytearray(1) + self.cmd + payload + bytearray(1) + self.MAGIC_END
self.__data = self.MAGIC_START + bytearray(1) + self.__cmd + payload + bytearray(1) + self.MAGIC_END
self.__payload = payload
self.__calc_length()
self.__calc_checksum()
@ -75,141 +245,87 @@ class SEMSocket():
def send(self):
if not self.__btle_device.connected:
self.__btle_device.reconnect(self.__btle_device.auto_reconnect_timeout)
self.__btle_device.reconnect()
self.__btle_device.write_char.write(self.__data, True)
self.__btle_device.btle_device.waitForNotifications(5)
self.__btle_device._write_char.write(self.__data, True)
return self.__btle_device._btle_device.waitForNotifications(5)
class BTLEHandler(btle.DefaultDelegate):
def __init__(self, btle_device):
btle.DefaultDelegate.__init__(self)
self.btle_device = btle_device
self.__btle_device = btle_device
def handleNotification(self, cHandle, data):
if len(data) <= 3:
print("Notification data seems invalid or incomplete. Could not parse: ", end="")
print(data)
return
message_type = data[2]
if message_type == 0x00:
if data[4] == 0x01:
print("Checksum error!")
else:
print("Unknown error:", data)
elif message_type == 0x03: #switch toggle
print("Switch toggled")
self.btle_device.getStatus()
elif message_type == 0x04: #status related data
self.btle_device.voltage = data[8]
self.btle_device.current = (data[9] << 8 | data[10]) / 1000
self.btle_device.power = (data[5] << 16 | data[6] << 8 | data[7]) / 1000
self.btle_device.frequency = data[11]
self.btle_device.powered = bool(data[4])
elif message_type == 0x17:
elif message_type == 0x01: #sync time response
if not data[3:] == b'\x00\x00\x02\xff\xff':
print("Time synced failed with unknown data: ", end="")
print(data)
elif message_type == 0x02: #set name response
if not data[3:] == b'\x00\x00\x03\xff\xff':
print("Set name failed with unknown data: ", end="")
print(data)
elif message_type == 0x03: #switch toggle response
self.__btle_device.getStatus()
elif message_type == 0x04: #status related response
voltage = data[8]
current = (data[9] << 8 | data[10]) / 1000
power = (data[5] << 16 | data[6] << 8 | data[7]) / 1000
total_power = (data[14] << 24 | data[15] << 16 | data[16] << 8 | data[17]) / 1000
self.__btle_device.voltage = voltage
self.__btle_device.current = current
self.__btle_device.power = power
self.__btle_device.frequency = data[11]
self.__btle_device.powered = bool(data[4])
self.__btle_device.total_power = total_power
# calculated values
try:
self.__btle_device.power_factor = power / (voltage * current)
except ZeroDivisionError:
self.__btle_device.power_factor = None
elif message_type == 0x10: #plug settings response
self.__btle_device.default_charge = (data[5] / 100)
self.__btle_device.night_charge = (data[6] / 100)
night_charge_start_time = int((data[7] << 8 | data[8]) / 60)
night_charge_end_time = int((data[9] << 8 | data[10]) / 60)
self.__btle_device.night_charge_start_time = time.strptime(str(night_charge_start_time), "%H")
self.__btle_device.night_charge_end_time = time.strptime(str(night_charge_end_time), "%H")
self.__btle_device.night_mode = not bool(data[11])
self.__btle_device.icon_idx = data[12]
self.__btle_device.power_protect = (data[13] << 8 | data[14])
elif message_type == 0x11: #serial number response
self.__btle_device._serial = data[-16:].decode("UTF-8")
elif message_type == 0x17: #authentication related response
if data[5] == 0x00 or data[5] == 0x01:
if not data[4]:
print("Login successful")
else:
print("Login failed")
# in theory the fifth byte indicates a login attempt response (0) or a response to a password change (1)
# but since a password change requires a valid login and a successful password changes logs you in,
# we can just ignore this bit and set the authenticated flag accordingly for both responses
self.__btle_device.authenticated = not data[4]
else:
print("5th byte of login-response is > 1:", data)
elif message_type == 0x0f: #set icon response
if data[3:6] == b'\x00\x03\x00':
# LED set successfully
pass
elif data[3:6] == b'\x00\x05\x00':
# Icon set successfully
pass
else:
print("Unknown response for setting icon: ", end="")
print(data[3:])
else:
print ("Unknown message from Handle: 0x" + format(cHandle,'02X') + " Value: "+ format(data))
def getStatus(self):
#15, 5, 4, 0, 0, 0, 5, -1, -1
cmd = bytearray([0x04])
payload = bytearray([0x00, 0x00, 0x00])
msg = self.BTLEMessage(self, cmd, payload)
msg.send()
def setStatus(self, status):
# 0f 06 03 00 01 00 00 05 ff ff -> on
# 0f 06 03 00 00 00 00 04 ff ff -> off
cmd = bytearray([0x03])
payload = bytearray([0x00, status, 0x00, 0x00])
msg = self.BTLEMessage(self, cmd, payload)
msg.send()
def login(self, password):
self.password = password
cmd = bytearray([0x17])
payload = bytearray()
payload.append(0x00)
payload.append(0x00)
payload.append(int(self.password[0]))
payload.append(int(self.password[1]))
payload.append(int(self.password[2]))
payload.append(int(self.password[3]))
payload.append(0x00)
payload.append(0x00)
payload.append(0x00)
payload.append(0x00)
msg = self.BTLEMessage(self, cmd, payload)
msg.send()
def changePassword(self, newPassword):
cmd = bytearray([0x17])
payload = bytearray()
payload.append(0x00)
payload.append(0x01)
payload.append(int(newPassword[0]))
payload.append(int(newPassword[1]))
payload.append(int(newPassword[2]))
payload.append(int(newPassword[3]))
payload.append(int(self.password[0]))
payload.append(int(self.password[1]))
payload.append(int(self.password[2]))
payload.append(int(self.password[3]))
self.password = newPassword
msg = self.BTLEMessage(self, cmd, payload)
msg.send()
def __reconnect(self):
try:
self.btle_device = btle.Peripheral(self.mac_address,addrType=btle.ADDR_TYPE_PUBLIC,iface=0)
self.btle_handler = self.BTLEHandler(self)
self.custom_service = self.btle_device.getServiceByUUID(0xfff0)
self.read_char = self.custom_service.getCharacteristics("0000fff1-0000-1000-8000-00805f9b34fb")[0]
self.write_char = self.custom_service.getCharacteristics("0000fff3-0000-1000-8000-00805f9b34fb")[0]
self.notify_char = self.custom_service.getCharacteristics("0000fff4-0000-1000-8000-00805f9b34fb")[0]
self.btle_device.setDelegate(self.btle_handler)
except btle.BTLEException as e:
self.connected = False
else:
self.connected = True
def reconnect(self, timeout = None):
if timeout == None:
self.__reconnect()
else:
reconnect_start = time.time()
while abs(reconnect_start - time.time()) < timeout or timeout == -1:
self.__reconnect()
if not self.connected:
raise self.NotConnectedException
#def SynVer(self):
# print("SynVer")
# self.read_char.read_value()
#def GetSynConfig(self):
# print("GetSynConfig")
# #15, 5, 16, 0, 0, 0, 17, -1, -1
# self.write_char.write_value(bytearray(b'\x0f\x05\x10\x00\x00\x00\x11\xff\xff'))
#def ______RESET(self):
# #15, 5, 16, 0, 0, 0, 17, -1, -1 ??? maybe reset?
# pass
#def GetSN(self):
# print("GetSN")
# #15, 5, 17, 0, 0, 0, 18, -1, -1
# self.write_char.write_value(bytearray(b'\x0f\x05\x11\x00\x00\x00\x12\xff\xff'))
# self.SynVer()
# self.notify_char.enable_notifications()
# self.Login("1337")
# self.GetSynConfig()
# #self.GetSN()