Taskset - Python

taskset - python

compared to the commandline you posted, you're missing a couple spaces ... e.g.:

cmd = "taskset -c " +  str(mapping[r]) + " python <path>/run-apps.py " + thr[r] + " &"

In your code, when parsing the "commandline", taskset is seeing the string -c2 which according to many commandline parsing libraries is the same thing as -c -2 which would explain the error that you're seeing.

Sometimes these things are easier to read if you use string interpolation instead:

cmd = "taskset -c %s python <path>/run-apps.py %s &" % (mapping[r],thr[r])

Or new style .format:

cmd = "taskset -c {0} python <path>/run-apps.py {1} &".format(mapping[r],thr[r])

And finally, no solution using os.system should go by without at least a mention of the python subprocess module.

process = subprocess.Popen(['taskset',
'-c',
str(mapping[r]),
'python',
'<path>/run-apps.py',
str(thr[r]) ] )

It'll avoid the shell altogether which is slightly more efficient and makes you safer from shell injection types of attacks.

how to run python program on specific core?

this is linux question instead of python question

try taskset

taskset -c 1 python file1.py --in 10
taskset -c 2 python file1.py --in 20
taskset -c 3 python file1.py --in 130

Can you set a task ratio for TaskSet(s) in Locust Python

TaskSets can also be weighted with the same @task decorator as individual tasks, as explained in the TaskSet docs, and you can weight them that way. Alternatively, the docs also explain that the tasks attribute can be defined as a dictionary instead of a list and you can give a weight with them, even for TaskSets. Applied to your sample code, it would look like this:

class User(HttpUser):
wait_time = between(n,m)
host = 'https://example.com'
tasks = {UserBehavior1: 2, UserBehavior2: 5}

This should give your TaskSets a weighting of 2:5 each time a user is spawned. The individual tasks in the TaskSets then have their own weighting.

How to get the number of available cores in python

According to the docs, this may be of limited availability, but it seems the os library has what you want;

Interface to the scheduler

These functions control how a process is allocated CPU time by the operating system. They are only available on some Unix platforms. For more detailed information, consult your Unix manpages.

New in version 3.3.

...

os.sched_getaffinity(pid)
Return the set of CPUs the process with PID pid (or the current process if zero) is restricted to.

tested using Ubuntu 20.04.2 (WSL):

aaron@DESKTOP:/mnt/c$ python3 -c "import os; print(os.sched_getaffinity(0))"
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}
aaron@DESKTOP:/mnt/c$ taskset 7 python3 -c "import os; print(os.sched_getaffinity(0))"
{0, 1, 2}

Edit: on windows you'll probably have to look to psutil

Assign Locust task sets programmatically

Create a single task and put the logic in that task for what you want to have it run.

class LoadTestUser(HttpUser):

def something1(self):
print("Hello!")

def something2(self):
print("World!")

@task
def task_logic(self):
if self.host == "hello":
self.something1()
else:
self.something2()

However, you can just address the error you're getting directly. You need to have a task defined in the class even if you intend to overwrite or change the tasks with your TaskSets. There's an example in the documentation but just add a task with pass so it doesn't do anything, then your overrides should work.

class LoadTestUser(HttpUser):

def on_start(self):
host_config = self.host
if host_config == "hello":
self.tasks = {RandomTask1:1}
else:
self.tasks = {RandomTask2:1}

@task
def nothing(self):
pass

EDIT:
This should work but looks like there could be a bug in the current version of Locust where it only accepts a dictionary for tasks when Locust first starts and then only accepts a list afterward. Until it's fixed, the example in the other answer works.

Locust - How do I define multiple task sets for the same user?

I think I got it. To solve the problem I had to add a method at the end of each taskset to stop the execution of the task set:

  def stop(self):
self.interrupt()

In addition to that, I had to change the inherited class to SequentialTaskSet so all tasks get executed in order.

This is the full code:

class Task1(SequentialTaskSet):
@task
def task1_method(self):
pass
@task
def stop(self):
self.interrupt()

class Task2(SequentialTaskSet):
@task
def task2_method(self):
pass
@task
def stop(self):
self.interrupt()

class UserBehaviour(SequentialTaskSet):
tasks = [Task1, Task2]

class LoggedInUser(HttpUser):
host = "http://localhost"
wait_time = between(1, 5)
tasks = [UserBehaviour]

Everything seems to be working fine now.



Related Topics



Leave a reply



Submit