[python] SQLAlchemy를 사용한 캐시 처리 방법

애플리케이션에서 캐시를 사용하면 데이터베이스의 부하를 줄이고 응답 시간을 개선할 수 있습니다. 이번 포스트에서는 Python의 SQLAlchemy를 사용하여 캐시를 처리하는 방법에 대해 알아보겠습니다.

SQLAlchemy 캐시 처리의 기본 원리

SQLAlchemy는 ORM(Object Relational Mapper) 라이브러리로써, 데이터베이스의 데이터를 객체로 다룰 수 있도록 해줍니다. 캐시 처리를 위해서는 SQLAlchemy의 Query 객체를 사용하여 데이터를 조회하는 과정에서 캐싱을 적용해야 합니다.

기본적으로 SQLAlchemy는 자체적으로 캐싱을 지원하지 않지만, 우리는 직접 캐싱을 구현하여 적용할 수 있습니다.

SQLAlchemy 캐시 구현 방법

캐시를 구현하기 위해 우리는 Redis라는 인메모리 데이터 저장소를 사용할 것입니다. Redis는 빠른 읽기와 쓰기 성능을 제공하며, 서버 장애 시에도 데이터를 안전하게 보존할 수 있습니다.

먼저 Redis를 설치하고 Python에서 Redis를 사용하기 위한 라이브러리를 설치합니다.

pip install redis

그리고 다음과 같이 캐시 클래스를 작성합니다.

import redis
from sqlalchemy.orm import sessionmaker

cache = redis.Redis(host='localhost', port=6379, db=0)
Session = sessionmaker(bind=engine)

class CachedQuery:
    def __init__(self, query):
        self.query = query

    def fetch(self, limit=None):
        cache_key = str(self.query.statement.compile(compile_kwargs={"literal_binds": True}))
        result = cache.get(cache_key)

        if result:
            return result

        with Session() as session:
            result = [row for row in session.execute(self.query)]
        session.close()

        cache.setex(cache_key, result, 60)  # 캐시 데이터를 60초 동안 유지

        return result

위 코드에서는 Redis 연결을 설정하고, SQLAlchemy의 세션을 생성합니다. CachedQuery 클래스는 SQLAlchemy Query 객체를 포장하여 캐시를 적용합니다. fetch 메서드를 통해 데이터를 조회하며, 캐시에 데이터가 존재하는 경우 바로 결과를 반환합니다. 그렇지 않은 경우 쿼리를 수행한 후에 결과를 캐시에 저장하고 반환합니다.

사용 예제

캐시를 사용하려는 쿼리를 다음과 같이 작성합니다.

from sqlalchemy import create_engine

engine = create_engine('mysql+pymysql://username:password@localhost/db_name')

query = CachedQuery(
    select([User]).where(User.id == 1)
)

result = query.fetch()
print(result)

위 코드에서는 User 테이블에서 id가 1인 데이터를 조회하는 쿼리를 CachedQuery 객체로 감싸서 실행합니다. 이렇게 하면 첫 번째 실행 시에는 데이터베이스에 직접 조회를 하지만, 두 번째 실행부터는 캐시된 데이터를 바로 반환합니다.

마무리

이번에는 SQLAlchemy를 이용한 캐시 처리 방법에 대해서 알아보았습니다. SQLAlchemy를 사용하면 복잡한 캐시 처리를 간편하게 구현할 수 있습니다. 캐시를 이용하여 데이터베이스의 부하를 줄이고 응답 시간을 개선하는 것은 매우 중요한 과제이므로, 적절히 활용해야 합니다.

참고 자료: