from __future__ import annotations import asyncio from asgiref.sync import sync_to_async from django.db import transaction from django.db.models import Q from shop.cache import KeywordCache from shop.models import Order, OrderItem, OrderStatus, Product # 定义最大页面大小, 这个应该放在配置文件中 MAX_PAGE_SIZE = 100 class ProductService: @staticmethod async def fuzzy_query(keyword: str, start: int, end: int): qs = Product.objects.filter( Q(name__icontains=keyword) | Q(keywords__icontains=keyword) | Q(description__icontains=keyword) | Q(id__icontains=keyword) )[start:end] return await sync_to_async(list)(qs) @staticmethod async def update_keyword_cache(keyword: str, products: list[Product]): cache = KeywordCache() await cache.set_product_id(keyword, [product.id for product in products]) @staticmethod async def search_products(keyword: str, page: int = 1, page_size: int = 50) -> list[Product]: page_size = min(page_size, MAX_PAGE_SIZE) # 只做首页缓存 if page == 1: cache = KeywordCache() try: cached: list[int] = await cache.get_product_id(keyword) if cached: if len(cached) < page_size: products = await ProductService.fuzzy_query(keyword, 0, page_size) # 可能会丢失停机时的临界缓存, 不过在这是可接受的 asyncio.create_task(ProductService.update_keyword_cache(keyword, products)) # noqa: RUF006 return products # 只取前 page_size 个 id, 且保持顺序 ids = cached[:page_size] qs = Product.objects.filter(id__in=ids) products = await sync_to_async(list)(qs) # 保证顺序 id2product = {p.id: p for p in products} return [id2product[i] for i in ids if i in id2product] except (ConnectionError, TimeoutError): # 如果缓存查询失败, 继续执行数据库查询 pass assert page > 0 and page_size > 0 start = (page - 1) * page_size end = start + page_size products = await ProductService.fuzzy_query(keyword, start, end) if page == 1 and len(products) > 0: asyncio.create_task(ProductService.update_keyword_cache(keyword, products)) # noqa: RUF006 return products @staticmethod async def create_product(**kwargs): product = await Product.objects.acreate(**kwargs) # TODO:这里的关键词更新是不准确的 keywords = kwargs.get("keywords") if keywords: cache = KeywordCache() asyncio.create_task(cache.invalidate_product(keywords)) # noqa: RUF006 # 返回新商品主要信息 return { "id": product.id, "name": product.name, "description": product.description, "price": str(product.price), "stock": product.stock, "keywords": product.keywords, } class OrderService: @staticmethod async def batch_create_order(order_items): def _batch_create_order_sync(order_items): results = [] with transaction.atomic(): order = Order.objects.create() for item in order_items: try: product = Product.objects.select_for_update().get(id=item["product_id"]) if product.stock >= item["quantity"]: product.stock -= item["quantity"] product.save() OrderItem.objects.create( order=order, product=product, quantity=item["quantity"], price=product.price, status=OrderStatus.SUCCESS.value, ) results.append( { "product_id": product.id, "status": OrderStatus.SUCCESS.value, }, ) else: OrderItem.objects.create( order=order, product=product, quantity=item["quantity"], price=product.price, status=OrderStatus.FAILED.value, fail_reason="库存不足", ) results.append( { "product_id": product.id, "status": OrderStatus.FAILED.value, "reason": "库存不足", }, ) except Product.DoesNotExist: results.append( { "product_id": item["product_id"], "status": OrderStatus.FAILED.value, "reason": "商品不存在", } ) except Exception as e: results.append( { "product_id": item["product_id"], "status": OrderStatus.FAILED.value, "reason": str(e), }, ) return order.id, results return await sync_to_async(_batch_create_order_sync)(order_items)