quantum_stack_interview/shop/cache.py

44 lines
1.6 KiB
Python

from __future__ import annotations
import asyncio
from quantum_stack_interview import settings
from redis import asyncio as aioredis
# 这里因为只有一个地方使用到了 redis, 所以直接在这里创建连接池
redis_pool = aioredis.ConnectionPool.from_url(
f"redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB}", decode_responses=True
)
class KeywordCache:
_write_lock = asyncio.Lock()
def __init__(self):
self.redis = aioredis.Redis(connection_pool=redis_pool)
self.key_prefix = "product_keyword"
async def get_product_id(self, key: str):
return await self.redis.smembers(f"{self.key_prefix}:{key}")
async def set_product_id(self, key: str, data: list[int]):
async with self._write_lock:
await self.redis.sadd(f"{self.key_prefix}:{key}", *data)
await self.redis.expire(f"{self.key_prefix}:{key}", 3600) # 设置过期时间为1小时
async def add_product_one(self, key: str, product_id: int):
async with self._write_lock:
if self.hash_product(key) is None:
await self.redis.sadd(f"{self.key_prefix}:{key}", product_id)
# await self.redis.expire(f"{self.key_prefix}:{key}", 3600)
async def invalidate_product(self, key):
async with self._write_lock:
await self.redis.delete(f"{self.key_prefix}:{key}")
async def hash_product(self, key: str):
return await self.redis.hgetall(f"{self.key_prefix}:{key}")
async def scan_product_keyword(self):
return await self.redis.scan_iter(f"{self.key_prefix}:*")