반응형
문제 상황
관계형 데이터베이스에 테이블에는 중복 방지와 무결성을 위해 primary key를 설정한다.
이 때 primary key를 설정하면 primary key 값이 중복된 데이터들은 업로드할 수 없다.
보통 SQL문에서는 중복된 데이터인 경우 INSERT INTO ... ON CONFLICT ... 구문을 통해 데이터를 덮어쓸 수 있다.
하지만 pd.to_sql 함수의 경우 데이터 업로드 시 해당 기능이 구현돼 있지 않아 다음 코드를 작성했다.
해결 방법
함수의 동작방식을 다음과 같이 하여 upsert를 간접 구현하였다. 간단히 말하면 pk 중복 데이터를 제거하고 다시 insert하는 방식으로 완전한 upsert는 아닐 수 있다.
1. 시스템 테이블에서 해당 테이블의 primary_key 이름 찾기
2. primary_key 이름을 이용해서 primary_key에 해당하는 칼럼명 찾기
3. WHERE ... IN ... 구문을 이용해서 업로드할 데이터와 중복된 db 내 데이터 제거
4. 데이터 모두 업로드
import pandas as pd
from datetime import date, datetime
from sqlalchemy import create_engine
engine = create_engine(conn_str, client_encoding='utf8')
def upsert_data(data, schema_name, table_name, engine):
# 해당 테이블의 pkey constraint 이름 찾기
pkey_constraint = pd.read_sql(sql = f"""SELECT constraint_name
FROM information_schema.table_constraints
WHERE table_schema = '{schema_name}' AND table_name = '{table_name}' AND constraint_type = 'PRIMARY KEY'""",
con = engine).iloc[0, 0]
# 해당 테이블의 primary key 리스트
pkey_cols = pd.read_sql(f"""SELECT column_name
FROM information_schema.key_column_usage
WHERE table_schema = '{schema_name}' AND table_name = '{table_name}' AND constraint_name = '{pkey_constraint}';""",
con = engine).to_numpy().ravel()
# db 내에 pkey 중복 데이터 제거를 위한 where문 생성
in_list = list()
for _, row in data[pkey_cols].iterrows():
row_processing = [f"'{val}'" if isinstance(val, (str, date, datetime)) else str(val) for val in row.values]
in_list.append(f"({', '.join(row_processing)})")
where_clause = f"WHERE ({', '.join(pkey_cols)}) IN ({', '.join(in_list)})" if len(in_list) > 0 else ""
# delete문을 통해 중복 데이터 제거
if len(in_list) > 0:
with engine.begin() as conn:
conn.exec_driver_sql(f"""DELETE FROM {schema_name}.{table_name}
{where_clause}""")
# 데이터 업로드
data.to_sql(table_name, schema = schema_name, con = engine, index = False, if_exists = "append")
- 다음 코드는 postgreSQL을 기반으로 작성돼 내부 SQL문은 변경이 필요할 수 있다.
- 데이터 upsert 양이 많아지면 쿼리 크기 때문에 에러가 날 수 있다(max_step_depth_error). 이 경우 delete할 chunk_size를 따로 설정해줘서 해결할 수 있다(코드의 복잡성으로 이 글에서는 생략).
- 시간이 없어 여러 케이스에 대해 테스트해보지 못했다. 에러가 발생하면 부디 알려주세요...
반응형
'Programming > Python' 카테고리의 다른 글
[sqlalchemy] immutabledict is not a sequence 에러 해결 (0) | 2024.03.09 |
---|---|
[Pandas] 정규식을 이용해서 특정 문자만 제거하기 (0) | 2024.02.27 |
[Python] json 출력 포맷 설정하기 (0) | 2023.12.19 |
[Python] pip freeze 시, 버전명이 '@ file:///' 로 뜨는 문제 (0) | 2023.06.16 |
[Python] 하버사인 (haversine)으로 위경도가 주어진 두 지점 거리 구하기 (0) | 2022.10.12 |