0001 #!/usr/bin/env python 0002 0003 # This file is double licensed as the BitPim License and the Python 0004 # licenses. It incorporates code from the standard Python library 0005 # which was then modified to work properly. 0006 0007 # My own implementation of xmlrpc (both server and client) 0008 0009 # It has a silly name so it doesn't class with standard Python 0010 # and library module names 0011 0012 # This code initially tried to do XML-RPC over HTTP/1.1 persistent 0013 # connections over SSL (provided by M2Crypto). That turned out into a 0014 # big nightmare of trying to mash libraries to work together that 0015 # didn't really want to. 0016 0017 # This new version uses SSH (provided by paramiko). 0018 0019 # Server design 0020 # 0021 # Main thread (which could be a daemon thread for the rest of the program) 0022 # creates the listening socket, and starts the connection handler threads. 0023 # They all sit in a loop. They call accept, work on the lifetime of 0024 # a connection, and when it closes go back to accept. When they get a 0025 # request, it is dumped into a queue for the main thread to deal with, 0026 # who then dumps the results back into a queue for the connection thread. 0027 # Consequently we get the benefits of threading for dealing with event 0028 # stuff, but the actual request handling still seems single threaded. 0029 0030 0031 TRACE=False 0032 0033 0034 # standard modules 0035 import threading 0036 import Queue 0037 import time 0038 import sys 0039 import xmlrpclib 0040 import base64 0041 import string 0042 import logging 0043 import os 0044 import socket 0045 0046 # required add ons 0047 import paramiko 0048 0049 # my modules 0050 if TRACE: import guihelper # to format exceptions 0051 import common 0052 0053 class ServerChannel(paramiko.Channel): 0054 0055 def __init__(self, chanid, peeraddr, username): 0056 self.chan=chanid 0057 self.peeraddr=peeraddr 0058 self.username=username 0059 paramiko.Channel.__init__(self, chanid) 0060 0061 def readall(self, amount): 0062 result="" 0063 while amount: 0064 l=self.chan.recv(amount) 0065 if len(l)==0: 0066 return result # eof 0067 result+=l 0068 amount-=len(l) 0069 return result 0070 0071 def XMLRPCLoop(self, conn): 0072 """Main loop that deals with the XML-RPC data 0073 0074 @param conn: The connectionthread object we are working for 0075 """ 0076 while True: 0077 try: 0078 length=int(self.readall(8)) 0079 except ValueError: 0080 return 0081 xml=self.readall(length) 0082 response=conn.processxmlrpcrequest(xml, self.peeraddr, 0083 self.username) 0084 self.chan.sendall( ("%08d" % (len(response),))+response) 0085 0086 class myServer(paramiko.ServerInterface): 0087 0088 def __init__(self, peeraddr, onbehalfof): 0089 self.event = threading.Event() 0090 self.peeraddr=peeraddr 0091 self.onbehalfof=onbehalfof 0092 0093 def get_allowed_auths(self, username): 0094 return "password" 0095 0096 def check_auth_password(self, username, password): 0097 # we borrow the connection's queue object for this 0098 self.bf_auth_username="<invalid user>" 0099 conn=self.onbehalfof 0100 conn.log("Checking authentication for user "+`username`) 0101 msg=Server.Message(Server.Message.CMD_NEW_USER_REQUEST, conn.responsequeue, 0102 self.peeraddr, data=(username,password)) 0103 conn.requestqueue.put(msg) 0104 resp=conn.responsequeue.get() 0105 assert resp.cmd==resp.CMD_NEW_USER_RESPONSE 0106 if hasattr(resp, 'exception'): 0107 conn.logexception("Exception while checking authentication",resp.exception) 0108 return paramiko.AUTH_FAILED 0109 if resp.data: 0110 conn.log("Credentials ok for user "+`username`) 0111 self.bf_auth_username=username 0112 return paramiko.AUTH_SUCCESSFUL 0113 conn.log("Credentials not accepted for user "+`username`) 0114 return paramiko.AUTH_FAILED 0115 0116 def check_channel_request(self, kind, chanid): 0117 if kind == 'bitfling': 0118 return paramiko.OPEN_SUCCEEDED 0119 return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED 0120 0121 0122 class Server(threading.Thread): 0123 0124 class Message: 0125 """A message between a connection thread and the server object, or vice versa""" 0126 # These are in the order things happen. Logging happens all the time, and we 0127 # cycle XMLRPC requests and responses for the lifetime of a connection 0128 CMD_LOG=0 # data is log message 0129 CMD_LOG_EXCEPTION=1 # data is sys.exc_info() 0130 CMD_NEW_ACCEPT_REQUEST=2 0131 CMD_NEW_ACCEPT_RESPONSE=3 0132 CMD_NEW_USER_REQUEST=4 0133 CMD_NEW_USER_RESPONSE=5 0134 CMD_XMLRPC_REQUEST=6 0135 CMD_XMLRPC_RESPONSE=7 0136 CMD_CONNECTION_CLOSE=8 0137 0138 def __init__(self, cmd, respondqueue=None, clientaddr=None, data=None): 0139 self.cmd=cmd 0140 self.respondqueue=respondqueue 0141 self.clientaddr=clientaddr 0142 self.data=data 0143 0144 def __repr__(self): 0145 d=`self.data` 0146 if len(d)>40: 0147 d=d[:40] 0148 str=`self.cmd` 0149 for i in dir(self): 0150 if i.startswith("CMD_") and getattr(self, i)==self.cmd: 0151 str=i 0152 break 0153 return "Message: cmd=%s data=%s" % (str, d) 0154 0155 class ConnectionThread(threading.Thread): 0156 0157 def __init__(self, server, listen, queue, name): 0158 """Constructor 0159 0160 @param server: reference to server object 0161 @param listen: socket object that is in listening state 0162 @param queue: the queue object to send messages to 0163 @param name: name of this thread""" 0164 threading.Thread.__init__(self) 0165 self.setDaemon(True) 0166 self.setName(name) 0167 self.responsequeue=Queue.Queue() 0168 self.server=server 0169 self.requestqueue=queue 0170 self.listen=listen 0171 0172 def log(self, str): 0173 now=time.time() 0174 t=time.localtime(now) 0175 timestr="&%d:%02d:%02d.%03d" % ( t[3], t[4], t[5], int((now-int(now))*1000)) 0176 msg=Server.Message(Server.Message.CMD_LOG, data="%s: %s: %s" % (timestr, self.getName(), str)) 0177 self.requestqueue.put(msg) 0178 0179 def logexception(self, str, excinfo): 0180 if __debug__ and TRACE: print "exception %s\n%s" % (str, guihelper.formatexception(excinfo)) 0181 self.log(str) 0182 msg=Server.Message(Server.Message.CMD_LOG_EXCEPTION, data=excinfo) 0183 self.requestqueue.put(msg) 0184 0185 def run(self): 0186 event=threading.Event() 0187 while not self.server.wantshutdown: 0188 if __debug__ and TRACE: print self.getName()+": About to call accept" 0189 try: 0190 transport=None 0191 event.clear() 0192 # blocking wait for new connection to come in 0193 sock, peeraddr = self.listen.accept() 0194 # ask if we allow this connection 0195 msg=Server.Message(Server.Message.CMD_NEW_ACCEPT_REQUEST, self.responsequeue, peeraddr) 0196 self.requestqueue.put(msg) 0197 resp=self.responsequeue.get() 0198 assert resp.cmd==resp.CMD_NEW_ACCEPT_RESPONSE 0199 ok=resp.data 0200 if not ok: 0201 self.log("Connection from "+`peeraddr`+" not accepted") 0202 sock.close() 0203 continue 0204 # startup ssh stuff 0205 self.log("Connection from "+`peeraddr`+" accepted") 0206 transport=paramiko.Transport(sock) 0207 transport.add_server_key(self.server.ssh_server_key) 0208 transport.setDaemon(True) 0209 srvr=myServer(peeraddr, self) 0210 transport.start_server(event,srvr) 0211 0212 except: 0213 if __debug__ and TRACE: print self.getName()+": Exception in accept block\n"+guihelper.formatexception() 0214 self.logexception("Exception in accept", sys.exc_info()) 0215 if transport is not None: del transport 0216 sock.close() 0217 continue 0218 0219 # wait for it to become an SSH connection 0220 event.wait() 0221 if not event.isSet() or not transport.is_active(): 0222 self.log("Connection from "+`peeraddr`+" didn't do SSH negotiation") 0223 transport.close() 0224 sock.close() 0225 continue 0226 if __debug__ and TRACE: print self.getName()+": SSH connection from "+`peeraddr` 0227 0228 self.log("SSH negotiated from "+`peeraddr`) 0229 0230 chan=None 0231 try: 0232 chan=transport.accept() 0233 serverchan=ServerChannel(chan,peeraddr,srvr.bf_auth_username) 0234 serverchan.XMLRPCLoop(self) 0235 except: 0236 self.logexception("Exception in XMLRPCLoop", sys.exc_info()) 0237 0238 if chan is not None: 0239 chan.close() 0240 del chan 0241 0242 if transport is not None: 0243 transport.close() 0244 del transport 0245 0246 msg=Server.Message(Server.Message.CMD_CONNECTION_CLOSE, None, peeraddr) 0247 self.requestqueue.put(msg) 0248 self.log("Connection from "+`peeraddr`+" closed") 0249 0250 def processxmlrpcrequest(self, data, client_addr, username): 0251 msg=Server.Message(Server.Message.CMD_XMLRPC_REQUEST, self.responsequeue, client_addr, data=(data, username)) 0252 if __debug__ and TRACE: 0253 self.log("%s: req %s" % (username, `data`)) 0254 self.requestqueue.put(msg) 0255 resp=self.responsequeue.get() 0256 assert resp.cmd==resp.CMD_XMLRPC_RESPONSE 0257 if hasattr(resp, "exception"): 0258 raise resp.exception 0259 return resp.data 0260 0261 0262 def __init__(self, host, port, servercert, connectionthreadcount=5, timecheck=60, connectionidlebreak=240): 0263 """Creates the listening thread and infrastructure. Don't forget to call start() if you 0264 want anything to be processed! You probably also want to call setDaemon(). Remember to 0265 load a certificate into the sslcontext. 0266 0267 @param connectionthreadcount: How many threads are being used. If new connections 0268 arrive while the existing threads are busy in connections, then they will be ignored 0269 @param timecheck: How often shutdown requests are checked for in the main thread (only valid on Python 2.3+) 0270 @param connectionidlebreak: If an SSH connection is idle for this amount of time then it is closed 0271 """ 0272 threading.Thread.__init__(self) 0273 self.setName("Threading SSH server controller for %s:%d" % (host, port)) 0274 # setup logging 0275 l=logging.getLogger("paramiko") 0276 l.setLevel(logging.INFO) 0277 lh=FunctionLogHandler(self.OnLog) 0278 lh.setFormatter(logging.Formatter('%(levelname)-.3s [%(asctime)s] %(name)s: %(message)s', '%Y%m%d:%H%M%S')) 0279 l.addHandler(lh) 0280 self.ssh_server_key=servercert 0281 connection=socket.socket(socket.AF_INET, socket.SOCK_STREAM) 0282 connection.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 0283 if not host: 0284 host="0.0.0.0" 0285 if __debug__ and TRACE: print "Binding to host %s port %d" % (host, port) 0286 connection.bind( (host, port) ) 0287 connection.listen(connectionthreadcount+5) 0288 self.timecheck=timecheck 0289 self.connectionidlebreak=connectionidlebreak 0290 self.wantshutdown=False 0291 self.workqueue=Queue.Queue() 0292 self.threadlist=[] 0293 for count in range(connectionthreadcount): 0294 conthread=self.ConnectionThread(self, connection, self.workqueue, "SSH worker thread %d/%d" % (count+1, connectionthreadcount)) 0295 conthread.start() 0296 self.threadlist.append(conthread) 0297 0298 def shutdown(self): 0299 """Requests a shutdown of all threads""" 0300 self.wantshutdown=True 0301 0302 def run23(self): 0303 while not self.wantshutdown: 0304 try: 0305 msg=self.workqueue.get(True, self.timecheck) 0306 except Queue.Empty: 0307 continue 0308 try: 0309 self.processmessage(msg) 0310 except: 0311 sys.excepthook(*sys.exc_info()) 0312 0313 def run22(self): 0314 while not self.wantshutdown: 0315 try: 0316 msg=self.workqueue.get(True) 0317 except Queue.Empty: 0318 continue 0319 try: 0320 self.processmessage(msg) 0321 except: 0322 sys.excepthook(*sys.exc_info()) 0323 0324 0325 if sys.version_info>=(2,3): 0326 run=run23 0327 else: 0328 run=run22 0329 0330 def processmessage(self, msg): 0331 if not isinstance(msg, Server.Message): 0332 self.OnUserMessage(msg) 0333 return 0334 if __debug__ and TRACE: 0335 if not msg.cmd in (msg.CMD_LOG, msg.CMD_LOG_EXCEPTION): 0336 print "Processing message "+`msg` 0337 resp=None 0338 if msg.cmd==msg.CMD_LOG: 0339 self.OnLog(msg.data) 0340 return 0341 elif msg.cmd==msg.CMD_LOG_EXCEPTION: 0342 self.OnLogException(msg.data) 0343 return 0344 elif msg.cmd==msg.CMD_NEW_ACCEPT_REQUEST: 0345 ok=self.OnNewAccept(msg.clientaddr) 0346 resp=Server.Message(Server.Message.CMD_NEW_ACCEPT_RESPONSE, data=ok) 0347 elif msg.cmd==msg.CMD_NEW_USER_REQUEST: 0348 ok=self.OnNewUser(msg.clientaddr, msg.data[0], msg.data[1]) 0349 resp=Server.Message(Server.Message.CMD_NEW_USER_RESPONSE, data=ok) 0350 elif msg.cmd==msg.CMD_XMLRPC_REQUEST: 0351 data=self.OnXmlRpcRequest(* (msg.data+(msg.clientaddr,))) 0352 resp=Server.Message(Server.Message.CMD_XMLRPC_RESPONSE, data=data) 0353 elif msg.cmd==msg.CMD_CONNECTION_CLOSE: 0354 self.OnConnectionClose(msg.clientaddr) 0355 else: 0356 assert False, "Unknown message command "+`msg.cmd` 0357 raise Exception("Internal processing error") 0358 if resp is not None: 0359 msg.respondqueue.put(resp) 0360 0361 def OnLog(self, str): 0362 """Process a log message""" 0363 print str 0364 0365 def OnLogException(self, exc): 0366 """Process an exception message""" 0367 print exc[:2] 0368 0369 0370 def OnNewAccept(self, clientaddr): 0371 """Decide if we accept a new new connection""" 0372 return True 0373 0374 def OnNewUser(self, clientaddr, username, password): 0375 """Decide if a user is allowed to authenticate""" 0376 return True 0377 0378 def OnConnectionClose(self, clientaddr): 0379 """Called when a connection closes""" 0380 if __debug__ and TRACE: print "Closed connection from "+`clientaddr` 0381 0382 def OnUserMessage(self, msg): 0383 """Called when a message arrives in the workqueue""" 0384 0385 def OnXmlRpcRequest(self, xmldata, username, clientaddr): 0386 """Called when an XML-RPC request arrives, but before the XML is parsed""" 0387 params, method = xmlrpclib.loads(xmldata) 0388 # call method 0389 try: 0390 response=self.OnMethodDispatch(method, params, username, clientaddr) 0391 # wrap response in a singleton tuple 0392 response = (response,) 0393 response = xmlrpclib.dumps(response, methodresponse=1) 0394 except xmlrpclib.Fault, fault: 0395 response = xmlrpclib.dumps(fault) 0396 except: 0397 self.OnLog("Exception processing method "+`method`) 0398 self.OnLogException(sys.exc_info()) 0399 # report exception back to server, with class name first 0400 # and then `object`. The client end may be able to 0401 # re-raise it 0402 obj=sys.exc_info()[1] 0403 try: 0404 klass="%s.%s" % (obj.__module__, obj.__name__) 0405 except: 0406 klass="%s.%s" % (obj.__class__.__module__, obj.__class__.__name__) 0407 response = xmlrpclib.dumps(xmlrpclib.Fault(17, "%s:%s" % (klass, obj))) 0408 0409 return response 0410 0411 def OnMethodDispatch(self, method, params, username, clientaddr): 0412 """Called once the XML-RPC request is parsed""" 0413 if __debug__ and TRACE: print "%s %s (user=%s, client=%s)" % (method, `tuple(params)`, username, `clientaddr`) 0414 if method=='add' and len(params)==2: 0415 return params[0]+params[1] 0416 raise xmlrpclib.Fault(10, "Unknown method "+method) 0417 0418 # Copied from xmlrpclib. This version is slightly modified to be derived from 0419 # object. The reason is that if you print a _Method object, then the __str__ 0420 # method is called, which tries to do it over XML-RPC! Deriving from object 0421 # causes that and many other methods to be already present so it isn't a 0422 # problem. 0423 0424 class _Method(object): 0425 # some magic to bind an XML-RPC method to an RPC server. 0426 # supports "nested" methods (e.g. examples.getStateName) 0427 def __init__(self, send, name): 0428 self.__send = send 0429 self.__name = name 0430 def __getattr__(self, name): 0431 return _Method(self.__send, "%s.%s" % (self.__name, name)) 0432 def __call__(self, *args): 0433 return self.__send(self.__name, args) 0434 0435 class CertificateNotAcceptedException(Exception): 0436 pass 0437 0438 class ServerProxy: 0439 logsetup=False 0440 def __init__(self, username, password, host, port, certverifier=None): 0441 if not self.logsetup: 0442 # paramiko.util.log_to_file('serverproxy.log') 0443 self.logsetup=True 0444 self.__username=username 0445 self.__password=password 0446 self.__host=host 0447 self.__port=port 0448 self.__channel=None 0449 self.__certverifier=certverifier 0450 0451 def __str__(self): 0452 return "<XML-RPC over SSH proxy for %s @ %s:%d>" % (self.__username, self.__host, self.__port) 0453 0454 def __repr__(self): 0455 return "<XML-RPC over SSH proxy for %s @ %s:%d>" % (self.__username, self.__host, self.__port) 0456 0457 def __makeconnection(self): 0458 self.__channel=None 0459 sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM) 0460 sock.connect( (self.__host, self.__port) ) 0461 t=paramiko.Transport(sock) 0462 t.setDaemon(True) 0463 event=threading.Event() 0464 t.start_client(event) 0465 event.wait(15) 0466 if not t.is_active(): 0467 raise Exception("No SSH on the other end: %s/%d" % (self.__host, self.__port) ) 0468 key=t.get_remote_server_key() 0469 0470 if self.__certverifier is not None: 0471 res=self.__certverifier( (self.__host, self.__port), key) 0472 if not res: 0473 raise CertificateNotAcceptedException("Certificate not accepted for %s @ %s:%d" % (self.__username, self.__host, self.__port)) 0474 if not t.is_active(): 0475 raise Exception("Session has failed while waiting for certificate to be verified") 0476 0477 event=threading.Event() 0478 t.auth_password(self.__username, self.__password, event) 0479 event.wait() 0480 if not t.is_active(): 0481 raise Exception("Authentication to %s failed: Username %s, password %s" % (self.__host, `self.__username`, `self.__password`)) 0482 self.__channel=t.open_channel("bitfling") 0483 0484 def __ensure_channel(self): 0485 if self.__channel is None: 0486 self.__makeconnection() 0487 if self.__channel is None: 0488 raise Exception("Unable to properly connect") 0489 0490 def __getattr__(self, name): 0491 if name.startswith("__"): 0492 raise Exception("Bad method "+`name`) 0493 return _Method(self.__send, name) 0494 0495 def __recvall(self, channel, amount): 0496 result="" 0497 while amount: 0498 l=channel.recv(amount) 0499 if len(l)==0: 0500 return result # eof 0501 result+=l 0502 amount-=len(l) 0503 return result 0504 0505 def __send(self, methodname, args): 0506 self.__ensure_channel() 0507 request=xmlrpclib.dumps(args, methodname, encoding=None) # allow_none=False (allow_none is py2.3+) 0508 self.__channel.sendall( ("%08d" % (len(request),))+request) 0509 resplen=self.__recvall(self.__channel, 8) 0510 resplen=int(resplen) 0511 response=self.__recvall(self.__channel, resplen) 0512 p, u = xmlrpclib.getparser() 0513 p.feed(response) 0514 p.close() 0515 # if the response was a Fault, then it is raised by u.close() 0516 try: 0517 response=u.close() 0518 except xmlrpclib.Fault,e: 0519 if e.faultCode!=17: 0520 raise e 0521 klass,str=e.faultString.split(':', 1) 0522 raise common.getfullname(klass)(str) 0523 0524 if len(response)==1: 0525 response=response[0] 0526 return response 0527 0528 class FunctionLogHandler(logging.Handler): 0529 "Log handler that calls a specified function" 0530 def __init__(self, function, level=logging.NOTSET): 0531 logging.Handler.__init__(self, level) 0532 self.function=function 0533 0534 def emit(self, record): 0535 self.function(record.getMessage()) 0536 0537 0538 if __name__=='__main__': 0539 if len(sys.argv)<2: 0540 print "You must supply arguments - one of" 0541 print " server" 0542 print " client" 0543 sys.exit(1) 0544 0545 if sys.argv[1]=="server": 0546 #cert=paramiko.DSSKey() 0547 #cert.read_private_key_file(os.path.expanduser("~/.bitfling.key")) 0548 cert=paramiko.DSSKey.from_private_key_file("~/.bitfling.key") 0549 server=Server('', 12652, cert) 0550 server.setDaemon(True) 0551 server.start() 0552 0553 time.sleep(1120) 0554 0555 if sys.argv[1]=="client": 0556 server=ServerProxy('username', 'password', 'localhost', 12652) 0557 0558 print server.add(3,4) 0559 print server.add("one", "two") 0560 0561
Generated by PyXR 0.9.4