quantum_stack_interview/shop/services.py

145 lines
5.9 KiB
Python

from __future__ import annotations
import asyncio
from asgiref.sync import sync_to_async
from django.db import transaction
from django.db.models import Q
from shop.cache import KeywordCache
from shop.models import Order, OrderItem, OrderStatus, Product
# 定义最大页面大小, 这个应该放在配置文件中
MAX_PAGE_SIZE = 100
class ProductService:
@staticmethod
async def fuzzy_query(keyword: str, start: int, end: int):
qs = Product.objects.filter(
Q(name__icontains=keyword)
| Q(keywords__icontains=keyword)
| Q(description__icontains=keyword)
| Q(id__icontains=keyword)
)[start:end]
return await sync_to_async(list)(qs)
@staticmethod
async def update_keyword_cache(keyword: str, products: list[Product]):
cache = KeywordCache()
await cache.set_product_id(keyword, [product.id for product in products])
@staticmethod
async def search_products(keyword: str, page: int = 1, page_size: int = 50) -> list[Product]:
page_size = min(page_size, MAX_PAGE_SIZE)
# 只做首页缓存
if page == 1:
cache = KeywordCache()
try:
cached: list[int] = await cache.get_product_id(keyword)
if cached:
if len(cached) < page_size:
products = await ProductService.fuzzy_query(keyword, 0, page_size)
# 可能会丢失停机时的临界缓存, 不过在这是可接受的
asyncio.create_task(ProductService.update_keyword_cache(keyword, products)) # noqa: RUF006
return products
# 只取前 page_size 个 id, 且保持顺序
ids = cached[:page_size]
qs = Product.objects.filter(id__in=ids)
products = await sync_to_async(list)(qs)
# 保证顺序
id2product = {p.id: p for p in products}
return [id2product[i] for i in ids if i in id2product]
except (ConnectionError, TimeoutError):
# 如果缓存查询失败, 继续执行数据库查询
pass
assert page > 0 and page_size > 0
start = (page - 1) * page_size
end = start + page_size
products = await ProductService.fuzzy_query(keyword, start, end)
if page == 1 and len(products) > 0:
asyncio.create_task(ProductService.update_keyword_cache(keyword, products)) # noqa: RUF006
return products
@staticmethod
async def create_product(**kwargs):
product = await Product.objects.acreate(**kwargs)
# TODO:这里的关键词更新是不准确的
keywords = kwargs.get("keywords")
if keywords:
cache = KeywordCache()
asyncio.create_task(cache.invalidate_product(keywords)) # noqa: RUF006
# 返回新商品主要信息
return {
"id": product.id,
"name": product.name,
"description": product.description,
"price": str(product.price),
"stock": product.stock,
"keywords": product.keywords,
}
class OrderService:
@staticmethod
async def batch_create_order(order_items):
def _batch_create_order_sync(order_items):
results = []
with transaction.atomic():
order = Order.objects.create()
for item in order_items:
try:
product = Product.objects.select_for_update().get(id=item["product_id"])
if product.stock >= item["quantity"]:
product.stock -= item["quantity"]
product.save()
OrderItem.objects.create(
order=order,
product=product,
quantity=item["quantity"],
price=product.price,
status=OrderStatus.SUCCESS.value,
)
results.append(
{
"product_id": product.id,
"status": OrderStatus.SUCCESS.value,
},
)
else:
OrderItem.objects.create(
order=order,
product=product,
quantity=item["quantity"],
price=product.price,
status=OrderStatus.FAILED.value,
fail_reason="库存不足",
)
results.append(
{
"product_id": product.id,
"status": OrderStatus.FAILED.value,
"reason": "库存不足",
},
)
except Product.DoesNotExist:
results.append(
{
"product_id": item["product_id"],
"status": OrderStatus.FAILED.value,
"reason": "商品不存在",
}
)
except Exception as e:
results.append(
{
"product_id": item["product_id"],
"status": OrderStatus.FAILED.value,
"reason": str(e),
},
)
return order.id, results
return await sync_to_async(_batch_create_order_sync)(order_items)