From ce7471d85d1073d953d61cb0039a588c84a3ab38 Mon Sep 17 00:00:00 2001 From: Frederic Date: Fri, 15 May 2015 23:38:44 +0200 Subject: [PATCH] some tests --- statemachinetest.py | 200 ++++++++++++++++++++++++++++++++++++++++++++ tuitest.py | 116 +++++++++++++++++++++++++ 2 files changed, 316 insertions(+) create mode 100644 statemachinetest.py create mode 100644 tuitest.py diff --git a/statemachinetest.py b/statemachinetest.py new file mode 100644 index 0000000..576d2e4 --- /dev/null +++ b/statemachinetest.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python3 + +import threading +import queue + +class IllegalEventError(Exception): + pass + + +class AbstractState(object): + def __init__(self, controller): + pass + + def on_gabelschalter_up(self): + raise IllegalEventError() + + def on_gabelschalter_down(self): + raise IllegalEventError() + + def on_incoming_call(self): + raise IllegalEventError() + + def on_call_ended(self): + raise IllegalEventError() + + def on_call_accepted(self): + raise IllegalEventError() + + def on_call_ringing(self): + raise IllegalEventError() + + def on_nummernschalter_input(self, num): + raise IllegalEventError() + + def on_timeout(self): + raise IllegalEventError() + + + +class IdleState(AbstractState): + def on_incoming_call(self): + print('incomfing call') + + def on_gabelschalter_up(self): + print('gabel up') + return DailingState + + def on_gabelschalter_down(self): + pass + + def on_nummernschalter_input(self, x): + pass + + +class SchelltState(AbstractState): + def on_gabelschalter_up(self): + return AcceptingState + + def on_call_ended(self): + return IdleState + +class AcceptingState(AbstractState): + def on_call_accepted(self): + return CallRunningState + +class CallTerminatingState(AbstractState): + def on_call_ended(self): + return IdleState + + def on_call_accepted(self): + return None + +class ForgottenState(AbstractState): + def on_gabelschalter_down(self): + return IdleState + +class BusyBeepingState(AbstractState): + def on_timeout(self): + return ForgottenState + + def on_gabelschalter_down(self): + return IdleState + +class CallRunningState(AbstractState): + def on_gabelschalter_down(self): + return CallTerminatingState + + def on_call_ended(self): + return BusyBeepingState + +class WecktState(AbstractState): + def on_gabelschalter_down(self): + return CallTerminatingState + + def on_call_ended(self): + return BusyBeepingState + + def on_call_accepted(self): + return CallRunningState + +class ConnectingState(AbstractState): + def on_gabelschalter_down(self): + return CallTerminatingState + + def on_call_ringing(self): + return WecktState + +class DailingState(AbstractState): + def on_gabelschalter_down(self): + return IdleState + + def on_nummernschalter_input(self, num): + print('nummernschalter: %d' % (num)) + + def on_timeout(self): + return ConnectingState + + + +class TelefonapparatUserInterface(object): + def __init__(self): + pass + + def add_gabelschalter_callback(self, cb): + pass + + def add_nummernschalter_callback(self, cb): + pass + + def set_wecker(self, enabled): + pass + + def set_schauzeichen(self, enabled): + pass + + +class StateMachineController(object): + def __init__(self): + self.__state = IdleState(self) + + self.__running = True + self.__evqueue = queue.Queue() + self.__evthread = threading.Thread(target=self.__event_dispatcher) + self.__evthread.start() + + def __event_dispatcher(self): + while self.__running: + (evname, evargs, evkwargs) = self.__evqueue.get() + if not evname: + return + + print('!!! event: %s' % (evname)) + handler = getattr(self.__state, 'on_%s' % (evname)) + try: + newstate = handler(*evargs, **evkwargs) + except IllegalEventError: + print('illegal event occured!!!!!!!!!!!!!!!!!!!!') + if not newstate: + continue + + oldstate = self.__state.__class__ + print('%s -> %s' % (oldstate.__name__, newstate.__name__)) + self.__state = newstate(self) + + + def queue_event(self, evname, *evargs, **evkwargs): + if not hasattr(AbstractState, 'on_%s' % (evname)): + raise ValueError('Illegal event name: %s' % (evname)) + self.__evqueue.put((evname, evargs, evkwargs)) + + def stop(self, hard=False): + if hard: + self.__running = False + self.__evqueue.put((None, None, None)) + + + + + + +if __name__ == '__main__': + c = StateMachineController() + + c.queue_event('gabelschalter_up') + c.queue_event('nummernschalter_input', 4) + c.queue_event('nummernschalter_input', 2) + #c.queue_event('gabelschalter_down') + #c.queue_event('call_accepted') + c.queue_event('timeout') + c.queue_event('call_ringing') + #c.queue_event('gabelschalter_down') + c.queue_event('call_accepted') + c.queue_event('call_ended') + c.queue_event('timeout') + c.queue_event('gabelschalter_down') + + c.stop() + + + diff --git a/tuitest.py b/tuitest.py new file mode 100644 index 0000000..2fecc05 --- /dev/null +++ b/tuitest.py @@ -0,0 +1,116 @@ + +import RPi.GPIO as gpio +import time +import threading + + +class TelefonapparatPinConfiguration(object): + def __init__(self): + self.pin_nsa = 11 + self.pin_nsi = 13 + self.pin_gabelschalter = 15 + self.pin_schauzeichen = 32 + self.pin_wecker_enable = 16 + self.pin_wecker_a = 18 + self.pin_wecker_b = 19 + +class TelefonapparatUserInterface(object): + def __init__(self, pinconfig): + self.__pinconfig = pinconfig + + gpio.setup(self.__pinconfig.pin_nsa, gpio.IN, gpio.PUD_UP) + gpio.setup(self.__pinconfig.pin_nsi, gpio.IN, gpio.PUD_UP) + gpio.setup(self.__pinconfig.pin_gabelschalter, gpio.IN, gpio.PUD_UP) + gpio.setup(self.__pinconfig.pin_schauzeichen, gpio.OUT) + gpio.setup(self.__pinconfig.pin_wecker_enable, gpio.OUT) + gpio.setup(self.__pinconfig.pin_wecker_a, gpio.OUT) + gpio.setup(self.__pinconfig.pin_wecker_b, gpio.OUT) + + gpio.add_event_detect(self.__pinconfig.pin_nsa, gpio.BOTH, + self.__on_nsa_change, 20) + gpio.add_event_detect(self.__pinconfig.pin_nsi, gpio.FALLING, + self.__on_nsi_falling, 20) + gpio.add_event_detect(self.__pinconfig.pin_gabelschalter, gpio.BOTH, + self.__on_gabelschalter_change, 20) + #self.__nsa_up_time = 0 + self.__nsi_cnt = 0 + self.__weckt = False + + self.__nummernschalter_callbacks = [] + self.__gabelschalter_callbacks = [] + + def __on_nsa_change(self, pin): + nsastate = gpio.input(self.__pinconfig.pin_nsa) + if nsastate == 0: + #self.__nsa_up_time = time.time() + self.__nsi_cnt = 0 + else: + for cb in self.__nummernschalter_callbacks: + cb(self.__nsi_cnt) + + def __on_nsi_falling(self, pin): + #print 'nsi' + self.__nsi_cnt += 1 + + def __on_gabelschalter_change(self, pin): + gbstate = gpio.input(self.__pinconfig.pin_gabelschalter) + print 'gabelschalter:', gbstate + for cb in self.__gabelschalter_callbacks: + cb(gbstate) + + def __wecker_thread(self): + while self.__weckt: + c = 0 + gpio.output(self.__pinconfig.pin_wecker_enable, 1) + while c < 1000: + gpio.output(self.__pinconfig.pin_wecker_a, 1) + gpio.output(self.__pinconfig.pin_wecker_b, 0) + time.sleep(0.02) + gpio.output(self.__pinconfig.pin_wecker_a, 0) + gpio.output(self.__pinconfig.pin_wecker_b, 1) + time.sleep(0.02) + c += 40 + print 'ring' + gpio.output(self.__pinconfig.pin_wecker_enable, 0) + + print '' + time.sleep(4) + + + def add_gabelschalter_callback(self, cb): + self.__gabelschalter_callbacks.append(cb) + + def add_nummernschalter_callback(self, cb): + self.__nummernschalter_callbacks.append(cb) + + def set_wecker(self, enabled): + if enabled and not self.__weckt: + self.__weckt = True + t = threading.Thread(target=self.__wecker_thread) + t.start() + elif not enabled: + self.__weckt = False + + def set_schauzeichen(self, enabled): + raise Exception('Not implemented.') + + +if __name__ == '__main__': + gpio.setmode(gpio.BOARD) + pinconfig = TelefonapparatPinConfiguration() + t = TelefonapparatUserInterface(pinconfig) + + def dailed(num): + print num + + t.add_nummernschalter_callback(dailed) + + t.set_wecker(True) + time.sleep(20) + t.set_wecker(False) + while True: + time.sleep(1) + + + +