How to Send a Signal from a Python Program

How to send signal using callback called from external script?

We need to add some additional state to the callback, namely a reference to the instance of the class whose member function we want to invoke.

To do this, we can use a class. To make the functionality equivalent to just using a simple static callback function, let's define operator() (i.e. make it a functor), and also expose this operator to Python.

Let's suppose we have the following application class:

class app
{
public:
explicit app(std::string name) : name_(std::move(name)) {}

int run();

void callback(double t_prog, double t_final);

private:
std::string name_;
};

In run() we execute our Python script, and we want it to call the member function callback of the current instance.

Let's define the following callback handler class:

class callback_handler
{
public:
explicit callback_handler(app& a) : app_(a) {}

void operator()(double t_prog, double t_final)
{
app_.callback(t_prog, t_final);
}

private:
app& app_;
};

We need to expose this class to Python, but don't want to be able to create new instances from Python, nor do we really want it to be copied (although it doesn't really matter here, since our state only consists of references).

BOOST_PYTHON_MODULE(cbtest)
{
bp::class_<callback_handler, boost::noncopyable>("callback_handler", bp::no_init)
.def("__call__", &callback_handler::operator())
;
};

At the start of our application, we need to make sure to initialize our module before we use it -- call initcbtest(); right after initializing the Python interpreter.

Now we can use our callback handler in the following manner (The Python code stays the same, since the object is callable):

    callback_handler cbh(*this);
bp::object result = MyFunc(1, 10, 2, boost::ref(cbh));
std::cout << "result = " << bp::extract<double>(result) << "\n";


Sample Code

#include <boost/noncopyable.hpp>
#include <boost/python.hpp>

#include <iostream>
// ============================================================================
namespace bp = boost::python;
// ============================================================================
class app
{
public:
explicit app(std::string name) : name_(std::move(name)) {}

int run();

void callback(double t_prog, double t_final);

private:
std::string name_;
};
// ============================================================================
class callback_handler
{
public:
explicit callback_handler(app& a) : app_(a) {}

void operator()(double t_prog, double t_final)
{
app_.callback(t_prog, t_final);
}

private:
app& app_;
};
// ----------------------------------------------------------------------------
BOOST_PYTHON_MODULE(cbtest)
{
bp::class_<callback_handler, boost::noncopyable>("callback_handler", bp::no_init)
.def("__call__", &callback_handler::operator())
;
};
// ============================================================================
void app::callback(double t_prog, double t_final)
{
std::cout << "CB(" << name_ << ") " << t_prog << " " << t_final << "\n";
}
// ----------------------------------------------------------------------------
int app::run()
{
Py_Initialize();
initcbtest();

try {
bp::object module = bp::import("__main__");
bp::object name_space = module.attr("__dict__");
bp::exec_file("MyModule.py", name_space, name_space);

bp::object MyFunc = name_space["MyFunc"];

callback_handler cbh(*this);
bp::object result = MyFunc(1, 10, 2, boost::ref(cbh));
std::cout << "result = " << bp::extract<double>(result) << "\n";
} catch (bp::error_already_set&) {
PyErr_Print();
}

Py_Finalize();

return 0;
}
// ============================================================================
int main()
{
app a("TestApp");

return a.run();
}
// ============================================================================


Python Script

File MyModule.py:

def MyFunc(a, b, c, callback):
result = 0
for i in range(a, b, c):
result += i
callback(i, b)
return result


Console Output

CB(TestApp) 0 10
CB(TestApp) 2 10
CB(TestApp) 4 10
CB(TestApp) 6 10
CB(TestApp) 8 10
result = 20

How to send SIGINT to python running in a subprocess

Maybe try:

#!/usr/local/cpython-3.8/bin/python3

import signal
import subprocess
import time

proc = subprocess.Popen(['python3', 'B.py'], shell=False)
time.sleep(1)
proc.send_signal(signal.SIGINT)
proc.wait()

...as your A.py?

I suspect the shell is ignoring SIGINT rather than passing it down to B.py.

How to make a python script stopable from another script?

