Detecting USB Device Insertion on Windows 10 using python
I tested with Python 3.8.2 x64.
- Install pywin32 (
pip install pywin32
) - Install the current/last version (1.5) of the WMI module from https://github.com/tjguk/wmi (
pip install -e git+https://github.com/tjguk/wmi.git#egg=wmi
) - run a script (
test.py
in my case), like:
import wmi
raw_wql = "SELECT * FROM __InstanceCreationEvent WITHIN 2 WHERE TargetInstance ISA \'Win32_USBHub\'"
c = wmi.WMI ()
watcher = c.watch_for(raw_wql=raw_wql)
while 1:
usb = watcher ()
print(usb)
- plug in a USB device. Output looks like:
(wmi-py) C:\Users\USER\Source\wmi-py>py test.py
instance of Win32_USBHub
{
Caption = "USB Composite Device";
ConfigManagerErrorCode = 0;
ConfigManagerUserConfig = FALSE;
CreationClassName = "Win32_USBHub";
Description = "USB Composite Device";
...
Many thanks to the WMI module author, and the Windows nerds' discussion here Detecting USB drive insertion and removal using windows service and c#
Detecting insertion/removal of USB input devices on Windows 10
There are multiple ways to detect what is happening with device changes
Detecting USB Insertion / Removal Events in Windows using C++
One suggested way is to use WM_DEVICECHANGE message. https://stackoverflow.com/a/4078996/2830850
One such example an be found at below
#Modified from: http://wiki.wxpython.org/HookingTheWndProc
##########################################################################
##
## This is a modification of the original WndProcHookMixin by Kevin Moore,
## modified to use ctypes only instead of pywin32, so it can be used
## with no additional dependencies in Python 2.5
##
##########################################################################
import sys
import ctypes
#import GUID
from ctypes import c_long, c_int, wintypes
import wx
GWL_WNDPROC = -4
WM_DESTROY = 2
DBT_DEVTYP_DEVICEINTERFACE = 0x00000005 # device interface class
DBT_DEVICEREMOVECOMPLETE = 0x8004 # device is gone
DBT_DEVICEARRIVAL = 0x8000 # system detected a new device
WM_DEVICECHANGE = 0x0219
class GUID(ctypes.Structure):
_pack_ = 1
_fields_ = [("Data1", ctypes.c_ulong),
("Data2", ctypes.c_ushort),
("Data3", ctypes.c_ushort),
("Data4", ctypes.c_ubyte * 8)]
## It's probably not neccesary to make this distinction, but it never hurts to be safe
if 'unicode' in wx.PlatformInfo:
SetWindowLong = ctypes.windll.user32.SetWindowLongW
CallWindowProc = ctypes.windll.user32.CallWindowProcW
else:
SetWindowLong = ctypes.windll.user32.SetWindowLongA
CallWindowProc = ctypes.windll.user32.CallWindowProcA
## Create a type that will be used to cast a python callable to a c callback function
## first arg is return type, the rest are the arguments
#WndProcType = ctypes.WINFUNCTYPE(c_int, wintypes.HWND, wintypes.UINT, wintypes.WPARAM, wintypes.LPARAM)
WndProcType = ctypes.WINFUNCTYPE(ctypes.c_long, ctypes.c_int, ctypes.c_uint, ctypes.c_int, ctypes.c_int)
if 'unicode' in wx.PlatformInfo:
RegisterDeviceNotification = ctypes.windll.user32.RegisterDeviceNotificationW
else:
RegisterDeviceNotification = ctypes.windll.user32.RegisterDeviceNotificationA
RegisterDeviceNotification.restype = wintypes.HANDLE
RegisterDeviceNotification.argtypes = [wintypes.HANDLE, wintypes.c_void_p, wintypes.DWORD]
UnregisterDeviceNotification = ctypes.windll.user32.UnregisterDeviceNotification
UnregisterDeviceNotification.restype = wintypes.BOOL
UnregisterDeviceNotification.argtypes = [wintypes.HANDLE]
class DEV_BROADCAST_DEVICEINTERFACE(ctypes.Structure):
_fields_ = [("dbcc_size", ctypes.c_ulong),
("dbcc_devicetype", ctypes.c_ulong),
("dbcc_reserved", ctypes.c_ulong),
("dbcc_classguid", GUID),
("dbcc_name", ctypes.c_wchar * 256)]
class DEV_BROADCAST_HDR(ctypes.Structure):
_fields_ = [("dbch_size", wintypes.DWORD),
("dbch_devicetype", wintypes.DWORD),
("dbch_reserved", wintypes.DWORD)]
GUID_DEVCLASS_PORTS = GUID(0x4D36E978, 0xE325, 0x11CE,
(ctypes.c_ubyte*8)(0xBF, 0xC1, 0x08, 0x00, 0x2B, 0xE1, 0x03, 0x18))
GUID_DEVINTERFACE_USB_DEVICE = GUID(0xA5DCBF10L, 0x6530,0x11D2,
(ctypes.c_ubyte*8)(0x90, 0x1F, 0x00,0xC0, 0x4F, 0xB9, 0x51, 0xED))
class WndProcHookMixin:
"""
This class can be mixed in with any wxWindows window class in order to hook it's WndProc function.
You supply a set of message handler functions with the function addMsgHandler. When the window receives that
message, the specified handler function is invoked. If the handler explicitly returns False then the standard
WindowProc will not be invoked with the message. You can really screw things up this way, so be careful.
This is not the correct way to deal with standard windows messages in wxPython (i.e. button click, paint, etc)
use the standard wxWindows method of binding events for that. This is really for capturing custom windows messages
or windows messages that are outside of the wxWindows world.
"""
def __init__(self):
self.__msgDict = {}
## We need to maintain a reference to the WndProcType wrapper
## because ctypes doesn't
self.__localWndProcWrapped = None
self.rtnHandles = []
def hookWndProc(self):
self.__localWndProcWrapped = WndProcType(self.localWndProc)
self.__oldWndProc = SetWindowLong(self.GetHandle(), GWL_WNDPROC, self.__localWndProcWrapped)
def unhookWndProc(self):
SetWindowLong(self.GetHandle(), GWL_WNDPROC, self.__oldWndProc)
## Allow the ctypes wrapper to be garbage collected
self.__localWndProcWrapped = None
def addMsgHandler(self,messageNumber,handler):
self.__msgDict[messageNumber] = handler
def localWndProc(self, hWnd, msg, wParam, lParam):
# call the handler if one exists
# performance note: "in" is the fastest way to check for a key
# when the key is unlikely to be found
# (which is the case here, since most messages will not have handlers).
# This is called via a ctypes shim for every single windows message
# so dispatch speed is important
if msg in self.__msgDict:
# if the handler returns false, we terminate the message here
# Note that we don't pass the hwnd or the message along
# Handlers should be really, really careful about returning false here
if self.__msgDict[msg](wParam,lParam) == False:
return
# Restore the old WndProc on Destroy.
if msg == WM_DESTROY: self.unhookWndProc()
return CallWindowProc(self.__oldWndProc, hWnd, msg, wParam, lParam)
def registerDeviceNotification(self, guid, devicetype=DBT_DEVTYP_DEVICEINTERFACE):
devIF = DEV_BROADCAST_DEVICEINTERFACE()
devIF.dbcc_size = ctypes.sizeof(DEV_BROADCAST_DEVICEINTERFACE)
devIF.dbcc_devicetype = DBT_DEVTYP_DEVICEINTERFACE
if guid:
devIF.dbcc_classguid = GUID.GUID(guid)
return RegisterDeviceNotification(self.GetHandle(), ctypes.byref(devIF), 0)
def unregisterDeviceNotification(self, handle):
if UnregisterDeviceNotification(handle) == 0:
raise Exception("Unable to unregister device notification messages")
# a simple example
if __name__ == "__main__":
class MyFrame(wx.Frame,WndProcHookMixin):
def __init__(self,parent):
WndProcHookMixin.__init__(self)
wx.Frame.__init__(self,parent,-1,"Insert and Remove USE Device and Watch STDOUT",size=(640,480))
self.Bind(wx.EVT_CLOSE, self.onClose)
#Change the following guid to the GUID of the device you want notifications for
#self.devNotifyHandle = self.registerDeviceNotification(guid="{3c5e1462-5695-4e18-876b-f3f3d08aaf18}")
dbh = DEV_BROADCAST_DEVICEINTERFACE()
dbh.dbcc_size = ctypes.sizeof(DEV_BROADCAST_DEVICEINTERFACE)
dbh.dbcc_devicetype = DBT_DEVTYP_DEVICEINTERFACE
dbh.dbcc_classguid = GUID_DEVINTERFACE_USB_DEVICE
self.devNotifyHandle = RegisterDeviceNotification(self.GetHandle(), ctypes.byref(dbh), 0)
self.addMsgHandler(WM_DEVICECHANGE, self.onDeviceChange)
self.hookWndProc()
def onDeviceChange(self,wParam,lParam):
print "WM_DEVICECHANGE [WPARAM:%i][LPARAM:%i]"%(wParam,lParam)
if wParam == DBT_DEVICEARRIVAL:
print "Device Arrival"
elif wParam == DBT_DEVICEREMOVECOMPLETE:
print "Device Remvoed"
if lParam:
dbh = DEV_BROADCAST_HDR.from_address(lParam)
if dbh.dbch_devicetype == DBT_DEVTYP_DEVICEINTERFACE:
dbd = DEV_BROADCAST_DEVICEINTERFACE.from_address(lParam)
#Verify that the USB VID and PID match our assigned VID and PID
if 'Vid_10c4&Pid_8382' in dbd.dbcc_name:
print "Was Our USB Device"
return True
def onClose(self, event):
self.unregisterDeviceNotification(self.devNotifyHandle)
event.Skip()
app = wx.App(False)
frame = MyFrame(None)
frame.Show()
app.MainLoop()
Above is taken from https://github.com/weiwei22844/UsefullPython/blob/fa8603b92cb0b3f6ce00c876f24138211f47e906/HookUsbMsg.py
Now coming back to your code
import wmi
device_connected_wql = "SELECT * FROM __InstanceCreationEvent WITHIN 2 WHERE TargetInstance ISA \'Win32_Keyboard\'"
device_disconnected_wql = "SELECT * FROM __InstanceDeletionEvent WITHIN 2 WHERE TargetInstance ISA \'Win32_Keyboard\'"
c = wmi.WMI()
connected_watcher = c.watch_for(raw_wql=device_connected_wql)
disconnected_watcher = c.watch_for(raw_wql=device_disconnected_wql)
while 1:
connected = connected_watcher()
disconnected = disconnected_watcher()
if connected:
print("Keyboard connected")
if disconnected:
print("Keyboard disconnected")
The way you call is connected_watcher
and disconnected_watcher
in series, both are blocking call. So when you call it first connected
becomes true and then the disconnected_watcher
gets called which blocked until a device is disconnected. That is why when you disconnected, you see both the messages together.
The way you can fix it is by making sure you time out these queries
while 1:
try:
connected = connected_watcher(timeout_ms=10)
except wmi.x_wmi_timed_out:
pass
else:
if connected:
print("Keyboard connected")
try:
disconnected = disconnected_watcher(timeout_ms=10)
except wmi.x_wmi_timed_out:
pass
else:
if disconnected:
print("Keyboard disconnected")
Another approach is to use threads. But to make your code thread compatible you need to make it like below
class VolumeRemovalWatcher:
def __init__(self, callback=None):
self.stop_wanted=False
self.callback=callback
def stop(self):
self.stop_wanted = True
def watch_for_events(self):
if not threading.current_thread() is threading.main_thread():
pythoncom.CoInitialize()
try:
w = WMI()
watcher = w.Win32_VolumeChangeEvent.watch_for(EventType=3)
while not self.stop_wanted:
try:
event = watcher(timeout_ms=1000)
except x_wmi_timed_out:
pass
else:
print(event.DriveName)
if self.callback is not None:
self.callback(event.DriveName)
except Exception as e:
print(e)
return None
finally:
if not threading.current_thread() is threading.main_thread():
pythoncom.CoUninitialize()
Credits for above code to https://github.com/utytlanyjoe/pyWmiHandler/blob/89b934301990a4a955ec13db21caaf81d9a94f63/wmi_wrapper.py
Detect inserted USB on Windows
Yes, you need to use the RegisterDeviceNotification
Windows API call. As far as I know, there is no Python module that wraps this functionality, so you have to use ctypes
to call this function.
Fortunately, you are not the first person who has wanted to do this, so there are some code samples floating around the web. WxPython provides a code sample, but as you are writing a daemon this may not interest you. You might want to try the following code sample, which relies on both ctypes
and pywin32
, lifted shameless from Tim Golden:
import win32serviceutil
import win32service
import win32event
import servicemanager
import win32gui
import win32gui_struct
struct = win32gui_struct.struct
pywintypes = win32gui_struct.pywintypes
import win32con
GUID_DEVINTERFACE_USB_DEVICE = "{A5DCBF10-6530-11D2-901F-00C04FB951ED}"
DBT_DEVICEARRIVAL = 0x8000
DBT_DEVICEREMOVECOMPLETE = 0x8004
import ctypes
#
# Cut-down clone of UnpackDEV_BROADCAST from win32gui_struct, to be
# used for monkey-patching said module with correct handling
# of the "name" param of DBT_DEVTYPE_DEVICEINTERFACE
#
def _UnpackDEV_BROADCAST (lparam):
if lparam == 0: return None
hdr_format = "iii"
hdr_size = struct.calcsize (hdr_format)
hdr_buf = win32gui.PyGetMemory (lparam, hdr_size)
size, devtype, reserved = struct.unpack ("iii", hdr_buf)
# Due to x64 alignment issues, we need to use the full format string over
# the entire buffer. ie, on x64:
# calcsize('iiiP') != calcsize('iii')+calcsize('P')
buf = win32gui.PyGetMemory (lparam, size)
extra = {}
if devtype == win32con.DBT_DEVTYP_DEVICEINTERFACE:
fmt = hdr_format + "16s"
_, _, _, guid_bytes = struct.unpack (fmt, buf[:struct.calcsize(fmt)])
extra['classguid'] = pywintypes.IID (guid_bytes, True)
extra['name'] = ctypes.wstring_at (lparam + struct.calcsize(fmt))
else:
raise NotImplementedError("unknown device type %d" % (devtype,))
return win32gui_struct.DEV_BROADCAST_INFO(devtype, **extra)
win32gui_struct.UnpackDEV_BROADCAST = _UnpackDEV_BROADCAST
class DeviceEventService (win32serviceutil.ServiceFramework):
_svc_name_ = "DevEventHandler"
_svc_display_name_ = "Device Event Handler"
_svc_description_ = "Handle device notification events"
def __init__(self, args):
win32serviceutil.ServiceFramework.__init__ (self, args)
self.hWaitStop = win32event.CreateEvent (None, 0, 0, None)
#
# Specify that we're interested in device interface
# events for USB devices
#
filter = win32gui_struct.PackDEV_BROADCAST_DEVICEINTERFACE (
GUID_DEVINTERFACE_USB_DEVICE
)
self.hDevNotify = win32gui.RegisterDeviceNotification (
self.ssh, # copy of the service status handle
filter,
win32con.DEVICE_NOTIFY_SERVICE_HANDLE
)
#
# Add to the list of controls already handled by the underlying
# ServiceFramework class. We're only interested in device events
#
def GetAcceptedControls(self):
rc = win32serviceutil.ServiceFramework.GetAcceptedControls (self)
rc |= win32service.SERVICE_CONTROL_DEVICEEVENT
return rc
#
# Handle non-standard service events (including our device broadcasts)
# by logging to the Application event log
#
def SvcOtherEx(self, control, event_type, data):
if control == win32service.SERVICE_CONTROL_DEVICEEVENT:
info = win32gui_struct.UnpackDEV_BROADCAST(data)
#
# This is the key bit here where you'll presumably
# do something other than log the event. Perhaps pulse
# a named event or write to a secure pipe etc. etc.
#
if event_type == DBT_DEVICEARRIVAL:
servicemanager.LogMsg (
servicemanager.EVENTLOG_INFORMATION_TYPE,
0xF000,
("Device %s arrived" % info.name, '')
)
elif event_type == DBT_DEVICEREMOVECOMPLETE:
servicemanager.LogMsg (
servicemanager.EVENTLOG_INFORMATION_TYPE,
0xF000,
("Device %s removed" % info.name, '')
)
#
# Standard stuff for stopping and running service; nothing
# specific to device notifications
#
def SvcStop(self):
self.ReportServiceStatus (win32service.SERVICE_STOP_PENDING)
win32event.SetEvent (self.hWaitStop)
def SvcDoRun(self):
win32event.WaitForSingleObject (self.hWaitStop, win32event.INFINITE)
servicemanager.LogMsg (
servicemanager.EVENTLOG_INFORMATION_TYPE,
servicemanager.PYS_SERVICE_STOPPED,
(self._svc_name_, '')
)
if __name__=='__main__':
win32serviceutil.HandleCommandLine (DeviceEventService)
Related Topics
Auto.Arima() Equivalent for Python
Converting Python Objects for Rpy2
When Using Os.Execlp, Why 'Python' Needs 'Python' as Argv[0]
Ipython Notebook on Linux Vm Running Matplotlib Interactive with Nbagg
Get All Modules/Packages Used by a Python Project
How to Install Python on Alpine Linux
How to Close a Socket Left Open by a Killed Program
Signal Handling in Multi-Threaded Python
How to "Watch" a File for Modification/Change
Python: Get Default Gateway for a Local Interface/Ip Address in Linux
Environment Variables When Script Run by Cron
How to Move .Conda from One Folder to Another at the Moment of Creating the Environment
How to Grab the Color of a Pixel on My Desktop? (Linux)
Changing the Process Name of a Python Script