PyXR

c:\projects\bitpim\src \ native \ usb \ usb.py



0001 ###
0002 ### A wrapper for the libusb wrapper of the libusb library :-)
0003 ###
0004 ### This code is in the Public Domain.  (libusb and the wrapper
0005 ### are LGPL)
0006 ###
0007 
0008 from __future__ import generators
0009 
0010 import sys
0011 
0012 if sys.platform=='win32':
0013     try:
0014         import win32api
0015         handle=win32api.LoadLibrary("libusb0.dll")
0016         win32api.FreeLibrary(handle)
0017     except:
0018         raise ImportError("libusb needs to be installed for this module to work")
0019     
0020 import libusb as usb
0021 
0022 # grab some constants and put them in our namespace
0023 
0024 for sym in dir(usb):
0025     if sym.startswith("USB_CLASS_") or sym.startswith("USB_DT"):
0026         exec "%s=usb.%s" %(sym, sym)
0027 del sym
0028 
0029 TRACE=False
0030 
0031 class USBException(Exception):
0032     def __init__(self):
0033         Exception.__init__(self, usb.usb_strerror())
0034 
0035 def UpdateLists():
0036     """Updates the lists of busses and devices
0037 
0038     @return: A tuple of (change in number of busses, change in number of devices)
0039     """
0040     return usb.usb_find_busses(), usb.usb_find_devices()
0041 
0042 class USBBus:
0043     "Wraps a bus"
0044     
0045     def __init__(self, usb_bus):
0046         self.bus=usb_bus
0047 
0048     def name(self):
0049         return self.bus.dirname
0050 
0051     def devices(self):
0052         dev=self.bus.devices
0053         while dev is not None:
0054             if dev.config is not None:
0055                yield USBDevice(dev)
0056             dev=dev.next
0057         raise StopIteration()
0058 
0059 class USBDevice:
0060     "Wraps a device"
0061 
0062     def __init__(self, usb_device):
0063         self.usb=usb # save it so that it can't be GC before us
0064         self.dev=usb_device
0065         self.handle=usb.usb_open(self.dev)
0066         if TRACE: print "usb_open(%s)=%s" % (self.dev,self.handle)
0067         if self.handle is None:
0068             raise USBException()
0069 
0070     def __del__(self):
0071         self.close()
0072 
0073     def close(self):
0074         if self.handle is not None:
0075             if TRACE: print "usb_close(%s)" % (self.handle,)
0076             self.usb.usb_close(self.handle)
0077             self.handle=None
0078         self.usb=None
0079 
0080     def number(self):
0081         return self.dev.bInterfaceNumber
0082 
0083     def name(self):
0084         return self.dev.filename
0085 
0086     def vendor(self):
0087         return self.dev.descriptor.idVendor
0088 
0089     def vendorstring(self):
0090         return self._getstring("iManufacturer")
0091 
0092     def productstring(self):
0093         return self._getstring("iProduct")
0094 
0095     def serialnumber(self):
0096         return self._getstring("iSerialNumber")
0097 
0098     def _getstring(self, fieldname):
0099         n=getattr(self.dev.descriptor, fieldname)
0100         if n:
0101             res,string=usb.usb_get_string_simple(self.handle, n, 1024)
0102             if TRACE: print "usb_get_string_simple(%s, %d, %d)=%d,%s" % (self.handle, n, 1024, res, string)
0103             if res<0:
0104                 raise USBException()
0105             return string
0106         return None
0107 
0108     def product(self):
0109         return self.dev.descriptor.idProduct
0110 
0111     def interfaces(self):
0112         for i in range(self.dev.config.bNumInterfaces):
0113             yield USBInterface(self, usb.usb_interface_index(self.dev.config.interface, i))
0114         raise StopIteration()
0115 
0116     def classdetails(self):
0117         "returns a tuple of device class, devicesubclass, deviceprotocol (all ints)"
0118         return self.dev.descriptor.bDeviceClass, \
0119                self.dev.descriptor.bDeviceSubClass, \
0120                self.dev.descriptor.bDeviceProtocol
0121 
0122 class USBInterface:
0123 
0124     # currently we only deal with first configuration
0125     def __init__(self, device, iface, alt=None):
0126         self.iface=iface
0127         self.device=device
0128         self.desc=alt or iface.altsetting
0129 
0130     def number(self):
0131         return self.desc.bInterfaceNumber
0132 
0133     def altnumber(self):
0134         return self.desc.bAlternateSetting
0135 
0136     def classdetails(self):
0137         return self.desc.bInterfaceClass, \
0138                self.desc.bInterfaceSubClass, \
0139                self.desc.bInterfaceProtocol
0140 
0141     def openbulk(self,epinno=None,epoutno=None):
0142         "Returns a filelike object you can use to read and write"
0143         # find the endpoints
0144         match=lambda ep1, ep2: (ep1 is None) or (ep2 is None) or ((ep1 & 0x7f) == (ep2 & 0x7f))
0145 
0146         epin=None
0147         epout=None
0148         for ep in self.endpoints():
0149             if ep.isbulk():
0150                 if ep.direction()==ep.IN:
0151                     if (not epin) and match(epinno,ep.address()): epin=ep
0152                 else:
0153                     if (not epout) and match(epoutno,ep.address()): epout=ep
0154         assert epin is not None
0155         assert epout is not None
0156 
0157         # set the configuration
0158         if TRACE:
0159             print "getting configvalue"
0160         v=self.device.dev.config.bConfigurationValue
0161         if TRACE:
0162             print "value is",v,"now about to set config"
0163         res=usb.usb_set_configuration(self.device.handle, v)
0164         if TRACE:
0165             print "usb_set_configurationds(%s, %d)=%d" % (self.device.handle,v,res)
0166             print "config set"
0167             # grab the interface
0168             print "claiming",self.number()
0169         res=usb.usb_claim_interface(self.device.handle, self.number())
0170         if TRACE: print "usb_claim_interface(%s, %d)=%d" % (self.device.handle, self.number(), res)
0171         if res<0:
0172             raise USBException()
0173 
0174         # set the alt setting
0175         print "alt setting", self.altnumber()
0176         res=usb.usb_set_altinterface(self.device.handle, self.altnumber())
0177         if TRACE: print "usb_set_alt_interface(%s, %d)=%d" % (self.device.handle, self.altnumber(), res)
0178         if res<0:
0179             raise USBException()
0180 
0181         # we now have the file
0182         return USBFile(self, epin, epout)
0183 
0184     def alternates(self):
0185        for i in range(self.iface.num_altsetting):
0186            yield USBInterface(self.device,self.iface,usb.usb_interface_descriptor_index(self.iface.altsetting,i))
0187     # a generator raises its StopIteration() by itself
0188 
0189     def endpoints(self):
0190        for i in range(self.desc.bNumEndpoints):
0191            yield USBEndpoint(usb.usb_endpoint_descriptor_index(self.desc.endpoint, i))
0192        raise StopIteration()
0193 
0194 class USBEndpoint:
0195     # type of endpoint
0196     TYPE_CONTROL=usb.USB_ENDPOINT_TYPE_CONTROL
0197     TYPE_ISOCHRONOUS=usb.USB_ENDPOINT_TYPE_ISOCHRONOUS
0198     TYPE_BULK=usb.USB_ENDPOINT_TYPE_BULK
0199     TYPE_INTERRUPT=usb.USB_ENDPOINT_TYPE_INTERRUPT
0200     # direction for bulk
0201     IN=usb.USB_ENDPOINT_IN
0202     OUT=usb.USB_ENDPOINT_OUT
0203     def __init__(self, ep):
0204         self.ep=ep
0205 
0206     def type(self):
0207         return self.ep.bmAttributes&usb.USB_ENDPOINT_TYPE_MASK
0208 
0209     def address(self):
0210         return self.ep.bEndpointAddress
0211 
0212     def maxpacketsize(self):
0213         return self.ep.wMaxPacketSize
0214 
0215     def isbulk(self):
0216         return self.type()==self.TYPE_BULK
0217 
0218     def direction(self):
0219         assert self.isbulk()
0220         return self.ep.bEndpointAddress&usb.USB_ENDPOINT_DIR_MASK
0221         
0222 class USBFile:
0223 
0224     def __init__(self, iface, epin, epout):
0225         self.usb=usb  # save this so that our destructor can run
0226         self.claimed=True
0227         self.iface=iface
0228         self.epin=epin
0229         self.epout=epout
0230         self.addrin=epin.address()
0231         self.addrout=epout.address()
0232         self.insize=epin.maxpacketsize()
0233         self.outsize=epout.maxpacketsize()
0234 
0235     def __del__(self):
0236         self.close()
0237 
0238     def resetep(self, resetin=True, resetout=True):
0239         if resetin:
0240             res=usb.usb_clear_halt(self.iface.device.handle, self.addrin)
0241             if TRACE: print "usb_clear_halt(%s,%d)=%d" % (self.iface.device.handle, self.addrin, res)
0242             res=usb.usb_resetep(self.iface.device.handle, self.addrin)
0243             if TRACE: print "usb_resetep(%s,%d)=%d" % (self.iface.device.handle, self.addrin, res)
0244         if resetout:
0245             res=usb.usb_clear_halt(self.iface.device.handle, self.addrout)
0246             if TRACE: print "usb_clear_halt(%s,%d)=%d" % (self.iface.device.handle, self.addrout, res)
0247             res=usb.usb_resetep(self.iface.device.handle, self.addrout)
0248             if TRACE: print "usb_resetep(%s,%d)=%d" % (self.iface.device.handle, self.addrout, res)
0249 
0250     def read(self,howmuch=1024, timeout=1000):
0251         data=""
0252         while howmuch>0:
0253             res,str=usb.usb_bulk_read_wrapped(self.iface.device.handle, self.addrin, self.insize, int(timeout))
0254             if TRACE: print "usb_bulk_read(%s,%d,%d,%d)=%d,%s" % (self.iface.device.handle, self.addrin, self.insize, timeout, res, `str`)
0255             if res<0:
0256                 if len(data)>0:
0257                     return data
0258                 e=USBException()
0259                 raise e
0260             if res==0:
0261                 return data
0262             data+=str
0263             howmuch-=len(str)
0264             if howmuch and len(str)!=self.insize:
0265                 # short read, no more data
0266                 break
0267 
0268         return data
0269 
0270     def write(self, data, timeout=1000):
0271         first=True
0272         while first or len(data):
0273             first=False
0274             res=usb.usb_bulk_write(self.iface.device.handle, self.addrout, data[:min(len(data), self.outsize)], timeout)
0275             if TRACE: print "usb_bulk_write(%s, %d, %d bytes, %d)=%d" % (self.iface.device.handle, self.addrout, min(len(data), self.outsize), timeout, res)
0276             if res<0:
0277                 raise USBException()
0278             data=data[res:]
0279 
0280     def close(self):
0281         if self.claimed:
0282             self.resetep()
0283             self.usb.usb_release_interface(self.iface.device.handle, self.iface.number())
0284         self.usb=None
0285         self.claimed=False
0286 
0287 def OpenDevice(vendorid, productid, interfaceid):
0288     for bus in AllBusses():
0289         for device in bus.devices():
0290             if device.vendor()==vendorid and device.product()==productid:
0291                 for iface in device.interfaces():
0292                     if iface.number()==interfaceid:
0293                         return iface.openbulk()
0294     raise ValueError( "vendor 0x%x product 0x%x interface %d not found" % (vendorid, productid, interfaceid))
0295         
0296     
0297 def classtostring(klass):
0298     "Returns the class as a string"
0299     for sym in dir(usb):
0300         if sym.startswith("USB_CLASS_") and klass==getattr(usb, sym):
0301             return sym
0302     return `klass`
0303 
0304 def eptypestring(type):
0305     for sym in dir(USBEndpoint):
0306         if sym.startswith("TYPE_") and type==getattr(USBEndpoint, sym):
0307             return sym
0308     return `type`
0309     
0310 def AllBusses():
0311     bus=usb.usb_get_busses()
0312     while bus is not None:
0313         yield USBBus(bus)
0314         bus=bus.next
0315     raise StopIteration()
0316 
0317 # initialise
0318 usb.usb_init() # sadly no way to tell if this has failed
0319 
0320 if __name__=='__main__':
0321 
0322     bus,dev=UpdateLists()
0323     print "%d busses, %d devices" % (bus,dev)
0324 
0325     for bus in AllBusses():
0326         print bus.name()
0327         for device in bus.devices():
0328             print "  %x/%x %s" % (device.vendor(), device.product(), device.name())
0329             klass,subclass,proto=device.classdetails()
0330             print "  class %s subclass %d protocol %d" % (classtostring(klass), subclass, proto)
0331             for i in device.vendorstring, device.productstring, device.serialnumber:
0332                 try:
0333                     print "  "+i()
0334                 except:
0335                     pass
0336             for iface in device.interfaces():
0337                 print "      interface number %d" % (iface.number(),)
0338                 klass,subclass,proto=iface.classdetails()
0339                 print "      class %s subclass %d protocol %d" % (classtostring(klass), subclass, proto)
0340                 for ep in iface.endpoints():
0341                     print "          endpointaddress 0x%x" % (ep.address(),)
0342                     print "          "+eptypestring(ep.type()),
0343                     if ep.isbulk():
0344                         if ep.direction()==ep.IN:
0345                             print "IN"
0346                         else:
0347                             print "OUT"
0348                     else:
0349                         print
0350 
0351                 print ""
0352             print ""
0353         print ""
0354 
0355     print "opening device"
0356     cell=OpenDevice(0x1004, 0x6000, 2)
0357     print "device opened, about to write"
0358     cell.write("\x59\x0c\xc4\xc1\x7e")
0359     print "wrote, about to read"
0360     res=cell.read(12)
0361     print "read %d bytes" % (len(res),)
0362     print `res`
0363     cell.close()
0364 

Generated by PyXR 0.9.4