A PHP/Pthreads Thread Class Can't Use Array

PHP Pthreads not able to pass values to child objects within the threads

Finally I found the answer. But I am not quite sure if its a correct behaviour. Your suggestions are welcome...

What I was doing is creating a class variable $this->e and assigning it the child process object $this->e = new $object and then running the init() method. The init method returns a few queries back as class member variable i.e. $this->e->queries will have those queries assigned to it. The fix I found is that I should use it as a separate variable within the 'run' method of the Thread Object. Not as a class variable and Once I have the queries I can assign it to the class variable directly and call it after doing $thread->join() like $thread->queries and it works fine.

In short - The class variables of the class extending to Thread does not support Objects within the run method.

For more details below is the sample code. I have created two files. 1. Objects.php containing all objects and another one Processor.php. See below.

Objects.php contains follow code.

<?php

class Controller {

public $queries = [];

public function init() {

}

public function set($key, $value) {
$this->{$key} = $value;
return $this;
}

function __destruct() {
global $scriptStartTime;
pr("Time taken for " . get_called_class() . " to execute = " .. executionTime($scriptStartTime, true));
}
}

class SampleObject extends Controller {

public function init() {
parent::init();
sleep(rand(0, 15));
return $this->queries[] = 'INSERT INTO my_table (something) VALUES ("' . get_called_class() . '")';
}
}

function pr($array) {
echo PHP_EOL;
print_r($array);
echo PHP_EOL;
}

function executionTime($startTime, $text = false) {
$time = @number_format(microtime(true) - $startTime, 3);

$txt = 'seconds';

if ($time > 60) {
$time = @number_format($time / 60, 3);
$txt = 'minutes';
}

if ($text) {
$time = "{$time} {$txt}";
}

return $time;
}

Processor.php contains below code

ini_set('display_errors', 1);

require __DIR__ . DIRECTORY_SEPARATOR . 'Objects.php';

################ Processor #################

class Processor extends \Thread {

public function __construct($process) {
$this->process = $process;
}

public function run() {
//\Core\Autoloader::reload(); // reloading all autoloaders
require __DIR__ . DIRECTORY_SEPARATOR . 'Objects.php';
$scriptStartTime = microtime(true);

# Dynamically creating objects for testing purpose.
if (!class_exists($this->process['className'])) {
eval("class " . $this->process['className'] . " extends SampleObject {}");
}

$object = (new $this->process['className']);

# Set the default values that are common across all elements
$object
# Identity of thread
->set('name', $this->process['className'])
# Options to carry the assigned values
->set('options', $this->process['options'])
# The project details
->set('project', $this->project)
;

$object->init();

$this->queries = ($object->queries);
}
}

$scriptStartTime = microtime(true);

for ($i = 0; $i < 150; $i++) {
$jobs[] = [
'className' => 'Object_' . $i,
'options' => []
];
}

$totalJobsToExecute = count($jobs);

$i = 0;

# Initalizing threads

$threads = [];

$project = [
'id' => 12345,
'title' => 'Some cool stuff'
];

foreach ($jobs AS $process) {
$i++;

$proc = $process['className'];
$threads[$proc] = new Processor($process);
// In this sample code it works without PTHREADS_INHERIT_NONE, but with my code it doesn't
if ($threads[$proc]->start(PTHREADS_INHERIT_NONE)) {
pr('Thread "' . $process['className'] . '" started');
}
}

pr("Threads | Starting time = " . executionTime($scriptStartTime, true));

$queries = [];

foreach ($threads AS $thread) {
if ($thread->join()) {
$queries[] = $thread->queries;
}
}

pr($queries);

pr('Count of threads === ' . count($threads));
pr("Threads time = " . executionTime($scriptStartTime, true));
pr("Threads | Total Threads executed = ({$i}) out of (" . $totalJobsToExecute . ")");

How to use pthreads in order to perform simultaneous operations on an array with constraint?

You have made “private” mutexes and condition variables for each thread, so they are not synchronizing in any (meaningful) way. Rather than this:

pthread_mutex_t mutex = globalMutex;
pthread_cond_t condition = globalCond;

Just use the globalMutex, and globalCond -- that is what you actually want.

[
I moved this in here, because I think we are supposed to. I can't intuit SO-iquette.
]

