fetapi/phoneinterface.py

241 lines
8 KiB
Python
Raw Normal View History

2015-05-18 14:34:45 +02:00
import linphone
import time
import threading
import subprocess
2015-05-18 14:34:45 +02:00
2017-04-28 18:57:43 +02:00
class PhoneProxyConfiguration(object):
2015-06-04 18:42:08 +02:00
def __init__(self, name, proxy, identity, username, password, realm,
prefix):
self.name = name
self.proxy = proxy
self.identity = identity
self.username = username
self.password = password
self.realm = realm
self.prefix = prefix
2017-04-28 18:57:43 +02:00
class PhoneConfiguration(object):
2015-06-04 18:42:08 +02:00
def __init__(self, sound_device, incoming_timeout, linphone_config,
default_proxy, proxies, stun_server):
self.sound_device = sound_device
self.incoming_timeout = incoming_timeout
self.linphone_config = linphone_config
self.default_proxy = default_proxy
self.proxies = proxies
self.stun_server = stun_server
2017-04-28 18:57:43 +02:00
2015-05-18 14:34:45 +02:00
class PhoneEvent(object):
RegInProgress,\
RegSuccessfull,\
2015-05-20 01:11:00 +02:00
RegLost,\
2015-05-18 14:34:45 +02:00
CallIncoming,\
CallRinging,\
CallAccepted,\
CallEnded,\
CallBusy,\
CallInvalidNumber = range(9)
2015-05-18 14:34:45 +02:00
@classmethod
def string(cls, val):
for k, v in vars(cls).iteritems():
if v == val:
return k
2017-04-28 18:57:43 +02:00
2015-05-18 14:34:45 +02:00
class PhoneInterface(object):
def __init__(self, config):
2015-05-18 14:34:45 +02:00
cbs = {
'global_state_changed': self.__global_state_changed,
'registration_state_changed': self.__registration_state_changed,
'call_state_changed': self.__call_state_changed
}
self.__event_cbs = []
self.__config = config
self.__core = linphone.Core.new(cbs, None, config.linphone_config)
# Create and add all proxy configs
for p in config.proxies:
2017-04-28 18:57:43 +02:00
ainfo = self.__core.create_auth_info(p.username, p.username,
p.password, None, p.realm,
None)
self.__core.add_auth_info(ainfo)
pconf = self.__core.create_proxy_config()
pconf.edit()
if self.__core.version < '3.9.0':
pconf.identity = p.identity
else:
pconf.identity_address = pconf.normalize_sip_uri(p.identity)
pconf.publish_enabled = False
pconf.realm = p.realm
pconf.register_enabled = True
pconf.server_addr = p.proxy
self.__core.add_proxy_config(pconf)
pconf.done()
if p.name == config.default_proxy:
self.__core.default_proxy_config = pconf
2015-05-18 14:34:45 +02:00
self.__audioproc = None
2017-04-28 18:57:43 +02:00
aplay = subprocess.Popen(['aplay', '-qD%s' % config.sound_device],
stdin=subprocess.PIPE)
self.__ttsproc = subprocess.Popen(['espeak', '-p10', '--stdout'],
stdin=subprocess.PIPE,
stdout=aplay.stdin)
2015-05-18 14:34:45 +02:00
# Set default parameters overriding the ones from the given config file
self.__core.set_user_agent('FeTAp 615', '0.1')
self.__core.stun_server = config.stun_server
self.__core.ringback = ''
2015-05-18 14:34:45 +02:00
self.__core.max_calls = 1
self.__core.inc_timeout = config.incoming_timeout
2015-05-18 14:34:45 +02:00
self.__core.set_call_error_tone(linphone.Reason.Busy, '')
self.__core.disable_chat(linphone.Reason.None)
self.__core.echo_cancellation_enabled = False
self.__core.video_capture_enabled = False
self.__core.video_display_enabled = False
def __global_state_changed(self, core, state, msg):
print 'Global state changed:', state, msg
# TODO: Do we need events emitted here?
pass
def __registration_state_changed(self, core, proxyconf, state, msg):
print 'Registration state changed:', proxyconf, state, msg
evt = None
if state == linphone.RegistrationState.Progress:
evt = PhoneEvent.RegInProgress
elif state == linphone.RegistrationState.Ok:
evt = PhoneEvent.RegSuccessfull
elif state == linphone.RegistrationState.None:
2015-05-20 01:11:00 +02:00
evt = PhoneEvent.RegLost
2015-05-18 14:34:45 +02:00
if evt is not None:
for cb in self.__event_cbs:
cb(evt)
else:
print 'Unhandled registration state:', linphone.RegistrationState.string(state)
def __call_state_changed(self, core, call, state, msg):
print 'Call state changed:', call, state, msg
evt = None
if state == linphone.CallState.IncomingReceived:
evt = PhoneEvent.CallIncoming
elif state == linphone.CallState.OutgoingRinging:
evt = PhoneEvent.CallRinging
elif state == linphone.CallState.Connected:
evt = PhoneEvent.CallAccepted
elif state == linphone.CallState.End:
evt = PhoneEvent.CallEnded
elif state == linphone.CallState.Error:
error = call.error_info.reason
if error == linphone.Reason.Busy:
2015-05-18 14:34:45 +02:00
evt = PhoneEvent.CallBusy
elif error == linphone.Reason.NotFound:
evt = PhoneEvent.CallInvalidNumber
else:
evt = PhoneEvent.CallEnded
2015-05-18 14:34:45 +02:00
if evt is not None:
for cb in self.__event_cbs:
cb(evt)
else:
print 'Unhandled call state:', linphone.CallState.string(state)
2015-05-18 14:34:45 +02:00
def __pollthread(self):
while self.__running:
self.__core.iterate()
time.sleep(0.02) # Value from example code
def start(self):
self.__running = True
t = threading.Thread(target=self.__pollthread)
t.start()
def stop(self):
self.stop_playing()
if self.__ttsproc is not None:
self.__ttsproc.terminate()
2015-05-18 14:34:45 +02:00
self.__running = False
def add_event_cb(self, cb):
self.__event_cbs.append(cb)
def call(self, number):
if '@' not in number and self.__core.default_proxy_config is None:
# Try to resolve prefix
for p in self.__config.proxies:
if number.startswith(p.prefix):
number = number[len(p.prefix):]
number += '@' + p.realm
break
2015-05-18 14:34:45 +02:00
self.__core.invite(number)
def accept_call(self):
self.__core.accept_call(self.__core.current_call)
def decline_call(self):
self.__core.decline_call(self.__core.current_call, linphone.Reason.Busy)
2015-05-18 14:34:45 +02:00
def end_call(self):
self.__core.terminate_call(self.__core.current_call)
def play_dial_tone(self):
self.stop_playing()
2017-04-28 18:57:43 +02:00
self.__audioproc = subprocess.Popen(
['play', '-nq', 'synth', 'sine', '425'],
env={'AUDIODRIVER': 'alsa', 'AUDIODEV': self.__config.sound_device}
)
def play_ringback_tone(self):
self.stop_playing()
2017-04-28 18:57:43 +02:00
self.__audioproc = subprocess.Popen(
['play', '-nq', 'synth', '1', 'sine', '425', 'pad', '4@1',
'repeat', '1000'],
env={'AUDIODRIVER': 'alsa', 'AUDIODEV': self.__config.sound_device}
)
2015-05-18 14:34:45 +02:00
def play_busy_tone(self):
self.stop_playing()
2017-04-28 18:57:43 +02:00
self.__audioproc = subprocess.Popen(
['play', '-nq', 'synth', '0.48', 'sine', '425', 'pad', '0.48@0.48',
'repeat', '1000'],
env={'AUDIODRIVER': 'alsa', 'AUDIODEV': self.__config.sound_device}
)
2015-05-18 14:34:45 +02:00
def stop_playing(self):
if self.__audioproc is not None:
self.__audioproc.terminate()
2015-05-18 14:34:45 +02:00
def read_text(self, text):
self.__ttsproc.stdin.write(text + '\n')
self.__ttsproc.stdin.flush()
2015-12-27 11:52:38 +01:00
def get_remote_number(self):
return self.__core.current_call_remote_address.username
2015-05-18 14:34:45 +02:00
if __name__ == '__main__':
def event_cb(evt):
print 'Got event:', PhoneEvent.string(evt)
try:
phone = PhoneInterface('.linphonerc-foo', '.linphonerc')
phone.add_event_cb(event_cb)
phone.start()
i = 0
while True:
time.sleep(1)
i += 1
if i == 5:
phone.call('3474')
#phone.play_busy_tone()
pass
except KeyboardInterrupt:
phone.stop()