Merge branch 'btledev_private' of sqozz/sem6000 into master

This cleans up some rough edges of the initial version.

1. Reconnect support introduces better handling of missing or misbehaving wall sockets.
2. Variable visibility for the internally used ble characteristics just give the reader a slight clue about their (un)importance for them.
3. Variable access was cleaned up to use the provided setter for cmd and payload.
4. Some spaghetti code got slightly more readable.
5. Finally, graceful handling of disconnects is now possible. This allows short lived scripts (e.g. home-assistant components) to access multiple sockets with the same BLE adapter.
This commit is contained in:
sqozz 2019-11-13 18:13:36 +01:00 committed by Gitea
commit 8b7e88db74
2 changed files with 161 additions and 122 deletions

View file

@ -1,16 +1,23 @@
from sem6000 import SEMSocket
import time
socket = SEMSocket('f0:c7:7f:0d:e7:17')
from sem6000 import SEMSocket
socket.login("1337")
socket.changePassword("1234")
socket.login("1234")
#socket.changePassword("1337")
# auto_reconnect_timeout enabled auto reconnect if sending a command fails. Valid values:
# None (default): everything that fails throws NotConnectedException's
# -1: infinite retries
# integer: seconds before exception is thrown
socket = SEMSocket('f0:c7:7f:0d:e7:17', auto_reconnect_timeout=None)
#socket.login("1337")
#socket.changePassword("1234")
#socket.login("1234")
while True:
break;
time.sleep(1)
try:
socket.getStatus()
socket.setStatus(True)
print("=== {} ({}) ===".format(socket.mac_address, "on" if socket.powered else "off"))
print("\t{}V {}A → {}W@{}Hz".format(socket.voltage, socket.current, socket.power, socket.frequency))
except SEMSocket.NotConnectedException:
socket.reconnect(-1) #infinite reconnect attempts

View file

