Package bitfling :: Module xmlrpcstuff
[hide private]
[frames] | no frames]

Source Code for Module bitfling.xmlrpcstuff

  1  #!/usr/bin/env python 
  2   
  3  # This file is double licensed as the BitPim License and the Python 
  4  # licenses.  It incorporates code from the standard Python library 
  5  # which was then modified to work properly. 
  6   
  7  # My own implementation of xmlrpc (both server and client) 
  8   
  9  # It has a silly name so it doesn't class with standard Python 
 10  # and library module names 
 11   
 12  # This code initially tried to do XML-RPC over HTTP/1.1 persistent 
 13  # connections over SSL (provided by M2Crypto).  That turned out into a 
 14  # big nightmare of trying to mash libraries to work together that 
 15  # didn't really want to. 
 16   
 17  # This new version uses SSH (provided by paramiko). 
 18   
 19  # Server design 
 20  # 
 21  # Main thread (which could be a daemon thread for the rest of the program) 
 22  # creates the listening socket, and starts the connection handler threads. 
 23  # They all sit in a loop.  They call accept, work on the lifetime of 
 24  # a connection, and when it closes go back to accept.  When they get a 
 25  # request, it is dumped into a queue for the main thread to deal with, 
 26  # who then dumps the results back into a queue for the connection thread. 
 27  # Consequently we get the benefits of threading for dealing with event 
 28  # stuff, but the actual request handling still seems single threaded. 
 29   
 30   
 31  TRACE=False 
 32   
 33   
 34  # standard modules 
 35  import threading 
 36  import Queue 
 37  import time 
 38  import sys 
 39  import xmlrpclib 
 40  import base64 
 41  import string 
 42  import logging 
 43  import os 
 44  import socket 
 45   
 46  # required add ons 
 47  import paramiko 
 48   
 49  # my modules 
 50  if TRACE: import guihelper # to format exceptions 
 51  import common 
 52   
