Package native :: Package usb :: Module usb
[hide private]
[frames] | no frames]

Source Code for Module native.usb.usb

  1  ### 
  2  ### A wrapper for the libusb wrapper of the libusb library :-) 
  3  ### 
  4  ### This code is in the Public Domain.  (libusb and the wrapper 
  5  ### are LGPL) 
  6  ### 
  7   
  8  from __future__ import generators 
  9   
 10  import sys 
 11   
 12  if sys.platform=='win32': 
 13      try: 
 14          import win32api 
 15          handle=win32api.LoadLibrary("libusb0.dll") 
 16          win32api.FreeLibrary(handle) 
 17      except: 
 18          raise ImportError("libusb needs to be installed for this module to work") 
 19       
 20  import libusb as usb 
 21   
 22  # grab some constants and put them in our namespace 
 23   
 24  for sym in dir(usb): 
 25      if sym.startswith("USB_CLASS_") or sym.startswith("USB_DT"): 
 26          exec "%s=usb.%s" %(sym, sym) 
 27  del sym 
 28   
 29  TRACE=False 
 30   
31 -class USBException(Exception):
32 - def __init__(self):
33 Exception.__init__(self, usb.usb_strerror())
34
35 -def UpdateLists():
36 """Updates the lists of busses and devices 37 38 @return: A tuple of (change in number of busses, change in number of devices) 39 """ 40 return usb.usb_find_busses(), usb.usb_find_devices()
41
42 -class USBBus:
43 "Wraps a bus" 44
45 - def __init__(self, usb_bus):
46 self.bus=usb_bus
47
48 - def name(self):
49 return self.bus.dirname
50
51 - def devices(self):
52 dev=self.bus.devices 53 while dev is not None: 54 if dev.config is not None: 55 yield USBDevice(dev) 56 dev=dev.next 57 raise StopIteration()
58
59 -class USBDevice:
60 "Wraps a device" 61
62 - def __init__(self, usb_device):
63 self.usb=usb # save it so that it can't be GC before us 64 self.dev=usb_device 65 self.handle=usb.usb_open(self.dev) 66 if TRACE: print "usb_open(%s)=%s" % (self.dev,self.handle) 67 if self.handle is None: 68 raise USBException()
69
70 - def __del__(self):
71 self.close()
72
73 - def close(self):
74 if self.handle is not None: 75 if TRACE: print "usb_close(%s)" % (self.handle,) 76 self.usb.usb_close(self.handle) 77 self.handle=None 78 self.usb=None
79
80 - def number(self):
81 return self.dev.bInterfaceNumber
82
83 - def name(self):
84 return self.dev.filename
85
86 - def vendor(self):
87 return self.dev.descriptor.idVendor
88
89 - def vendorstring(self):
90 return self._getstring("iManufacturer")
91
92 - def productstring(self):
93 return self._getstring("iProduct")
94
95 - def serialnumber(self):
96 return self._getstring("iSerialNumber")
97
98 - def _getstring(self, fieldname):
99 n=getattr(self.dev.descriptor, fieldname) 100 if n: 101 res,string=usb.usb_get_string_simple(self.handle, n, 1024) 102 if TRACE: print "usb_get_string_simple(%s, %d, %d)=%d,%s" % (self.handle, n, 1024, res, string) 103 if res<0: 104 raise USBException() 105 return string 106 return None
107
108 - def product(self):
109 return self.dev.descriptor.idProduct
110
111 - def interfaces(self):
112 for i in range(self.dev.config.bNumInterfaces): 113 yield USBInterface(self, usb.usb_interface_index(self.dev.config.interface, i)) 114 raise StopIteration()
115
116 - def classdetails(self):
117 "returns a tuple of device class, devicesubclass, deviceprotocol (all ints)" 118 return self.dev.descriptor.bDeviceClass, \ 119 self.dev.descriptor.bDeviceSubClass, \ 120 self.dev.descriptor.bDeviceProtocol
121
122 -class USBInterface:
123 124 # currently we only deal with first configuration
125 - def __init__(self, device, iface, alt=None):
126 self.iface=iface 127 self.device=device 128 self.desc=alt or iface.altsetting
129
130 - def number(self):
131 return self.desc.bInterfaceNumber
132
133 - def altnumber(self):
134 return self.desc.bAlternateSetting
135
136 - def classdetails(self):
137 return self.desc.bInterfaceClass, \ 138 self.desc.bInterfaceSubClass, \ 139 self.desc.bInterfaceProtocol
140
141 - def openbulk(self,epinno=None,epoutno=None):
142 "Returns a filelike object you can use to read and write" 143 # find the endpoints 144 match=lambda ep1, ep2: (ep1 is None) or (ep2 is None) or ((ep1 & 0x7f) == (ep2 & 0x7f)) 145 146 epin=None 147 epout=None 148 for ep in self.endpoints(): 149 if ep.isbulk(): 150 if ep.direction()==ep.IN: 151 if (not epin) and match(epinno,ep.address()): epin=ep 152 else: 153 if (not epout) and match(epoutno,ep.address()): epout=ep 154 assert epin is not None 155 assert epout is not None 156 157 # set the configuration 158 if TRACE: 159 print "getting configvalue" 160 v=self.device.dev.config.bConfigurationValue 161 if TRACE: 162 print "value is",v,"now about to set config" 163 res=usb.usb_set_configuration(self.device.handle, v) 164 if TRACE: 165 print "usb_set_configurationds(%s, %d)=%d" % (self.device.handle,v,res) 166 print "config set" 167 # grab the interface 168 print "claiming",self.number() 169 res=usb.usb_claim_interface(self.device.handle, self.number()) 170 if TRACE: print "usb_claim_interface(%s, %d)=%d" % (self.device.handle, self.number(), res) 171 if res<0: 172 raise USBException() 173 174 # set the alt setting 175 res=usb.usb_set_altinterface(self.device.handle, self.altnumber()) 176 if TRACE: print "usb_set_alt_interface(%s, %d)=%d" % (self.device.handle, self.altnumber(), res) 177 if res<0: 178 # Setting the alternate interface causes problems with some phones (VX-10000) 179 # reset the device and reclaim the interface if there was problem setting the 180 # alternate interface. 181 usb.usb_reset(self.device.handle) 182 usb.usb_release_interface (self.device.handle, self.number()) 183 184 res=usb.usb_claim_interface(self.device.handle, self.number()) 185 if res<0: 186 raise USBException() 187 188 # we now have the file 189 return USBFile(self, epin, epout)
190
191 - def alternates(self):
192 for i in range(self.iface.num_altsetting): 193 yield USBInterface(self.device,self.iface,usb.usb_interface_descriptor_index(self.iface.altsetting,i))
194 # a generator raises its StopIteration() by itself 195
196 - def endpoints(self):
197 for i in range(self.desc.bNumEndpoints): 198 yield USBEndpoint(usb.usb_endpoint_descriptor_index(self.desc.endpoint, i)) 199 raise StopIteration()
200
201 -class USBEndpoint:
202 # type of endpoint 203 TYPE_CONTROL=usb.USB_ENDPOINT_TYPE_CONTROL 204 TYPE_ISOCHRONOUS=usb.USB_ENDPOINT_TYPE_ISOCHRONOUS 205 TYPE_BULK=usb.USB_ENDPOINT_TYPE_BULK 206 TYPE_INTERRUPT=usb.USB_ENDPOINT_TYPE_INTERRUPT 207 # direction for bulk 208 IN=usb.USB_ENDPOINT_IN 209 OUT=usb.USB_ENDPOINT_OUT
210 - def __init__(self, ep):
211 self.ep=ep
212
213 - def type(self):
214 return self.ep.bmAttributes&usb.USB_ENDPOINT_TYPE_MASK
215
216 - def address(self):
217 return self.ep.bEndpointAddress
218
219 - def maxpacketsize(self):
220 return self.ep.wMaxPacketSize
221
222 - def isbulk(self):
223 return self.type()==self.TYPE_BULK
224
225 - def direction(self):
226 assert self.isbulk() 227 return self.ep.bEndpointAddress&usb.USB_ENDPOINT_DIR_MASK
228
229 -class USBFile:
230
231 - def __init__(self, iface, epin, epout):
232 self.usb=usb # save this so that our destructor can run 233 self.claimed=True 234 self.iface=iface 235 self.epin=epin 236 self.epout=epout 237 self.addrin=epin.address() 238 self.addrout=epout.address() 239 self.insize=epin.maxpacketsize() 240 self.outsize=epout.maxpacketsize()
241
242 - def __del__(self):
243 self.close()
244
245 - def resetep(self, resetin=True, resetout=True):
246 if resetin: 247 res=usb.usb_clear_halt(self.iface.device.handle, self.addrin) 248 if TRACE: print "usb_clear_halt(%s,%d)=%d" % (self.iface.device.handle, self.addrin, res) 249 res=usb.usb_resetep(self.iface.device.handle, self.addrin) 250 if TRACE: print "usb_resetep(%s,%d)=%d" % (self.iface.device.handle, self.addrin, res) 251 if resetout: 252 res=usb.usb_clear_halt(self.iface.device.handle, self.addrout) 253 if TRACE: print "usb_clear_halt(%s,%d)=%d" % (self.iface.device.handle, self.addrout, res) 254 res=usb.usb_resetep(self.iface.device.handle, self.addrout) 255 if TRACE: print "usb_resetep(%s,%d)=%d" % (self.iface.device.handle, self.addrout, res)
256
257 - def read(self,howmuch=1024, timeout=1000):
258 data="" 259 while howmuch>0: 260 res,str=usb.usb_bulk_read_wrapped(self.iface.device.handle, self.addrin, self.insize, int(timeout)) 261 if TRACE: print "usb_bulk_read(%s,%d,%d,%d)=%d,%s" % (self.iface.device.handle, self.addrin, self.insize, timeout, res, `str`) 262 if res<0: 263 if len(data)>0: 264 return data 265 e=USBException() 266 raise e 267 if res==0: 268 return data 269 data+=str 270 howmuch-=len(str) 271 if howmuch and len(str)!=self.insize: 272 # short read, no more data 273 break 274 275 return data
276
277 - def write(self, data, timeout=1000):
278 first=True 279 while first or len(data): 280 first=False 281 res=usb.usb_bulk_write(self.iface.device.handle, self.addrout, data[:min(len(data), self.outsize)], timeout) 282 if TRACE: print "usb_bulk_write(%s, %d, %d bytes, %d)=%d" % (self.iface.device.handle, self.addrout, min(len(data), self.outsize), timeout, res) 283 if res<0: 284 raise USBException() 285 data=data[res:]
286
287 - def close(self):
288 if self.claimed: 289 self.resetep() 290 self.usb.usb_release_interface(self.iface.device.handle, self.iface.number()) 291 self.usb=None 292 self.claimed=False
293
294 -def OpenDevice(vendorid, productid, interfaceid):
295 for bus in AllBusses(): 296 for device in bus.devices(): 297 if device.vendor()==vendorid and device.product()==productid: 298 for iface in device.interfaces(): 299 if iface.number()==interfaceid: 300 return iface.openbulk() 301 raise ValueError( "vendor 0x%x product 0x%x interface %d not found" % (vendorid, productid, interfaceid))
302 303
304 -def classtostring(klass):
305 "Returns the class as a string" 306 for sym in dir(usb): 307 if sym.startswith("USB_CLASS_") and klass==getattr(usb, sym): 308 return sym 309 return `klass`
310
311 -def eptypestring(type):
312 for sym in dir(USBEndpoint): 313 if sym.startswith("TYPE_") and type==getattr(USBEndpoint, sym): 314 return sym 315 return `type`
316
317 -def AllBusses():
318 bus=usb.usb_get_busses() 319 while bus is not None: 320 yield USBBus(bus) 321 bus=bus.next 322 raise StopIteration()
323 324 # initialise 325 usb.usb_init() # sadly no way to tell if this has failed 326 327 if __name__=='__main__': 328 329 bus,dev=UpdateLists() 330 print "%d busses, %d devices" % (bus,dev) 331 332 for bus in AllBusses(): 333 print bus.name() 334 for device in bus.devices(): 335 print " %x/%x %s" % (device.vendor(), device.product(), device.name()) 336 klass,subclass,proto=device.classdetails() 337 print " class %s subclass %d protocol %d" % (classtostring(klass), subclass, proto) 338 for i in device.vendorstring, device.productstring, device.serialnumber: 339 try: 340 print " "+i() 341 except: 342 pass 343 for iface in device.interfaces(): 344 print " interface number %d" % (iface.number(),) 345 klass,subclass,proto=iface.classdetails() 346 print " class %s subclass %d protocol %d" % (classtostring(klass), subclass, proto) 347 for ep in iface.endpoints(): 348 print " endpointaddress 0x%x" % (ep.address(),) 349 print " "+eptypestring(ep.type()), 350 if ep.isbulk(): 351 if ep.direction()==ep.IN: 352 print "IN" 353 else: 354 print "OUT" 355 else: 356 print 357 358 print "" 359 print "" 360 print "" 361 362 print "opening device" 363 cell=OpenDevice(0x1004, 0x6000, 2) 364 print "device opened, about to write" 365 cell.write("\x59\x0c\xc4\xc1\x7e") 366 print "wrote, about to read" 367 res=cell.read(12) 368 print "read %d bytes" % (len(res),) 369 print `res` 370 cell.close() 371