from .base_sync import BaseSync from loguru import logger from typing import List, Dict, Any, Set import json import time from datetime import datetime, timedelta from sqlalchemy import text, and_ from models.orm_models import StrategyKX class AccountSyncBatch(BaseSync): """账户信息批量同步器""" async def sync_batch(self, accounts: Dict[str, Dict]): """批量同步所有账号的账户信息""" try: logger.info(f"开始批量同步账户信息,共 {len(accounts)} 个账号") # 测试 # res = await self.redis_client._get_account_info_from_redis(10140, 5548, 'mt5') # print(res) # return # 收集所有账号的数据 all_account_data = await self.redis_client._collect_all_account_data(accounts) if not all_account_data: logger.info("无账户信息数据需要同步") return # 批量同步到数据库 success = await self._sync_account_info_batch_to_db(all_account_data) if success: logger.info(f"账户信息批量同步完成: 处理 {len(all_account_data)} 条记录") else: logger.error("账户信息批量同步失败") except Exception as e: logger.error(f"账户信息批量同步失败: {e}") async def _sync_account_info_batch_to_db(self, account_data_list: List[Dict]) -> bool: """批量同步账户信息到数据库(最高效版本)""" session = self.db_manager.get_session() try: if not account_data_list: return True with session.begin(): # 方法1:使用原生SQL批量插入/更新(性能最好) success = self._batch_upsert_account_info(session, account_data_list) if not success: # 方法2:回退到ORM批量操作 success = self._batch_orm_upsert_account_info(session, account_data_list) return success except Exception as e: logger.error(f"批量同步账户信息到数据库失败: {e}") return False finally: session.close() def _batch_upsert_account_info(self, session, account_data_list: List[Dict]) -> bool: """使用原生SQL批量插入/更新账户信息""" try: # 准备批量数据 values_list = [] for data in account_data_list: values = ( f"({data['st_id']}, {data['k_id']}, 'USDT', " f"{data['balance']}, {data['withdrawal']}, {data['deposit']}, " f"{data['other']}, {data['profit']}, {data['time']})" ) values_list.append(values) if not values_list: return True values_str = ", ".join(values_list) # 使用INSERT ... ON DUPLICATE KEY UPDATE sql = f""" INSERT INTO deh_strategy_kx_new (st_id, k_id, asset, balance, withdrawal, deposit, other, profit, time) VALUES {values_str} ON DUPLICATE KEY UPDATE balance = VALUES(balance), withdrawal = VALUES(withdrawal), deposit = VALUES(deposit), other = VALUES(other), profit = VALUES(profit), up_time = NOW() """ session.execute(text(sql)) logger.info(f"原生SQL批量更新账户信息: {len(account_data_list)} 条记录") return True except Exception as e: logger.error(f"原生SQL批量更新账户信息失败: {e}") return False def _batch_orm_upsert_account_info(self, session, account_data_list: List[Dict]) -> bool: """使用ORM批量插入/更新账户信息""" try: # 分组数据以提高效率 account_data_by_key = {} for data in account_data_list: key = (data['k_id'], data['st_id'], data['time']) account_data_by_key[key] = data # 批量查询现有记录 existing_records = self._batch_query_existing_records(session, list(account_data_by_key.keys())) # 批量更新或插入 to_update = [] to_insert = [] for key, data in account_data_by_key.items(): if key in existing_records: # 更新 record = existing_records[key] record.balance = data['balance'] record.withdrawal = data['withdrawal'] record.deposit = data['deposit'] record.other = data['other'] record.profit = data['profit'] else: # 插入 to_insert.append(StrategyKX(**data)) # 批量插入新记录 if to_insert: session.add_all(to_insert) logger.info(f"ORM批量更新账户信息: 更新 {len(existing_records)} 条,插入 {len(to_insert)} 条") return True except Exception as e: logger.error(f"ORM批量更新账户信息失败: {e}") return False def _batch_query_existing_records(self, session, keys: List[tuple]) -> Dict[tuple, StrategyKX]: """批量查询现有记录""" existing_records = {} try: if not keys: return existing_records # 构建查询条件 conditions = [] for k_id, st_id, time_val in keys: conditions.append(f"(k_id = {k_id} AND st_id = {st_id} AND time = {time_val})") if conditions: conditions_str = " OR ".join(conditions) sql = f""" SELECT * FROM deh_strategy_kx_new WHERE {conditions_str} """ results = session.execute(text(sql)).fetchall() for row in results: key = (row.k_id, row.st_id, row.time) existing_records[key] = StrategyKX( id=row.id, st_id=row.st_id, k_id=row.k_id, asset=row.asset, balance=row.balance, withdrawal=row.withdrawal, deposit=row.deposit, other=row.other, profit=row.profit, time=row.time ) except Exception as e: logger.error(f"批量查询现有记录失败: {e}") return existing_records async def sync(self): """兼容旧接口""" accounts = self.get_accounts_from_redis() await self.sync_batch(accounts)