@ -1,11 +1,10 @@
from bluepy import btle
import struct
import time
import pdb
import uuid
class SEMSocket():
password = "0000"
auto_reconnect_timeout = None
powered = False
voltage = 0
current = 0
@ -13,104 +12,21 @@ class SEMSocket():
frequency = 0
mac_address = ""
custom_service = None
read_char = None
write_char = None
notify_char = None
_read_char = None
_write_char = None
_notify_char = None
_btle_device = None
def __init__(self, mac):
def __init__(self, mac, auto_reconnect_timeout = None):
self.mac_address = mac
self.btle_device = btle.Peripheral(self.mac_address,addrType=btle.ADDR_TYPE_PUBLIC,iface=0)
self.btle_handler = self.BTLEHandler(self)
self.custom_service = self.btle_device.getServiceByUUID(0xfff0)
self.read_char = self.custom_service.getCharacteristics("0000fff1-0000-1000-8000-00805f9b34fb")[0]
print(self.read_char.read())
self.write_char = self.custom_service.getCharacteristics("0000fff3-0000-1000-8000-00805f9b34fb")[0]
self.notify_char = self.custom_service.getCharacteristics("0000fff4-0000-1000-8000-00805f9b34fb")[0]
self.btle_device.setDelegate(self.btle_handler)
class BTLEMessage():
MAGIC_START = bytearray([0x0f])
MAGIC_END = bytearray([0xff, 0xff])
__data = bytearray()
__cmd = bytearray(1) # cmd cannot be empty
__payload = bytearray()
def __init__(self, btle_device, cmd=bytearray(), payload=bytearray()):
self.__btle_device = btle_device
self.cmd = cmd
self.payload = payload
@property
def cmd(self):
return self.__cmd
@cmd.setter
def cmd(self, cmd):
self.__data = self.MAGIC_START + bytearray(1) + cmd + self.payload + bytearray(1) + self.MAGIC_END
self.__cmd = cmd
self.__calc_length()
self.__calc_checksum()
@property
def payload(self):
return self.__payload
@payload.setter
def payload(self, payload):
self.__data = self.MAGIC_START + bytearray(1) + self.cmd + payload + bytearray(1) + self.MAGIC_END
self.__payload = payload
self.__calc_length()
self.__calc_checksum()
def __calc_checksum(self):
checksum = 1
for i in range(2, self.__data[1] + 2):
checksum += self.__data[i]
self.__data[-3] = checksum & 0xff
def __calc_length(self):
self.__data[1] = 1 + len(self.__payload) + 1 # cmd + payload + checksum
def send(self):
self.__btle_device.write_char.write(self.__data, True)
self.__btle_device.btle_device.waitForNotifications(5)
class BTLEHandler(btle.DefaultDelegate):
def __init__(self, btle_device):
btle.DefaultDelegate.__init__(self)
self.btle_device = btle_device
def handleNotification(self, cHandle, data):
message_type = data[2]
if message_type == 0x00:
if data[4] == 0x01:
print("Checksum error!")
else:
print("Unknown error:", data)
elif message_type == 0x03: #switch toggle
print("Switch toggled")
self.btle_device.getStatus()
elif message_type == 0x04: #status related data
self.btle_device.voltage = data[8]
self.btle_device.current = (data[9] << 8 | data[10]) / 1000
self.btle_device.power = (data[5] << 16 | data[6] << 8 | data[7]) / 1000
self.btle_device.frequency = data[11]
self.btle_device.powered = bool(data[4])
elif message_type == 0x17:
if data[5] == 0x00 or data[5] == 0x01:
if not data[4]:
print("Login successful")
else:
print("Login failed")
else:
print("5th byte of login-response is > 1:", data)
else:
print ("Unknown message from Handle: 0x" + format(cHandle,'02X') + " Value: "+ format(data))
self.auto_reconnect_timeout = auto_reconnect_timeout
try:
self.reconnect()
except self.NotConnectedException:
# initial connection may fail. It is up to the code what to do
pass
def getStatus(self):
print("GetStatus")
#15, 5, 4, 0, 0, 0, 5, -1, -1
cmd = bytearray([0x04])
payload = bytearray([0x00, 0x00, 0x00])
@ -118,7 +34,6 @@ class SEMSocket():
msg.send()
def setStatus(self, status):
print("SetStatus:", status)
# 0f 06 03 00 01 00 00 05 ff ff -> on
# 0f 06 03 00 00 00 00 04 ff ff -> off
cmd = bytearray([0x03])
@ -127,16 +42,13 @@ class SEMSocket():
msg.send()
def login(self, password):
print("Login")
self.password = password
cmd = bytearray([0x17])
payload = bytearray()
payload.append(0x00)
payload.append(0x00)
payload.append(int(self.password[0]))
payload.append(int(self.password[1]))
payload.append(int(self.password[2]))
payload.append(int(self.password[3]))
for i in range(4):
payload.append(int(self.password[i]))
payload.append(0x00)
payload.append(0x00)
payload.append(0x00)
@ -145,23 +57,61 @@ class SEMSocket():
msg.send()
def changePassword(self, newPassword):
print("Change Password")
cmd = bytearray([0x17])
payload = bytearray()
payload.append(0x00)
payload.append(0x01)
payload.append(int(newPassword[0]))
payload.append(int(newPassword[1]))
payload.append(int(newPassword[2]))
payload.append(int(newPassword[3]))
payload.append(int(self.password[0]))
payload.append(int(self.password[1]))
payload.append(int(self.password[2]))
payload.append(int(self.password[3]))
for i in range(4):
payload.append(int(self.newPassword[i]))
for i in range(4):
payload.append(int(self.password[i]))
self.password = newPassword
msg = self.BTLEMessage(self, cmd, payload)
msg.send()
@property
def connected(self):
try:
if "conn" in self._btle_device.status().get("state"):
return True
else:
return False
except:
return False
def __reconnect(self):
self.disconnect()
self.connect()
def reconnect(self, timeout = None):
if timeout == None:
self.__reconnect()
else:
reconnect_start = time.time()
while abs(reconnect_start - time.time()) < timeout or timeout == -1:
self.__reconnect()
if not self.connected:
raise self.NotConnectedException
def connect(self):
self.disconnect()
if not self._btle_device:
self._btle_device = btle.Peripheral(self.mac_address,addrType=btle.ADDR_TYPE_PUBLIC,iface=0)
else:
self._btle_device.connect(self.mac_address)
self._btle_handler = self.BTLEHandler(self)
self._custom_service = self._btle_device.getServiceByUUID(0xfff0)
self._read_char = self._custom_service.getCharacteristics("0000fff1-0000-1000-8000-00805f9b34fb")[0]
self._write_char = self._custom_service.getCharacteristics("0000fff3-0000-1000-8000-00805f9b34fb")[0]
self._notify_char = self._custom_service.getCharacteristics("0000fff4-0000-1000-8000-00805f9b34fb")[0]
self._btle_device.setDelegate(self._btle_handler)
def disconnect(self):
if self.connected == True:
self._btle_device.disconnect()
#def SynVer(self):
# print("SynVer")
# self.read_char.read_value()
@ -186,4 +136,86 @@ class SEMSocket():
# self.GetSynConfig()
# #self.GetSN()
class NotConnectedException(Exception):
pass
class BTLEMessage():
MAGIC_START = bytearray([0x0f])
MAGIC_END = bytearray([0xff, 0xff])
__data = bytearray()
__cmd = bytearray(1) # cmd cannot be empty
__payload = bytearray()
def __init__(self, btle_device, cmd=bytearray(), payload=bytearray()):
self.__btle_device = btle_device
self.cmd = cmd
self.payload = payload
@property
def cmd(self):
return self.__cmd
@cmd.setter
def cmd(self, cmd):
self.__data = self.MAGIC_START + bytearray(1) + cmd + self.__payload + bytearray(1) + self.MAGIC_END
self.__cmd = cmd
self.__calc_length()
self.__calc_checksum()
@property
def payload(self):
return self.__payload
@payload.setter
def payload(self, payload):
self.__data = self.MAGIC_START + bytearray(1) + self.__cmd + payload + bytearray(1) + self.MAGIC_END
self.__payload = payload
self.__calc_length()
self.__calc_checksum()
def __calc_checksum(self):
checksum = 1
for i in range(2, self.__data[1] + 2):
checksum += self.__data[i]
self.__data[-3] = checksum & 0xff
def __calc_length(self):
self.__data[1] = 1 + len(self.__payload) + 1 # cmd + payload + checksum
def send(self):
if not self.__btle_device.connected:
self.__btle_device.reconnect(self.__btle_device.auto_reconnect_timeout)
self.__btle_device._write_char.write(self.__data, True)
self.__btle_device._btle_device.waitForNotifications(5)
class BTLEHandler(btle.DefaultDelegate):
def __init__(self, btle_device):
btle.DefaultDelegate.__init__(self)
self.__btle_device = btle_device
def handleNotification(self, cHandle, data):
message_type = data[2]
if message_type == 0x00:
if data[4] == 0x01:
print("Checksum error!")
else:
print("Unknown error:", data)
elif message_type == 0x03: #switch toggle
print("Switch toggled")
self.__btle_device.getStatus()
elif message_type == 0x04: #status related data
self.__btle_device.voltage = data[8]
self.__btle_device.current = (data[9] << 8 | data[10]) / 1000
self.__btle_device.power = (data[5] << 16 | data[6] << 8 | data[7]) / 1000
self.__btle_device.frequency = data[11]
self.__btle_device.powered = bool(data[4])
elif message_type == 0x17:
if data[5] == 0x00 or data[5] == 0x01:
if data[4]:
print("Login failed")
else:
print("5th byte of login-response is > 1:", data)
else:
print ("Unknown message from Handle: 0x" + format(cHandle,'02X') + " Value: "+ format(data))