By the way, just to make sure I understand this, the mutex is per
cell, so that multiple threads can work on multiple cells
simultaneously, right? Just not two threads on the same cell. –

So, what you probably want is something more like:

typedef struct myStruct {
int cellId;
pthread_mutex_t lock;
pthread_cond_t wait;
} myStruct;

and in InitMyStruct():

myStructs[i]->cellId = i % THREADS_NUM;
pthread_mutex_init(&myStructs[i]->lock, NULL);
pthread_cond_init(&myStructs[i]->wait, NULL);

and in Halvers:

pthread_mutex_lock(&myStr->lock);
cells[id] /= 2;
pthread_cond_broadcast(&myStr->wait);
pthread_mutex_unlock(&myStr->lock);

and Doubler:
...

   pthread_mutex_lock(&myStr->lock);
while((cells[id] * 2) > MAX) {
printf("Waiting... id = %d\n", id);
pthread_cond_wait(&myStr->wait, &myStr->lock);
}
cells[id] *= 2;
printf("new val = %d, id = %d\n", cells[id], id);
pthread_mutex_unlock(&myStr->lock);

So currently, only one thread can make changes to the array at a time?
But then the program exits after about a second, if threads couldn't
be making changes to the array simultaneously then wouldn't the
program take 10 seconds to finish, because each HalverThread sleeps
for 1 second. – Yos 6 hours

The Halvers sleep before grabbing the mutex, thus all sleep near simultaneously, wake up, fight for mutex and continue.

Php - Pthreads/Speedup and data save

I found the answer to how to access the array data from this post below under the php 7 section:
A PHP/pthreads Thread class can't use array?

In short, in php 7 arrays have immutability by default so you can either extend volatile or cast the array object to a normal array. I chose to simply cast all array objects. UPDATED CODE is below:

class WorkerThreads extends Thread

{
private $workerId;
public $lines_ophold;

public function __construct($id)
{
$this->workerId = $id;

}
public function run()
{

for($t=0; $t<15; $t++)
{

$lines_ophold="";

for($x=0; $x<30; $x++)
{
$lines_ophold .=file_get_contents("/folder/".$this->workerId."/pos3.txt")."\r\n";

}

$lines_op=array_keys(array_flip(explode("\r\n",$lines_ophold)));
$this->result=(array) $lines_op; //CAST THE ARRAY object

$linesmaster1[$t]=$this->result;

unset($lines_op);

}

print_r($linesmaster1);
}
}

// Worker pool
$workers = [];
// Initialize and start the threads

foreach (range(0, 8) as $i) {

$workers[$i] = new WorkerThreads($i);

$workers[$i]->start();
}

// Let the threads come back
foreach (range(0, 8) as $i) {

$workers[$i]->join();

}

PHP - Multithread a function with pthreads

The basic problem is that arrays are not thread safe, pthreads provides array-like interfaces on all Threaded objects; This means you can use Threaded objects in place of arrays in a multi-threaded context.

<?php

function demanding(...$params) {
/* you have parameters here */
return array(rand(), rand());
}

class Task extends Collectable {
public function __construct(Threaded $result, $params) {
$this->result = $result;
$this->params = $params;
}

public function run() {
$this->result[] =
demanding(...$this->params);
}

protected $result;
protected $params;
}

$pool = new Pool(16);

$result = new Threaded();

while (@$i++<16) {
$pool->submit(
new Task($result, $argv));
}

$pool->shutdown();

var_dump($result);
?>

There isn't a built-in way to do a multi-threaded sort, so the simplest thing to do is sort your result when the threads are finished.

Distributed Computing with PThread Not Working

This is prefaced by my top comments.

I had to refactor your code a bit.

I also had to modify some of the provided functions to allow debug printf

Based on your desired result of 55, instead of strtok, you can/should use strstr. That's the only way I got the count to be correct.

I added a mutex so updates to total would not be trashed by thread collisions.

The key is the rewrite of allowedOnThread using a struct as I mentioned. Calculate a starting and ending offset for each segment for each thread, adjusting for before and after whitespace so that words are not chopped in the middle.


Hear is the refactored code. It is annotated. It allows each thread to calculate its range individually.

The code comes up with the correct answer but stopping on the end for each segment seems to be correct, but I might double check that.

#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdarg.h>
#include <stdatomic.h>

#define MAX 10240
#define NUM_THREADS 10

