from __future__ import annotations import asyncio from quantum_stack_interview import settings from redis import asyncio as aioredis # 这里因为只有一个地方使用到了 redis, 所以直接在这里创建连接池 redis_pool = aioredis.ConnectionPool.from_url( f"redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB}", decode_responses=True ) class KeywordCache: _write_lock = asyncio.Lock() def __init__(self): self.redis = aioredis.Redis(connection_pool=redis_pool) self.key_prefix = "product_keyword" async def get_product_id(self, key: str): return await self.redis.smembers(f"{self.key_prefix}:{key}") async def set_product_id(self, key: str, data: list[int]): async with self._write_lock: await self.redis.sadd(f"{self.key_prefix}:{key}", *data) await self.redis.expire(f"{self.key_prefix}:{key}", 3600) # 设置过期时间为1小时 async def add_product_one(self, key: str, product_id: int): async with self._write_lock: if self.hash_product(key) is None: await self.redis.sadd(f"{self.key_prefix}:{key}", product_id) # await self.redis.expire(f"{self.key_prefix}:{key}", 3600) async def invalidate_product(self, key): async with self._write_lock: await self.redis.delete(f"{self.key_prefix}:{key}") async def hash_product(self, key: str): return await self.redis.hgetall(f"{self.key_prefix}:{key}") async def scan_product_keyword(self): return await self.redis.scan_iter(f"{self.key_prefix}:*")