Package bitfling ::
Module xmlrpcstuff
|
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 TRACE=False
32
33
34
35 import threading
36 import Queue
37 import time
38 import sys
39 import xmlrpclib
40 import base64
41 import string
42 import logging
43 import os
44 import socket
45
46
47 import paramiko
48
49
50 if TRACE: import guihelper
51 import common
52
54
55 - def __init__(self, chanid, peeraddr, username):
56 self.chan=chanid
57 self.peeraddr=peeraddr
58 self.username=username
59 paramiko.Channel.__init__(self, chanid)
60
62 result=""
63 while amount:
64 l=self.chan.recv(amount)
65 if len(l)==0:
66 return result
67 result+=l
68 amount-=len(l)
69 return result
70
72 """Main loop that deals with the XML-RPC data
73
74 @param conn: The connectionthread object we are working for
75 """
76 while True:
77 try:
78 length=int(self.readall(8))
79 except ValueError:
80 return
81 xml=self.readall(length)
82 response=conn.processxmlrpcrequest(xml, self.peeraddr,
83 self.username)
84 self.chan.sendall( ("%08d" % (len(response),))+response)
85
86 -class myServer(paramiko.ServerInterface):
87
88 - def __init__(self, peeraddr, onbehalfof):
89 self.event = threading.Event()
90 self.peeraddr=peeraddr
91 self.onbehalfof=onbehalfof
92
95
97
98 self.bf_auth_username="<invalid user>"
99 conn=self.onbehalfof
100 conn.log("Checking authentication for user "+`username`)
101 msg=Server.Message(Server.Message.CMD_NEW_USER_REQUEST, conn.responsequeue,
102 self.peeraddr, data=(username,password))
103 conn.requestqueue.put(msg)
104 resp=conn.responsequeue.get()
105 assert resp.cmd==resp.CMD_NEW_USER_RESPONSE
106 if hasattr(resp, 'exception'):
107 conn.logexception("Exception while checking authentication",resp.exception)
108 return paramiko.AUTH_FAILED
109 if resp.data:
110 conn.log("Credentials ok for user "+`username`)
111 self.bf_auth_username=username
112 return paramiko.AUTH_SUCCESSFUL
113 conn.log("Credentials not accepted for user "+`username`)
114 return paramiko.AUTH_FAILED
115
117 if kind == 'bitfling':
118 return paramiko.OPEN_SUCCEEDED
119 return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
120
121
122 -class Server(threading.Thread):
123
154
156
157 - def __init__(self, server, listen, queue, name):
158 """Constructor
159
160 @param server: reference to server object
161 @param listen: socket object that is in listening state
162 @param queue: the queue object to send messages to
163 @param name: name of this thread"""
164 threading.Thread.__init__(self)
165 self.setDaemon(True)
166 self.setName(name)
167 self.responsequeue=Queue.Queue()
168 self.server=server
169 self.requestqueue=queue
170 self.listen=listen
171
172 - def log(self, str):
178
184
186 event=threading.Event()
187 while not self.server.wantshutdown:
188 if __debug__ and TRACE: print self.getName()+": About to call accept"
189 try:
190 transport=None
191 event.clear()
192
193 sock, peeraddr = self.listen.accept()
194
195 msg=Server.Message(Server.Message.CMD_NEW_ACCEPT_REQUEST, self.responsequeue, peeraddr)
196 self.requestqueue.put(msg)
197 resp=self.responsequeue.get()
198 assert resp.cmd==resp.CMD_NEW_ACCEPT_RESPONSE
199 ok=resp.data
200 if not ok:
201 self.log("Connection from "+`peeraddr`+" not accepted")
202 sock.close()
203 continue
204
205 self.log("Connection from "+`peeraddr`+" accepted")
206 transport=paramiko.Transport(sock)
207 transport.add_server_key(self.server.ssh_server_key)
208 transport.setDaemon(True)
209 srvr=myServer(peeraddr, self)
210 transport.start_server(event,srvr)
211
212 except:
213 if __debug__ and TRACE: print self.getName()+": Exception in accept block\n"+guihelper.formatexception()
214 self.logexception("Exception in accept", sys.exc_info())
215 if transport is not None: del transport
216 sock.close()
217 continue
218
219
220 event.wait()
221 if not event.isSet() or not transport.is_active():
222 self.log("Connection from "+`peeraddr`+" didn't do SSH negotiation")
223 transport.close()
224 sock.close()
225 continue
226 if __debug__ and TRACE: print self.getName()+": SSH connection from "+`peeraddr`
227
228 self.log("SSH negotiated from "+`peeraddr`)
229
230 chan=None
231 try:
232 chan=transport.accept()
233 serverchan=ServerChannel(chan,peeraddr,srvr.bf_auth_username)
234 serverchan.XMLRPCLoop(self)
235 except:
236 self.logexception("Exception in XMLRPCLoop", sys.exc_info())
237
238 if chan is not None:
239 chan.close()
240 del chan
241
242 if transport is not None:
243 transport.close()
244 del transport
245
246 msg=Server.Message(Server.Message.CMD_CONNECTION_CLOSE, None, peeraddr)
247 self.requestqueue.put(msg)
248 self.log("Connection from "+`peeraddr`+" closed")
249
260
261
262 - def __init__(self, host, port, servercert, connectionthreadcount=5, timecheck=60, connectionidlebreak=240):
263 """Creates the listening thread and infrastructure. Don't forget to call start() if you
264 want anything to be processed! You probably also want to call setDaemon(). Remember to
265 load a certificate into the sslcontext.
266
267 @param connectionthreadcount: How many threads are being used. If new connections
268 arrive while the existing threads are busy in connections, then they will be ignored
269 @param timecheck: How often shutdown requests are checked for in the main thread (only valid on Python 2.3+)
270 @param connectionidlebreak: If an SSH connection is idle for this amount of time then it is closed
271 """
272 threading.Thread.__init__(self)
273 self.setName("Threading SSH server controller for %s:%d" % (host, port))
274
275 l=logging.getLogger("paramiko")
276 l.setLevel(logging.INFO)
277 lh=FunctionLogHandler(self.OnLog)
278 lh.setFormatter(logging.Formatter('%(levelname)-.3s [%(asctime)s] %(name)s: %(message)s', '%Y%m%d:%H%M%S'))
279 l.addHandler(lh)
280 self.ssh_server_key=servercert
281 connection=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
282 connection.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
283 if not host:
284 host="0.0.0.0"
285 if __debug__ and TRACE: print "Binding to host %s port %d" % (host, port)
286 connection.bind( (host, port) )
287 connection.listen(connectionthreadcount+5)
288 self.timecheck=timecheck
289 self.connectionidlebreak=connectionidlebreak
290 self.wantshutdown=False
291 self.workqueue=Queue.Queue()
292 self.threadlist=[]
293 for count in range(connectionthreadcount):
294 conthread=self.ConnectionThread(self, connection, self.workqueue, "SSH worker thread %d/%d" % (count+1, connectionthreadcount))
295 conthread.start()
296 self.threadlist.append(conthread)
297
299 """Requests a shutdown of all threads"""
300 self.wantshutdown=True
301
303 while not self.wantshutdown:
304 try:
305 msg=self.workqueue.get(True, self.timecheck)
306 except Queue.Empty:
307 continue
308 try:
309 self.processmessage(msg)
310 except:
311 sys.excepthook(*sys.exc_info())
312
314 while not self.wantshutdown:
315 try:
316 msg=self.workqueue.get(True)
317 except Queue.Empty:
318 continue
319 try:
320 self.processmessage(msg)
321 except:
322 sys.excepthook(*sys.exc_info())
323
324
325 if sys.version_info>=(2,3):
326 run=run23
327 else:
328 run=run22
329
360
362 """Process a log message"""
363 print str
364
366 """Process an exception message"""
367 print exc[:2]
368
369
371 """Decide if we accept a new new connection"""
372 return True
373
374 - def OnNewUser(self, clientaddr, username, password):
375 """Decide if a user is allowed to authenticate"""
376 return True
377
379 """Called when a connection closes"""
380 if __debug__ and TRACE: print "Closed connection from "+`clientaddr`
381
383 """Called when a message arrives in the workqueue"""
384
386 """Called when an XML-RPC request arrives, but before the XML is parsed"""
387 params, method = xmlrpclib.loads(xmldata)
388
389 try:
390 response=self.OnMethodDispatch(method, params, username, clientaddr)
391
392 response = (response,)
393 response = xmlrpclib.dumps(response, methodresponse=1)
394 except xmlrpclib.Fault, fault:
395 response = xmlrpclib.dumps(fault)
396 except:
397 self.OnLog("Exception processing method "+`method`)
398 self.OnLogException(sys.exc_info())
399
400
401
402 obj=sys.exc_info()[1]
403 try:
404 klass="%s.%s" % (obj.__module__, obj.__name__)
405 except:
406 klass="%s.%s" % (obj.__class__.__module__, obj.__class__.__name__)
407 response = xmlrpclib.dumps(xmlrpclib.Fault(17, "%s:%s" % (klass, obj)))
408
409 return response
410
412 """Called once the XML-RPC request is parsed"""
413 if __debug__ and TRACE: print "%s %s (user=%s, client=%s)" % (method, `tuple(params)`, username, `clientaddr`)
414 if method=='add' and len(params)==2:
415 return params[0]+params[1]
416 raise xmlrpclib.Fault(10, "Unknown method "+method)
417
418
419
420
421
422
423
425
426
433 return self.__send(self.__name, args)
434
437
439 logsetup=False
440 - def __init__(self, username, password, host, port, certverifier=None):
441 if not self.logsetup:
442
443 self.logsetup=True
444 self.__username=username
445 self.__password=password
446 self.__host=host
447 self.__port=port
448 self.__channel=None
449 self.__certverifier=certverifier
450
452 return "<XML-RPC over SSH proxy for %s @ %s:%d>" % (self.__username, self.__host, self.__port)
453
455 return "<XML-RPC over SSH proxy for %s @ %s:%d>" % (self.__username, self.__host, self.__port)
456
458 self.__channel=None
459 sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
460 sock.connect( (self.__host, self.__port) )
461 t=paramiko.Transport(sock)
462 t.setDaemon(True)
463 event=threading.Event()
464 t.start_client(event)
465 event.wait(15)
466 if not t.is_active():
467 raise Exception("No SSH on the other end: %s/%d" % (self.__host, self.__port) )
468 key=t.get_remote_server_key()
469
470 if self.__certverifier is not None:
471 res=self.__certverifier( (self.__host, self.__port), key)
472 if not res:
473 raise CertificateNotAcceptedException("Certificate not accepted for %s @ %s:%d" % (self.__username, self.__host, self.__port))
474 if not t.is_active():
475 raise Exception("Session has failed while waiting for certificate to be verified")
476
477 event=threading.Event()
478 t.auth_password(self.__username, self.__password, event)
479 event.wait()
480 if not t.is_active():
481 raise Exception("Authentication to %s failed: Username %s, password %s" % (self.__host, `self.__username`, `self.__password`))
482 self.__channel=t.open_channel("bitfling")
483
485 if self.__channel is None:
486 self.__makeconnection()
487 if self.__channel is None:
488 raise Exception("Unable to properly connect")
489
494
496 result=""
497 while amount:
498 l=channel.recv(amount)
499 if len(l)==0:
500 return result
501 result+=l
502 amount-=len(l)
503 return result
504
505 - def __send(self, methodname, args):
527
529 "Log handler that calls a specified function"
530 - def __init__(self, function, level=logging.NOTSET):
531 logging.Handler.__init__(self, level)
532 self.function=function
533
534 - def emit(self, record):
535 self.function(record.getMessage())
536
537
538 if __name__=='__main__':
539 if len(sys.argv)<2:
540 print "You must supply arguments - one of"
541 print " server"
542 print " client"
543 sys.exit(1)
544
545 if sys.argv[1]=="server":
546
547
548 cert=paramiko.DSSKey.from_private_key_file("~/.bitfling.key")
549 server=Server('', 12652, cert)
550 server.setDaemon(True)
551 server.start()
552
553 time.sleep(1120)
554
555 if sys.argv[1]=="client":
556 server=ServerProxy('username', 'password', 'localhost', 12652)
557
558 print server.add(3,4)
559 print server.add("one", "two")
560