Module: rtclite.vnd.adobe.rtmpt
# Copyright (c) 2011, Kundan Singh. All rights reserved. see README for details.

This is a simple tunnel application that receives connection on RTMPT and forwards on RTMP.

I have tested this with Flash VideoIO on Flash Player 11 and rtmplite's server. To test it yourself, first start an RTMP server, e.g.,
  $ python -m rtclite.vnd.adobe.rtmp -d
This listens on default TCP port 1935. The -d option enabled debug trace during development.
Now start this tunnel in debug mode listening on port 8080 (RTMPT/HTTP) and forwarding to localhost:1935 (RTMP).
  $ python -m rtclite.vnd.adobe.rtmpt -l -t -d
By default listens on port 8080 on RTMPT and forward to localhost:1935 on RTMP, so the -l and -t options above are unnecessary.
Now point your browser to for the Flash videoIO test page.
Set the "src" property to rtmpt://localhost:8080/myapp?publish=live to start publishing using RTMPT.
To play the stream, open another browser instance or tab to the same test page, and set the "src"
property to rtmpt://localhost:8080/myapp?play=live
Note that the RTMP server is doing actual media conferencing, whereas this tunnel application just forwards between RTMPT/HTTP and RTMP.
You can have some participants on "rtmp" and others on "rtmpt" as long as both connect to the same back end RTMP server under the same
connection scope.

Known issues: this tunnel software is in alpha with known issues:
1. Disconnection of publisher when player disconencts.

import random, socket, traceback, SocketServer, logging

logger = logging.getLogger('rtmpt')

class Session(object):
    def __init__(self): = str(random.randint(1000000000, 9999999999))
        self._sock, self._timeout, self._pending, self._next = None, 0.020, [], 0
    def connect(self, target_address):
        sock = self._sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # make it non-block
    def close(self):
        if self._sock is not None:
            self._sock = None
    def sendrecv(self, seq, data):
        if seq == self._next:
            self._next += 1
            logger.debug('=>%r=>   (%s)', seq,
            if data: self._sock.send(data)
            while self._pending:
                found = [(i, x[0], x[1]) for i, x in enumerate(self._pending) if x[0] == self._next]
                if not found: break
                index, seq, data = found[0]
                del self._pending[index]
                self._next += 1
                logger.debug('  %r=>   (%s)', seq,
                if data: self._sock.send(data)
            try: response = self._sock.recv(8192)
            except socket.timeout: response = ''
            logger.debug('=>%r     (%s)', seq,
            self._pending.append((seq, data))
            response = '' # no need to respond with data in this case
        return response
class tunnel(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    allow_reuse_address = True 

class handler(SocketServer.StreamRequestHandler):
    intervals = ('\x01', '\x03', '\x05', '\x09', '\x11', '\x21')
    def handle(self):
        logger.debug('created new handler for %r', self.request)
        interval, failed = self.intervals[0], 0
            while True:
                firstline = body = response = None
                headers = {}
                firstline = self.readline()
                if firstline is None: raise ValueError, 'connection closed in reading first line'
                logger.debug('%s', firstline)
                method, path, protocol = firstline.split(' ')
                if method != 'POST': raise ValueError, 'invalid method ' + method
                while True:
                    line = self.readline()
                    if line is None: raise ValueError, 'connection closed in reading headers'
                    logger.debug('%s', line)
                    if not line: break
                    name, value = line.split(':', 1)
                    headers[name.lower().strip()] = value.strip()
                ctype, clen, conn = [headers.get(name.lower(), None) for name in ('content-type', 'content-length', 'connection')]
                if ctype != 'application/x-fcs': raise ValueError, 'invalid content-type ' + ctype
                if clen: clen = int(clen)
                if clen > 0: body =
                if path == '/fcs/ident2':
                    self.send_error(404, 'Not Found')
                elif path == '/open/1':
                    while True:
                        session = Session()
                        if not in self.server.sessions:
                        self.server.sessions[] = session
                        self.send_response( + '\n')
                    except socket.error:
                        self.send_error(500, 'Cannot Connect to Server')
                    parts = path.split('/')
                    if len(parts) == 4 and parts[1] in ('idle', 'send', 'close'):
                        ignore, command, sessionId, seq = parts
                        session, seq = self.server.sessions.get(sessionId, None), int(seq)
                        if not session:
                            self.send_error(500, 'Invalid session ' + sessionId)
                        elif command == 'idle' or command == 'send':
                            response = session.sendrecv(seq, body if command == 'send' else None)
                            if response:
                                interval, failed = self.intervals[0], 0
                                failed += 1
                                if failed >= 10:
                                    index = self.intervals.index(interval)
                                    if index < len(self.intervals) - 1: index += 1
                                    interval, failed = self.intervals[index], 0
                                    logger.debug('changed interval to 0x%x', ord(interval))
                            self.send_response(interval + response)
                        elif command == 'close':
                            del self.server.sessions[]
                        raise ValueError, 'invalid path ' + path
                if conn == 'close':
            if _debug: traceback.print_exc()
    def send_error(self, code, reason):
        self.write('HTTP/1.1 %d %s'%(code, reason), 'Content-Length: 0')
    def send_response(self, body):
        self.write('HTTP/1.1 200 OK', 'Content-Type: application/x-fcs', 'Content-Length: %d'%(len(body) if body else 0), body=body)
    def write(self, *args, **kwargs):
        data = '\r\n'.join(args) + '\r\n\r\n'
        logger.debug('%r', data[:-2])
        if 'body' in kwargs and kwargs.get('body'): data += kwargs.get('body')
    def read(self, length):
    def readline(self):
        value = self.rfile.readline()
        return None if not value else value.rstrip() if value[-1] == '\n' else value

def run(server_address = ('', 8080), target_address = ('', 1935),
        server_class=tunnel, handler_class=handler):
    logger.debug('starting HTTP server on %r target %r', server_address, target_address)
    server = server_class(server_address, handler_class)
    server.target_address = target_address
    server.sessions = {} # map from to session
    except KeyboardInterrupt:

# The main routine to start, run and stop the service
if __name__ == '__main__':
    import sys
    from optparse import OptionParser
    parser = OptionParser()
    parser.add_option('-l', '--listen',  dest='listen',  default='', help="listening transport address. Default ''")
    parser.add_option('-t', '--target',  dest='target',  default="", help="target server address. Default is ''")
    parser.add_option('-d', '--verbose', dest='verbose', default=False, action='store_true', help='enable debug trace')
    parser.add_option('--test', dest='test', default=False, action='store_true', help='test this module and exit')
    (options, args) = parser.parse_args()
    if options.test: sys.exit() # no tests
    logger.setLevel(logging.DEBUG if options.verbose else logging.INFO)
    listen, target = [(x.partition(':')[0], int(x.partition(':')[2])) for x in (options.listen,]
    run(server_address=listen, target_address=target)