0001 ### 0002 ### A wrapper for the libusb wrapper of the libusb library :-) 0003 ### 0004 ### This code is in the Public Domain. (libusb and the wrapper 0005 ### are LGPL) 0006 ### 0007 0008 from __future__ import generators 0009 0010 import sys 0011 0012 if sys.platform=='win32': 0013 try: 0014 import win32api 0015 handle=win32api.LoadLibrary("libusb0.dll") 0016 win32api.FreeLibrary(handle) 0017 except: 0018 raise ImportError("libusb needs to be installed for this module to work") 0019 0020 import libusb as usb 0021 0022 # grab some constants and put them in our namespace 0023 0024 for sym in dir(usb): 0025 if sym.startswith("USB_CLASS_") or sym.startswith("USB_DT"): 0026 exec "%s=usb.%s" %(sym, sym) 0027 del sym 0028 0029 TRACE=False 0030 0031 class USBException(Exception): 0032 def __init__(self): 0033 Exception.__init__(self, usb.usb_strerror()) 0034 0035 def UpdateLists(): 0036 """Updates the lists of busses and devices 0037 0038 @return: A tuple of (change in number of busses, change in number of devices) 0039 """ 0040 return usb.usb_find_busses(), usb.usb_find_devices() 0041 0042 class USBBus: 0043 "Wraps a bus" 0044 0045 def __init__(self, usb_bus): 0046 self.bus=usb_bus 0047 0048 def name(self): 0049 return self.bus.dirname 0050 0051 def devices(self): 0052 dev=self.bus.devices 0053 while dev is not None: 0054 if dev.config is not None: 0055 yield USBDevice(dev) 0056 dev=dev.next 0057 raise StopIteration() 0058 0059 class USBDevice: 0060 "Wraps a device" 0061 0062 def __init__(self, usb_device): 0063 self.usb=usb # save it so that it can't be GC before us 0064 self.dev=usb_device 0065 self.handle=usb.usb_open(self.dev) 0066 if TRACE: print "usb_open(%s)=%s" % (self.dev,self.handle) 0067 if self.handle is None: 0068 raise USBException() 0069 0070 def __del__(self): 0071 self.close() 0072 0073 def close(self): 0074 if self.handle is not None: 0075 if TRACE: print "usb_close(%s)" % (self.handle,) 0076 self.usb.usb_close(self.handle) 0077 self.handle=None 0078 self.usb=None 0079 0080 def number(self): 0081 return self.dev.bInterfaceNumber 0082 0083 def name(self): 0084 return self.dev.filename 0085 0086 def vendor(self): 0087 return self.dev.descriptor.idVendor 0088 0089 def vendorstring(self): 0090 return self._getstring("iManufacturer") 0091 0092 def productstring(self): 0093 return self._getstring("iProduct") 0094 0095 def serialnumber(self): 0096 return self._getstring("iSerialNumber") 0097 0098 def _getstring(self, fieldname): 0099 n=getattr(self.dev.descriptor, fieldname) 0100 if n: 0101 res,string=usb.usb_get_string_simple(self.handle, n, 1024) 0102 if TRACE: print "usb_get_string_simple(%s, %d, %d)=%d,%s" % (self.handle, n, 1024, res, string) 0103 if res<0: 0104 raise USBException() 0105 return string 0106 return None 0107 0108 def product(self): 0109 return self.dev.descriptor.idProduct 0110 0111 def interfaces(self): 0112 for i in range(self.dev.config.bNumInterfaces): 0113 yield USBInterface(self, usb.usb_interface_index(self.dev.config.interface, i)) 0114 raise StopIteration() 0115 0116 def classdetails(self): 0117 "returns a tuple of device class, devicesubclass, deviceprotocol (all ints)" 0118 return self.dev.descriptor.bDeviceClass, \ 0119 self.dev.descriptor.bDeviceSubClass, \ 0120 self.dev.descriptor.bDeviceProtocol 0121 0122 class USBInterface: 0123 0124 # currently we only deal with first configuration 0125 def __init__(self, device, iface, alt=None): 0126 self.iface=iface 0127 self.device=device 0128 self.desc=alt or iface.altsetting 0129 0130 def number(self): 0131 return self.desc.bInterfaceNumber 0132 0133 def altnumber(self): 0134 return self.desc.bAlternateSetting 0135 0136 def classdetails(self): 0137 return self.desc.bInterfaceClass, \ 0138 self.desc.bInterfaceSubClass, \ 0139 self.desc.bInterfaceProtocol 0140 0141 def openbulk(self,epinno=None,epoutno=None): 0142 "Returns a filelike object you can use to read and write" 0143 # find the endpoints 0144 match=lambda ep1, ep2: (ep1 is None) or (ep2 is None) or ((ep1 & 0x7f) == (ep2 & 0x7f)) 0145 0146 epin=None 0147 epout=None 0148 for ep in self.endpoints(): 0149 if ep.isbulk(): 0150 if ep.direction()==ep.IN: 0151 if (not epin) and match(epinno,ep.address()): epin=ep 0152 else: 0153 if (not epout) and match(epoutno,ep.address()): epout=ep 0154 assert epin is not None 0155 assert epout is not None 0156 0157 # set the configuration 0158 if TRACE: 0159 print "getting configvalue" 0160 v=self.device.dev.config.bConfigurationValue 0161 if TRACE: 0162 print "value is",v,"now about to set config" 0163 res=usb.usb_set_configuration(self.device.handle, v) 0164 if TRACE: 0165 print "usb_set_configurationds(%s, %d)=%d" % (self.device.handle,v,res) 0166 print "config set" 0167 # grab the interface 0168 print "claiming",self.number() 0169 res=usb.usb_claim_interface(self.device.handle, self.number()) 0170 if TRACE: print "usb_claim_interface(%s, %d)=%d" % (self.device.handle, self.number(), res) 0171 if res<0: 0172 raise USBException() 0173 0174 # set the alt setting 0175 print "alt setting", self.altnumber() 0176 res=usb.usb_set_altinterface(self.device.handle, self.altnumber()) 0177 if TRACE: print "usb_set_alt_interface(%s, %d)=%d" % (self.device.handle, self.altnumber(), res) 0178 if res<0: 0179 raise USBException() 0180 0181 # we now have the file 0182 return USBFile(self, epin, epout) 0183 0184 def alternates(self): 0185 for i in range(self.iface.num_altsetting): 0186 yield USBInterface(self.device,self.iface,usb.usb_interface_descriptor_index(self.iface.altsetting,i)) 0187 # a generator raises its StopIteration() by itself 0188 0189 def endpoints(self): 0190 for i in range(self.desc.bNumEndpoints): 0191 yield USBEndpoint(usb.usb_endpoint_descriptor_index(self.desc.endpoint, i)) 0192 raise StopIteration() 0193 0194 class USBEndpoint: 0195 # type of endpoint 0196 TYPE_CONTROL=usb.USB_ENDPOINT_TYPE_CONTROL 0197 TYPE_ISOCHRONOUS=usb.USB_ENDPOINT_TYPE_ISOCHRONOUS 0198 TYPE_BULK=usb.USB_ENDPOINT_TYPE_BULK 0199 TYPE_INTERRUPT=usb.USB_ENDPOINT_TYPE_INTERRUPT 0200 # direction for bulk 0201 IN=usb.USB_ENDPOINT_IN 0202 OUT=usb.USB_ENDPOINT_OUT 0203 def __init__(self, ep): 0204 self.ep=ep 0205 0206 def type(self): 0207 return self.ep.bmAttributes&usb.USB_ENDPOINT_TYPE_MASK 0208 0209 def address(self): 0210 return self.ep.bEndpointAddress 0211 0212 def maxpacketsize(self): 0213 return self.ep.wMaxPacketSize 0214 0215 def isbulk(self): 0216 return self.type()==self.TYPE_BULK 0217 0218 def direction(self): 0219 assert self.isbulk() 0220 return self.ep.bEndpointAddress&usb.USB_ENDPOINT_DIR_MASK 0221 0222 class USBFile: 0223 0224 def __init__(self, iface, epin, epout): 0225 self.usb=usb # save this so that our destructor can run 0226 self.claimed=True 0227 self.iface=iface 0228 self.epin=epin 0229 self.epout=epout 0230 self.addrin=epin.address() 0231 self.addrout=epout.address() 0232 self.insize=epin.maxpacketsize() 0233 self.outsize=epout.maxpacketsize() 0234 0235 def __del__(self): 0236 self.close() 0237 0238 def resetep(self, resetin=True, resetout=True): 0239 if resetin: 0240 res=usb.usb_clear_halt(self.iface.device.handle, self.addrin) 0241 if TRACE: print "usb_clear_halt(%s,%d)=%d" % (self.iface.device.handle, self.addrin, res) 0242 res=usb.usb_resetep(self.iface.device.handle, self.addrin) 0243 if TRACE: print "usb_resetep(%s,%d)=%d" % (self.iface.device.handle, self.addrin, res) 0244 if resetout: 0245 res=usb.usb_clear_halt(self.iface.device.handle, self.addrout) 0246 if TRACE: print "usb_clear_halt(%s,%d)=%d" % (self.iface.device.handle, self.addrout, res) 0247 res=usb.usb_resetep(self.iface.device.handle, self.addrout) 0248 if TRACE: print "usb_resetep(%s,%d)=%d" % (self.iface.device.handle, self.addrout, res) 0249 0250 def read(self,howmuch=1024, timeout=1000): 0251 data="" 0252 while howmuch>0: 0253 res,str=usb.usb_bulk_read_wrapped(self.iface.device.handle, self.addrin, self.insize, int(timeout)) 0254 if TRACE: print "usb_bulk_read(%s,%d,%d,%d)=%d,%s" % (self.iface.device.handle, self.addrin, self.insize, timeout, res, `str`) 0255 if res<0: 0256 if len(data)>0: 0257 return data 0258 e=USBException() 0259 raise e 0260 if res==0: 0261 return data 0262 data+=str 0263 howmuch-=len(str) 0264 if howmuch and len(str)!=self.insize: 0265 # short read, no more data 0266 break 0267 0268 return data 0269 0270 def write(self, data, timeout=1000): 0271 first=True 0272 while first or len(data): 0273 first=False 0274 res=usb.usb_bulk_write(self.iface.device.handle, self.addrout, data[:min(len(data), self.outsize)], timeout) 0275 if TRACE: print "usb_bulk_write(%s, %d, %d bytes, %d)=%d" % (self.iface.device.handle, self.addrout, min(len(data), self.outsize), timeout, res) 0276 if res<0: 0277 raise USBException() 0278 data=data[res:] 0279 0280 def close(self): 0281 if self.claimed: 0282 self.resetep() 0283 self.usb.usb_release_interface(self.iface.device.handle, self.iface.number()) 0284 self.usb=None 0285 self.claimed=False 0286 0287 def OpenDevice(vendorid, productid, interfaceid): 0288 for bus in AllBusses(): 0289 for device in bus.devices(): 0290 if device.vendor()==vendorid and device.product()==productid: 0291 for iface in device.interfaces(): 0292 if iface.number()==interfaceid: 0293 return iface.openbulk() 0294 raise ValueError( "vendor 0x%x product 0x%x interface %d not found" % (vendorid, productid, interfaceid)) 0295 0296 0297 def classtostring(klass): 0298 "Returns the class as a string" 0299 for sym in dir(usb): 0300 if sym.startswith("USB_CLASS_") and klass==getattr(usb, sym): 0301 return sym 0302 return `klass` 0303 0304 def eptypestring(type): 0305 for sym in dir(USBEndpoint): 0306 if sym.startswith("TYPE_") and type==getattr(USBEndpoint, sym): 0307 return sym 0308 return `type` 0309 0310 def AllBusses(): 0311 bus=usb.usb_get_busses() 0312 while bus is not None: 0313 yield USBBus(bus) 0314 bus=bus.next 0315 raise StopIteration() 0316 0317 # initialise 0318 usb.usb_init() # sadly no way to tell if this has failed 0319 0320 if __name__=='__main__': 0321 0322 bus,dev=UpdateLists() 0323 print "%d busses, %d devices" % (bus,dev) 0324 0325 for bus in AllBusses(): 0326 print bus.name() 0327 for device in bus.devices(): 0328 print " %x/%x %s" % (device.vendor(), device.product(), device.name()) 0329 klass,subclass,proto=device.classdetails() 0330 print " class %s subclass %d protocol %d" % (classtostring(klass), subclass, proto) 0331 for i in device.vendorstring, device.productstring, device.serialnumber: 0332 try: 0333 print " "+i() 0334 except: 0335 pass 0336 for iface in device.interfaces(): 0337 print " interface number %d" % (iface.number(),) 0338 klass,subclass,proto=iface.classdetails() 0339 print " class %s subclass %d protocol %d" % (classtostring(klass), subclass, proto) 0340 for ep in iface.endpoints(): 0341 print " endpointaddress 0x%x" % (ep.address(),) 0342 print " "+eptypestring(ep.type()), 0343 if ep.isbulk(): 0344 if ep.direction()==ep.IN: 0345 print "IN" 0346 else: 0347 print "OUT" 0348 else: 0349 print 0350 0351 print "" 0352 print "" 0353 print "" 0354 0355 print "opening device" 0356 cell=OpenDevice(0x1004, 0x6000, 2) 0357 print "device opened, about to write" 0358 cell.write("\x59\x0c\xc4\xc1\x7e") 0359 print "wrote, about to read" 0360 res=cell.read(12) 0361 print "read %d bytes" % (len(res),) 0362 print `res` 0363 cell.close() 0364
Generated by PyXR 0.9.4