53 -class ServerChannel(paramiko.Channel):
54
55 - def __init__(self, chanid, peeraddr, username):
56 self.chan=chanid 57 self.peeraddr=peeraddr 58 self.username=username 59 paramiko.Channel.__init__(self, chanid)
60
61 - def readall(self, amount):
62 result="" 63 while amount: 64 l=self.chan.recv(amount) 65 if len(l)==0: 66 return result # eof 67 result+=l 68 amount-=len(l) 69 return result
70
71 - def XMLRPCLoop(self, conn):
72 """Main loop that deals with the XML-RPC data 73 74 @param conn: The connectionthread object we are working for 75 """ 76 while True: 77 try: 78 length=int(self.readall(8)) 79 except ValueError: 80 return 81 xml=self.readall(length) 82 response=conn.processxmlrpcrequest(xml, self.peeraddr, 83 self.username) 84 self.chan.sendall( ("%08d" % (len(response),))+response)
85
86 -class myServer(paramiko.ServerInterface):
87
88 - def __init__(self, peeraddr, onbehalfof):
89 self.event = threading.Event() 90 self.peeraddr=peeraddr 91 self.onbehalfof=onbehalfof
92
93 - def get_allowed_auths(self, username):
94 return "password"
95
96 - def check_auth_password(self, username, password):
97 # we borrow the connection's queue object for this 98 self.bf_auth_username="<invalid user>" 99 conn=self.onbehalfof 100 conn.log("Checking authentication for user "+`username`) 101 msg=Server.Message(Server.Message.CMD_NEW_USER_REQUEST, conn.responsequeue, 102 self.peeraddr, data=(username,password)) 103 conn.requestqueue.put(msg) 104 resp=conn.responsequeue.get() 105 assert resp.cmd==resp.CMD_NEW_USER_RESPONSE 106 if hasattr(resp, 'exception'): 107 conn.logexception("Exception while checking authentication",resp.exception) 108 return paramiko.AUTH_FAILED 109 if resp.data: 110 conn.log("Credentials ok for user "+`username`) 111 self.bf_auth_username=username 112 return paramiko.AUTH_SUCCESSFUL 113 conn.log("Credentials not accepted for user "+`username`) 114 return paramiko.AUTH_FAILED
115
116 - def check_channel_request(self, kind, chanid):
117 if kind == 'bitfling': 118 return paramiko.OPEN_SUCCEEDED 119 return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
120 121
122 -class Server(threading.Thread):
123
124 - class Message:
125 """A message between a connection thread and the server object, or vice versa""" 126 # These are in the order things happen. Logging happens all the time, and we 127 # cycle XMLRPC requests and responses for the lifetime of a connection 128 CMD_LOG=0 # data is log message 129 CMD_LOG_EXCEPTION=1 # data is sys.exc_info() 130 CMD_NEW_ACCEPT_REQUEST=2 131 CMD_NEW_ACCEPT_RESPONSE=3 132 CMD_NEW_USER_REQUEST=4 133 CMD_NEW_USER_RESPONSE=5 134 CMD_XMLRPC_REQUEST=6 135 CMD_XMLRPC_RESPONSE=7 136 CMD_CONNECTION_CLOSE=8 137
138 - def __init__(self, cmd, respondqueue=None, clientaddr=None, data=None):
139 self.cmd=cmd 140 self.respondqueue=respondqueue 141 self.clientaddr=clientaddr 142 self.data=data
143
144 - def __repr__(self):
145 d=`self.data` 146 if len(d)>40: 147 d=d[:40] 148 str=`self.cmd` 149 for i in dir(self): 150 if i.startswith("CMD_") and getattr(self, i)==self.cmd: 151 str=i 152 break 153 return "Message: cmd=%s data=%s" % (str, d)
154
155 - class ConnectionThread(threading.Thread):
156
157 - def __init__(self, server, listen, queue, name):
158 """Constructor 159 160 @param server: reference to server object 161 @param listen: socket object that is in listening state 162 @param queue: the queue object to send messages to 163 @param name: name of this thread""" 164 threading.Thread.__init__(self) 165 self.setDaemon(True) 166 self.setName(name) 167 self.responsequeue=Queue.Queue() 168 self.server=server 169 self.requestqueue=queue 170 self.listen=listen
171
172 - def log(self, str):
173 now=time.time() 174 t=time.localtime(now) 175 timestr="&%d:%02d:%02d.%03d" % ( t[3], t[4], t[5], int((now-int(now))*1000)) 176 msg=Server.Message(Server.Message.CMD_LOG, data="%s: %s: %s" % (timestr, self.getName(), str)) 177 self.requestqueue.put(msg)
178
179 - def logexception(self, str, excinfo):
180 if __debug__ and TRACE: print "exception %s\n%s" % (str, guihelper.formatexception(excinfo)) 181 self.log(str) 182 msg=Server.Message(Server.Message.CMD_LOG_EXCEPTION, data=excinfo) 183 self.requestqueue.put(msg)
184
185 - def run(self):
186 event=threading.Event() 187 while not self.server.wantshutdown: 188 if __debug__ and TRACE: print self.getName()+": About to call accept" 189 try: 190 transport=None 191 event.clear() 192 # blocking wait for new connection to come in 193 sock, peeraddr = self.listen.accept() 194 # ask if we allow this connection 195 msg=Server.Message(Server.Message.CMD_NEW_ACCEPT_REQUEST, self.responsequeue, peeraddr) 196 self.requestqueue.put(msg) 197 resp=self.responsequeue.get() 198 assert resp.cmd==resp.CMD_NEW_ACCEPT_RESPONSE 199 ok=resp.data 200 if not ok: 201 self.log("Connection from "+`peeraddr`+" not accepted") 202 sock.close() 203 continue 204 # startup ssh stuff 205 self.log("Connection from "+`peeraddr`+" accepted") 206 transport=paramiko.Transport(sock) 207 transport.add_server_key(self.server.ssh_server_key) 208 transport.setDaemon(True) 209 srvr=myServer(peeraddr, self) 210 transport.start_server(event,srvr) 211 212 except: 213 if __debug__ and TRACE: print self.getName()+": Exception in accept block\n"+guihelper.formatexception() 214 self.logexception("Exception in accept", sys.exc_info()) 215 if transport is not None: del transport 216 sock.close() 217 continue 218 219 # wait for it to become an SSH connection 220 event.wait() 221 if not event.isSet() or not transport.is_active(): 222 self.log("Connection from "+`peeraddr`+" didn't do SSH negotiation") 223 transport.close() 224 sock.close() 225 continue 226 if __debug__ and TRACE: print self.getName()+": SSH connection from "+`peeraddr` 227 228 self.log("SSH negotiated from "+`peeraddr`) 229 230 chan=None 231 try: 232 chan=transport.accept() 233 serverchan=ServerChannel(chan,peeraddr,srvr.bf_auth_username) 234 serverchan.XMLRPCLoop(self) 235 except: 236 self.logexception("Exception in XMLRPCLoop", sys.exc_info()) 237 238 if chan is not None: 239 chan.close() 240 del chan 241 242 if transport is not None: 243 transport.close() 244 del transport 245 246 msg=Server.Message(Server.Message.CMD_CONNECTION_CLOSE, None, peeraddr) 247 self.requestqueue.put(msg) 248 self.log("Connection from "+`peeraddr`+" closed")
249
250 - def processxmlrpcrequest(self, data, client_addr, username):
251 msg=Server.Message(Server.Message.CMD_XMLRPC_REQUEST, self.responsequeue, client_addr, data=(data, username)) 252 if __debug__ and TRACE: 253 self.log("%s: req %s" % (username, `data`)) 254 self.requestqueue.put(msg) 255 resp=self.responsequeue.get() 256 assert resp.cmd==resp.CMD_XMLRPC_RESPONSE 257 if hasattr(resp, "exception"): 258 raise resp.exception 259 return resp.data
260 261
262 - def __init__(self, host, port, servercert, connectionthreadcount=5, timecheck=60, connectionidlebreak=240):
263 """Creates the listening thread and infrastructure. Don't forget to call start() if you 264 want anything to be processed! You probably also want to call setDaemon(). Remember to 265 load a certificate into the sslcontext. 266 267 @param connectionthreadcount: How many threads are being used. If new connections 268 arrive while the existing threads are busy in connections, then they will be ignored 269 @param timecheck: How often shutdown requests are checked for in the main thread (only valid on Python 2.3+) 270 @param connectionidlebreak: If an SSH connection is idle for this amount of time then it is closed 271 """ 272 threading.Thread.__init__(self) 273 self.setName("Threading SSH server controller for %s:%d" % (host, port)) 274 # setup logging 275 l=logging.getLogger("paramiko") 276 l.setLevel(logging.INFO) 277 lh=FunctionLogHandler(self.OnLog) 278 lh.setFormatter(logging.Formatter('%(levelname)-.3s [%(asctime)s] %(name)s: %(message)s', '%Y%m%d:%H%M%S')) 279 l.addHandler(lh) 280 self.ssh_server_key=servercert 281 connection=socket.socket(socket.AF_INET, socket.SOCK_STREAM) 282 connection.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 283 if not host: 284 host="0.0.0.0" 285 if __debug__ and TRACE: print "Binding to host %s port %d" % (host, port) 286 connection.bind( (host, port) ) 287 connection.listen(connectionthreadcount+5) 288 self.timecheck=timecheck 289 self.connectionidlebreak=connectionidlebreak 290 self.wantshutdown=False 291 self.workqueue=Queue.Queue() 292 self.threadlist=[] 293 for count in range(connectionthreadcount): 294 conthread=self.ConnectionThread(self, connection, self.workqueue, "SSH worker thread %d/%d" % (count+1, connectionthreadcount)) 295 conthread.start() 296 self.threadlist.append(conthread)
297
298 - def shutdown(self):
299 """Requests a shutdown of all threads""" 300 self.wantshutdown=True
301
302 - def run23(self):
303 while not self.wantshutdown: 304 try: 305 msg=self.workqueue.get(True, self.timecheck) 306 except Queue.Empty: 307 continue 308 try: 309 self.processmessage(msg) 310 except: 311 sys.excepthook(*sys.exc_info())
312
313 - def run22(self):
314 while not self.wantshutdown: 315 try: 316 msg=self.workqueue.get(True) 317 except Queue.Empty: 318 continue 319 try: 320 self.processmessage(msg) 321 except: 322 sys.excepthook(*sys.exc_info())
323 324 325 if sys.version_info>=(2,3): 326 run=run23 327 else: 328 run=run22 329
330 - def processmessage(self, msg):
331 if not isinstance(msg, Server.Message): 332 self.OnUserMessage(msg) 333 return 334 if __debug__ and TRACE: 335 if not msg.cmd in (msg.CMD_LOG, msg.CMD_LOG_EXCEPTION): 336 print "Processing message "+`msg` 337 resp=None 338 if msg.cmd==msg.CMD_LOG: 339 self.OnLog(msg.data) 340 return 341 elif msg.cmd==msg.CMD_LOG_EXCEPTION: 342 self.OnLogException(msg.data) 343 return 344 elif msg.cmd==msg.CMD_NEW_ACCEPT_REQUEST: 345 ok=self.OnNewAccept(msg.clientaddr) 346 resp=Server.Message(Server.Message.CMD_NEW_ACCEPT_RESPONSE, data=ok) 347 elif msg.cmd==msg.CMD_NEW_USER_REQUEST: 348 ok=self.OnNewUser(msg.clientaddr, msg.data[0], msg.data[1]) 349 resp=Server.Message(Server.Message.CMD_NEW_USER_RESPONSE, data=ok) 350 elif msg.cmd==msg.CMD_XMLRPC_REQUEST: 351 data=self.OnXmlRpcRequest(* (msg.data+(msg.clientaddr,))) 352 resp=Server.Message(Server.Message.CMD_XMLRPC_RESPONSE, data=data) 353 elif msg.cmd==msg.CMD_CONNECTION_CLOSE: 354 self.OnConnectionClose(msg.clientaddr) 355 else: 356 assert False, "Unknown message command "+`msg.cmd` 357 raise Exception("Internal processing error") 358 if resp is not None: 359 msg.respondqueue.put(resp)
360
361 - def OnLog(self, str):
362 """Process a log message""" 363 print str
364
365 - def OnLogException(self, exc):
366 """Process an exception message""" 367 print exc[:2]
368 369
370 - def OnNewAccept(self, clientaddr):
371 """Decide if we accept a new new connection""" 372 return True
373
374 - def OnNewUser(self, clientaddr, username, password):
375 """Decide if a user is allowed to authenticate""" 376 return True
377
378 - def OnConnectionClose(self, clientaddr):
379 """Called when a connection closes""" 380 if __debug__ and TRACE: print "Closed connection from "+`clientaddr`
381
382 - def OnUserMessage(self, msg):
383 """Called when a message arrives in the workqueue"""
384
385 - def OnXmlRpcRequest(self, xmldata, username, clientaddr):
386 """Called when an XML-RPC request arrives, but before the XML is parsed""" 387 params, method = xmlrpclib.loads(xmldata) 388 # call method 389 try: 390 response=self.OnMethodDispatch(method, params, username, clientaddr) 391 # wrap response in a singleton tuple 392 response = (response,) 393 response = xmlrpclib.dumps(response, methodresponse=1) 394 except xmlrpclib.Fault, fault: 395 response = xmlrpclib.dumps(fault) 396 except: 397 self.OnLog("Exception processing method "+`method`) 398 self.OnLogException(sys.exc_info()) 399 # report exception back to server, with class name first 400 # and then `object`. The client end may be able to 401 # re-raise it 402 obj=sys.exc_info()[1] 403 try: 404 klass="%s.%s" % (obj.__module__, obj.__name__) 405 except: 406 klass="%s.%s" % (obj.__class__.__module__, obj.__class__.__name__) 407 response = xmlrpclib.dumps(xmlrpclib.Fault(17, "%s:%s" % (klass, obj))) 408 409 return response
410
411 - def OnMethodDispatch(self, method, params, username, clientaddr):
412 """Called once the XML-RPC request is parsed""" 413 if __debug__ and TRACE: print "%s %s (user=%s, client=%s)" % (method, `tuple(params)`, username, `clientaddr`) 414 if method=='add' and len(params)==2: 415 return params[0]+params[1] 416 raise xmlrpclib.Fault(10, "Unknown method "+method)
417 418 # Copied from xmlrpclib. This version is slightly modified to be derived from 419 # object. The reason is that if you print a _Method object, then the __str__ 420 # method is called, which tries to do it over XML-RPC! Deriving from object 421 # causes that and many other methods to be already present so it isn't a 422 # problem. 423
424 -class _Method(object):
425 # some magic to bind an XML-RPC method to an RPC server. 426 # supports "nested" methods (e.g. examples.getStateName)
427 - def __init__(self, send, name):
428 self.__send = send 429 self.__name = name
430 - def __getattr__(self, name):
431 return _Method(self.__send, "%s.%s" % (self.__name, name))
432 - def __call__(self, *args):
433 return self.__send(self.__name, args)
434
435 -class CertificateNotAcceptedException(Exception):
436 pass
437
438 -class ServerProxy:
439 logsetup=False
440 - def __init__(self, username, password, host, port, certverifier=None):
441 if not self.logsetup: 442 # paramiko.util.log_to_file('serverproxy.log') 443 self.logsetup=True 444 self.__username=username 445 self.__password=password 446 self.__host=host 447 self.__port=port 448 self.__channel=None 449 self.__certverifier=certverifier
450
451 - def __str__(self):
452 return "<XML-RPC over SSH proxy for %s @ %s:%d>" % (self.__username, self.__host, self.__port)
453
454 - def __repr__(self):
455 return "<XML-RPC over SSH proxy for %s @ %s:%d>" % (self.__username, self.__host, self.__port)
456
457 - def __makeconnection(self):
458 self.__channel=None 459 sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM) 460 sock.connect( (self.__host, self.__port) ) 461 t=paramiko.Transport(sock) 462 t.setDaemon(True) 463 event=threading.Event() 464 t.start_client(event) 465 event.wait(15) 466 if not t.is_active(): 467 raise Exception("No SSH on the other end: %s/%d" % (self.__host, self.__port) ) 468 key=t.get_remote_server_key() 469 470 if self.__certverifier is not None: 471 res=self.__certverifier( (self.__host, self.__port), key) 472 if not res: 473 raise CertificateNotAcceptedException("Certificate not accepted for %s @ %s:%d" % (self.__username, self.__host, self.__port)) 474 if not t.is_active(): 475 raise Exception("Session has failed while waiting for certificate to be verified") 476 477 event=threading.Event() 478 t.auth_password(self.__username, self.__password, event) 479 event.wait() 480 if not t.is_active(): 481 raise Exception("Authentication to %s failed: Username %s, password %s" % (self.__host, `self.__username`, `self.__password`)) 482 self.__channel=t.open_channel("bitfling")
483
484 - def __ensure_channel(self):
485 if self.__channel is None: 486 self.__makeconnection() 487 if self.__channel is None: 488 raise Exception("Unable to properly connect")
489
490 - def __getattr__(self, name):
491 if name.startswith("__"): 492 raise Exception("Bad method "+`name`) 493 return _Method(self.__send, name)
494
495 - def __recvall(self, channel, amount):
496 result="" 497 while amount: 498 l=channel.recv(amount) 499 if len(l)==0: 500 return result # eof 501 result+=l 502 amount-=len(l) 503 return result
504
505 - def __send(self, methodname, args):
506 self.__ensure_channel() 507 request=xmlrpclib.dumps(args, methodname, encoding=None) # allow_none=False (allow_none is py2.3+) 508 self.__channel.sendall( ("%08d" % (len(request),))+request) 509 resplen=self.__recvall(self.__channel, 8) 510 resplen=int(resplen) 511 response=self.__recvall(self.__channel, resplen) 512 p, u = xmlrpclib.getparser() 513 p.feed(response) 514 p.close() 515 # if the response was a Fault, then it is raised by u.close() 516 try: 517 response=u.close() 518 except xmlrpclib.Fault,e: 519 if e.faultCode!=17: 520 raise e 521 klass,str=e.faultString.split(':', 1) 522 raise common.getfullname(klass)(str) 523 524 if len(response)==1: 525 response=response[0] 526 return response
527
528 -class FunctionLogHandler(logging.Handler):
529 "Log handler that calls a specified function"
530 - def __init__(self, function, level=logging.NOTSET):
531 logging.Handler.__init__(self, level) 532 self.function=function
533
534 - def emit(self, record):
535 self.function(record.getMessage())
536 537 538 if __name__=='__main__': 539 if len(sys.argv)<2: 540 print "You must supply arguments - one of" 541 print " server" 542 print " client" 543 sys.exit(1) 544 545 if sys.argv[1]=="server": 546 #cert=paramiko.DSSKey() 547 #cert.read_private_key_file(os.path.expanduser("~/.bitfling.key")) 548 cert=paramiko.DSSKey.from_private_key_file("~/.bitfling.key") 549 server=Server('', 12652, cert) 550 server.setDaemon(True) 551 server.start() 552 553 time.sleep(1120) 554 555 if sys.argv[1]=="client": 556 server=ServerProxy('username', 'password', 'localhost', 12652) 557 558 print server.add(3,4) 559 print server.add("one", "two") 560