int n1, n2;
char *s1, *s2;
FILE *fp;
int countArray[NUM_THREADS] = { 0 };

pthread_mutex_t mutex;

int total = 0;

const char *delims = ",. \t\n";

struct range {
size_t off; // starting offset
size_t end; // ending offset (one past last char)
};

__thread int curtid;
__thread FILE *logxf;
#ifdef DEBUG
#define dbgprt(_fmt...) \
_dbgprt(_fmt)
#else
#define dbgprt(_fmt...) \
do { \
} while (0)
#endif

#define prt(_lvl) \
__attribute__((__format__(__printf__,_lvl,_lvl + 1)))

void prt(1)
_dbgprt(const char *fmt,...)
{
char *bp;
char buf[1000];
va_list ap;

if (logxf == NULL) {
char logf[100];
sprintf(logf,"log%2.2d",curtid);
logxf = fopen(logf,"w");
setlinebuf(logxf);
}

fprintf(logxf,"[%d] ",curtid);

va_start(ap,fmt);
bp += vfprintf(logxf,fmt,ap);
va_end(ap);
}

//read input file and generate string s1/s2 and length n1/n2
int
readf(FILE * fp)
{
if ((fp = fopen("strings.txt", "r")) == NULL) {
printf("ERROR: can't open string.txt!\n");
return 0;
}
s1 = (char *) malloc(sizeof(char) * MAX);
if (s1 == NULL) {
printf("ERROR: Out of memory!\n");
return -1;
}
s2 = (char *) malloc(sizeof(char) * MAX);
if (s1 == NULL) {
printf("ERROR: Out of memory\n");
return -1;
}
// read s1 s2 from the file

s1 = fgets(s1, MAX, fp);
s2 = fgets(s2, MAX, fp);
// length of s1
n1 = strlen(s1);
// length of s2
n2 = strlen(s2) - 1;

// when error exit
if (s1 == NULL || s2 == NULL || n1 < n2)
return -1;
return 0;
}

size_t
skip_to_delim(size_t off,const char *tag)
{
char *str;

dbgprt("skip_to_delim: ENTER off=%zu tag=%s\n",off,tag);

str = &s1[off];
off += strcspn(str,delims);

dbgprt("skip_to_delim: EXIT off=%zu\n",off);

return off;
}

int
allowedOnThread(int thread, struct range *seg)
{
int threadMultiplier = n1 / NUM_THREADS;

dbgprt("allowedOnThread: ENTER thread=%d\n",thread);

// get starting offset
do {
seg->off = threadMultiplier * thread;

// first thread always starts at offset 0
if (thread == 0)
break;

// skip past a word and stop on a delimiter
seg->off = skip_to_delim(seg->off,"off");
} while (0);

// get ending offset/length
do {
if (thread == (NUM_THREADS - 1)) {
seg->end = n1;
break;
}

// scan at least the amount we're allocated
seg->end = seg->off + threadMultiplier;

// skip past a word and stop on a delimiter
seg->end = skip_to_delim(seg->end,"end");
} while (0);

dbgprt("allowedOnThread: EXIT thread=%d off=%zu end=%zu\n",
thread,seg->off,seg->end);

return 0;
}

int
num_substring(int t)
{
//add your logic here
//1, how to distribute different parts of string s1 into different threads
//2, how to sum up the total number of substring from all threads

dbgprt("num_substring: ENTER\n");

struct range seg;
allowedOnThread(t,&seg);

char *str = &s1[seg.off];
char *end = &s1[seg.end];

char *token = str;
size_t count = 0;

// NOTE/FIXME -- this should be double checked to ensure that we're not
// double counting by going beyond our range
while (1) {
// look for a substring match of s2 in s1
token = strstr(token,s2);
if (token == NULL)
break;

// don't intrude on next thread's segment
if (token >= end)
break;

// advance the count
count += 1;

// point to start of next possible match point for s2
token += n2;

// stop when we go beyond the end of our thread's area
if (token >= end)
break;
}

// add to global count (under thread lock)
pthread_mutex_lock(&mutex);
total += count;
pthread_mutex_unlock(&mutex);

dbgprt("num_substring: EXIT count=%zu\n",count);

return count;
}

