from .base_sync import BaseSync from loguru import logger from typing import List, Dict import json import time from datetime import datetime, timedelta class AccountSync(BaseSync): """账户信息同步器""" async def sync(self): """同步账户信息数据""" try: # 获取所有账号 accounts = self.get_accounts_from_redis() for k_id_str, account_info in accounts.items(): try: k_id = int(k_id_str) st_id = account_info.get('st_id', 0) exchange_id = account_info['exchange_id'] if k_id <= 0 or st_id <= 0: continue # 从Redis获取账户信息数据 account_data = await self._get_account_info_from_redis(k_id, st_id, exchange_id) # 同步到数据库 if account_data: success = self._sync_account_info_to_db(account_data) if success: logger.debug(f"账户信息同步成功: k_id={k_id}") except Exception as e: logger.error(f"同步账号 {k_id_str} 账户信息失败: {e}") continue logger.info("账户信息同步完成") except Exception as e: logger.error(f"账户信息同步失败: {e}") async def _get_account_info_from_redis(self, k_id: int, st_id: int, exchange_id: str) -> List[Dict]: """从Redis获取账户信息数据""" try: redis_key = f"{exchange_id}:balance:{k_id}" redis_funds = self.redis_client.client.hgetall(redis_key) if not redis_funds: return [] # 按天统计数据 from config.settings import SYNC_CONFIG recent_days = SYNC_CONFIG['recent_days'] today = datetime.now() date_stats = {} # 收集所有日期的数据 for fund_key, fund_json in redis_funds.items(): try: fund_data = json.loads(fund_json) date_str = fund_data.get('lz_time', '') lz_type = fund_data.get('lz_type', '') if not date_str or lz_type not in ['lz_balance', 'deposit', 'withdrawal']: continue # 只处理最近N天的数据 date_obj = datetime.strptime(date_str, '%Y-%m-%d') if (today - date_obj).days > recent_days: continue if date_str not in date_stats: date_stats[date_str] = { 'balance': 0.0, 'deposit': 0.0, 'withdrawal': 0.0, 'has_balance': False } lz_amount = float(fund_data.get('lz_amount', 0)) if lz_type == 'lz_balance': date_stats[date_str]['balance'] = lz_amount date_stats[date_str]['has_balance'] = True elif lz_type == 'deposit': date_stats[date_str]['deposit'] += lz_amount elif lz_type == 'withdrawal': date_stats[date_str]['withdrawal'] += lz_amount except (json.JSONDecodeError, ValueError) as e: logger.debug(f"解析Redis数据失败: {fund_key}, error={e}") continue # 转换为账户信息数据 account_data_list = [] sorted_dates = sorted(date_stats.keys()) prev_balance = 0.0 for date_str in sorted_dates: stats = date_stats[date_str] # 如果没有余额数据但有充提数据,仍然处理 if not stats['has_balance'] and stats['deposit'] == 0 and stats['withdrawal'] == 0: continue balance = stats['balance'] deposit = stats['deposit'] withdrawal = stats['withdrawal'] # 计算利润 profit = balance - deposit - withdrawal - prev_balance # 转换时间戳 date_obj = datetime.strptime(date_str, '%Y-%m-%d') time_timestamp = int(date_obj.timestamp()) account_data = { 'st_id': st_id, 'k_id': k_id, 'balance': balance, 'withdrawal': withdrawal, 'deposit': deposit, 'other': 0.0, # 暂时为0 'profit': profit, 'time': time_timestamp } account_data_list.append(account_data) # 更新前一天的余额 if stats['has_balance']: prev_balance = balance return account_data_list except Exception as e: logger.error(f"获取Redis账户信息失败: k_id={k_id}, error={e}") return [] def _sync_account_info_to_db(self, account_data_list: List[Dict]) -> bool: """同步账户信息到数据库""" session = self.db_manager.get_session() try: with session.begin(): for account_data in account_data_list: try: # 查询是否已存在 existing = session.execute( select(StrategyKX).where( and_( StrategyKX.k_id == account_data['k_id'], StrategyKX.st_id == account_data['st_id'], StrategyKX.time == account_data['time'] ) ) ).scalar_one_or_none() if existing: # 更新 existing.balance = account_data['balance'] existing.withdrawal = account_data['withdrawal'] existing.deposit = account_data['deposit'] existing.other = account_data['other'] existing.profit = account_data['profit'] else: # 插入 new_account = StrategyKX(**account_data) session.add(new_account) except Exception as e: logger.error(f"处理账户数据失败: {account_data}, error={e}") continue return True except Exception as e: logger.error(f"同步账户信息到数据库失败: error={e}") return False finally: session.close()