Files
exchange_monitor_sync/utils/redis_batch_helper.py
lz_db 803d40b88e 1
2025-12-03 14:40:14 +08:00

129 lines
5.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import redis
from loguru import logger
from typing import List, Dict, Tuple
import json
import time
from datetime import datetime, timedelta
class RedisBatchHelper:
"""Redis批量数据获取助手"""
def __init__(self, redis_client):
self.redis_client = redis_client
def get_recent_orders_batch(self, exchange_id: str, account_list: List[Tuple[int, int]],
recent_days: int = 3) -> List[Dict]:
"""批量获取多个账号的最近订单数据(优化内存使用)"""
all_orders = []
try:
# 分批处理账号,避免内存过大
batch_size = 20 # 每批处理20个账号
for i in range(0, len(account_list), batch_size):
batch_accounts = account_list[i:i + batch_size]
# 并发获取这批账号的数据
batch_orders = self._get_batch_accounts_orders(exchange_id, batch_accounts, recent_days)
all_orders.extend(batch_orders)
# 批次间休息避免Redis压力过大
if i + batch_size < len(account_list):
time.sleep(0.05)
logger.info(f"批量获取订单完成: {len(account_list)}个账号,{len(all_orders)}条订单")
except Exception as e:
logger.error(f"批量获取订单失败: {e}")
return all_orders
def _get_batch_accounts_orders(self, exchange_id: str, account_list: List[Tuple[int, int]],
recent_days: int) -> List[Dict]:
"""获取一批账号的订单数据"""
batch_orders = []
try:
# 计算最近日期
today = datetime.now()
recent_dates = []
for i in range(recent_days):
date = today - timedelta(days=i)
recent_dates.append(date.strftime('%Y-%m-%d'))
# 为每个账号构建key列表
all_keys = []
key_to_account = {}
for k_id, st_id in account_list:
redis_key = f"{exchange_id}:orders:{k_id}"
# 获取该账号的所有key
try:
account_keys = self.redis_client.hkeys(redis_key)
for key in account_keys:
key_str = key.decode('utf-8') if isinstance(key, bytes) else key
if key_str == 'positions':
continue
# 检查是否是最近日期
for date_format in recent_dates:
if key_str.startswith(date_format + '_'):
all_keys.append((redis_key, key_str))
key_to_account[(redis_key, key_str)] = (k_id, st_id)
break
except Exception as e:
logger.error(f"获取账号 {k_id} 的key失败: {e}")
continue
if not all_keys:
return batch_orders
# 分批获取订单数据
chunk_size = 500
for i in range(0, len(all_keys), chunk_size):
chunk = all_keys[i:i + chunk_size]
# 按redis_key分组使用hmget批量获取
keys_by_redis_key = {}
for redis_key, key_str in chunk:
if redis_key not in keys_by_redis_key:
keys_by_redis_key[redis_key] = []
keys_by_redis_key[redis_key].append(key_str)
# 为每个redis_key批量获取
for redis_key, key_list in keys_by_redis_key.items():
try:
values = self.redis_client.hmget(redis_key, key_list)
for key_str, order_json in zip(key_list, values):
if not order_json:
continue
try:
order = json.loads(order_json)
# 验证时间
order_time = order.get('time', 0)
if order_time >= int(time.time()) - recent_days * 24 * 3600:
# 添加账号信息
k_id, st_id = key_to_account.get((redis_key, key_str), (0, 0))
order['k_id'] = k_id
order['st_id'] = st_id
order['exchange_id'] = exchange_id
batch_orders.append(order)
except json.JSONDecodeError as e:
logger.debug(f"解析订单JSON失败: key={key_str}, error={e}")
continue
except Exception as e:
logger.error(f"批量获取Redis数据失败: {redis_key}, error={e}")
continue
except Exception as e:
logger.error(f"获取批量账号订单失败: {e}")
return batch_orders