import asyncio import redis import json import re import time from loguru import logger from config.settings import REDIS_CONFIG, COMPUTER_NAMES, COMPUTER_NAME_PATTERN,SYNC_CONFIG import utils.helpers as helpers from typing import List, Dict, Any, Set, Tuple, Optional from datetime import datetime, timedelta class RedisClient: """Redis客户端管理器""" _instance = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if not self._initialized: self._pool = None self._client = None self._initialized = True self.computer_names = self._get_computer_names() self.computer_name_pattern = re.compile(COMPUTER_NAME_PATTERN) @property def client(self): """获取Redis客户端(懒加载)""" if self._client is None: self._init_connection_pool() return self._client def _init_connection_pool(self): """初始化连接池""" try: self._pool = redis.ConnectionPool( host=REDIS_CONFIG['host'], port=REDIS_CONFIG['port'], db=REDIS_CONFIG['db'], decode_responses=REDIS_CONFIG['decode_responses'], max_connections=REDIS_CONFIG['max_connections'], socket_keepalive=True ) self._client = redis.Redis(connection_pool=self._pool) # 测试连接 self._client.ping() logger.info("Redis连接池初始化成功") except Exception as e: logger.error(f"Redis连接失败: {e}") raise def get_accounts_from_redis(self) -> Dict[str, Dict]: """从Redis获取所有计算机名的账号配置""" try: accounts_dict = {} total_keys_processed = 0 # 方法1:使用配置的计算机名列表 for computer_name in self.computer_names: accounts = self._get_accounts_by_computer_name(computer_name) total_keys_processed += 1 accounts_dict.update(accounts) # 方法2:如果配置的计算机名没有数据,尝试自动发现(备用方案) if not accounts_dict: logger.warning("配置的计算机名未找到数据,尝试自动发现...") accounts_dict = self._discover_all_accounts() logger.info(f"从 {len(self.computer_names)} 个计算机名获取到 {len(accounts_dict)} 个账号") return accounts_dict except Exception as e: logger.error(f"获取账户信息失败: {e}") return {} def _get_computer_names(self) -> List[str]: """获取计算机名列表""" if ',' in COMPUTER_NAMES: names = [name.strip() for name in COMPUTER_NAMES.split(',')] logger.info(f"使用配置的计算机名列表: {names}") return names return [COMPUTER_NAMES.strip()] def _get_accounts_by_computer_name(self, computer_name: str) -> Dict[str, Dict]: """获取指定计算机名的账号""" accounts_dict = {} try: # 构建key redis_key = f"{computer_name}_strategy_api" # 从Redis获取数据 result = self.client.hgetall(redis_key) if not result: logger.debug(f"未找到 {redis_key} 的策略API配置") return {} # logger.info(f"从 {redis_key} 获取到 {len(result)} 个交易所配置") for exchange_name, accounts_json in result.items(): try: accounts = json.loads(accounts_json) if not accounts: continue # 格式化交易所ID exchange_id = self.format_exchange_id(exchange_name) for account_id, account_info in accounts.items(): parsed_account = self.parse_account(exchange_id, account_id, account_info) if parsed_account: # 添加计算机名标记 parsed_account['computer_name'] = computer_name accounts_dict[account_id] = parsed_account except json.JSONDecodeError as e: logger.error(f"解析交易所 {exchange_name} 的JSON数据失败: {e}") continue except Exception as e: logger.error(f"处理交易所 {exchange_name} 数据异常: {e}") continue logger.info(f"从 {redis_key} 解析到 {len(accounts_dict)} 个账号") except Exception as e: logger.error(f"获取计算机名 {computer_name} 的账号失败: {e}") return accounts_dict def _discover_all_accounts(self) -> Dict[str, Dict]: """自动发现所有匹配的账号key""" accounts_dict = {} discovered_keys = [] try: # 获取所有匹配模式的key pattern = "*_strategy_api" cursor = 0 while True: cursor, keys = self.client.scan(cursor, match=pattern, count=100) for key in keys: key_str = key.decode('utf-8') if isinstance(key, bytes) else key discovered_keys.append(key_str) if cursor == 0: break logger.info(f"自动发现 {len(discovered_keys)} 个策略API key") # 处理每个发现的key for key_str in discovered_keys: # 提取计算机名 computer_name = key_str.replace('_strategy_api', '') # 验证计算机名格式 if self.computer_name_pattern.match(computer_name): accounts = self._get_accounts_by_computer_name(computer_name) accounts_dict.update(accounts) else: logger.warning(f"跳过不符合格式的计算机名: {computer_name}") logger.info(f"自动发现共获取到 {len(accounts_dict)} 个账号") except Exception as e: logger.error(f"自动发现账号失败: {e}") return accounts_dict def format_exchange_id(self, key: str) -> str: """格式化交易所ID""" key = key.lower().strip() # 交易所名称映射 exchange_mapping = { 'metatrader': 'mt5', 'binance_spot_test': 'binance', 'binance_spot': 'binance', 'binance': 'binance', 'gate_spot': 'gate', 'okex': 'okx', 'okx': 'okx', 'bybit': 'bybit', 'bybit_spot': 'bybit', 'bybit_test': 'bybit', 'huobi': 'huobi', 'huobi_spot': 'huobi', 'gate': 'gate', 'gateio': 'gate', 'kucoin': 'kucoin', 'kucoin_spot': 'kucoin', 'mexc': 'mexc', 'mexc_spot': 'mexc', 'bitget': 'bitget', 'bitget_spot': 'bitget' } normalized_key = exchange_mapping.get(key, key) # 记录未映射的交易所 if normalized_key == key and key not in exchange_mapping.values(): logger.debug(f"未映射的交易所名称: {key}") return normalized_key def parse_account(self, exchange_id: str, account_id: str, account_info: str) -> Optional[Dict]: """解析账号信息""" try: source_account_info = json.loads(account_info) # print(source_account_info) # 基础信息 account_data = { 'exchange_id': exchange_id, 'k_id': account_id, 'st_id': helpers.safe_int(source_account_info.get('st_id'), 0), 'add_time': helpers.safe_int(source_account_info.get('add_time'), 0), 'api_key': source_account_info.get('api_key', ''), } return account_data except json.JSONDecodeError as e: logger.error(f"解析账号 {account_id} JSON数据失败: {e}, 原始数据: {account_info[:100]}...") return None except Exception as e: logger.error(f"处理账号 {account_id} 数据异常: {e}") return None def _group_accounts_by_exchange(self, accounts: Dict[str, Dict]) -> Dict[str, List[Dict]]: """按交易所分组账号""" groups = {} for account_id, account_info in accounts.items(): exchange_id = account_info.get('exchange_id') if exchange_id: if exchange_id not in groups: groups[exchange_id] = [] groups[exchange_id].append(account_info) return groups async def _collect_all_positions(self, accounts: Dict[str, Dict]) -> List[Dict]: """收集所有账号的持仓数据""" all_positions = [] try: # 按交易所分组账号 account_groups = self._group_accounts_by_exchange(accounts) # 并发收集每个交易所的数据 tasks = [] for exchange_id, account_list in account_groups.items(): task = self._collect_exchange_positions(exchange_id, account_list) tasks.append(task) # 等待所有任务完成并合并结果 results = await asyncio.gather(*tasks, return_exceptions=True) for result in results: if isinstance(result, list): all_positions.extend(result) except Exception as e: logger.error(f"收集持仓数据失败: {e}") return all_positions async def _collect_exchange_positions(self, exchange_id: str, account_list: List[Dict]) -> List[Dict]: """收集某个交易所的持仓数据""" positions_list = [] try: tasks = [] for account_info in account_list: k_id = int(account_info['k_id']) st_id = account_info.get('st_id', 0) task = self._get_positions_from_redis(k_id, st_id, exchange_id) tasks.append(task) # 并发获取 results = await asyncio.gather(*tasks, return_exceptions=True) for result in results: if isinstance(result, list): positions_list.extend(result) except Exception as e: logger.error(f"收集交易所 {exchange_id} 持仓数据失败: {e}") return positions_list async def _get_positions_from_redis(self, k_id: int, st_id: int, exchange_id: str) -> List[Dict]: """从Redis获取持仓数据""" try: redis_key = f"{exchange_id}:positions:{k_id}" redis_data = self.client.hget(redis_key, 'positions') if not redis_data: return [] positions = json.loads(redis_data) # 添加账号信息 for position in positions: # print(position['symbol']) position['k_id'] = k_id position['st_id'] = st_id position['exchange_id'] = exchange_id return positions except Exception as e: logger.error(f"获取Redis持仓数据失败: k_id={k_id}, error={e}") return [] async def _collect_all_account_data(self, accounts: Dict[str, Dict]) -> List[Dict]: """收集所有账号的账户信息数据""" all_account_data = [] try: # 按交易所分组账号 account_groups = self._group_accounts_by_exchange(accounts) # 并发收集每个交易所的数据 tasks = [] for exchange_id, account_list in account_groups.items(): task = self._collect_exchange_account_data(exchange_id, account_list) tasks.append(task) # 等待所有任务完成并合并结果 results = await asyncio.gather(*tasks, return_exceptions=True) for result in results: if isinstance(result, list): all_account_data.extend(result) logger.info(f"收集到 {len(all_account_data)} 条账户信息记录") except Exception as e: logger.error(f"收集账户信息数据失败: {e}") return all_account_data async def _collect_exchange_account_data(self, exchange_id: str, account_list: List[Dict]) -> List[Dict]: """收集某个交易所的账户信息数据""" account_data_list = [] try: for account_info in account_list: k_id = int(account_info['k_id']) st_id = account_info.get('st_id', 0) add_time = account_info.get('add_time', 0) # 从Redis获取账户信息数据 account_data = await self._get_account_info_from_redis(k_id, st_id, exchange_id, add_time) account_data_list.extend(account_data) logger.debug(f"交易所 {exchange_id}: 收集到 {len(account_data_list)} 条账户信息") except Exception as e: logger.error(f"收集交易所 {exchange_id} 账户信息失败: {e}") return account_data_list async def _get_account_info_from_redis(self, k_id: int, st_id: int, exchange_id: str, add_time: int) -> List[Dict]: """从Redis获取账户信息数据(批量优化版本)""" try: redis_key = f"{exchange_id}:balance:{k_id}" redis_funds = self.client.hgetall(redis_key) if not redis_funds: return [] # 按天统计数据 recent_days = SYNC_CONFIG['recent_days_account'] today = datetime.now() date_stats = {} # 收集所有日期的数据 for fund_key, fund_json in redis_funds.items(): try: fund_data = json.loads(fund_json) date_str = fund_data.get('lz_time', '') lz_type = fund_data.get('lz_type', '') add_time_str = helpers.convert_timestamp(add_time,format_str='%Y-%m-%d') if date_str < add_time_str: continue if not date_str or lz_type not in ['lz_balance', 'deposit', 'withdrawal']: continue # 只处理最近N天的数据 date_obj = datetime.strptime(date_str, '%Y-%m-%d') if recent_days != 0: if (today - date_obj).days > recent_days: continue if date_str not in date_stats: date_stats[date_str] = { 'balance': 0.0, 'deposit': 0.0, 'withdrawal': 0.0, 'has_balance': False } if fund_data.get('lz_amount'): lz_amount = float(fund_data.get('lz_amount', 0)) else: lz_amount = float(fund_data.get('lz_money', 0)) if lz_type == 'lz_balance': date_stats[date_str]['balance'] = lz_amount date_stats[date_str]['has_balance'] = True elif lz_type == 'deposit': date_stats[date_str]['deposit'] += lz_amount elif lz_type == 'withdrawal': date_stats[date_str]['withdrawal'] += lz_amount except (json.JSONDecodeError, ValueError) as e: logger.debug(f"解析Redis数据失败: {fund_key}, error={e}") continue # 转换为账户信息数据 account_data_list = [] sorted_dates = sorted(date_stats.keys()) # 获取前一天余额用于计算利润 prev_balance_map = self._get_previous_balances(redis_funds, sorted_dates) for date_str in sorted_dates: stats = date_stats[date_str] # 如果没有余额数据但有充提数据,仍然处理 if not stats['has_balance'] and stats['deposit'] == 0 and stats['withdrawal'] == 0: continue balance = stats['balance'] deposit = stats['deposit'] withdrawal = stats['withdrawal'] # 计算利润 prev_balance = prev_balance_map.get(date_str, 0.0) profit = balance - deposit - withdrawal - prev_balance # 转换时间戳 date_obj = datetime.strptime(date_str, '%Y-%m-%d') time_timestamp = int(date_obj.timestamp()) account_data = { 'st_id': st_id, 'k_id': k_id, 'balance': balance, 'withdrawal': withdrawal, 'deposit': deposit, 'other': 0.0, # 暂时为0 'profit': profit, 'time': time_timestamp } account_data_list.append(account_data) return account_data_list except Exception as e: logger.error(f"获取Redis账户信息失败: k_id={k_id}, error={e}") return [] def _get_previous_balances(self, redis_funds: Dict, sorted_dates: List[str]) -> Dict[str, float]: """获取前一天的余额""" prev_balance_map = {} prev_date = None for date_str in sorted_dates: # 查找前一天的余额 if prev_date: for fund_key, fund_json in redis_funds.items(): try: fund_data = json.loads(fund_json) if (fund_data.get('lz_time') == prev_date and fund_data.get('lz_type') == 'lz_balance'): prev_balance_map[date_str] = float(fund_data.get('lz_amount', 0)) break except: continue else: prev_balance_map[date_str] = 0.0 prev_date = date_str return prev_balance_map async def _collect_all_orders(self, accounts: Dict[str, Dict]) -> List[Dict]: """收集所有账号的订单数据""" all_orders = [] try: # 按交易所分组账号 account_groups = self._group_accounts_by_exchange(accounts) # 并发收集每个交易所的数据 tasks = [] for exchange_id, account_list in account_groups.items(): task = self._collect_exchange_orders(exchange_id, account_list) tasks.append(task) # 等待所有任务完成并合并结果 results = await asyncio.gather(*tasks, return_exceptions=True) for result in results: if isinstance(result, list): all_orders.extend(result) except Exception as e: logger.error(f"收集订单数据失败: {e}") return all_orders async def _collect_exchange_orders(self, exchange_id: str, account_list: List[Dict]) -> List[Dict]: """收集某个交易所的订单数据""" orders_list = [] try: # 并发获取每个账号的数据 tasks = [] for account_info in account_list: k_id = int(account_info['k_id']) st_id = account_info.get('st_id', 0) task = self._get_recent_orders_from_redis(k_id, st_id, exchange_id) tasks.append(task) results = await asyncio.gather(*tasks, return_exceptions=True) for result in results: if isinstance(result, list): orders_list.extend(result) logger.debug(f"交易所 {exchange_id}: 收集到 {len(orders_list)} 条订单") except Exception as e: logger.error(f"收集交易所 {exchange_id} 订单数据失败: {e}") return orders_list async def _get_recent_orders_from_redis(self, k_id: int, st_id: int, exchange_id: str) -> List[Dict]: """从Redis获取最近N天的订单数据""" try: redis_key = f"{exchange_id}:orders:{k_id}" recent_days = SYNC_CONFIG['recent_days_order'] # 计算最近N天的日期 today = datetime.now() recent_dates = [] for i in range(recent_days): date = today - timedelta(days=i) date_format = date.strftime('%Y-%m-%d') recent_dates.append(date_format) # 使用scan获取所有符合条件的key cursor = 0 recent_keys = [] while True: cursor, keys = self.client.hscan(redis_key, cursor, count=1000) for key, _ in keys.items(): key_str = key.decode('utf-8') if isinstance(key, bytes) else key if key_str == 'positions': continue # 检查是否以最近N天的日期开头 for date_format in recent_dates: if key_str.startswith(date_format + '_'): recent_keys.append(key_str) break if cursor == 0: break if not recent_keys: return [] # 批量获取订单数据 orders_list = [] # 分批获取,避免单次hgetall数据量太大 chunk_size = 500 for i in range(0, len(recent_keys), chunk_size): chunk_keys = recent_keys[i:i + chunk_size] # 使用hmget批量获取 chunk_values = self.client.hmget(redis_key, chunk_keys) for key, order_json in zip(chunk_keys, chunk_values): if not order_json: continue try: order = json.loads(order_json) # 验证时间 order_time = order.get('time', 0) if order_time >= int(time.time()) - recent_days * 24 * 3600: # 添加账号信息 order['k_id'] = k_id order['st_id'] = st_id order['exchange_id'] = exchange_id orders_list.append(order) except json.JSONDecodeError as e: logger.debug(f"解析订单JSON失败: key={key}, error={e}") continue return orders_list except Exception as e: logger.error(f"获取Redis订单数据失败: k_id={k_id}, error={e}") return [] def close(self): """关闭连接池""" if self._pool: self._pool.disconnect() logger.info("Redis连接池已关闭")