import asyncio from loguru import logger import signal import sys import time import json from typing import Dict import utils.helpers as helpers import redis from utils.redis_client import RedisClient from config.settings import SYNC_CONFIG from .position_sync import PositionSyncBatch from .order_sync import OrderSyncBatch # 使用批量版本 from .account_sync import AccountSyncBatch from utils.redis_batch_helper import RedisBatchHelper from config.settings import COMPUTER_NAMES, COMPUTER_NAME_PATTERN from typing import List, Dict, Any, Set, Optional from config.settings import REDIS_CONFIG class SyncManager: """同步管理器(完整批量版本)""" def __init__(self): self.is_running = True self.redis_client = RedisClient() self.sync_interval = SYNC_CONFIG['interval'] # 初始化批量同步工具 self.redis_helper = None # 初始化同步器 self.syncers = [] if SYNC_CONFIG['enable_position_sync']: position_sync = PositionSyncBatch() self.syncers.append(position_sync) logger.info("启用持仓批量同步") if SYNC_CONFIG['enable_order_sync']: order_sync = OrderSyncBatch() self.syncers.append(order_sync) logger.info("启用订单批量同步") if SYNC_CONFIG['enable_account_sync']: account_sync = AccountSyncBatch() self.syncers.append(account_sync) logger.info("启用账户信息批量同步") # 性能统计 self.stats = { 'total_syncs': 0, 'last_sync_time': 0, 'avg_sync_time': 0, 'position': {'accounts': 0, 'positions': 0, 'time': 0}, 'order': {'accounts': 0, 'orders': 0, 'time': 0}, 'account': {'accounts': 0, 'records': 0, 'time': 0} } # 注册信号处理器 signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) async def start(self): """启动同步服务""" logger.info(f"同步服务启动,间隔 {self.sync_interval} 秒") # await self.cp() # return while self.is_running: try: # 获取所有账号(只获取一次) accounts = self.redis_client.get_accounts_from_redis() if not accounts: logger.warning("未获取到任何账号,等待下次同步") await asyncio.sleep(self.sync_interval) continue # return self.stats['total_syncs'] += 1 sync_start = time.time() logger.info(f"第{self.stats['total_syncs']}次同步开始,共 {len(accounts)} 个账号") # 执行所有同步器 tasks = [syncer.sync_batch(accounts) for syncer in self.syncers] await asyncio.gather(*tasks, return_exceptions=True) # 更新统计 sync_time = time.time() - sync_start self._update_stats(sync_time) logger.info(f"同步完成,总耗时 {sync_time:.2f} 秒,等待 {self.sync_interval} 秒") await asyncio.sleep(self.sync_interval) except asyncio.CancelledError: logger.info("同步任务被取消") break except Exception as e: logger.error(f"同步任务异常: {e}") # 获取完整的错误信息 import traceback error_details = { 'error_type': type(e).__name__, 'error_message': str(e), 'traceback': traceback.format_exc() } logger.error("完整堆栈跟踪:\n{traceback}", traceback=error_details['traceback']) await asyncio.sleep(30) def _update_stats(self, sync_time: float): """更新统计信息""" self.stats['last_sync_time'] = sync_time self.stats['avg_sync_time'] = (self.stats['avg_sync_time'] * 0.9 + sync_time * 0.1) # 打印详细统计 stats_lines = [ f"=== 第{self.stats['total_syncs']}次同步统计 ===", f"总耗时: {sync_time:.2f}秒 | 平均耗时: {self.stats['avg_sync_time']:.2f}秒" ] if self.stats['position']['accounts'] > 0: stats_lines.append( f"持仓: {self.stats['position']['accounts']}账号/{self.stats['position']['positions']}条" f"/{self.stats['position']['time']:.2f}秒" ) if self.stats['order']['accounts'] > 0: stats_lines.append( f"订单: {self.stats['order']['accounts']}账号/{self.stats['order']['orders']}条" f"/{self.stats['order']['time']:.2f}秒" ) if self.stats['account']['accounts'] > 0: stats_lines.append( f"账户: {self.stats['account']['accounts']}账号/{self.stats['account']['records']}条" f"/{self.stats['account']['time']:.2f}秒" ) logger.info("\n".join(stats_lines)) def signal_handler(self, signum, frame): """信号处理器""" logger.info(f"接收到信号 {signum},正在关闭...") self.is_running = False async def stop(self): """停止同步服务""" self.is_running = False # 关闭所有数据库连接 for syncer in self.syncers: if hasattr(syncer, 'db_manager'): syncer.db_manager.close() logger.info("同步服务停止") async def cp(self): """把Redis 0库的资产数据复制到Redis 1库""" # 创建到Redis 0库的连接 redis_0 = redis.Redis( host=REDIS_CONFIG['host'], # 你的Redis主机 port=REDIS_CONFIG['port'], # 你的Redis端口 db=0, # 数据库0 password=None, # 如果有密码 decode_responses=True # 自动解码为字符串 ) # 创建到Redis 1库的连接 redis_1 = redis.Redis( host=REDIS_CONFIG['host'], port=REDIS_CONFIG['port'], db=1, # 数据库1 password=None, decode_responses=True ) accounts = self.redis_client.get_accounts_from_redis() print(f"共 {len(accounts)} 个账号") all_kid = accounts.keys() keys1 = redis_0.keys("Bybit_fin_*") # print(f"找到 {len(keys1)} 个匹配的键") keys2 = redis_0.keys("Metatrader_fin_*") # print(f"找到 {len(keys2)} 个匹配的键") all_keys = keys1 + keys2 print(f"共 {len(all_keys)} 个键") for key in all_keys: key_split = key.split("_") k_id = key_split[-1] if k_id not in all_kid: continue exchange_id = None if key_split[0] == "Bybit": exchange_id = "bybit" elif key_split[0] == "Metatrader": exchange_id = "mt5" # data = redis_0.hget(key) print(f"开始处理键 {key} 的数据") data = redis_0.hgetall(key) for k, v in data.items(): # print(f"{k}") data_dict = json.loads(v) data_dict['lz_amount'] = float(data_dict['lz_money']) # del data_dict['lz_money'] if data_dict['lz_type'] == 'lz_balance': row_key = k else: row_key = f"fund_{k}" # print(row_key) redis_0.hset(f"{exchange_id}:balance:{data_dict['k_id']}",row_key,json.dumps(data_dict)) # print(f"键 {key} 的数据为 {data}") redis_0.close() redis_0.close() print("复制完成")