44 lines
1.6 KiB
Python
44 lines
1.6 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
|
|
from quantum_stack_interview import settings
|
|
from redis import asyncio as aioredis
|
|
|
|
# 这里因为只有一个地方使用到了 redis, 所以直接在这里创建连接池
|
|
redis_pool = aioredis.ConnectionPool.from_url(
|
|
f"redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB}", decode_responses=True
|
|
)
|
|
|
|
|
|
class KeywordCache:
|
|
_write_lock = asyncio.Lock()
|
|
|
|
def __init__(self):
|
|
self.redis = aioredis.Redis(connection_pool=redis_pool)
|
|
self.key_prefix = "product_keyword"
|
|
|
|
async def get_product_id(self, key: str):
|
|
return await self.redis.smembers(f"{self.key_prefix}:{key}")
|
|
|
|
async def set_product_id(self, key: str, data: list[int]):
|
|
async with self._write_lock:
|
|
await self.redis.sadd(f"{self.key_prefix}:{key}", *data)
|
|
await self.redis.expire(f"{self.key_prefix}:{key}", 3600) # 设置过期时间为1小时
|
|
|
|
async def add_product_one(self, key: str, product_id: int):
|
|
async with self._write_lock:
|
|
if self.hash_product(key) is None:
|
|
await self.redis.sadd(f"{self.key_prefix}:{key}", product_id)
|
|
# await self.redis.expire(f"{self.key_prefix}:{key}", 3600)
|
|
|
|
async def invalidate_product(self, key):
|
|
async with self._write_lock:
|
|
await self.redis.delete(f"{self.key_prefix}:{key}")
|
|
|
|
async def hash_product(self, key: str):
|
|
return await self.redis.hgetall(f"{self.key_prefix}:{key}")
|
|
|
|
async def scan_product_keyword(self):
|
|
return await self.redis.scan_iter(f"{self.key_prefix}:*")
|