fetapi/phoneinterface.py

212 lines
6.9 KiB
Python

import time
import threading
import subprocess
from pylinphone.pylinphone import LinphoneCommunicationSocket
from history import History
class PhoneProxyConfiguration(object):
def __init__(self, name, proxy, identity, username, password, realm,
prefix):
self.name = name
self.proxy = proxy
self.identity = identity
self.username = username
self.password = password
self.realm = realm
self.prefix = prefix
class PhoneConfiguration(object):
def __init__(self, sound_device, incoming_timeout, linphone_config,
default_proxy, proxies, stun_server):
self.sound_device = sound_device
self.incoming_timeout = incoming_timeout
self.linphone_config = linphone_config
self.default_proxy = default_proxy
self.proxies = proxies
self.stun_server = stun_server
class PhoneEvent(object):
RegInProgress,\
RegSuccessfull,\
RegLost,\
CallIncoming,\
CallRinging,\
CallAccepted,\
CallEnded,\
CallBusy,\
CallInvalidNumber = range(9)
@classmethod
def string(cls, val):
for k, v in vars(cls).items():
if v == val:
return k
class PhoneInterface(object):
def __init__(self, config):
self.__event_cbs = []
self.__config = config
self.__core = LinphoneCommunicationSocket("/var/tmp/debian-sid/tmp/linphone")
self.__history = History()
self.__core.onLinphoneCallIncomingReceived = self.on_LinphoneCallIncomingReceived
self.__core.onLinphoneCallOutgoingRinging = self.on_LinphoneCallOutgoingRinging
self.__core.onLinphoneCallConnected = self.on_LinphoneCallConnected
self.__core.onLinphoneCallEnd = self.on_LinphoneCallEnd
# Create and add all proxy configs
for p in config.proxies:
aid = self.__core.register(p.identity, p.proxy, p.password, p.username) # sip:XXXX@hg.eventphone.de, hg.eventphone.de, MySecretPassword, XXXX
self.__audioproc = None
aplay = subprocess.Popen(['aplay', '-qD%s' % config.sound_device],
stdin=subprocess.PIPE)
self.__ttsproc = subprocess.Popen(['espeak', '-p10', '--stdout'],
stdin=subprocess.PIPE,
stdout=aplay.stdin)
self.current_call_id = None
# Set default parameters overriding the ones from the given config file
# TODO: figure out how to set at least some of these settings through the unix socket
#self.__core.set_user_agent('FeTAp 615', '0.1')
#self.__core.stun_server = config.stun_server
#self.__core.ringback = ''
#self.__core.max_calls = 1
#self.__core.inc_timeout = config.incoming_timeout
#self.__core.set_call_error_tone(linphone.Reason.Busy, '')
#self.__core.disable_chat(linphone.Reason.None)
#self.__core.echo_cancellation_enabled = False
#self.__core.video_capture_enabled = False
#self.__core.video_display_enabled = False
def run_callbacks(self, evt):
print(PhoneEvent.string(evt))
for cb in self.__event_cbs:
cb(evt)
def on_LinphoneCallIncomingReceived(self, event):
self.run_callbacks(PhoneEvent.CallIncoming)
self.current_call_id = event.call_id
self.__history.log_call(self.get_remote_number(), True)
def on_LinphoneCallOutgoingRinging(self, event):
self.run_callbacks(PhoneEvent.CallRinging)
self.__history.log_call(self.get_remote_number(), False)
def on_LinphoneCallConnected(self, event):
self.run_callbacks(PhoneEvent.CallAccepted)
def on_LinphoneCallEnd(self, event):
self.run_callbacks(PhoneEvent.CallEnded)
self.current_call_id = None
def __pollthread(self):
while self.__running:
self.__core.process_event()
time.sleep(0.2) # Value for good measure
def start(self):
self.__running = True
t = threading.Thread(target=self.__pollthread)
t.start()
def stop(self):
self.stop_playing()
if self.__ttsproc is not None:
self.__ttsproc.terminate()
self.__running = False
def add_event_cb(self, cb):
self.__event_cbs.append(cb)
def call(self, number):
if '@' not in number:
proxy = None
default_name = self.__config.default_proxy
for p in self.__config.proxies:
if p.name == default_name:
proxy = p
break
if proxy is None:
# Try to resolve prefix
for p in self.__config.proxies:
if number.startswith(p.prefix):
number = number[len(p.prefix):]
proxy = p
break
if proxy is not None:
number += '@' + proxy.realm
self.__core.call(number)
def accept_call(self):
self.__core.answer(self.current_call_id)
def decline_call(self):
self.__core.decline_call(self.__core.current_call)
def end_call(self):
self.__core.terminate()
def play_dial_tone(self):
self.stop_playing()
self.__audioproc = subprocess.Popen(
['play', '-nq', 'synth', 'sine', '425'],
env={'AUDIODRIVER': 'alsa', 'AUDIODEV': self.__config.sound_device}
)
def play_ringback_tone(self):
self.stop_playing()
self.__audioproc = subprocess.Popen(
['play', '-nq', 'synth', '1', 'sine', '425', 'pad', '4@1',
'repeat', '1000'],
env={'AUDIODRIVER': 'alsa', 'AUDIODEV': self.__config.sound_device}
)
def play_busy_tone(self):
self.stop_playing()
self.__audioproc = subprocess.Popen(
['play', '-nq', 'synth', '0.48', 'sine', '425', 'pad', '0.48@0.48',
'repeat', '1000'],
env={'AUDIODRIVER': 'alsa', 'AUDIODEV': self.__config.sound_device}
)
def stop_playing(self):
if self.__audioproc is not None:
self.__audioproc.terminate()
def read_text(self, text):
self.__ttsproc.stdin.write(text.encode('utf8') + b'\n')
self.__ttsproc.stdin.flush()
def get_remote_number(self):
if self.current_call_id != None:
status = self.__core.call_status(self.current_call_id)
return status.get("from")
if __name__ == '__main__':
def event_cb(evt):
print('Got event:', PhoneEvent.string(evt))
try:
phone = PhoneInterface('.linphonerc-foo', '.linphonerc')
phone.add_event_cb(event_cb)
phone.start()
i = 0
while True:
time.sleep(1)
i += 1
if i == 5:
phone.call('3474')
#phone.play_busy_tone()
pass
except KeyboardInterrupt:
phone.stop()