Jost Do It.

그냥 IT해.

Programming/Python

[Python] upsert문 간접 구현하기

그냥하Jo. 2024. 2. 5. 20:51
반응형

문제 상황

관계형 데이터베이스에 테이블에는 중복 방지와 무결성을 위해 primary key를 설정한다.

이 때 primary key를 설정하면 primary key 값이 중복된 데이터들은 업로드할 수 없다.

보통 SQL문에서는 중복된 데이터인 경우 INSERT INTO ... ON CONFLICT ...  구문을 통해 데이터를 덮어쓸 수 있다.

하지만 pd.to_sql 함수의 경우 데이터 업로드 시 해당 기능이 구현돼 있지 않아 다음 코드를 작성했다.

 

해결 방법

함수의 동작방식을 다음과 같이 하여   upsert를 간접 구현하였다. 간단히 말하면 pk 중복 데이터를 제거하고 다시 insert하는 방식으로 완전한 upsert는 아닐 수 있다.

 

1. 시스템 테이블에서 해당 테이블의 primary_key 이름 찾기

2. primary_key 이름을 이용해서 primary_key에 해당하는 칼럼명 찾기

3. WHERE ... IN ... 구문을 이용해서 업로드할 데이터와 중복된 db 내 데이터 제거

4. 데이터 모두 업로드

import pandas as pd
from datetime import date, datetime
from sqlalchemy import create_engine

engine = create_engine(conn_str, client_encoding='utf8')

def upsert_data(data, schema_name, table_name, engine):

    # 해당 테이블의 pkey constraint 이름 찾기
    pkey_constraint = pd.read_sql(sql = f"""SELECT constraint_name
    FROM information_schema.table_constraints
    WHERE table_schema = '{schema_name}' AND table_name = '{table_name}' AND constraint_type = 'PRIMARY KEY'""",
        con = engine).iloc[0, 0]

    # 해당 테이블의 primary key 리스트
    pkey_cols = pd.read_sql(f"""SELECT column_name
    FROM information_schema.key_column_usage
    WHERE table_schema = '{schema_name}' AND table_name = '{table_name}' AND constraint_name = '{pkey_constraint}';""",
               con = engine).to_numpy().ravel()

    # db 내에 pkey 중복 데이터 제거를 위한 where문 생성
    in_list = list()
    for _, row in data[pkey_cols].iterrows():
        row_processing = [f"'{val}'" if isinstance(val, (str, date, datetime)) else str(val) for val in row.values]
        in_list.append(f"({', '.join(row_processing)})")
    where_clause = f"WHERE ({', '.join(pkey_cols)}) IN ({', '.join(in_list)})" if len(in_list) > 0 else ""

    # delete문을 통해 중복 데이터 제거
    if len(in_list) > 0:
	    with engine.begin() as conn:
    	    conn.exec_driver_sql(f"""DELETE FROM {schema_name}.{table_name}
        	                            {where_clause}""")

    # 데이터 업로드
    data.to_sql(table_name, schema = schema_name, con = engine, index = False, if_exists = "append")
  • 다음 코드는 postgreSQL을 기반으로 작성돼 내부 SQL문은 변경이 필요할 수 있다.
  • 데이터 upsert 양이 많아지면 쿼리 크기 때문에 에러가 날 수 있다(max_step_depth_error). 이 경우 delete할 chunk_size를 따로 설정해줘서 해결할 수 있다(코드의 복잡성으로 이 글에서는 생략).
  • 시간이 없어 여러 케이스에 대해 테스트해보지 못했다. 에러가 발생하면 부디 알려주세요...
반응형