Python Postgres Psycopg2 Threadedconnectionpool Exhausted

Python Postgres psycopg2 ThreadedConnectionPool exhausted

You need to use a queue on top of your pool.

Something like the following should work:

import gevent, sys, random, psycopg2, logging
from contextlib import contextmanager
from gevent.queue import Queue
from gevent.socket import wait_read, wait_write
from psycopg2.pool import ThreadedConnectionPool
from psycopg2 import extensions, OperationalError
import sys
logger = logging.getLogger(__name__)

poolsize = 100 #number of max connections
pdsn = '' # put your dsn here

if sys.version_info[0] >= 3:
integer_types = (int,)
else:
import __builtin__
integer_types = (int, __builtin__.long)


class ConnectorError(Exception):
""" This is a base class for all CONNECTOR related exceptions """
pass

#simplified calls etc. db.fetchall(SQL, arg1, arg2...)
def cursor(): return Pcursor()
def fetchone(PSQL, *args): return Pcursor().fetchone(PSQL, *args)
def fetchall(PSQL, *args): return Pcursor().fetchall(PSQL, *args)
def execute(PSQL, *args): return Pcursor().execute(PSQL, *args)

#singleton connection pool, gets reset if a connection is bad or drops
_pgpool = None
def pgpool():
global _pgpool
if not _pgpool:
try:
_pgpool = PostgresConnectionPool(maxsize=poolsize)
except psycopg2.OperationalError as exc:
_pgpool = None
return _pgpool

class Pcursor(object):

def __init__(self, **kwargs):
#in case of a lost connection lets sit and wait till it's online
global _pgpool
if not _pgpool:
while not _pgpool:
try:
pgpool()
except:
logger.debug('Attempting Connection To Postgres...')
gevent.sleep(1)

def fetchone(self, PSQL, *args):
with _pgpool.cursor() as cursor:
try:
cursor.execute(PSQL, args)
except TypeError:
cursor.execute(PSQL, args[0])
except Exception as exc:
print(sys._getframe().f_back.f_code)
print(sys._getframe().f_back.f_code.co_name)
logger.warning(str(exc))
logger.debug(cursor.query)
return cursor.fetchone()

def fetchall(self, PSQL, *args):
with _pgpool.cursor() as cursor:
try:
cursor.execute(PSQL, args)
except TypeError:
cursor.execute(PSQL, args[0])
except Exception as exc:
print(sys._getframe().f_back.f_code)
print(sys._getframe().f_back.f_code.co_name)
logger.warning(str(exc))
logger.debug(cursor.query)
return cursor.fetchall()

def execute(self, PSQL, *args):
with _pgpool.cursor() as cursor:
try:
cursor.execute(PSQL, args)
except TypeError:
cursor.execute(PSQL, args[0])
except Exception as exc:
print(sys._getframe().f_back.f_code)
print(sys._getframe().f_back.f_code.co_name)
logger.warning(str(exc))
finally:
logger.debug(cursor.query)
return cursor.query

def fetchmany(self, PSQL, *args):
with _pgpool.cursor() as cursor:
try:
cursor.execute(PSQL, args)
except TypeError:
cursor.execute(PSQL, args[0])
while 1:
items = cursor.fetchmany()
if not items:
break
for item in items:
yield item

class AbstractDatabaseConnectionPool(object):

def __init__(self, maxsize=poolsize):
if not isinstance(maxsize, integer_types):
raise TypeError('Expected integer, got %r' % (maxsize, ))
self.maxsize = maxsize
self.pool = Queue()
self.size = 0

def create_connection(self):
#overridden by PostgresConnectionPool
raise NotImplementedError()

def get(self):
pool = self.pool
if self.size >= self.maxsize or pool.qsize():
return pool.get()

self.size += 1
try:
new_item = self.create_connection()
except:
self.size -= 1
raise
return new_item

def put(self, item):
self.pool.put(item)

def closeall(self):
while not self.pool.empty():
conn = self.pool.get_nowait()
try:
conn.close()
except Exception:
pass

@contextmanager
def connection(self, isolation_level=None):
conn = self.get()
try:
if isolation_level is not None:
if conn.isolation_level == isolation_level:
isolation_level = None
else:
conn.set_isolation_level(isolation_level)
yield conn
except:
if conn.closed:
conn = None
self.closeall()
raise
else:
if conn.closed:
raise OperationalError("Cannot commit because connection was closed: %r" % (conn, ))
finally:
if conn is not None and not conn.closed:
if isolation_level is not None:
conn.set_isolation_level(isolation_level)
self.put(conn)

@contextmanager
def cursor(self, *args, **kwargs):
isolation_level = kwargs.pop('isolation_level', None)
with self.connection(isolation_level) as conn:
try:
yield conn.cursor(*args, **kwargs)
except:
global _pgpool
_pgpool = None
del(self)

class PostgresConnectionPool(AbstractDatabaseConnectionPool):
def __init__(self,**kwargs):
try:
self.pconnect = ThreadedConnectionPool(1, poolsize, dsn=pdsn)
except:
global _pgpool
_pgpool = None
raise ConnectorError('Database Connection Failed')
maxsize = kwargs.pop('maxsize', None)
self.kwargs = kwargs
AbstractDatabaseConnectionPool.__init__(self, maxsize)

