How to do a proper upsert using sqlalchemy on postgresql?
this does the trick for me:
from sqlalchemy import create_engine
from sqlalchemy import MetaData, Table
from sqlalchemy.dialects import postgresql
from sqlalchemy.inspection import inspect
def upsert(engine, schema, table_name, records=[]):
metadata = MetaData(schema=schema)
metadata.bind = engine
table = Table(table_name, metadata, schema=schema, autoload=True)
# get list of fields making up primary key
primary_keys = [key.name for key in inspect(table).primary_key]
# assemble base statement
stmt = postgresql.insert(table).values(records)
# define dict of non-primary keys for updating
update_dict = {
c.name: c
for c in stmt.excluded
if not c.primary_key
}
# cover case when all columns in table comprise a primary key
# in which case, upsert is identical to 'on conflict do nothing.
if update_dict == {}:
warnings.warn('no updateable columns found for table')
# we still wanna insert without errors
insert_ignore(table_name, records)
return None
# assemble new statement with 'on conflict do update' clause
update_stmt = stmt.on_conflict_do_update(
index_elements=primary_keys,
set_=update_dict,
)
# execute
with engine.connect() as conn:
result = conn.execute(update_stmt)
return result
SQLAlchemy - performing a bulk upsert (if exists, update, else insert) in postgresql
There is an upsert-esque operation in SQLAlchemy:
db.session.merge()
After I found this command, I was able to perform upserts, but it is worth mentioning that this operation is slow for a bulk "upsert".
The alternative is to get a list of the primary keys you would like to upsert, and query the database for any matching ids:
# Imagine that post1, post5, and post1000 are posts objects with ids 1, 5 and 1000 respectively
# The goal is to "upsert" these posts.
# we initialize a dict which maps id to the post object
my_new_posts = {1: post1, 5: post5, 1000: post1000}
for each in posts.query.filter(posts.id.in_(my_new_posts.keys())).all():
# Only merge those posts which already exist in the database
db.session.merge(my_new_posts.pop(each.id))
# Only add those posts which did not exist in the database
db.session.add_all(my_new_posts.values())
# Now we commit our modifications (merges) and inserts (adds) to the database!
db.session.commit()
Bulk Upsert with SQLAlchemy Postgres
update_stmt = insert_stmt.on_conflict_do_update(
index_elements=[MyTable.id],
set_=dict(data=values)
)
index_elements should either be a list of strings or a list of column objects. So either [MyTable.id]
or ['id']
(This is correct)
set_ should be a dictionary with column names as keys and valid sql update objects as values. You can reference values from the insert block using the excluded
attribute. So to get the result you are hoping for here you would want set_={'test_value': insert_stmt.excluded.test_value}
(The error you made is that data=
in the example isn't a magic argument... it was the name of the column on their example table)
So, the whole thing would be
update_stmt = insert_stmt.on_conflict_do_update(
index_elements=[MyTable.id],
set_={'test_value': insert_stmt.excluded.test_value}
)
Of course, in a real world example I usually want to change more then one column. In that case I would do something like...
update_columns = {col.name: col for col in insert_stmt.excluded if col.name not in ('id', 'datetime_created')}
update_statement = insert_stmt.on_conflict_do_update(index_elements=['id'], set_=update_columns)
(This example would overwrite every column except for the id and datetime_created columns)
Bulk upsert with SQLAlchemy
from https://stackoverflow.com/a/26018934/465974
After I found this command, I was able to perform upserts, but it is
worth mentioning that this operation is slow for a bulk "upsert".The alternative is to get a list of the primary keys you would like to
upsert, and query the database for any matching ids:
Sqlalchemy, and Python - upsert statement that returns the IDs back
Eventually Ive used the .returning(<column>)
inside my upsert statement like the following:
def _generate_upsert_stmnt(self, items):
model_class = models.MyModel
table = model_class.__table__
insert_statement = sa.dialects.postgresql.insert(table, items)
upsert_statement = insert_statement.on_conflict_do_update(
index_elements=[table.c['id']],
set_={c.name: c for c in insert_statement.excluded if c.name !=
["id", "my_model", "my_model_id"]}).returning(table.c['id'])
return upsert_statement
and extraced the data like so:
def _extract_ids_from_resultproxy(self, result):
table = models.MyModel.__table__
return [row[table.c.id] for row in result]
Upsert statement with Flask-SQLAlchemy
What I think I'm finding is that none of this would work because I wasn't matching on a primary key, but on a unique key. What I've done is change the unique key area_id
to a primary key. Then, I can use the upsert statement from above.
@compiles(Insert)
def compile_upsert(insert_stmt, compiler, **kwargs):
"""
converts every SQL insert to an upsert i.e;
INSERT INTO test (foo, bar) VALUES (1, 'a')
becomes:
INSERT INTO test (foo, bar) VALUES (1, 'a') ON CONFLICT(foo) DO UPDATE SET (bar = EXCLUDED.bar)
(assuming foo is a primary key)
:param insert_stmt: Original insert statement
:param compiler: SQL Compiler
:param kwargs: optional arguments
:return: upsert statement
"""
pk = insert_stmt.table.primary_key
insert = compiler.visit_insert(insert_stmt, **kwargs)
ondup = f'ON CONFLICT ({",".join(c.name for c in pk)}) DO UPDATE SET'
updates = ', '.join(f"{c.name}=EXCLUDED.{c.name}" for c in insert_stmt.table.columns)
upsert = ' '.join((insert, ondup, updates))
return upsert
I had been trying to change the pk = insert_stmt.table.primary_key
line to check for the unique key with no success, but it works just like this if I change that field.
Changing the primary key also fixed the other solution I was trying:
group = []
for row in rows:
parsed = area.parser(row, i)
area = Area()
area.from_dict(parsed, new=True)
group.append(area)
insert(db.session, Area, group)
def insert(session, model, rows):
table = model.__table__
stmt = insert(table)
primary_keys = [key.name for key in inspect(table).primary_key]
update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
if not update_dict:
raise ValueError("insert_or_update resulted in an empty update_dict")
stmt = stmt.on_conflict_do_update(
index_elements=primary_keys,
set_=update_dict
)
So both solutions were (relatively) workable, but only with a primary key instead of a unique key and that just hadn't been clear to me.
SQLAlchemy Core - Efficient UPSERT of a python list of dictionaries with Mysql
I went with this:
insert_stmt = insert(table).values(data2)
primKeyColNames = [pk_column.name for pk_column in table.primary_key.columns.values()]
updatedColNames = [column.name for column in table.columns if column.name not in primKeyColNames]
onDuplicate = {colName:getattr(insert_stmt.inserted, colName) for colName in updatedColNames}
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(onDuplicate)
engine.execute(on_duplicate_key_stmt)
Get the primary key (can be mutiple columns), remove them from the list of columns, use that list to create the dict for on_duplicate_key_update and pass it to execute.
Related Topics
How to Tell If Tensorflow Is Using Gpu Acceleration from Inside Python Shell
Parsing a Pipe-Delimited File in Python
Windowserror: [Error 193] %1 Is Not a Valid Win32 Application in Python
Pythonic Way to Find Maximum Value and Its Index in a List
Using Tkinter in Python to Edit the Title Bar
Get Business Days Between Start and End Date Using Pandas
How to Find the Most Common Element in the List of List in Python
How to Extract Table Names and Column Names from SQL Query
How to Use a Pre-Trained Neural Network With Grayscale Images
How to Clean \Xc2\Xa0 \Xc2\Xa0..... in Text Data
How to Do an Upsert With Sqlalchemy
How to Print Specific Key Value from a Dictionary
Removing Non-Breaking Spaces from Strings Using Python
Shifting the Elements of an Array in Python
How to Extract Rar Files Inside Google Colab