How to Do an Upsert With Sqlalchemy

How to do a proper upsert using sqlalchemy on postgresql?

this does the trick for me:

from sqlalchemy import create_engine
from sqlalchemy import MetaData, Table
from sqlalchemy.dialects import postgresql
from sqlalchemy.inspection import inspect

def upsert(engine, schema, table_name, records=[]):

metadata = MetaData(schema=schema)
metadata.bind = engine

table = Table(table_name, metadata, schema=schema, autoload=True)

# get list of fields making up primary key
primary_keys = [key.name for key in inspect(table).primary_key]

# assemble base statement
stmt = postgresql.insert(table).values(records)

# define dict of non-primary keys for updating
update_dict = {
c.name: c
for c in stmt.excluded
if not c.primary_key
}

# cover case when all columns in table comprise a primary key
# in which case, upsert is identical to 'on conflict do nothing.
if update_dict == {}:
warnings.warn('no updateable columns found for table')
# we still wanna insert without errors
insert_ignore(table_name, records)
return None


# assemble new statement with 'on conflict do update' clause
update_stmt = stmt.on_conflict_do_update(
index_elements=primary_keys,
set_=update_dict,
)

# execute
with engine.connect() as conn:
result = conn.execute(update_stmt)
return result

SQLAlchemy - performing a bulk upsert (if exists, update, else insert) in postgresql

There is an upsert-esque operation in SQLAlchemy:

db.session.merge()

After I found this command, I was able to perform upserts, but it is worth mentioning that this operation is slow for a bulk "upsert".

The alternative is to get a list of the primary keys you would like to upsert, and query the database for any matching ids:

# Imagine that post1, post5, and post1000 are posts objects with ids 1, 5 and 1000 respectively
# The goal is to "upsert" these posts.
# we initialize a dict which maps id to the post object

my_new_posts = {1: post1, 5: post5, 1000: post1000}

for each in posts.query.filter(posts.id.in_(my_new_posts.keys())).all():
# Only merge those posts which already exist in the database
db.session.merge(my_new_posts.pop(each.id))

# Only add those posts which did not exist in the database
db.session.add_all(my_new_posts.values())

# Now we commit our modifications (merges) and inserts (adds) to the database!
db.session.commit()

Bulk Upsert with SQLAlchemy Postgres

update_stmt = insert_stmt.on_conflict_do_update(
index_elements=[MyTable.id],
set_=dict(data=values)
)

index_elements should either be a list of strings or a list of column objects. So either [MyTable.id] or ['id'] (This is correct)

set_ should be a dictionary with column names as keys and valid sql update objects as values. You can reference values from the insert block using the excluded attribute. So to get the result you are hoping for here you would want set_={'test_value': insert_stmt.excluded.test_value} (The error you made is that data= in the example isn't a magic argument... it was the name of the column on their example table)

So, the whole thing would be

update_stmt = insert_stmt.on_conflict_do_update(
index_elements=[MyTable.id],
set_={'test_value': insert_stmt.excluded.test_value}
)

Of course, in a real world example I usually want to change more then one column. In that case I would do something like...

update_columns = {col.name: col for col in insert_stmt.excluded if col.name not in ('id', 'datetime_created')}
update_statement = insert_stmt.on_conflict_do_update(index_elements=['id'], set_=update_columns)

(This example would overwrite every column except for the id and datetime_created columns)

Bulk upsert with SQLAlchemy

from https://stackoverflow.com/a/26018934/465974

After I found this command, I was able to perform upserts, but it is
worth mentioning that this operation is slow for a bulk "upsert".

The alternative is to get a list of the primary keys you would like to
upsert, and query the database for any matching ids:

Sqlalchemy, and Python - upsert statement that returns the IDs back

Eventually Ive used the .returning(<column>) inside my upsert statement like the following:

def _generate_upsert_stmnt(self, items):
model_class = models.MyModel
table = model_class.__table__
insert_statement = sa.dialects.postgresql.insert(table, items)
upsert_statement = insert_statement.on_conflict_do_update(
index_elements=[table.c['id']],
set_={c.name: c for c in insert_statement.excluded if c.name !=
["id", "my_model", "my_model_id"]}).returning(table.c['id'])
return upsert_statement

and extraced the data like so:

    def _extract_ids_from_resultproxy(self, result):
table = models.MyModel.__table__
return [row[table.c.id] for row in result]

Upsert statement with Flask-SQLAlchemy

What I think I'm finding is that none of this would work because I wasn't matching on a primary key, but on a unique key. What I've done is change the unique key area_id to a primary key. Then, I can use the upsert statement from above.

@compiles(Insert)
def compile_upsert(insert_stmt, compiler, **kwargs):
"""
converts every SQL insert to an upsert i.e;
INSERT INTO test (foo, bar) VALUES (1, 'a')
becomes:
INSERT INTO test (foo, bar) VALUES (1, 'a') ON CONFLICT(foo) DO UPDATE SET (bar = EXCLUDED.bar)
(assuming foo is a primary key)
:param insert_stmt: Original insert statement
:param compiler: SQL Compiler
:param kwargs: optional arguments
:return: upsert statement
"""
pk = insert_stmt.table.primary_key
insert = compiler.visit_insert(insert_stmt, **kwargs)
ondup = f'ON CONFLICT ({",".join(c.name for c in pk)}) DO UPDATE SET'
updates = ', '.join(f"{c.name}=EXCLUDED.{c.name}" for c in insert_stmt.table.columns)
upsert = ' '.join((insert, ondup, updates))
return upsert

I had been trying to change the pk = insert_stmt.table.primary_key line to check for the unique key with no success, but it works just like this if I change that field.

Changing the primary key also fixed the other solution I was trying:

group = []
for row in rows:
parsed = area.parser(row, i)

area = Area()
area.from_dict(parsed, new=True)

group.append(area)

insert(db.session, Area, group)


def insert(session, model, rows):
table = model.__table__
stmt = insert(table)
primary_keys = [key.name for key in inspect(table).primary_key]
update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}

if not update_dict:
raise ValueError("insert_or_update resulted in an empty update_dict")

stmt = stmt.on_conflict_do_update(
index_elements=primary_keys,
set_=update_dict
)

So both solutions were (relatively) workable, but only with a primary key instead of a unique key and that just hadn't been clear to me.

SQLAlchemy Core - Efficient UPSERT of a python list of dictionaries with Mysql

I went with this:

        insert_stmt = insert(table).values(data2)
primKeyColNames = [pk_column.name for pk_column in table.primary_key.columns.values()]
updatedColNames = [column.name for column in table.columns if column.name not in primKeyColNames]
onDuplicate = {colName:getattr(insert_stmt.inserted, colName) for colName in updatedColNames}
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(onDuplicate)
engine.execute(on_duplicate_key_stmt)

Get the primary key (can be mutiple columns), remove them from the list of columns, use that list to create the dict for on_duplicate_key_update and pass it to execute.



Related Topics



Leave a reply



Submit