Pyudev - Calls Function Twice

Set keyboard layout using pyudev

You can filter keyboards by checking for the ID_INPUT_KEYBOARD property:

if action == 'add' and device['ID_INPUT_KEYBOARD'] == '1':
print('a keyboard was added')

Concerning the difference between calling setxkbmap directly and using the script, I'd guess that the X server needs time to initialize the keyboard, too. UDev invokes the callback as soon as the keyboard has finished udev processing, but this can easily before the X11 server configures and initializes the new keyboard.

Check the X.org logs, and any error message that setxkbmap might print.

Retrieve USB information using pyudev with device name

PyUSB would be a good address for detailed USB information indeed. But I use pyudev to monitor the insertion of USB removable devices. So I tried to do all I need with one library..
Here is a code that works but is ugly (as you can see I can extract all information I want using pyudev except the usb size/capacity):

import glib

from pyudev import Context, Monitor
import pyudev
import subprocess

def get_block_infos(dev_name):
dev = pyudev.Device.from_device_file(context, dev_name)

try:
objProc = subprocess.Popen('lsblk --nodeps %s | grep -v SIZE | awk \'{ print $4 }\'' % dev.get('DEVNAME'), shell=True, bufsize=0, executable="/bin/bash", stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except OSError, e:
print(e)

# stdOut.communicate() --> dimension [0]: stdout, dimenstion [1]: stderr
stdOut = objProc.communicate()

print('<BLOCK INFORMATION>')
print('Device name: %s' % dev.get('DEVNAME'))
print('Device type: %s' % dev.get('DEVTYPE'))
print('Bus system: %s' % dev.get('ID_BUS'))
print('Partition label: %s' % dev.get('ID_FS_LABEL'))
print('FS: %s' % dev.get('ID_FS_SYSTEM_ID'))
print('FS type: %s' % dev.get('ID_FS_TYPE'))
print('Device usage: %s' % dev.get('ID_FS_USAGE'))
print('Device model: %s' % dev.get('ID_MODEL'))
print('Partition type: %s' % dev.get('ID_PART_TABLE_TYPE'))
print('USB driver: %s' % dev.get('ID_USB_DRIVER'))
print('Path id: %s' % dev.get('ID_PATH'))
print('Capacity: %s' % stdOut[0].strip())
print('</BLOCK INFORMATION>')

def get_usb_infos(dev_path):
print('<USB INFORMATION>')

usb_removable_device = None

# deprecated and removed from dbus
# print(pyudev.Device.from_path(context, id_path))
# because I found no other documented way, I iterate
# over all devices and match my pci path..
for device in context.list_devices(subsystem='usb'):
usb_dev_path = device.get('DEVPATH')

if dev_path.startswith(usb_dev_path):
# this matches the usb hub,
# the usb controller and
# in third place the usb stick
# so lets watch out for the
# longest/most precise match

try:
if len(usb_dev_path) > len(usb_removable_device.get('DEVPATH')):
usb_removable_device = device
except AttributeError:
# ignore because in first loop
# usb_removable_device is None and
# there is no usb_removable_device.get() attrib
usb_removable_device = device

# get more information with usb_removable_device.items()
print('Vendor: %s' % usb_removable_device.get('ID_VENDOR_FROM_DATABASE'))
print('</USB INFORMATION>')

try:
from pyudev.glib import MonitorObserver

def device_event(observer, device):
get_block_infos(device.get('DEVNAME'))
get_usb_infos(device.get('DEVPATH'))
except:
from pyudev.glib import GUDevMonitorObserver as MonitorObserver

def device_event(observer, action, device):
get_block_infos(device.get('DEVNAME'))
get_usb_infos(device.get('DEVPATH'))

context = Context()
monitor = Monitor.from_netlink(context)

monitor.filter_by(subsystem='block')
observer = MonitorObserver(monitor)

observer.connect('device-event', device_event)
monitor.start()

glib.MainLoop().run()

An example output of this script would be:

<BLOCK INFORMATION>
Device name: /dev/sdb
Device type: disk
Bus system: usb
Partition label: CentOS-6.5-x86_64-LiveCD
FS: LINUX
FS type: iso9660
Device usage: filesystem
Device model: Patriot_Memory
Partition type: dos
USB driver: usb-storage
Path id: pci-0000:00:1d.0-usb-0:1.2:1.0-scsi-0:0:0:0
Capacity: 14.5G
</BLOCK INFORMATION>
<USB INFORMATION>
Vendor: Kingston Technology Company Inc.
</USB INFORMATION>
<BLOCK INFORMATION>
Device name: /dev/sdb1
Device type: partition
Bus system: usb
Partition label: CentOS-6.5-x86_64-LiveCD
FS: LINUX
FS type: ext4
Device usage: filesystem
Device model: Patriot_Memory
Partition type: dos
USB driver: usb-storage
Path id: pci-0000:00:1d.0-usb-0:1.2:1.0-scsi-0:0:0:0
Capacity: 14.4G
</BLOCK INFORMATION>
<USB INFORMATION>
Vendor: Kingston Technology Company Inc.
</USB INFORMATION>

Finding only disk drives using pyudev

Since udev only runs on Linux kernel (at least as of now), you could filter out by MAJOR number 8 which represents all SCSI/SATA disk driver based devices.

for device in context.list_devices(MAJOR='8'):
if (device.device_type == 'disk'):
print "{}, ({})".format(device.device_node, device.device_type)

On my system, your code outputs the following:

/dev/sda, (disk)
/dev/sdf, (disk)
/dev/sdb, (disk)
/dev/sdc, (disk)
/dev/sdd, (disk)
/dev/sde, (disk)
/dev/sr0, (disk)
/dev/loop0, (disk)
/dev/loop1, (disk)
/dev/loop2, (disk)
/dev/loop3, (disk)
/dev/loop4, (disk)
/dev/loop5, (disk)
/dev/loop6, (disk)
/dev/loop7, (disk)

After filtering by major number 8, I see the following output:

/dev/sda, (disk)
/dev/sdf, (disk)
/dev/sdb, (disk)
/dev/sdc, (disk)
/dev/sdd, (disk)
/dev/sde, (disk)

Note that you would also get USB hard drives and USB sticks in the list, since they also tend to use the same SCSI disk driver.

I'm not really sure if IDE hard drives are mapped as sdX or hdX with the latest 2.6 or 3.x kernels. Do not have an IDE hard drive to verify and long since I have had one. :D

UPDATE: The same device number page lists /dev/hdX to be the one used by IDE hard drives (and IDE cdroms too may be ?). If you want to filter these as well, I believe you could do something like this:

for device in context.list_devices(DEVTYPE='disk'):
major = device['MAJOR']
if major == '8' or major == '3':
print "{}, ({})".format(device.device_node, device.device_type)

Notify QML for 'usb device inserted' events using PyQt5 and pyudev

The best thing to do is to modify the GUI in QML, for which the Monitor and Device object must be accessible from QML. Only QObjects receive notifications so I will create 2 classes that wrap with a light layer to both classes using q-properties and slots.

pyqtudev.py

from PyQt5 import QtCore
from pyudev import Context, Monitor, Device
from pyudev.pyqt5 import MonitorObserver

class QtUdevDevice(QtCore.QObject):
def __init__(self, parent=None):
super(QtUdevDevice, self).__init__(parent)
self.m_dev = None

def initialize(self, dev):
self.m_dev = dev

@QtCore.pyqtSlot(result=bool)
def isValid(self):
return self.m_dev is not None

@QtCore.pyqtProperty(str, constant=True)
def devType(self):
if not self.isValid():
return ""
if self.m_dev.device_type is None:
return ""
return self.m_dev.device_type

@QtCore.pyqtProperty(str, constant=True)
def subsystem(self):
if not self.isValid():
return ""
return self.m_dev.subsystem

@QtCore.pyqtProperty(str, constant=True)
def name(self):
if not self.isValid():
return ""
return self.m_dev.sys_name

@QtCore.pyqtProperty(str, constant=True)
def driver(self):
if not self.isValid():
return ""
if self.m_dev.driver is None:
return ""
return self.m_dev.driver

@QtCore.pyqtProperty(str, constant=True)
def deviceNode(self):
if not self.isValid():
return ""
if self.m_dev.device_node is None:
return ""
return self.m_dev.device_node

@QtCore.pyqtProperty(list, constant=True)
def alternateDeviceSymlinks(self):
return list(self.m_dev.device_links)

@QtCore.pyqtProperty(str, constant=True)
def sysfsPath(self):
if not self.isValid():
return ""
return self.m_dev.sys_path

@QtCore.pyqtProperty(int, constant=True)
def sysfsNumber(self):
if not self.isValid():
return -1
if self.m_dev.sys_number is None:
return -1
return int(self.m_dev.sys_number)

@QtCore.pyqtSlot(str, result=str)
def property(self, name):
if not self.isValid():
return ""
v = self.m_dev.properties.get(name)
return v if v is not None else ""

@QtCore.pyqtSlot(str, result=bool)
def hasProperty(self, name):
if not self.isValid():
return False
return self.m_dev.properties.get(name) is not None

@QtCore.pyqtProperty(list, constant=True)
def deviceProperties(self):
if not self.isValid():
return []
return list(self.m_dev.properties)

@QtCore.pyqtProperty(list, constant=True)
def sysfsProperties(self):
if not self.isValid():
return []
return list(self.m_dev.attributes.available_attributes)

@QtCore.pyqtProperty(QtCore.QObject, constant=True)
def parentDevice(self):
if not self.isValid:
return
if self.m_dev.parent:
parent_device = QtUdevDevice()
parent_device.initialize(self.m_dev.parent)
return parent_device

@QtCore.pyqtProperty(str, constant=True)
def action(self):
if not self.isValid():
return ""
if self.m_dev.action is None:
return ""
return self.m_dev.action

def __repr__(self):
if self.isValid():
return "UdevDevice({})".format(self.sysfsPath())
return "Invalid UdevDevice"

class QtMonitorObserver(QtCore.QObject):
deviceEvent = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
deviceAdded = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
deviceRemoved = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
deviceChanged = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
deviceOnlined = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
deviceOfflined = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])

