diff --git a/example.py b/example.py index 1777856..ea3e874 100644 --- a/example.py +++ b/example.py @@ -1,16 +1,23 @@ -from sem6000 import SEMSocket import time -socket = SEMSocket('f0:c7:7f:0d:e7:17') +from sem6000 import SEMSocket -socket.login("1337") -socket.changePassword("1234") -socket.login("1234") -#socket.changePassword("1337") +# auto_reconnect_timeout enabled auto reconnect if sending a command fails. Valid values: +# None (default): everything that fails throws NotConnectedException's +# -1: infinite retries +# integer: seconds before exception is thrown + +socket = SEMSocket('f0:c7:7f:0d:e7:17', auto_reconnect_timeout=None) + +#socket.login("1337") +#socket.changePassword("1234") +#socket.login("1234") while True: - break; time.sleep(1) - socket.getStatus() - socket.setStatus(True) - print("=== {} ({}) ===".format(socket.mac_address, "on" if socket.powered else "off")) - print("\t{}V {}A → {}W@{}Hz".format(socket.voltage, socket.current, socket.power, socket.frequency)) + try: + socket.getStatus() + socket.setStatus(True) + print("=== {} ({}) ===".format(socket.mac_address, "on" if socket.powered else "off")) + print("\t{}V {}A → {}W@{}Hz".format(socket.voltage, socket.current, socket.power, socket.frequency)) + except SEMSocket.NotConnectedException: + socket.reconnect(-1) #infinite reconnect attempts diff --git a/sem6000.py b/sem6000.py index 4e9b803..63c5283 100644 --- a/sem6000.py +++ b/sem6000.py @@ -1,11 +1,10 @@ from bluepy import btle -import struct import time -import pdb import uuid class SEMSocket(): password = "0000" + auto_reconnect_timeout = None powered = False voltage = 0 current = 0 @@ -13,104 +12,21 @@ class SEMSocket(): frequency = 0 mac_address = "" custom_service = None - read_char = None - write_char = None - notify_char = None + _read_char = None + _write_char = None + _notify_char = None + _btle_device = None - def __init__(self, mac): + def __init__(self, mac, auto_reconnect_timeout = None): self.mac_address = mac - self.btle_device = btle.Peripheral(self.mac_address,addrType=btle.ADDR_TYPE_PUBLIC,iface=0) - self.btle_handler = self.BTLEHandler(self) - - self.custom_service = self.btle_device.getServiceByUUID(0xfff0) - self.read_char = self.custom_service.getCharacteristics("0000fff1-0000-1000-8000-00805f9b34fb")[0] - print(self.read_char.read()) - self.write_char = self.custom_service.getCharacteristics("0000fff3-0000-1000-8000-00805f9b34fb")[0] - self.notify_char = self.custom_service.getCharacteristics("0000fff4-0000-1000-8000-00805f9b34fb")[0] - self.btle_device.setDelegate(self.btle_handler) - - class BTLEMessage(): - MAGIC_START = bytearray([0x0f]) - MAGIC_END = bytearray([0xff, 0xff]) - __data = bytearray() - __cmd = bytearray(1) # cmd cannot be empty - __payload = bytearray() - - def __init__(self, btle_device, cmd=bytearray(), payload=bytearray()): - self.__btle_device = btle_device - self.cmd = cmd - self.payload = payload - - @property - def cmd(self): - return self.__cmd - - @cmd.setter - def cmd(self, cmd): - self.__data = self.MAGIC_START + bytearray(1) + cmd + self.payload + bytearray(1) + self.MAGIC_END - self.__cmd = cmd - self.__calc_length() - self.__calc_checksum() - - @property - def payload(self): - return self.__payload - - @payload.setter - def payload(self, payload): - self.__data = self.MAGIC_START + bytearray(1) + self.cmd + payload + bytearray(1) + self.MAGIC_END - self.__payload = payload - self.__calc_length() - self.__calc_checksum() - - def __calc_checksum(self): - checksum = 1 - for i in range(2, self.__data[1] + 2): - checksum += self.__data[i] - self.__data[-3] = checksum & 0xff - - def __calc_length(self): - self.__data[1] = 1 + len(self.__payload) + 1 # cmd + payload + checksum - - def send(self): - self.__btle_device.write_char.write(self.__data, True) - self.__btle_device.btle_device.waitForNotifications(5) - - - class BTLEHandler(btle.DefaultDelegate): - def __init__(self, btle_device): - btle.DefaultDelegate.__init__(self) - self.btle_device = btle_device - - def handleNotification(self, cHandle, data): - message_type = data[2] - if message_type == 0x00: - if data[4] == 0x01: - print("Checksum error!") - else: - print("Unknown error:", data) - elif message_type == 0x03: #switch toggle - print("Switch toggled") - self.btle_device.getStatus() - elif message_type == 0x04: #status related data - self.btle_device.voltage = data[8] - self.btle_device.current = (data[9] << 8 | data[10]) / 1000 - self.btle_device.power = (data[5] << 16 | data[6] << 8 | data[7]) / 1000 - self.btle_device.frequency = data[11] - self.btle_device.powered = bool(data[4]) - elif message_type == 0x17: - if data[5] == 0x00 or data[5] == 0x01: - if not data[4]: - print("Login successful") - else: - print("Login failed") - else: - print("5th byte of login-response is > 1:", data) - else: - print ("Unknown message from Handle: 0x" + format(cHandle,'02X') + " Value: "+ format(data)) + self.auto_reconnect_timeout = auto_reconnect_timeout + try: + self.reconnect() + except self.NotConnectedException: + # initial connection may fail. It is up to the code what to do + pass def getStatus(self): - print("GetStatus") #15, 5, 4, 0, 0, 0, 5, -1, -1 cmd = bytearray([0x04]) payload = bytearray([0x00, 0x00, 0x00]) @@ -118,7 +34,6 @@ class SEMSocket(): msg.send() def setStatus(self, status): - print("SetStatus:", status) # 0f 06 03 00 01 00 00 05 ff ff -> on # 0f 06 03 00 00 00 00 04 ff ff -> off cmd = bytearray([0x03]) @@ -127,16 +42,13 @@ class SEMSocket(): msg.send() def login(self, password): - print("Login") self.password = password cmd = bytearray([0x17]) payload = bytearray() payload.append(0x00) payload.append(0x00) - payload.append(int(self.password[0])) - payload.append(int(self.password[1])) - payload.append(int(self.password[2])) - payload.append(int(self.password[3])) + for i in range(4): + payload.append(int(self.password[i])) payload.append(0x00) payload.append(0x00) payload.append(0x00) @@ -145,23 +57,61 @@ class SEMSocket(): msg.send() def changePassword(self, newPassword): - print("Change Password") cmd = bytearray([0x17]) payload = bytearray() payload.append(0x00) payload.append(0x01) - payload.append(int(newPassword[0])) - payload.append(int(newPassword[1])) - payload.append(int(newPassword[2])) - payload.append(int(newPassword[3])) - payload.append(int(self.password[0])) - payload.append(int(self.password[1])) - payload.append(int(self.password[2])) - payload.append(int(self.password[3])) + for i in range(4): + payload.append(int(self.newPassword[i])) + for i in range(4): + payload.append(int(self.password[i])) self.password = newPassword msg = self.BTLEMessage(self, cmd, payload) msg.send() + @property + def connected(self): + try: + if "conn" in self._btle_device.status().get("state"): + return True + else: + return False + except: + return False + + def __reconnect(self): + self.disconnect() + self.connect() + + def reconnect(self, timeout = None): + if timeout == None: + self.__reconnect() + else: + reconnect_start = time.time() + while abs(reconnect_start - time.time()) < timeout or timeout == -1: + self.__reconnect() + + if not self.connected: + raise self.NotConnectedException + + def connect(self): + self.disconnect() + if not self._btle_device: + self._btle_device = btle.Peripheral(self.mac_address,addrType=btle.ADDR_TYPE_PUBLIC,iface=0) + else: + self._btle_device.connect(self.mac_address) + self._btle_handler = self.BTLEHandler(self) + + self._custom_service = self._btle_device.getServiceByUUID(0xfff0) + self._read_char = self._custom_service.getCharacteristics("0000fff1-0000-1000-8000-00805f9b34fb")[0] + self._write_char = self._custom_service.getCharacteristics("0000fff3-0000-1000-8000-00805f9b34fb")[0] + self._notify_char = self._custom_service.getCharacteristics("0000fff4-0000-1000-8000-00805f9b34fb")[0] + self._btle_device.setDelegate(self._btle_handler) + + def disconnect(self): + if self.connected == True: + self._btle_device.disconnect() + #def SynVer(self): # print("SynVer") # self.read_char.read_value() @@ -186,4 +136,86 @@ class SEMSocket(): # self.GetSynConfig() # #self.GetSN() + class NotConnectedException(Exception): + pass + class BTLEMessage(): + MAGIC_START = bytearray([0x0f]) + MAGIC_END = bytearray([0xff, 0xff]) + __data = bytearray() + __cmd = bytearray(1) # cmd cannot be empty + __payload = bytearray() + + def __init__(self, btle_device, cmd=bytearray(), payload=bytearray()): + self.__btle_device = btle_device + self.cmd = cmd + self.payload = payload + + @property + def cmd(self): + return self.__cmd + + @cmd.setter + def cmd(self, cmd): + self.__data = self.MAGIC_START + bytearray(1) + cmd + self.__payload + bytearray(1) + self.MAGIC_END + self.__cmd = cmd + self.__calc_length() + self.__calc_checksum() + + @property + def payload(self): + return self.__payload + + @payload.setter + def payload(self, payload): + self.__data = self.MAGIC_START + bytearray(1) + self.__cmd + payload + bytearray(1) + self.MAGIC_END + self.__payload = payload + self.__calc_length() + self.__calc_checksum() + + def __calc_checksum(self): + checksum = 1 + for i in range(2, self.__data[1] + 2): + checksum += self.__data[i] + self.__data[-3] = checksum & 0xff + + def __calc_length(self): + self.__data[1] = 1 + len(self.__payload) + 1 # cmd + payload + checksum + + def send(self): + if not self.__btle_device.connected: + self.__btle_device.reconnect(self.__btle_device.auto_reconnect_timeout) + + self.__btle_device._write_char.write(self.__data, True) + self.__btle_device._btle_device.waitForNotifications(5) + + + class BTLEHandler(btle.DefaultDelegate): + def __init__(self, btle_device): + btle.DefaultDelegate.__init__(self) + self.__btle_device = btle_device + + def handleNotification(self, cHandle, data): + message_type = data[2] + if message_type == 0x00: + if data[4] == 0x01: + print("Checksum error!") + else: + print("Unknown error:", data) + elif message_type == 0x03: #switch toggle + print("Switch toggled") + self.__btle_device.getStatus() + elif message_type == 0x04: #status related data + self.__btle_device.voltage = data[8] + self.__btle_device.current = (data[9] << 8 | data[10]) / 1000 + self.__btle_device.power = (data[5] << 16 | data[6] << 8 | data[7]) / 1000 + self.__btle_device.frequency = data[11] + self.__btle_device.powered = bool(data[4]) + elif message_type == 0x17: + if data[5] == 0x00 or data[5] == 0x01: + if data[4]: + print("Login failed") + else: + print("5th byte of login-response is > 1:", data) + else: + print ("Unknown message from Handle: 0x" + format(cHandle,'02X') + " Value: "+ format(data))