In general, there are two main ways of doing this (as far as I can see). The first one would be to make your script check some condition that can be modified from outside (like the existence or the content of some file/socket). Or as @Green Cloak Guy stated, using pipes which is one form of interprocess communication.

The second one would be to use the built in mechanism for interprocess communication called signals that exists in every OS where python runs. When the user presses Ctrl+C the terminal sends a specific signal to the process in the foreground. But you can send the same (or another) signal programmatically (i.e. from another script).

Reading the answers to your other question I would say that what is missing to address this one is a way to send the appropriate signal to your already running process. Essentially this can be done by using the os.kill() function. Note that although the function is called 'kill' it can send any signal (not only SIGKILL).

In order for this to work you need to have the process id of the running process. A commonly used way of knowing this is making your script save its process id when it launches into a file stored in a common location. To get the current process id you can use the os.getpid() function.

So summarizing I'd say that the steps to achieve what you want would be:

  1. Modify your current script to store its process id (obtainable by using os.getpid()) into a file in a common location, for example /tmp/myscript.pid. Note that if you want your script to be protable you will need to address this in a way that works in non-unix like OSs like Windows.
  2. Choose one signal (typically SIGINT or SIGSTOP or SIGTERM) and modify your script to register a custom handler using signal.signal() that addresses the graceful termination of your script.
  3. Create another (note that it could be the same script with some command line paramater) script that reads the process id from the known file (aka /tmp/myscript.pid) and sends the chosen signal to that process using os.kill().

Note that an advantage of using signals to achieve this instead of an external way (files, pipes, etc.) is that the user can still press Ctrl+C (if you chose SIGINT) and that will produce the same behavior as the 'stop script' would.

Sending signal from Arduino serial to a Python program

First I think that it is not necessary to send strings for communication in serial, you can just send bytes.
The problem is when you use Serial.println("Something") you are sending Something\r\n so on the other device you should check it with Something\r\n, and as in the comments said, you can put a debug print to make sure your data is alive and the connection is OK.
You can also add acknowledge on your python code to respond to the command to make sure the data has been sent and don't send another one.
Arduino Code:

int pushButton=2;
int buttonState=0;

void setup()
{
serial.Begin(9600);
pinMode(pushButton, INPUT);
}

void loop()
{
int buttonState=digitalRead(pushButton);
if (buttonState==HIGH)
{
Serial.print('1');
delay(100);
}
if (buttonState==LOW)
{
//do nothing
}
}

Python code:

from subprocess import Popen
import serial

movie1 = "/home/pi/Videos/test.mp4"

ser = serial.Serial('/dev/ttyUSB0',9600)

while True:
command = ser.read()
if command:
# flush serial for unprocessed data
ser.flushInput()
print("new command:", command)
if str(command) == '1':
print("Playing movie")
Popen('killall omxplayer.bin')
Popen(['omxplayer','-b', movie1])
else:
print("Not a valid command")

How do I capture SIGINT in Python?

Register your handler with signal.signal like this:

#!/usr/bin/env python
import signal
import sys

def signal_handler(sig, frame):
print('You pressed Ctrl+C!')
sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
print('Press Ctrl+C')
signal.pause()

Code adapted from here.

More documentation on signal can be found here.
 

Directly send signals to USB using Python

Controlling components like LEDs from devices like Arduino and Raspberry Pi are done using GPIO pins. In contrast, USB is designed for data transfer and not for maintaining more constant high or low signals like GPIO. It also consumes different voltage and current levels than GPIO pins and could potentially damage components meant for GPIO.

However, you can get USB to GPIO adapters for this purpose (see here for an example discussion on these options).

In terms of Python, you can use packages such as PyUSB or the libusb Python wrapper to control/transfer data/communicate with USB devices (such as the USB to GPIO adapters). The companies providing the adapters might also have designed their own easy-to-use Python package to wrap around a lower-level driver (probably written in C). See this USB controlled LED device as an example with its own Python driver. The drivers are just software programs that take in relatively simple commands from the user for what they want a device to do. They then encapsulate the lower-level complexities required for following a protocol to communicate the user's intention to the USB device and controlling the USB port at the lowest possible software level.



Related Topics



Leave a reply



Submit