void *
calSubStringThread(void *threadid)
{
long tid = (long) threadid;

curtid = tid + 1;

dbgprt("calSubstringThread: ENTER\n");

int num = num_substring(tid);

dbgprt("calSubstringThread: EXIT num=%d\n",num);

pthread_exit(NULL);
}

// docheck -- check with non-threaded algorithm
void
docheck(void)
{
size_t count = 0;

char *token = s1;
while (1) {
token = strstr(token,s2);
if (token == NULL)
break;

count += 1;

token += n2;
}

printf("docheck: count=%zu\n",count);
}

int
main(int argc, char *argv[])
{
pthread_t threads[NUM_THREADS];
int t, rc;

pthread_mutex_init(&mutex,NULL);

readf(fp);

// get rid of newline
s2[n2] = 0;
dbgprt("main: s2='%s'\n",s2);

docheck();

for (t = 0; t < NUM_THREADS; t++) {
rc = pthread_create(&threads[t], NULL, calSubStringThread,
(void *) (size_t) t);
if (rc) {
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}

for (t = 0; t < NUM_THREADS; t++) {
pthread_join(threads[t], NULL);
}

printf("The number of substrings is: %d\n", total);

return 0;
}

Here is the debug log output. (Note that I manually indented the logs based on the ENTER/EXIT messages).

==> log00 <==
[0] main: s2='is'

==> log01 <==
[1] calSubstringThread: ENTER
[1] num_substring: ENTER
[1] allowedOnThread: ENTER thread=0
[1] skip_to_delim: ENTER off=479 tag=end
[1] skip_to_delim: EXIT off=479
[1] allowedOnThread: EXIT thread=0 off=0 end=479
[1] num_substring: EXIT count=1
[1] calSubstringThread: EXIT num=1

==> log02 <==
[2] calSubstringThread: ENTER
[2] num_substring: ENTER
[2] allowedOnThread: ENTER thread=1
[2] skip_to_delim: ENTER off=479 tag=off
[2] skip_to_delim: EXIT off=479
[2] skip_to_delim: ENTER off=958 tag=end
[2] skip_to_delim: EXIT off=960
[2] allowedOnThread: EXIT thread=1 off=479 end=960
[2] num_substring: EXIT count=2
[2] calSubstringThread: EXIT num=2

==> log03 <==
[3] calSubstringThread: ENTER
[3] num_substring: ENTER
[3] allowedOnThread: ENTER thread=2
[3] skip_to_delim: ENTER off=958 tag=off
[3] skip_to_delim: EXIT off=960
[3] skip_to_delim: ENTER off=1439 tag=end
[3] skip_to_delim: EXIT off=1440
[3] allowedOnThread: EXIT thread=2 off=960 end=1440
[3] num_substring: EXIT count=3
[3] calSubstringThread: EXIT num=3

==> log04 <==
[4] calSubstringThread: ENTER
[4] num_substring: ENTER
[4] allowedOnThread: ENTER thread=3
[4] skip_to_delim: ENTER off=1437 tag=off
[4] skip_to_delim: EXIT off=1440
[4] skip_to_delim: ENTER off=1919 tag=end
[4] skip_to_delim: EXIT off=1920
[4] allowedOnThread: EXIT thread=3 off=1440 end=1920
[4] num_substring: EXIT count=4
[4] calSubstringThread: EXIT num=4

==> log05 <==
[5] calSubstringThread: ENTER
[5] num_substring: ENTER
[5] allowedOnThread: ENTER thread=4
[5] skip_to_delim: ENTER off=1916 tag=off
[5] skip_to_delim: EXIT off=1920
[5] skip_to_delim: ENTER off=2399 tag=end
[5] skip_to_delim: EXIT off=2402
[5] allowedOnThread: EXIT thread=4 off=1920 end=2402
[5] num_substring: EXIT count=5
[5] calSubstringThread: EXIT num=5

==> log06 <==
[6] calSubstringThread: ENTER
[6] num_substring: ENTER
[6] allowedOnThread: ENTER thread=5
[6] skip_to_delim: ENTER off=2395 tag=off
[6] skip_to_delim: EXIT off=2396
[6] skip_to_delim: ENTER off=2875 tag=end
[6] skip_to_delim: EXIT off=2876
[6] allowedOnThread: EXIT thread=5 off=2396 end=2876
[6] num_substring: EXIT count=6
[6] calSubstringThread: EXIT num=6

==> log07 <==
[7] calSubstringThread: ENTER
[7] num_substring: ENTER
[7] allowedOnThread: ENTER thread=6
[7] skip_to_delim: ENTER off=2874 tag=off
[7] skip_to_delim: EXIT off=2876
[7] skip_to_delim: ENTER off=3355 tag=end
[7] skip_to_delim: EXIT off=3356
[7] allowedOnThread: EXIT thread=6 off=2876 end=3356
[7] num_substring: EXIT count=7
[7] calSubstringThread: EXIT num=7

==> log08 <==
[8] calSubstringThread: ENTER
[8] num_substring: ENTER
[8] allowedOnThread: ENTER thread=7
[8] skip_to_delim: ENTER off=3353 tag=off
[8] skip_to_delim: EXIT off=3356
[8] skip_to_delim: ENTER off=3835 tag=end
[8] skip_to_delim: EXIT off=3835
[8] allowedOnThread: EXIT thread=7 off=3356 end=3835
[8] num_substring: EXIT count=8
[8] calSubstringThread: EXIT num=8

==> log09 <==
[9] calSubstringThread: ENTER
[9] num_substring: ENTER
[9] allowedOnThread: ENTER thread=8
[9] skip_to_delim: ENTER off=3832 tag=off
[9] skip_to_delim: EXIT off=3832
[9] skip_to_delim: ENTER off=4311 tag=end
[9] skip_to_delim: EXIT off=4311
[9] allowedOnThread: EXIT thread=8 off=3832 end=4311
[9] num_substring: EXIT count=9
[9] calSubstringThread: EXIT num=9

==> log10 <==
[10] calSubstringThread: ENTER
[10] num_substring: ENTER
[10] allowedOnThread: ENTER thread=9
[10] skip_to_delim: ENTER off=4311 tag=off
[10] skip_to_delim: EXIT off=4311
[10] allowedOnThread: EXIT thread=9 off=4311 end=4799
[10] num_substring: EXIT count=10
[10] calSubstringThread: EXIT num=10


Thank you very much for explaining and helping me with this. I was wondering if you could explain to me how mutex works and why it's good practice to use it? – Bass Approved

This, from the C syntax, seems like an atomic operation:

total += count;

But, it's not. It's actually three operations:

temp = total;
temp += count;
total = temp;

Different threads will execute these in sequence. Normally (e.g. 99.44% of the time), these three operations will be executed by one thread without interference from another thread. If we have two threads (e.g. A and B), the "good" sequence is that the thread operations are "nicely" ordered:

thread A / cpu 0        thread B / cpu 1
-------------------- ------------------------
tempA = total;
tempA += countA;
total = tempA;
tempB = total;
tempB += countB;
total = tempB;

The final value for total would be: total + countA + countB, which is what we want.

But, if two threads are running simultaneously on different CPUs, they may intersperse these operations. We could have a sequence such as:

thread A / cpu 0        thread B / cpu 1
-------------------- ------------------------
tempA = total;
tempB = total;
tempA += countA;
tempB += countB;
total = tempA;
total = tempB;

In this case, at the end of the sequence, the final value of total would be: total + countB [which is not what we want]. (i.e.) The increment of total by countA [executed by thread A] would be lost/trashed!

In this case the threads are racing and thread B "won" the race.

Using a mutex [or other locking mechanism or using atomic operations] will prevent this.

A pthread_mutex_lock [loosely] is two operations: "request" and "grant". If the mutex is not held, these operations happen at the same time. If the mutex is held [by another thread], the "grant" is deferred in time. It is granted after the other thread has done a "release" (e.g. pthread_mutex_unlock). Here is the timeline:

thread A / cpu 0        thread B / cpu 1
-------------------- ------------------------
mutex requested
mutex granted
mutex requested
tempA = total;
tempA += countA;
total = tempA;
mutex released
mutex granted
tempB = total;
tempB += countB;
total = tempB;
mutex released

For a more detailed explanation, see my answer: Threading Differences in Linux Subsystem For Windows

Another way to guarantee atomic update is the use of stdatomic.h primitives. See my answer: multithreading with mutexes in c and running one thread at a time

Another solution is a "ticket lock". See my answer: C Pthreads - issues with thread-safe queue implementation



Related Topics



Leave a reply



Submit