PyXR

c:\projects\bitpim\src \ bitfling \ xmlrpcstuff.py



0001 #!/usr/bin/env python
0002 
0003 # This file is double licensed as the BitPim License and the Python
0004 # licenses.  It incorporates code from the standard Python library
0005 # which was then modified to work properly.
0006 
0007 # My own implementation of xmlrpc (both server and client)
0008 
0009 # It has a silly name so it doesn't class with standard Python
0010 # and library module names
0011 
0012 # This code initially tried to do XML-RPC over HTTP/1.1 persistent
0013 # connections over SSL (provided by M2Crypto).  That turned out into a
0014 # big nightmare of trying to mash libraries to work together that
0015 # didn't really want to.
0016 
0017 # This new version uses SSH (provided by paramiko).
0018 
0019 # Server design
0020 #
0021 # Main thread (which could be a daemon thread for the rest of the program)
0022 # creates the listening socket, and starts the connection handler threads.
0023 # They all sit in a loop.  They call accept, work on the lifetime of
0024 # a connection, and when it closes go back to accept.  When they get a
0025 # request, it is dumped into a queue for the main thread to deal with,
0026 # who then dumps the results back into a queue for the connection thread.
0027 # Consequently we get the benefits of threading for dealing with event
0028 # stuff, but the actual request handling still seems single threaded.
0029 
0030 
0031 TRACE=False
0032 
0033 
0034 # standard modules
0035 import threading
0036 import Queue
0037 import time
0038 import sys
0039 import xmlrpclib
0040 import base64
0041 import string
0042 import logging
0043 import os
0044 import socket
0045 
0046 # required add ons
0047 import paramiko
0048 
0049 # my modules
0050 if TRACE: import guihelper # to format exceptions
0051 import common
0052 
0053 class ServerChannel(paramiko.Channel):
0054 
0055     def __init__(self, chanid, peeraddr, username):
0056         self.chan=chanid
0057         self.peeraddr=peeraddr
0058         self.username=username
0059         paramiko.Channel.__init__(self, chanid)
0060         
0061     def readall(self, amount):
0062         result=""
0063         while amount:
0064             l=self.chan.recv(amount)
0065             if len(l)==0:
0066                 return result # eof
0067             result+=l
0068             amount-=len(l)
0069         return result
0070 
0071     def XMLRPCLoop(self, conn):
0072         """Main loop that deals with the XML-RPC data
0073 
0074         @param conn:  The connectionthread object we are working for
0075         """
0076         while True:
0077             try:
0078                 length=int(self.readall(8))
0079             except ValueError:
0080                 return
0081             xml=self.readall(length)
0082             response=conn.processxmlrpcrequest(xml, self.peeraddr,
0083                                            self.username)
0084             self.chan.sendall( ("%08d" % (len(response),))+response)
0085 
0086 class myServer(paramiko.ServerInterface):
0087 
0088     def __init__(self, peeraddr, onbehalfof):
0089         self.event = threading.Event()
0090         self.peeraddr=peeraddr
0091         self.onbehalfof=onbehalfof
0092         
0093     def get_allowed_auths(self, username):
0094         return "password"
0095 
0096     def check_auth_password(self, username, password):
0097         # we borrow the connection's queue object for this
0098         self.bf_auth_username="<invalid user>"
0099         conn=self.onbehalfof
0100         conn.log("Checking authentication for user "+`username`)
0101         msg=Server.Message(Server.Message.CMD_NEW_USER_REQUEST, conn.responsequeue,
0102                            self.peeraddr, data=(username,password))
0103         conn.requestqueue.put(msg)
0104         resp=conn.responsequeue.get()
0105         assert resp.cmd==resp.CMD_NEW_USER_RESPONSE
0106         if hasattr(resp, 'exception'):
0107             conn.logexception("Exception while checking authentication",resp.exception)
0108             return paramiko.AUTH_FAILED
0109         if resp.data:
0110             conn.log("Credentials ok for user "+`username`)
0111             self.bf_auth_username=username
0112             return paramiko.AUTH_SUCCESSFUL
0113         conn.log("Credentials not accepted for user "+`username`)
0114         return paramiko.AUTH_FAILED
0115 
0116     def check_channel_request(self, kind, chanid):
0117         if kind == 'bitfling':
0118             return paramiko.OPEN_SUCCEEDED
0119         return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
0120 
0121     
0122 class Server(threading.Thread):
0123 
0124     class Message:
0125         """A message between a connection thread and the server object, or vice versa"""
0126         # These are in the order things happen.  Logging happens all the time, and we
0127         # cycle XMLRPC requests and responses for the lifetime of a connection
0128         CMD_LOG=0                            # data is log message
0129         CMD_LOG_EXCEPTION=1                   # data is sys.exc_info()
0130         CMD_NEW_ACCEPT_REQUEST=2
0131         CMD_NEW_ACCEPT_RESPONSE=3
0132         CMD_NEW_USER_REQUEST=4
0133         CMD_NEW_USER_RESPONSE=5
0134         CMD_XMLRPC_REQUEST=6
0135         CMD_XMLRPC_RESPONSE=7
0136         CMD_CONNECTION_CLOSE=8
0137        
0138         def __init__(self, cmd, respondqueue=None, clientaddr=None, data=None):
0139             self.cmd=cmd
0140             self.respondqueue=respondqueue
0141             self.clientaddr=clientaddr
0142             self.data=data
0143 
0144         def __repr__(self):
0145             d=`self.data`
0146             if len(d)>40:
0147                 d=d[:40]
0148             str=`self.cmd`
0149             for i in dir(self):
0150                 if i.startswith("CMD_") and getattr(self, i)==self.cmd:
0151                     str=i
0152                     break
0153             return "Message: cmd=%s data=%s" % (str, d)
0154     
0155     class ConnectionThread(threading.Thread):
0156 
0157         def __init__(self, server, listen, queue, name):
0158             """Constructor
0159             
0160             @param server: reference to server object
0161             @param listen: socket object that is in listening state
0162             @param queue:  the queue object to send messages to
0163             @param name: name of this thread"""
0164             threading.Thread.__init__(self)
0165             self.setDaemon(True)
0166             self.setName(name)
0167             self.responsequeue=Queue.Queue()
0168             self.server=server
0169             self.requestqueue=queue
0170             self.listen=listen
0171 
0172         def log(self, str):
0173             now=time.time()
0174             t=time.localtime(now)
0175             timestr="&%d:%02d:%02d.%03d"  % ( t[3], t[4], t[5],  int((now-int(now))*1000))
0176             msg=Server.Message(Server.Message.CMD_LOG, data="%s: %s: %s" % (timestr, self.getName(), str))
0177             self.requestqueue.put(msg)
0178 
0179         def logexception(self, str, excinfo):
0180             if __debug__ and TRACE: print "exception %s\n%s" % (str, guihelper.formatexception(excinfo))
0181             self.log(str)
0182             msg=Server.Message(Server.Message.CMD_LOG_EXCEPTION, data=excinfo)
0183             self.requestqueue.put(msg)
0184                             
0185         def run(self):
0186             event=threading.Event()
0187             while not self.server.wantshutdown:
0188                 if __debug__ and TRACE: print self.getName()+": About to call accept"
0189                 try:
0190                     transport=None
0191                     event.clear()
0192                     # blocking wait for new connection to come in
0193                     sock, peeraddr = self.listen.accept()
0194                     # ask if we allow this connection
0195                     msg=Server.Message(Server.Message.CMD_NEW_ACCEPT_REQUEST, self.responsequeue, peeraddr)
0196                     self.requestqueue.put(msg)
0197                     resp=self.responsequeue.get()
0198                     assert resp.cmd==resp.CMD_NEW_ACCEPT_RESPONSE
0199                     ok=resp.data
0200                     if not ok:
0201                         self.log("Connection from "+`peeraddr`+" not accepted")
0202                         sock.close()
0203                         continue
0204                     # startup ssh stuff
0205                     self.log("Connection from "+`peeraddr`+" accepted")
0206                     transport=paramiko.Transport(sock)
0207                     transport.add_server_key(self.server.ssh_server_key)
0208                     transport.setDaemon(True)
0209                     srvr=myServer(peeraddr, self)
0210                     transport.start_server(event,srvr)
0211                                          
0212                 except:
0213                     if __debug__ and TRACE: print self.getName()+": Exception in accept block\n"+guihelper.formatexception()
0214                     self.logexception("Exception in accept", sys.exc_info())
0215                     if transport is not None: del transport
0216                     sock.close()
0217                     continue
0218 
0219                 # wait for it to become an SSH connection
0220                 event.wait()
0221                 if not event.isSet() or not transport.is_active():
0222                     self.log("Connection from "+`peeraddr`+" didn't do SSH negotiation")
0223                     transport.close()
0224                     sock.close()
0225                     continue
0226                 if __debug__ and TRACE: print self.getName()+": SSH connection from "+`peeraddr`
0227 
0228                 self.log("SSH negotiated from "+`peeraddr`)
0229 
0230                 chan=None
0231                 try:
0232                     chan=transport.accept()
0233                     serverchan=ServerChannel(chan,peeraddr,srvr.bf_auth_username)
0234                     serverchan.XMLRPCLoop(self)
0235                 except:
0236                     self.logexception("Exception in XMLRPCLoop", sys.exc_info())
0237 
0238                 if chan is not None:
0239                     chan.close()
0240                     del chan
0241 
0242                 if transport is not None:
0243                     transport.close()
0244                     del transport
0245 
0246                 msg=Server.Message(Server.Message.CMD_CONNECTION_CLOSE,  None, peeraddr)
0247                 self.requestqueue.put(msg)
0248                 self.log("Connection from "+`peeraddr`+" closed")
0249 
0250         def processxmlrpcrequest(self, data, client_addr, username):
0251             msg=Server.Message(Server.Message.CMD_XMLRPC_REQUEST, self.responsequeue, client_addr, data=(data, username))
0252             if __debug__ and TRACE:
0253                 self.log("%s: req %s" % (username, `data`))
0254             self.requestqueue.put(msg)
0255             resp=self.responsequeue.get()
0256             assert resp.cmd==resp.CMD_XMLRPC_RESPONSE
0257             if hasattr(resp, "exception"):
0258                 raise resp.exception
0259             return resp.data
0260             
0261 
0262     def __init__(self, host, port, servercert, connectionthreadcount=5, timecheck=60, connectionidlebreak=240):
0263         """Creates the listening thread and infrastructure.  Don't forget to call start() if you
0264         want anything to be processed!  You probably also want to call setDaemon().  Remember to
0265         load a certificate into the sslcontext.
0266 
0267         @param connectionthreadcount:  How many threads are being used.  If new connections
0268                             arrive while the existing threads are busy in connections, then they will be ignored
0269         @param timecheck:  How often shutdown requests are checked for in the main thread (only valid on Python 2.3+)
0270         @param connectionidlebreak: If an SSH connection is idle for this amount of time then it is closed
0271         """
0272         threading.Thread.__init__(self)
0273         self.setName("Threading SSH server controller for %s:%d" % (host, port))
0274         # setup logging
0275         l=logging.getLogger("paramiko")
0276         l.setLevel(logging.INFO)
0277         lh=FunctionLogHandler(self.OnLog)
0278         lh.setFormatter(logging.Formatter('%(levelname)-.3s [%(asctime)s] %(name)s: %(message)s', '%Y%m%d:%H%M%S'))
0279         l.addHandler(lh)
0280         self.ssh_server_key=servercert
0281         connection=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
0282         connection.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
0283         if not host:
0284             host="0.0.0.0"
0285         if __debug__ and TRACE: print "Binding to host %s port %d" % (host, port)
0286         connection.bind( (host, port) )
0287         connection.listen(connectionthreadcount+5)
0288         self.timecheck=timecheck
0289         self.connectionidlebreak=connectionidlebreak
0290         self.wantshutdown=False
0291         self.workqueue=Queue.Queue()
0292         self.threadlist=[]
0293         for count in range(connectionthreadcount):
0294             conthread=self.ConnectionThread(self, connection, self.workqueue, "SSH worker thread %d/%d" % (count+1, connectionthreadcount))
0295             conthread.start()
0296             self.threadlist.append(conthread)
0297 
0298     def shutdown(self):
0299         """Requests a shutdown of all threads"""
0300         self.wantshutdown=True
0301 
0302     def run23(self):
0303         while not self.wantshutdown:
0304             try:
0305                 msg=self.workqueue.get(True, self.timecheck)
0306             except Queue.Empty:
0307                 continue
0308             try:
0309                 self.processmessage(msg)
0310             except:
0311                 sys.excepthook(*sys.exc_info())
0312         
0313     def run22(self):
0314         while not self.wantshutdown:
0315             try:
0316                 msg=self.workqueue.get(True)
0317             except Queue.Empty:
0318                 continue
0319             try:
0320                 self.processmessage(msg)
0321             except:
0322                 sys.excepthook(*sys.exc_info())
0323                 
0324 
0325     if sys.version_info>=(2,3):
0326         run=run23
0327     else:
0328         run=run22
0329 
0330     def processmessage(self, msg):
0331         if not isinstance(msg, Server.Message):
0332             self.OnUserMessage(msg)
0333             return
0334         if __debug__ and TRACE:
0335             if not msg.cmd in (msg.CMD_LOG, msg.CMD_LOG_EXCEPTION):
0336                 print "Processing message "+`msg`
0337         resp=None
0338         if msg.cmd==msg.CMD_LOG:
0339             self.OnLog(msg.data)
0340             return
0341         elif msg.cmd==msg.CMD_LOG_EXCEPTION:
0342             self.OnLogException(msg.data)
0343             return
0344         elif msg.cmd==msg.CMD_NEW_ACCEPT_REQUEST:
0345             ok=self.OnNewAccept(msg.clientaddr)
0346             resp=Server.Message(Server.Message.CMD_NEW_ACCEPT_RESPONSE, data=ok)
0347         elif msg.cmd==msg.CMD_NEW_USER_REQUEST:
0348             ok=self.OnNewUser(msg.clientaddr, msg.data[0], msg.data[1])
0349             resp=Server.Message(Server.Message.CMD_NEW_USER_RESPONSE, data=ok)
0350         elif msg.cmd==msg.CMD_XMLRPC_REQUEST:
0351             data=self.OnXmlRpcRequest(* (msg.data+(msg.clientaddr,)))
0352             resp=Server.Message(Server.Message.CMD_XMLRPC_RESPONSE, data=data)
0353         elif msg.cmd==msg.CMD_CONNECTION_CLOSE:
0354             self.OnConnectionClose(msg.clientaddr)
0355         else:
0356             assert False, "Unknown message command "+`msg.cmd`
0357             raise Exception("Internal processing error")
0358         if resp is not None:
0359             msg.respondqueue.put(resp)
0360 
0361     def OnLog(self, str):
0362         """Process a log message"""
0363         print str
0364 
0365     def OnLogException(self, exc):
0366         """Process an exception message"""
0367         print exc[:2]
0368 
0369 
0370     def OnNewAccept(self, clientaddr):
0371         """Decide if we accept a new new connection"""
0372         return True
0373 
0374     def OnNewUser(self, clientaddr, username, password):
0375         """Decide if a user is allowed to authenticate"""
0376         return True
0377 
0378     def OnConnectionClose(self, clientaddr):
0379         """Called when a connection closes"""
0380         if __debug__ and TRACE: print "Closed connection from "+`clientaddr`
0381 
0382     def OnUserMessage(self, msg):
0383         """Called when a message arrives in the workqueue"""
0384 
0385     def OnXmlRpcRequest(self, xmldata, username, clientaddr):
0386         """Called when an XML-RPC request arrives, but before the XML is parsed"""
0387         params, method = xmlrpclib.loads(xmldata)
0388         # call method
0389         try:
0390             response=self.OnMethodDispatch(method, params, username, clientaddr)
0391             # wrap response in a singleton tuple
0392             response = (response,)
0393             response = xmlrpclib.dumps(response, methodresponse=1)
0394         except xmlrpclib.Fault, fault:
0395             response = xmlrpclib.dumps(fault)
0396         except:
0397             self.OnLog("Exception processing method "+`method`)
0398             self.OnLogException(sys.exc_info())
0399             # report exception back to server, with class name first
0400             # and then `object`.  The client end may be able to
0401             # re-raise it
0402             obj=sys.exc_info()[1]
0403             try:
0404                 klass="%s.%s" % (obj.__module__, obj.__name__)
0405             except:
0406                 klass="%s.%s" % (obj.__class__.__module__, obj.__class__.__name__)
0407             response = xmlrpclib.dumps(xmlrpclib.Fault(17, "%s:%s" % (klass, obj)))
0408 
0409         return response            
0410 
0411     def OnMethodDispatch(self, method, params, username, clientaddr):
0412         """Called once the XML-RPC request is parsed"""
0413         if __debug__ and TRACE: print "%s %s (user=%s, client=%s)" % (method, `tuple(params)`, username, `clientaddr`)
0414         if method=='add' and len(params)==2:
0415             return params[0]+params[1]
0416         raise xmlrpclib.Fault(10, "Unknown method "+method)
0417 
0418 # Copied from xmlrpclib.  This version is slightly modified to be derived from
0419 # object.  The reason is that if you print a _Method object, then the __str__
0420 # method is called, which tries to do it over XML-RPC!  Deriving from object
0421 # causes that and many other methods to be already present so it isn't a
0422 # problem.
0423 
0424 class _Method(object):
0425     # some magic to bind an XML-RPC method to an RPC server.
0426     # supports "nested" methods (e.g. examples.getStateName)
0427     def __init__(self, send, name):
0428         self.__send = send
0429         self.__name = name
0430     def __getattr__(self, name):
0431         return _Method(self.__send, "%s.%s" % (self.__name, name))
0432     def __call__(self, *args):
0433         return self.__send(self.__name, args)
0434 
0435 class CertificateNotAcceptedException(Exception):
0436     pass
0437 
0438 class ServerProxy:
0439     logsetup=False
0440     def __init__(self, username, password, host, port, certverifier=None):
0441         if not self.logsetup:
0442             # paramiko.util.log_to_file('serverproxy.log')
0443             self.logsetup=True
0444         self.__username=username
0445         self.__password=password
0446         self.__host=host
0447         self.__port=port
0448         self.__channel=None
0449         self.__certverifier=certverifier
0450 
0451     def __str__(self):
0452         return "<XML-RPC over SSH proxy for %s @ %s:%d>" % (self.__username, self.__host, self.__port)
0453 
0454     def __repr__(self):
0455         return "<XML-RPC over SSH proxy for %s @ %s:%d>" % (self.__username, self.__host, self.__port)
0456 
0457     def __makeconnection(self):
0458         self.__channel=None
0459         sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
0460         sock.connect( (self.__host, self.__port) )
0461         t=paramiko.Transport(sock)
0462         t.setDaemon(True)
0463         event=threading.Event()
0464         t.start_client(event)
0465         event.wait(15)
0466         if not t.is_active():
0467             raise Exception("No SSH on the other end: %s/%d" % (self.__host, self.__port) )
0468         key=t.get_remote_server_key()
0469         
0470         if self.__certverifier is not None:
0471             res=self.__certverifier( (self.__host, self.__port), key)
0472             if not res:
0473                 raise CertificateNotAcceptedException("Certificate not accepted for  %s @ %s:%d" % (self.__username, self.__host, self.__port))
0474             if not t.is_active():
0475                 raise Exception("Session has failed while waiting for certificate to be verified")
0476 
0477         event=threading.Event()
0478         t.auth_password(self.__username, self.__password, event)
0479         event.wait()
0480         if not t.is_active():
0481             raise Exception("Authentication to %s failed:  Username %s, password %s" % (self.__host, `self.__username`, `self.__password`))
0482         self.__channel=t.open_channel("bitfling")
0483 
0484     def __ensure_channel(self):
0485         if self.__channel is None:
0486             self.__makeconnection()
0487         if self.__channel is None:
0488             raise Exception("Unable to properly connect")
0489 
0490     def __getattr__(self, name):
0491         if name.startswith("__"):
0492             raise Exception("Bad method "+`name`)
0493         return _Method(self.__send, name)
0494 
0495     def __recvall(self, channel, amount):
0496         result=""
0497         while amount:
0498             l=channel.recv(amount)
0499             if len(l)==0:
0500                 return result # eof
0501             result+=l
0502             amount-=len(l)
0503         return result
0504 
0505     def __send(self, methodname, args):
0506         self.__ensure_channel()
0507         request=xmlrpclib.dumps(args, methodname, encoding=None) #  allow_none=False (allow_none is py2.3+)
0508         self.__channel.sendall( ("%08d" % (len(request),))+request)
0509         resplen=self.__recvall(self.__channel, 8)
0510         resplen=int(resplen)
0511         response=self.__recvall(self.__channel, resplen)
0512         p, u = xmlrpclib.getparser()
0513         p.feed(response)
0514         p.close()
0515         # if the response was a Fault, then it is raised by u.close()
0516         try:
0517             response=u.close()
0518         except xmlrpclib.Fault,e:
0519             if e.faultCode!=17:
0520                 raise e
0521             klass,str=e.faultString.split(':', 1)
0522             raise common.getfullname(klass)(str)
0523             
0524         if len(response)==1:
0525             response=response[0]
0526         return response
0527         
0528 class FunctionLogHandler(logging.Handler):
0529     "Log handler that calls a specified function"
0530     def __init__(self, function, level=logging.NOTSET):
0531         logging.Handler.__init__(self, level)
0532         self.function=function
0533 
0534     def emit(self, record):
0535         self.function(record.getMessage())
0536 
0537 
0538 if __name__=='__main__':
0539     if len(sys.argv)<2:
0540         print "You must supply arguments - one of"
0541         print "  server"
0542         print "  client"
0543         sys.exit(1)
0544 
0545     if sys.argv[1]=="server":
0546         #cert=paramiko.DSSKey()
0547         #cert.read_private_key_file(os.path.expanduser("~/.bitfling.key"))
0548         cert=paramiko.DSSKey.from_private_key_file("~/.bitfling.key")
0549         server=Server('', 12652, cert)
0550         server.setDaemon(True)
0551         server.start()
0552 
0553         time.sleep(1120)
0554 
0555     if sys.argv[1]=="client":
0556         server=ServerProxy('username', 'password', 'localhost', 12652)
0557 
0558         print server.add(3,4)
0559         print server.add("one", "two")
0560 
0561 

Generated by PyXR 0.9.4