'''
Implements a P2P-SIP adaptor and related components. The basic idea is derived from my earlier
work on SIPpeer and P2P-SIP as documented in the following papers, however the software is
completely independent:
[1] K.Singh and H.Schulzrinne, Peer-to-peer Internet telephony using SIP, NOSSDAV, 2005
http://www1.cs.columbia.edu/~kns10/publication/sip-p2p-short.pdf
[2] K.Singh and H.Schulzrinne, SIPpeer: a session initiation protocol (SIP)-based peer-to-peer
Internet telephony client adaptor, Columbia University Implementation Report 2004,
http://www1.cs.columbia.edu/~kns10/publication/sip-p2p-design.pdf
[3] K.Singh and H.Schulzrinne, Using an external DHT as a SIP location service, Columbia University
Technical Report, 2006. http://mice.cs.columbia.edu/getTechreport.php?techreportID=388
[4] K.Singh, Hello2web: a web-based Internet telephony client, a class project done in 2000.
http://www1.cs.columbia.edu/~kns10/software/helloweb/
The high-level overview is as follows: the adaptor acts as a SIP-to-SIP gateway, and allows
you to use your favorite SIP user agent to connect to the locally running adaptor. The adaptor
then uses P2P module to perform lookup and storage of contact information, along with other
information such as cryptographic credentials. An incoming REGISTER request from your UA
is translated into a P2P put operation that stores your contact information. Any other SIP request
from your UA is translated into a P2P get (lookup) operation for the destination, and then the
request is routed to that destination.
The implementation takes care of any NAT and firewall traversal issues in the signaling and
media path. In particular, it uses existing P2P nodes as relays if needed and performs a
combination of various NAT traversal protocols such as STUN, TURN and ICE.
For security reasons, the default implementation can accept local user agent only from localhost
i.e., IP address of 127.0.0.1. The implementation allows both TCP and UDP for the local user agent
but uses only UDP on the Internet side. This is to facilitate easy NAT traversal on the Internet.
The implementation also allows a broken SIP message from the local user agent. This allows writing
simple clients, e.g., from web, without involving the complete SIP stack. The simplification
is as follows:
If the local user agent connects to the adaptor over TCP, then the user agent does not need to do
any request retransmission. The TCP connection can remain persistent for the duration of the
registration. All request and response are exchanged on this TCP connection. If there is no
Expires header in the REGISTER request, then the adaptor assumes the registration
to be valid as long as the TCP connection is valid. If there is no Contact header in the
REGISTER request, then the adaptor picks one based on local IP, else it may acts as a stateless
or stateful SIP proxy.
If the local user agent connects to the adaptor using UDP, then the user agent does need to do
request retransmissions as per SIP. However, the adaptor performs periodic keep-alive using
OPTIONS to check the status of the user agent, and unregisters the user if the user agent
is found to be not alive, unless the original REGISTER was done with Expires of a day or more,
in which case the registration is assumed to be persistent for that duration.
Because of the design, the adaptor can be readily used as a P2P-SIP proxy as described in my
paper [1]. However, you must remove the restriction of only local user agent connection to allow
it to be a general purpose proxy used in a server farm.
'''
import os, sys, socket, time, traceback, types, logging
from ... import multitask
from ...std.ietf import rfc3261
from ...std.ietf.rfc2396 import Address
from ...std.ietf.rfc3261 import Stack, Message, Header, UserAgent, Proxy, TransportInfo
from ...std.ietf.rfc2617 import createAuthenticate
from ...common import getlocaladdr, multitask_Timer as Timer
from ..net.p2p.dht import randomNonce
from ..net.p2p.pipe import ServerSocket, H
logger = logging.getLogger('sip.p2p')
class AbstractAgent(object):
'''This is an abstract base class to connect SIP and location lookup. In particular, it creates listening
SIP ports, and uses a location dictionary to store user contacts. The actual sub-class must override the
self.location property to modify the storage layer (to P2P or database) and the onREGISTER, onINVITE,
onRequest methods to modify the call routing.'''
def __init__(self, sipaddr=('127.0.0.1', 5062), stack=Stack):
'''Construct a new Agent. sipaddr argument indicates the listening address for incoming
local SIP connections and messages on both UDP and TCP. It initializes the local members.'''
logger.debug('starting agent on %r', sipaddr)
self.conn = dict()
self.location = dict()
sock = socket.socket(type=socket.SOCK_DGRAM); sock.bind(sipaddr)
self.udp = stack(self, TransportInfo(sock)); self.udp.sock = sock
sock = socket.socket(type=socket.SOCK_STREAM); sock.bind(sipaddr); sock.listen(5)
self.tcp = stack(self, TransportInfo(sock)); self.tcp.sock = sock
self._gens = []
def __del__(self):
'''Delete the object and internal member references.'''
try:
self.udp.sock.close(); self.tcp.sock.close();
del self.p2p, self.udp, self.tcp, self._gens
except: pass
def start(self):
'''Start the agent.'''
for gen in [self._sipreceiver(self.udp), self._sipreceiver(self.tcp)]: self._gens.append(gen); multitask.add(gen)
return self
def stop(self):
'''Stop the agent.'''
for gen in self._gens: gen.close();
self._gens[:] = []
return self
def _sipreceiver(self, stack, maxsize=16386):
'''Handle the messages or connections on the given SIP stack's socket, and pass it to the stack
so that stack can invoke appropriate callback on this object such as receivedRequest.'''
sock = stack.sock
def tcpreceiver(sock, remote):
while True:
data = yield multitask.recv(sock, maxsize)
logger.debug('%r=>%r on type=%r\n%s', remote, sock.getsockname(), sock.type, data)
if data: stack.received(data, remote)
while True:
if sock.type == socket.SOCK_DGRAM:
data, remote = yield multitask.recvfrom(sock, maxsize)
logger.debug('%r=>%r on type=%r\n%s', remote, sock.getsockname(), sock.type, data)
if data: stack.received(data, remote)
elif sock.type == socket.SOCK_STREAM:
conn, remote = yield multitask.accept(sock)
if conn:
self.conn[remote] = conn
multitask.add(tcpreceiver(conn, remote))
else: raise ValueError, 'invalid socket type'
def send(self, data, remote, stack):
'''Send a given data to remote for the SIP stack.'''
def _send(self, data, remote, stack):
logger.debug('%r=>%r on type=%r\n%s', stack.sock.getsockname(), remote, stack.sock.type, data)
if stack.sock.type == socket.SOCK_STREAM:
if remote in self.conn:
yield multitask.send(self.conn[remote], data)
else:
yield multitask.sendto(stack.sock, data, remote)
multitask.add(_send(self, data, remote, stack))
def createServer(self, request, uri, stack):
'''Create a Proxy UAS for all requests except CANCEL.'''
return (request.method != 'CANCEL') and Proxy(stack, request) or None
def sending(self, ua, message, stack): pass
def receivedRequest(self, ua, request, stack):
logger.debug('received request from stack %r', request.method)
handlerName = 'on' + request.method
try:
if hasattr(self, handlerName) and callable(eval('self.' + handlerName)):
result = getattr(self, handlerName)(ua, request, stack)
else:
result = self.onRequest(ua, request, stack)
if result is not None and type(result) == types.GeneratorType:
logger.debug('result type %r', type(result))
multitask.add(result)
except:
logger.exception('exception in %r', handlerName)
ua.sendResponse(500, 'Internal server error')
def receivedResponse(self, ua, response, stack): pass
def cancelled(self, ua, request, stack): pass
def dialogCreated(self, dialog, ua, stack): pass
def authenticate(self, ua, header, stack): return True
def createTimer(self, app, stack): return Timer(app)
def onREGISTER(self, ua, request, stack):
if request.To.value.uri != request.From.value.uri:
ua.sendResponse(400, 'Third-party registration not supported')
return
auth = self.authorize(request)
if auth == 200:
saved = yield self.save(msg=request, uri=str(request.To.value.uri).lower())
logger.debug('saved=%r', saved)
if not saved:
ua.sendResponse(500, 'Internal server error')
else:
response = ua.createResponse(200, 'OK');
locations = yield self.locate(str(request.To.value.uri))
for h in map(lambda x: Header(str(x), 'Contact'), locations):
response.insert(h, append=True)
response.Expires = request.Expires if request.Expires else Header('3600', 'Expires')
ua.sendResponse(response)
elif auth == 404:
ua.sendResponse(404, 'Not found')
else:
response = ua.createResponse(401, 'Unauthorized')
response.insert(Header(createAuthenticate(realm='localhost', domain=str(request.uri), stale=('FALSE' if auth==401 else 'TRUE')), 'WWW-Authenticate'), append=True)
ua.sendResponse(response)
def onPUBLISH(self, ua, request, stack):
return self.onREGISTER(ua, request, stack)
def onRequest(self, ua, request, stack):
if request.Route:
proxied = ua.createRequest(request.method, dest=request.uri, recordRoute=(request.method=='INVITE'))
ua.sendRequest(proxied)
return
if request.had_lr and not ua.isLocal(request.uri):
logger.debug('proxying routed non-local request %r', request.uri)
proxied = ua.createRequest(request.method, dest=request.uri)
ua.sendRequest(proxied)
return
dest = yield self.locate(str(request.uri))
logger.debug('locations=%r', dest)
if dest:
if self.isProxy(request):
for c in dest:
ua.sendRequest(ua.createRequest(request.method, c.value.uri, recordRoute=True))
else:
response = ua.createResponse(302, 'Moved Temporarily')
for c in dest: response.insert(c, append=True)
ua.sendResponse(response)
else:
ua.sendResponse(480, 'Temporarily unavailable')
def isProxy(self, request):
return request['user-agent'] and request['user-agent'].value.find('X-Lite') >= 0
def authorize(self, request, realm='localhost'):
'''Server side of authentication. Returns 200 on success, 401 on failure, 0 if missing or invalid
nonce, and 404 if no password/user information available.'''
auths = filter(lambda x: x['realm']==realm, request.all('Authorization', 'Proxy-Authorization'))
if not auths: return 0
uri = request.From.value.uri
return 200
def save(self, msg, uri, defaultExpires=3600):
'''Save the contacts from REGISTER or PUBLISH msg.'''
expires = int(msg.Expires.value if msg.Expires else defaultExpires)
if uri in self.location: existing = self.location[uri]
else: existing = self.location[uri] = []
if msg.Contact and msg.first('Contact').value == '*':
if msg.Expires and msg.Expires.value == '0':
del self.location[uri]
else:
now = time.time()
for c in msg.all('Contact'):
e = now + (expires if 'expires' not in c else int(c.expires))
existing[:] = filter(lambda x: x[0].value.uri!=c.value.uri, existing)
existing.insert(0, (c, e))
existing[:] = filter(lambda x: x[1]>now, existing)
if not existing:
del self.location[uri]
logger.debug('save %r returning True', self.location)
yield
raise StopIteration(True)
def locate(self, uri):
'''Return all saved contacts for the given uri.'''
logger.debug('locate %r %r', uri, self.location)
existing = self.location.get(str(uri), [])
now = time.time()
existing[:] = filter(lambda x: x[1]>now, existing)
for c in existing: c[0]['expires'] = str(int(c[1]-now))
result = map(lambda x: x[0], existing)
yield
raise StopIteration(result)
class Agent(AbstractAgent):
'''An adaptor for P2P-SIP, that maps between local SIP user agent and Internet SIP network,
and uses P2P module for lookup and storage. This is based on the data mode. A similar class
can be implemented that does service mode, with advanced features such as presence aggregation
and dynamic call routing.'''
def __init__(self, server=False, sipaddr=('127.0.0.1', 5062), port=0):
'''Initialize the P2P-SIP agent'''
AbstractAgent.__init__(self, sipaddr=sipaddr)
self.p2p = ServerSocket(server=server, port=port)
self.location = None
def start(self, servers=None):
'''Start the Agent'''
self.p2p.start(servers=servers); AbstractAgent.start(self); return self
def stop(self):
'''Stop the Agent'''
AbstractAgent.stop(self); self.p2p.stop(); return self
def p2preceiver(self, p2p):
'''Receive packets or connections from p2p socket server.'''
def p2phandler(self, sock):
while True:
data = yield sock.recv()
while True:
sock = yield self.p2p.accept()
if hasattr(self, 'identity') and self.identity: multitask.add(p2phandler(sock))
def save(self, msg, uri, defaultExpires=3600):
'''Save the contacts from REGISTER or PUBLISH msg to P2P storage.'''
expires = int(msg.Expires.value if msg.Expires else defaultExpires)
existing = yield self.p2p.get(H(uri))
if msg.Contact and msg.first('Contact').value == '*':
if msg.Expires and msg.Expires.value == '0':
if existing:
for value, nonce, Kp, expires in existing:
yield self.p2p.remove(H(uri), value, nonce, expires+1)
existing = []
else:
existing = dict([(str(Header(str(x[0]), 'Contact').value.uri), x) for x in existing])
now, remove, update = time.time(), [], []
for c in msg.all('Contact'):
e = now + (expires if 'expires' not in c else int(c.expires))
if e<=now: remove.append(c)
else: update.insert(0, (c, e))
for c in remove:
if c.value.uri in existing:
value, nonce, Kp, expires = existing[c.value.uri]
yield self.p2p.remove(H(uri), value, nonce, expires+1)
for c, e in update:
if c.value.uri in existing:
value, nonce, Kp, expires = existing[c.value.uri]
yield self.p2p.put(H(uri), value, nonce, now+e)
else:
yield self.p2p.put(H(uri), str(c.value), randomNonce(), now+e)
logger.debug('save %r returning True', self.location)
raise StopIteration(True)
def locate(self, uri):
'''Return all saved contacts for the given uri from P2P storage.'''
existing = yield self.p2p.get(H(uri))
now = time.time()
result = []
for value, nonce, Kp, expires in existing:
c = Header(value, 'Contact')
c['expires'] = str(int(expires - now))
result.append(c)
logger.debug('locate %r %r', uri, result)
raise StopIteration(result)
_apps = dict()
def start(app=None, options=None):
global _apps
if app not in _apps:
agent = _apps[app] = Agent().start()
def stop(app=None):
global _apps
if app in _apps:
_apps[app].stop(); del _apps[app]
if __name__ == '__main__':
from optparse import OptionParser
parser = OptionParser()
parser.add_option('-d', '--verbose', dest='verbose', default=False, action='store_true', help='enable debug trace')
parser.add_option('-s', '--server', dest='server', default=False, action="store_true", help='start as bootstrap node')
parser.add_option('-p', '--sip-port',dest='sip_port', default=5062, type="int", help='SIP port number to listen. default is 5062.')
parser.add_option('-P', '--p2p-port',dest='p2p_port', default=0, type="int", help='Preferred P2P port number to listen. If supplied, disables multicast.')
parser.add_option('-S', '--servers', dest='servers', default=None, help='list of bootstrap nodes as "ip:port,ip:port,..."')
parser.add_option('--test', dest='test', default=False, action='store_true', help='test this module and exit')
(options, args) = parser.parse_args()
logging.basicConfig(level=logging.DEBUG if options.verbose else logging.INFO)
if options.test: sys.exit()
servers = [(x.split(':', 1)[0], int(x.split(':', 1)[1])) for x in options.servers.split(',')] if options.servers else None
agent = Agent(server=options.server, sipaddr=('0.0.0.0', options.sip_port), port=options.p2p_port).start(servers)
try: multitask.run()
except KeyboardInterrupt: pass
agent.stop()