def __init__(self, parent=None):
super(QtMonitorObserver, self).__init__(parent)
context = Context()
self._monitor = Monitor.from_netlink(context)
self._observer = MonitorObserver(self._monitor, self)
self._observer.deviceEvent.connect(self.setup_new_signals)

@QtCore.pyqtSlot()
def start(self):
self._monitor.start()

@QtCore.pyqtSlot(str)
def filter_by(self, filter):
self._monitor.filter_by(subsystem=filter)

@QtCore.pyqtSlot(str)
def filter_by_tag(self, tag):
self._monitor.filter_by_tag(tag)

@QtCore.pyqtSlot()
def remove_filter(self):
self._monitor.remove_filter()

@QtCore.pyqtSlot(Device)
def setup_new_signals(self, device):
new_signals_map = {
'add': self.deviceAdded,
'remove': self.deviceRemoved,
'change': self.deviceChanged,
'online': self.deviceOnlined,
'offline': self.deviceOfflined,
}
signal = new_signals_map.get(device.action)
qtdevice = QtUdevDevice()
qtdevice.initialize(device)
if signal:
signal.emit(qtdevice)
self.deviceEvent.emit(qtdevice)

main.py

import os
import sys
from PyQt5 import QtCore, QtGui, QtQml

from pyqtudev import QtMonitorObserver

def run():
app = QtGui.QGuiApplication(sys.argv)
engine = QtQml.QQmlApplicationEngine()
observer = QtMonitorObserver()
engine.rootContext().setContextProperty("observer", observer)
directory = os.path.dirname(os.path.abspath(__file__))
engine.load(QtCore.QUrl.fromLocalFile(os.path.join(directory, 'main.qml')))
if not engine.rootObjects():
return -1
return app.exec_()

if __name__ == "__main__":
sys.exit(run())

main.qml

import QtQuick 2.11
import QtQuick.Window 2.2
import QtQuick.Controls 2.2

ApplicationWindow {
visible: true
width: Screen.width/2
height: Screen.height/2
Connections {
target: observer
onDeviceEvent: {
console.log(device, device.name, device.action, device.parentDevice)
if(device.hasProperty("ID_VENDOR_ID")){
console.log(device.property("ID_VENDOR_ID"))
}
}
}
Component.onCompleted: {
observer.start()
observer.filter_by("usb")
}
}


Related Topics



Leave a reply



Submit