some tests
This commit is contained in:
commit
ce7471d85d
2 changed files with 316 additions and 0 deletions
200
statemachinetest.py
Normal file
200
statemachinetest.py
Normal file
|
@ -0,0 +1,200 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import threading
|
||||
import queue
|
||||
|
||||
class IllegalEventError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AbstractState(object):
|
||||
def __init__(self, controller):
|
||||
pass
|
||||
|
||||
def on_gabelschalter_up(self):
|
||||
raise IllegalEventError()
|
||||
|
||||
def on_gabelschalter_down(self):
|
||||
raise IllegalEventError()
|
||||
|
||||
def on_incoming_call(self):
|
||||
raise IllegalEventError()
|
||||
|
||||
def on_call_ended(self):
|
||||
raise IllegalEventError()
|
||||
|
||||
def on_call_accepted(self):
|
||||
raise IllegalEventError()
|
||||
|
||||
def on_call_ringing(self):
|
||||
raise IllegalEventError()
|
||||
|
||||
def on_nummernschalter_input(self, num):
|
||||
raise IllegalEventError()
|
||||
|
||||
def on_timeout(self):
|
||||
raise IllegalEventError()
|
||||
|
||||
|
||||
|
||||
class IdleState(AbstractState):
|
||||
def on_incoming_call(self):
|
||||
print('incomfing call')
|
||||
|
||||
def on_gabelschalter_up(self):
|
||||
print('gabel up')
|
||||
return DailingState
|
||||
|
||||
def on_gabelschalter_down(self):
|
||||
pass
|
||||
|
||||
def on_nummernschalter_input(self, x):
|
||||
pass
|
||||
|
||||
|
||||
class SchelltState(AbstractState):
|
||||
def on_gabelschalter_up(self):
|
||||
return AcceptingState
|
||||
|
||||
def on_call_ended(self):
|
||||
return IdleState
|
||||
|
||||
class AcceptingState(AbstractState):
|
||||
def on_call_accepted(self):
|
||||
return CallRunningState
|
||||
|
||||
class CallTerminatingState(AbstractState):
|
||||
def on_call_ended(self):
|
||||
return IdleState
|
||||
|
||||
def on_call_accepted(self):
|
||||
return None
|
||||
|
||||
class ForgottenState(AbstractState):
|
||||
def on_gabelschalter_down(self):
|
||||
return IdleState
|
||||
|
||||
class BusyBeepingState(AbstractState):
|
||||
def on_timeout(self):
|
||||
return ForgottenState
|
||||
|
||||
def on_gabelschalter_down(self):
|
||||
return IdleState
|
||||
|
||||
class CallRunningState(AbstractState):
|
||||
def on_gabelschalter_down(self):
|
||||
return CallTerminatingState
|
||||
|
||||
def on_call_ended(self):
|
||||
return BusyBeepingState
|
||||
|
||||
class WecktState(AbstractState):
|
||||
def on_gabelschalter_down(self):
|
||||
return CallTerminatingState
|
||||
|
||||
def on_call_ended(self):
|
||||
return BusyBeepingState
|
||||
|
||||
def on_call_accepted(self):
|
||||
return CallRunningState
|
||||
|
||||
class ConnectingState(AbstractState):
|
||||
def on_gabelschalter_down(self):
|
||||
return CallTerminatingState
|
||||
|
||||
def on_call_ringing(self):
|
||||
return WecktState
|
||||
|
||||
class DailingState(AbstractState):
|
||||
def on_gabelschalter_down(self):
|
||||
return IdleState
|
||||
|
||||
def on_nummernschalter_input(self, num):
|
||||
print('nummernschalter: %d' % (num))
|
||||
|
||||
def on_timeout(self):
|
||||
return ConnectingState
|
||||
|
||||
|
||||
|
||||
class TelefonapparatUserInterface(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def add_gabelschalter_callback(self, cb):
|
||||
pass
|
||||
|
||||
def add_nummernschalter_callback(self, cb):
|
||||
pass
|
||||
|
||||
def set_wecker(self, enabled):
|
||||
pass
|
||||
|
||||
def set_schauzeichen(self, enabled):
|
||||
pass
|
||||
|
||||
|
||||
class StateMachineController(object):
|
||||
def __init__(self):
|
||||
self.__state = IdleState(self)
|
||||
|
||||
self.__running = True
|
||||
self.__evqueue = queue.Queue()
|
||||
self.__evthread = threading.Thread(target=self.__event_dispatcher)
|
||||
self.__evthread.start()
|
||||
|
||||
def __event_dispatcher(self):
|
||||
while self.__running:
|
||||
(evname, evargs, evkwargs) = self.__evqueue.get()
|
||||
if not evname:
|
||||
return
|
||||
|
||||
print('!!! event: %s' % (evname))
|
||||
handler = getattr(self.__state, 'on_%s' % (evname))
|
||||
try:
|
||||
newstate = handler(*evargs, **evkwargs)
|
||||
except IllegalEventError:
|
||||
print('illegal event occured!!!!!!!!!!!!!!!!!!!!')
|
||||
if not newstate:
|
||||
continue
|
||||
|
||||
oldstate = self.__state.__class__
|
||||
print('%s -> %s' % (oldstate.__name__, newstate.__name__))
|
||||
self.__state = newstate(self)
|
||||
|
||||
|
||||
def queue_event(self, evname, *evargs, **evkwargs):
|
||||
if not hasattr(AbstractState, 'on_%s' % (evname)):
|
||||
raise ValueError('Illegal event name: %s' % (evname))
|
||||
self.__evqueue.put((evname, evargs, evkwargs))
|
||||
|
||||
def stop(self, hard=False):
|
||||
if hard:
|
||||
self.__running = False
|
||||
self.__evqueue.put((None, None, None))
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
c = StateMachineController()
|
||||
|
||||
c.queue_event('gabelschalter_up')
|
||||
c.queue_event('nummernschalter_input', 4)
|
||||
c.queue_event('nummernschalter_input', 2)
|
||||
#c.queue_event('gabelschalter_down')
|
||||
#c.queue_event('call_accepted')
|
||||
c.queue_event('timeout')
|
||||
c.queue_event('call_ringing')
|
||||
#c.queue_event('gabelschalter_down')
|
||||
c.queue_event('call_accepted')
|
||||
c.queue_event('call_ended')
|
||||
c.queue_event('timeout')
|
||||
c.queue_event('gabelschalter_down')
|
||||
|
||||
c.stop()
|
||||
|
||||
|
||||
|
116
tuitest.py
Normal file
116
tuitest.py
Normal file
|
@ -0,0 +1,116 @@
|
|||
|
||||
import RPi.GPIO as gpio
|
||||
import time
|
||||
import threading
|
||||
|
||||
|
||||
class TelefonapparatPinConfiguration(object):
|
||||
def __init__(self):
|
||||
self.pin_nsa = 11
|
||||
self.pin_nsi = 13
|
||||
self.pin_gabelschalter = 15
|
||||
self.pin_schauzeichen = 32
|
||||
self.pin_wecker_enable = 16
|
||||
self.pin_wecker_a = 18
|
||||
self.pin_wecker_b = 19
|
||||
|
||||
class TelefonapparatUserInterface(object):
|
||||
def __init__(self, pinconfig):
|
||||
self.__pinconfig = pinconfig
|
||||
|
||||
gpio.setup(self.__pinconfig.pin_nsa, gpio.IN, gpio.PUD_UP)
|
||||
gpio.setup(self.__pinconfig.pin_nsi, gpio.IN, gpio.PUD_UP)
|
||||
gpio.setup(self.__pinconfig.pin_gabelschalter, gpio.IN, gpio.PUD_UP)
|
||||
gpio.setup(self.__pinconfig.pin_schauzeichen, gpio.OUT)
|
||||
gpio.setup(self.__pinconfig.pin_wecker_enable, gpio.OUT)
|
||||
gpio.setup(self.__pinconfig.pin_wecker_a, gpio.OUT)
|
||||
gpio.setup(self.__pinconfig.pin_wecker_b, gpio.OUT)
|
||||
|
||||
gpio.add_event_detect(self.__pinconfig.pin_nsa, gpio.BOTH,
|
||||
self.__on_nsa_change, 20)
|
||||
gpio.add_event_detect(self.__pinconfig.pin_nsi, gpio.FALLING,
|
||||
self.__on_nsi_falling, 20)
|
||||
gpio.add_event_detect(self.__pinconfig.pin_gabelschalter, gpio.BOTH,
|
||||
self.__on_gabelschalter_change, 20)
|
||||
#self.__nsa_up_time = 0
|
||||
self.__nsi_cnt = 0
|
||||
self.__weckt = False
|
||||
|
||||
self.__nummernschalter_callbacks = []
|
||||
self.__gabelschalter_callbacks = []
|
||||
|
||||
def __on_nsa_change(self, pin):
|
||||
nsastate = gpio.input(self.__pinconfig.pin_nsa)
|
||||
if nsastate == 0:
|
||||
#self.__nsa_up_time = time.time()
|
||||
self.__nsi_cnt = 0
|
||||
else:
|
||||
for cb in self.__nummernschalter_callbacks:
|
||||
cb(self.__nsi_cnt)
|
||||
|
||||
def __on_nsi_falling(self, pin):
|
||||
#print 'nsi'
|
||||
self.__nsi_cnt += 1
|
||||
|
||||
def __on_gabelschalter_change(self, pin):
|
||||
gbstate = gpio.input(self.__pinconfig.pin_gabelschalter)
|
||||
print 'gabelschalter:', gbstate
|
||||
for cb in self.__gabelschalter_callbacks:
|
||||
cb(gbstate)
|
||||
|
||||
def __wecker_thread(self):
|
||||
while self.__weckt:
|
||||
c = 0
|
||||
gpio.output(self.__pinconfig.pin_wecker_enable, 1)
|
||||
while c < 1000:
|
||||
gpio.output(self.__pinconfig.pin_wecker_a, 1)
|
||||
gpio.output(self.__pinconfig.pin_wecker_b, 0)
|
||||
time.sleep(0.02)
|
||||
gpio.output(self.__pinconfig.pin_wecker_a, 0)
|
||||
gpio.output(self.__pinconfig.pin_wecker_b, 1)
|
||||
time.sleep(0.02)
|
||||
c += 40
|
||||
print 'ring'
|
||||
gpio.output(self.__pinconfig.pin_wecker_enable, 0)
|
||||
|
||||
print ''
|
||||
time.sleep(4)
|
||||
|
||||
|
||||
def add_gabelschalter_callback(self, cb):
|
||||
self.__gabelschalter_callbacks.append(cb)
|
||||
|
||||
def add_nummernschalter_callback(self, cb):
|
||||
self.__nummernschalter_callbacks.append(cb)
|
||||
|
||||
def set_wecker(self, enabled):
|
||||
if enabled and not self.__weckt:
|
||||
self.__weckt = True
|
||||
t = threading.Thread(target=self.__wecker_thread)
|
||||
t.start()
|
||||
elif not enabled:
|
||||
self.__weckt = False
|
||||
|
||||
def set_schauzeichen(self, enabled):
|
||||
raise Exception('Not implemented.')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
gpio.setmode(gpio.BOARD)
|
||||
pinconfig = TelefonapparatPinConfiguration()
|
||||
t = TelefonapparatUserInterface(pinconfig)
|
||||
|
||||
def dailed(num):
|
||||
print num
|
||||
|
||||
t.add_nummernschalter_callback(dailed)
|
||||
|
||||
t.set_wecker(True)
|
||||
time.sleep(20)
|
||||
t.set_wecker(False)
|
||||
while True:
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
|
||||
|
Loading…
Reference in a new issue