def create_connection(self):
self.conn = self.pconnect.getconn()
self.conn.autocommit = True
return self.conn

def gevent_wait_callback(conn, timeout=None):
"""A wait callback useful to allow gevent to work with Psycopg."""
while 1:
state = conn.poll()
if state == extensions.POLL_OK:
break
elif state == extensions.POLL_READ:
wait_read(conn.fileno(), timeout=timeout)
elif state == extensions.POLL_WRITE:
wait_write(conn.fileno(), timeout=timeout)
else:
raise ConnectorError("Bad result from poll: %r" % state)

extensions.set_wait_callback(gevent_wait_callback)

Then you can call your connection via this:

import db
db.Pcursor().execute(PSQL, arg1, arg2, arg3)

Basically I borrowed the gevent example of async postgres and modified it to support threadpooling via pyscopg2.

https://github.com/gevent/gevent/blob/master/examples/psycopg2_pool.py

I added what psycogreen does inside the module, so all you need to do is import and call the class. Each call to the class stacks a new query on the queue, but only uses the pool at a certain size. This way you don't run out of connections. This is essentially similar to what PGBouncer does, which I think would also eliminate your problem.

https://pgbouncer.github.io/

connection pool exhausted psycopg2

it would appear you need to return the connection back to the pool at some point, see:

http://initd.org/psycopg/docs/pool.html#psycopg2.pool.AbstractConnectionPool.putconn

i.e. something like:

sql = "select * from role"  
try:
conn = db.getconn()
try:
data = sqlio.read_sql_query(sql, conn)
finally:
pool.putconn(conn)
return data.to_json(orient='records')
except Exception as e:
print "error in executing with exception: ", e
return pd.DataFrame({'empty' : []})

multiprocessing / psycopg2 TypeError: can't pickle _thread.RLock objects

To put it simply, postgres connection and sqlalchemy connection pool is thread safe, however they are not fork-safe.

If you want to use multiprocessing, you should initialize the engine in each child processes after the fork.

You should use multithreading instead if you want to share engines.

Refer to Thread and process safety in psycopg2 documentation:

libpq connections
shouldn’t be used by a forked processes, so when using a module such
as multiprocessing or a forking web deploy method such as FastCGI make
sure to create the connections after the fork.

If you are using multiprocessing.Pool, there is a keyword argument initializer which can be used to run code once on each child process. Try this:

class PostgresConnector(object):
def __init__(self, db_url):
self.db_url = db_url
self.pool = self.init_pool()

def init_pool(self):
CPUS = multiprocessing.cpu_count()
return multiprocessing.Pool(CPUS, initializer=self.init_connection(self.db_url))

@classmethod
def init_connection(cls, db_url):
def _init_connection():
LOGGER.info('Creating Postgres engine')
cls.engine = create_engine(db_url)
return _init_connection

def run_parallel_queries(self, queries):
results = []
try:
for i in self.pool.imap_unordered(self.execute_parallel_query, queries):
results.append(i)
except Exception as exception:
LOGGER.error('Error whilst executing %s queries in parallel: %s', len(queries), exception)
raise
finally:
pass
#self.pool.close()
#self.pool.join()

LOGGER.info('Parallel query ran producing %s sets of results of type: %s', len(results), type(results))

return list(chain.from_iterable(results))

def execute_parallel_query(self, query):
with self.engine.connect() as conn:
with conn.begin():
result = conn.execute(query)
return result.fetchall()

def __getstate__(self):
# this is a hack, if you want to remove this method, you should
# remove self.pool and just pass pool explicitly
self_dict = self.__dict__.copy()
del self_dict['pool']
return self_dict

Now, to address the XY problem.

Initially it was a single query with the where id in (...) contained
all 6k predicate IDs but I ran into issues with the query using up >
4GB of RAM on the machine it ran on, so I decided to split it out into
6k individual queries which when synchronously keeps a steady memory
usage.

What you may want to do instead is one of these options:

  1. write a subquery that generates all 6000 IDs and use the subquery in your original bulk query.
  2. as above, but write the subquery as a CTE
  3. if your ID list comes from an external source (i.e. not from the database), then you can create a temporary table containing the 6000 IDs and then run your original bulk query against the temporary table

However, if you insist on running 6000 IDs through python, then the fastest query is likely neither to do all 6000 IDs in one go (which will run out of memory) nor to run 6000 individual queries. Instead, you may want to try to chunk the queries. Send 500 IDs at once for example. You will have to experiment with the chunk size to determine the largest number of IDs you can send at one time while still comfortably within your memory budget.

How can I pool connections using psycopg and gevent?

I assume you know gevent-psycopg2 module, which makes psycopg greenlet-friendly.

Looking for connection pooling solution I've tried 2 solutions:

  • SQLALchemy - it seems to work properly with monkey-patched threads and gevent-psycopg2. The QueuePool class uses threading module internally for locking, monkey patching is thus necessary, even though gevent-psycopg2 makes psycopg2 green.

  • there's a psycopg2 connection pooling example in gevent examples

I've tried both solutions, but not at production load - so I can't say about their robustness yet.



Related Topics



Leave a reply



Submit