Sharing Psycopg2/Libpq Connections Across Processes

Sharing psycopg2 / libpq connections across processes

Your surmise is basically correct: there is no issue with a connection being opened before a fork as long as you don't attempt to use it in more than one process.

That being said, I think you misunderstood the "multiprocessing approach" link you provided. It actually demonstrates a separate connection being opened in each child. (There is a connection opened by the parent before forking, but it's not being used in any child.)

The improvement given by the answer there (versus the code in the question) was to refactor so that -- rather than opening a new connection for each task in the queue -- each child process opened a single connection and then shared it across multiple tasks executed within the same child (i.e. the connection is passed as an argument to the Task processor).


As a general practice, one should prefer creating a connection within the process that is using it. In the answer cited, a connection was being created in the parent before forking, then used in the child. This does work fine, but leaves each "child connection" open in the parent as well, which is at best a waste of resources and also a potential cause of bugs.

Create DB connection and maintain on multiple processes (multiprocessing)

Try to isolate the creation of your connection in the Consumer constructor, then give it to the executed Task :

import multiprocessing, time, psycopg2

class Consumer(multiprocessing.Process):

def __init__(self, task_queue, result_queue):
self.task_queue = task_queue
self.result_queue = result_queue
self.pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")

def run(self):
proc_name =
while True:
next_task = self.task_queue.get()
if next_task is None:
print 'Tasks Complete'
answer = next_task(connection=self.pyConn)

class Task(object):
def __init__(self, a):
self.a = a

def __call__(self, connection=None):
pyConn = connection
pyCursor1 = pyConn.cursor()

procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)

print 'What is self?'
print self.a

return self.a

def __str__(self):
return 'ARC'
def run(self):
print 'IN'

Create DB connection and maintain on multiple processes (multiprocessing)

Try to isolate the creation of your connection in the Consumer constructor, then give it to the executed Task :

import multiprocessing, time, psycopg2

class Consumer(multiprocessing.Process):

def __init__(self, task_queue, result_queue):
self.task_queue = task_queue
self.result_queue = result_queue
self.pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")

def run(self):
proc_name =
while True:
next_task = self.task_queue.get()
if next_task is None:
print 'Tasks Complete'
answer = next_task(connection=self.pyConn)

class Task(object):
def __init__(self, a):
self.a = a

def __call__(self, connection=None):
pyConn = connection
pyCursor1 = pyConn.cursor()

procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)

print 'What is self?'
print self.a

return self.a

def __str__(self):
return 'ARC'
def run(self):
print 'IN'

Psycopg2 - Connect to postgreSQL database using a connection string

You could make use of urlparse, creating a dictionary that matches psycopg's connection arguments:

import psycopg2
from urllib.parse import urlparse

conStr = "localhost://username:password@data_quality:5432"
p = urlparse(conStr)

pg_connection_dict = {
'dbname': p.hostname,
'user': p.username,
'password': p.password,
'port': p.port,
'host': p.scheme

con = psycopg2.connect(**pg_connection_dict)


{'dbname': 'data_quality', 'user': 'username', 'password': 'password', 'port': 5432, 'host': 'localhost'}
<connection object at 0x105f58190; dsn: 'user=xxx password=xxx dbname=xxx host=xxx port=xxx', closed: 0>

multiprocessing / psycopg2 TypeError: can't pickle _thread.RLock objects

To put it simply, postgres connection and sqlalchemy connection pool is thread safe, however they are not fork-safe.

If you want to use multiprocessing, you should initialize the engine in each child processes after the fork.

You should use multithreading instead if you want to share engines.

Refer to Thread and process safety in psycopg2 documentation:

libpq connections
shouldn’t be used by a forked processes, so when using a module such
as multiprocessing or a forking web deploy method such as FastCGI make
sure to create the connections after the fork.

If you are using multiprocessing.Pool, there is a keyword argument initializer which can be used to run code once on each child process. Try this:

class PostgresConnector(object):
def __init__(self, db_url):
self.db_url = db_url
self.pool = self.init_pool()

def init_pool(self):
CPUS = multiprocessing.cpu_count()
return multiprocessing.Pool(CPUS, initializer=self.init_connection(self.db_url))

def init_connection(cls, db_url):
def _init_connection():'Creating Postgres engine')
cls.engine = create_engine(db_url)
return _init_connection

def run_parallel_queries(self, queries):
results = []
for i in self.pool.imap_unordered(self.execute_parallel_query, queries):
except Exception as exception:
LOGGER.error('Error whilst executing %s queries in parallel: %s', len(queries), exception)
#self.pool.join()'Parallel query ran producing %s sets of results of type: %s', len(results), type(results))

return list(chain.from_iterable(results))

def execute_parallel_query(self, query):
with self.engine.connect() as conn:
with conn.begin():
result = conn.execute(query)
return result.fetchall()

def __getstate__(self):
# this is a hack, if you want to remove this method, you should
# remove self.pool and just pass pool explicitly
self_dict = self.__dict__.copy()
del self_dict['pool']
return self_dict

Now, to address the XY problem.

Initially it was a single query with the where id in (...) contained
all 6k predicate IDs but I ran into issues with the query using up >
4GB of RAM on the machine it ran on, so I decided to split it out into
6k individual queries which when synchronously keeps a steady memory

What you may want to do instead is one of these options:

  1. write a subquery that generates all 6000 IDs and use the subquery in your original bulk query.
  2. as above, but write the subquery as a CTE
  3. if your ID list comes from an external source (i.e. not from the database), then you can create a temporary table containing the 6000 IDs and then run your original bulk query against the temporary table

However, if you insist on running 6000 IDs through python, then the fastest query is likely neither to do all 6000 IDs in one go (which will run out of memory) nor to run 6000 individual queries. Instead, you may want to try to chunk the queries. Send 500 IDs at once for example. You will have to experiment with the chunk size to determine the largest number of IDs you can send at one time while still comfortably within your memory budget.

Using TCP for memory sharing across processes

1) You should not have any problems with TCP packet size. Node will buffer/queue your data if it's too big and send them when the OS gives it a writable socket's file descriptor. You may hit performance issues only if you are writing more then your network bandwidth per second. At this point Node will also use more RAM to queue all this messages.

2) Most games use TCP or UDP for real time communication. It can be a bottleneck, as anything else (RAM, CPU, bandwidth, storage) can. At some point of stress, one or more resources will end/fail/perform badly. It's generally a good practice to use an architecture that can grow horizontally (adding more machines) when all optimizations are done for your bottleneck and you still need to add more simultaneous users to your game server.

You'll probably use TCP to connect to a Redis server (but you can also use a unix socket).

If you only need inter-process communication (and not inter-machine), you should take a look at the "cluster" Node.js core module. It has built-in IPC.

Related Topics

Leave a reply
