From 7fa249a767d53d78cc6b27a39d767e12bbf0a426 Mon Sep 17 00:00:00 2001 From: 7LZL <641971963@qq.com> Date: Tue, 2 Dec 2025 22:05:54 +0800 Subject: [PATCH] add --- .env | 35 ++++++++ config/__init__.py | 0 config/database.py | 25 ++++++ config/settings.py | 35 ++++++++ main.py | 52 +++++++++++ models/__init__.py | 0 models/orm_models.py | 76 ++++++++++++++++ requirements.txt | 14 +++ sync/__init__.py | 0 sync/account_sync.py | 183 ++++++++++++++++++++++++++++++++++++++ sync/base_sync.py | 90 +++++++++++++++++++ sync/manager.py | 66 ++++++++++++++ sync/order_sync.py | 166 ++++++++++++++++++++++++++++++++++ sync/position_sync.py | 174 ++++++++++++++++++++++++++++++++++++ utils/__init__.py | 0 utils/database_manager.py | 75 ++++++++++++++++ utils/helpers.py | 0 utils/redis_client.py | 54 +++++++++++ 18 files changed, 1045 insertions(+) create mode 100644 .env create mode 100644 config/__init__.py create mode 100644 config/database.py create mode 100644 config/settings.py create mode 100644 main.py create mode 100644 models/__init__.py create mode 100644 models/orm_models.py create mode 100644 requirements.txt create mode 100644 sync/__init__.py create mode 100644 sync/account_sync.py create mode 100644 sync/base_sync.py create mode 100644 sync/manager.py create mode 100644 sync/order_sync.py create mode 100644 sync/position_sync.py create mode 100644 utils/__init__.py create mode 100644 utils/database_manager.py create mode 100644 utils/helpers.py create mode 100644 utils/redis_client.py diff --git a/.env b/.env new file mode 100644 index 0000000..7c79a7a --- /dev/null +++ b/.env @@ -0,0 +1,35 @@ +# Redis配置 +REDIS_HOST=localhost +REDIS_PORT=6379 +REDIS_PASSWORD= +REDIS_DB=0 + +# MySQL配置 +DB_HOST=localhost +DB_PORT=3306 +DB_USER=root +DB_PASSWORD=your_password +DB_DATABASE=exchange_monitor +DB_POOL_SIZE=10 +DB_MAX_OVERFLOW=20 +DB_POOL_RECYCLE=3600 + +# SQLAlchemy配置 +SQLALCHEMY_ECHO=false +SQLALCHEMY_ECHO_POOL=false + +# 同步配置 +SYNC_INTERVAL=60 +RECENT_DAYS=3 +CHUNK_SIZE=1000 +ENABLE_POSITION_SYNC=true +ENABLE_ORDER_SYNC=true +ENABLE_ACCOUNT_SYNC=true + +# 日志配置 +LOG_LEVEL=INFO +LOG_ROTATION=10 MB +LOG_RETENTION=7 days + +# 计算机名(用于过滤账号) +COMPUTER_NAME=lz_c01 \ No newline at end of file diff --git a/config/__init__.py b/config/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/config/database.py b/config/database.py new file mode 100644 index 0000000..d17b0e4 --- /dev/null +++ b/config/database.py @@ -0,0 +1,25 @@ +import os +from dotenv import load_dotenv + +load_dotenv() + +# MySQL数据库配置 +DATABASE_CONFIG = { + 'host': os.getenv('DB_HOST', 'localhost'), + 'port': int(os.getenv('DB_PORT', 3306)), + 'user': os.getenv('DB_USER', 'root'), + 'password': os.getenv('DB_PASSWORD', ''), + 'database': os.getenv('DB_DATABASE', 'exchange_monitor'), + 'charset': 'utf8mb4', + 'pool_size': int(os.getenv('DB_POOL_SIZE', 10)), + 'max_overflow': int(os.getenv('DB_MAX_OVERFLOW', 20)), + 'pool_recycle': int(os.getenv('DB_POOL_RECYCLE', 3600)) +} + +# SQLAlchemy配置 +SQLALCHEMY_CONFIG = { + 'echo': os.getenv('SQLALCHEMY_ECHO', 'False').lower() == 'true', + 'echo_pool': os.getenv('SQLALCHEMY_ECHO_POOL', 'False').lower() == 'true', + 'pool_pre_ping': True, + 'pool_recycle': 3600 +} \ No newline at end of file diff --git a/config/settings.py b/config/settings.py new file mode 100644 index 0000000..fd3ca5b --- /dev/null +++ b/config/settings.py @@ -0,0 +1,35 @@ +import os +from dotenv import load_dotenv + +load_dotenv() + +# Redis配置 +REDIS_CONFIG = { + 'host': os.getenv('REDIS_HOST', 'localhost'), + 'port': int(os.getenv('REDIS_PORT', 6379)), + 'password': os.getenv('REDIS_PASSWORD', ''), + 'db': int(os.getenv('REDIS_DB', 0)), + 'decode_responses': True, + 'max_connections': 30 +} + +# 同步配置 +SYNC_CONFIG = { + 'interval': int(os.getenv('SYNC_INTERVAL', 60)), # 同步间隔(秒) + 'recent_days': int(os.getenv('RECENT_DAYS', 3)), # 同步最近几天数据 + 'chunk_size': int(os.getenv('CHUNK_SIZE', 1000)), # 批量处理大小 + 'enable_position_sync': os.getenv('ENABLE_POSITION_SYNC', 'true').lower() == 'true', + 'enable_order_sync': os.getenv('ENABLE_ORDER_SYNC', 'true').lower() == 'true', + 'enable_account_sync': os.getenv('ENABLE_ACCOUNT_SYNC', 'true').lower() == 'true' +} + +# 日志配置 +LOG_CONFIG = { + 'level': os.getenv('LOG_LEVEL', 'INFO'), + 'rotation': os.getenv('LOG_ROTATION', '10 MB'), + 'retention': os.getenv('LOG_RETENTION', '7 days'), + 'format': '{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}' +} + +# 计算机名配置(用于过滤账号) +COMPUTER_NAME = os.getenv('COMPUTER_NAME', 'lz_c01') \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..0c7dded --- /dev/null +++ b/main.py @@ -0,0 +1,52 @@ +import asyncio +import sys +from loguru import logger +import os + +# 配置日志 +from config.settings import LOG_CONFIG + +logger.remove() # 移除默认处理器 +logger.add( + sys.stdout, + level=LOG_CONFIG['level'], + format=LOG_CONFIG['format'] +) +logger.add( + "logs/sync_{time:YYYY-MM-DD}.log", + rotation=LOG_CONFIG['rotation'], + retention=LOG_CONFIG['retention'], + level=LOG_CONFIG['level'], + format=LOG_CONFIG['format'] +) + +async def main(): + """主程序""" + from sync.manager import SyncManager + + logger.info("=== 交易所数据同步服务启动 ===") + logger.info(f"工作目录: {os.getcwd()}") + + try: + manager = SyncManager() + await manager.start() + + except KeyboardInterrupt: + logger.info("用户中断") + except Exception as e: + logger.error(f"服务运行异常: {e}") + import traceback + logger.error(traceback.format_exc()) + finally: + logger.info("=== 交易所数据同步服务停止 ===") + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + logger.info("程序被用户中断") + except Exception as e: + logger.error(f"程序异常: {e}") + import traceback + logger.error(traceback.format_exc()) + sys.exit(1) \ No newline at end of file diff --git a/models/__init__.py b/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/models/orm_models.py b/models/orm_models.py new file mode 100644 index 0000000..b06f67b --- /dev/null +++ b/models/orm_models.py @@ -0,0 +1,76 @@ +from sqlalchemy import create_engine, MetaData +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session +from sqlalchemy import Integer, String, Float, DateTime, DECIMAL, func +from datetime import datetime +from typing import Optional +import json + +class Base(DeclarativeBase): + """SQLAlchemy基础类""" + pass + +class StrategyPosition(Base): + """持仓表模型""" + __tablename__ = 'deh_strategy_position_new' + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, comment='自增id') + st_id: Mapped[int] = mapped_column(Integer, nullable=False, comment='策略id') + k_id: Mapped[int] = mapped_column(Integer, default=0, comment='对应strategy_key 的ID') + asset: Mapped[str] = mapped_column(String(32), default='USDT', comment='使用资产名称') + symbol: Mapped[str] = mapped_column(String(50), nullable=False, comment='交易对') + price: Mapped[Optional[float]] = mapped_column(Float, comment='持仓均价') + side: Mapped[str] = mapped_column(String(10), nullable=False, comment='方向') + + # 注意:这里属性名是 qty,但对应数据库字段是 sum + qty: Mapped[float] = mapped_column("sum", Float, nullable=False, comment='仓位(张数)') + + asset_num: Mapped[float] = mapped_column(DECIMAL(20, 8), default=0.00000000, comment='资产数量') + asset_profit: Mapped[Optional[float]] = mapped_column(DECIMAL(20, 8), comment='利润数量') + leverage: Mapped[int] = mapped_column(Integer, default=0, comment='杠杆倍数') + uptime: Mapped[int] = mapped_column(Integer, nullable=False, comment='更新时间') + profit_price: Mapped[float] = mapped_column(DECIMAL(20, 8), default=0.00000000, comment='止盈价格') + stop_price: Mapped[float] = mapped_column(DECIMAL(20, 8), default=0.00000000, comment='止损价格') + liquidation_price: Mapped[float] = mapped_column(DECIMAL(20, 8), default=0.00000000, comment='强平价格') + + def __repr__(self) -> str: + return f"StrategyPosition(id={self.id!r}, k_id={self.k_id!r}, symbol={self.symbol!r})" + +class StrategyOrder(Base): + """订单表模型""" + __tablename__ = 'deh_strategy_order_new' + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + st_id: Mapped[Optional[int]] = mapped_column(Integer, comment='策略id') + k_id: Mapped[int] = mapped_column(Integer, default=0, comment='对应strategy_key 的ID') + asset: Mapped[Optional[str]] = mapped_column(String(32), default='USDT', comment='资产名称') + order_id: Mapped[Optional[str]] = mapped_column(String(765), comment='订单id') + symbol: Mapped[Optional[str]] = mapped_column(String(120), comment='交易对') + side: Mapped[Optional[str]] = mapped_column(String(120), comment='订单方向') + price: Mapped[Optional[float]] = mapped_column(Float, comment='订单价格') + time: Mapped[Optional[int]] = mapped_column(Integer, comment='订单时间') + order_qty: Mapped[Optional[float]] = mapped_column(Float, comment='订单挂单数量') + last_qty: Mapped[Optional[float]] = mapped_column(Float, comment='订单成交数量') + avg_price: Mapped[Optional[float]] = mapped_column(Float, comment='订单成交均价') + exchange_id: Mapped[Optional[int]] = mapped_column(Integer, comment='交易所id') + + def __repr__(self) -> str: + return f"StrategyOrder(id={self.id!r}, order_id={self.order_id!r})" + +class StrategyKX(Base): + """账户信息表模型""" + __tablename__ = 'deh_strategy_kx_new' + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, comment='自增id') + st_id: Mapped[Optional[int]] = mapped_column(Integer, comment='策略id') + k_id: Mapped[int] = mapped_column(Integer, default=0, comment='对应strategy_key 的ID') + asset: Mapped[Optional[str]] = mapped_column(String(32), default='USDT', comment='资产名') + balance: Mapped[float] = mapped_column(DECIMAL(20, 8), default=0.00000000, comment='当日账户金额') + withdrawal: Mapped[float] = mapped_column(DECIMAL(20, 8), default=0.00000000, comment='当日提现') + deposit: Mapped[float] = mapped_column(DECIMAL(20, 8), default=0.00000000, comment='当日充值') + other: Mapped[float] = mapped_column(DECIMAL(20, 8), default=0.00000000, comment='当日其他') + profit: Mapped[float] = mapped_column(DECIMAL(20, 8), default=0.00000000, comment='当日利润') + time: Mapped[Optional[int]] = mapped_column(Integer, comment='时间') + up_time: Mapped[Optional[datetime]] = mapped_column(DateTime, default=func.now(), onupdate=func.now(), comment='最后更新时间') + + def __repr__(self) -> str: + return f"StrategyKX(id={self.id!r}, k_id={self.k_id!r}, time={self.time!r})" \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e09cf05 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,14 @@ +# 基础依赖 +python-dotenv>=1.0.0 +loguru>=0.7.0 + +# 数据库 +SQLAlchemy>=2.0.0 +pymysql>=1.1.0 +aiomysql>=0.2.0 + +# Redis +redis>=5.0.0 + +# 异步 +asyncio>=3.4.3 \ No newline at end of file diff --git a/sync/__init__.py b/sync/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sync/account_sync.py b/sync/account_sync.py new file mode 100644 index 0000000..75b83ae --- /dev/null +++ b/sync/account_sync.py @@ -0,0 +1,183 @@ +from .base_sync import BaseSync +from loguru import logger +from typing import List, Dict +import json +import time +from datetime import datetime, timedelta + +class AccountSync(BaseSync): + """账户信息同步器""" + + async def sync(self): + """同步账户信息数据""" + try: + # 获取所有账号 + accounts = self.get_accounts_from_redis() + + for k_id_str, account_info in accounts.items(): + try: + k_id = int(k_id_str) + st_id = account_info.get('st_id', 0) + exchange_id = account_info['exchange_id'] + + if k_id <= 0 or st_id <= 0: + continue + + # 从Redis获取账户信息数据 + account_data = await self._get_account_info_from_redis(k_id, st_id, exchange_id) + + # 同步到数据库 + if account_data: + success = self._sync_account_info_to_db(account_data) + if success: + logger.debug(f"账户信息同步成功: k_id={k_id}") + + except Exception as e: + logger.error(f"同步账号 {k_id_str} 账户信息失败: {e}") + continue + + logger.info("账户信息同步完成") + + except Exception as e: + logger.error(f"账户信息同步失败: {e}") + + async def _get_account_info_from_redis(self, k_id: int, st_id: int, exchange_id: str) -> List[Dict]: + """从Redis获取账户信息数据""" + try: + redis_key = f"{exchange_id}:balance:{k_id}" + redis_funds = self.redis_client.client.hgetall(redis_key) + + if not redis_funds: + return [] + + # 按天统计数据 + from config.settings import SYNC_CONFIG + recent_days = SYNC_CONFIG['recent_days'] + + today = datetime.now() + date_stats = {} + + # 收集所有日期的数据 + for fund_key, fund_json in redis_funds.items(): + try: + fund_data = json.loads(fund_json) + date_str = fund_data.get('lz_time', '') + lz_type = fund_data.get('lz_type', '') + + if not date_str or lz_type not in ['lz_balance', 'deposit', 'withdrawal']: + continue + + # 只处理最近N天的数据 + date_obj = datetime.strptime(date_str, '%Y-%m-%d') + if (today - date_obj).days > recent_days: + continue + + if date_str not in date_stats: + date_stats[date_str] = { + 'balance': 0.0, + 'deposit': 0.0, + 'withdrawal': 0.0, + 'has_balance': False + } + + lz_amount = float(fund_data.get('lz_amount', 0)) + + if lz_type == 'lz_balance': + date_stats[date_str]['balance'] = lz_amount + date_stats[date_str]['has_balance'] = True + elif lz_type == 'deposit': + date_stats[date_str]['deposit'] += lz_amount + elif lz_type == 'withdrawal': + date_stats[date_str]['withdrawal'] += lz_amount + + except (json.JSONDecodeError, ValueError) as e: + logger.debug(f"解析Redis数据失败: {fund_key}, error={e}") + continue + + # 转换为账户信息数据 + account_data_list = [] + sorted_dates = sorted(date_stats.keys()) + prev_balance = 0.0 + + for date_str in sorted_dates: + stats = date_stats[date_str] + + # 如果没有余额数据但有充提数据,仍然处理 + if not stats['has_balance'] and stats['deposit'] == 0 and stats['withdrawal'] == 0: + continue + + balance = stats['balance'] + deposit = stats['deposit'] + withdrawal = stats['withdrawal'] + + # 计算利润 + profit = balance - deposit - withdrawal - prev_balance + + # 转换时间戳 + date_obj = datetime.strptime(date_str, '%Y-%m-%d') + time_timestamp = int(date_obj.timestamp()) + + account_data = { + 'st_id': st_id, + 'k_id': k_id, + 'balance': balance, + 'withdrawal': withdrawal, + 'deposit': deposit, + 'other': 0.0, # 暂时为0 + 'profit': profit, + 'time': time_timestamp + } + + account_data_list.append(account_data) + + # 更新前一天的余额 + if stats['has_balance']: + prev_balance = balance + + return account_data_list + + except Exception as e: + logger.error(f"获取Redis账户信息失败: k_id={k_id}, error={e}") + return [] + + def _sync_account_info_to_db(self, account_data_list: List[Dict]) -> bool: + """同步账户信息到数据库""" + session = self.db_manager.get_session() + try: + with session.begin(): + for account_data in account_data_list: + try: + # 查询是否已存在 + existing = session.execute( + select(StrategyKX).where( + and_( + StrategyKX.k_id == account_data['k_id'], + StrategyKX.st_id == account_data['st_id'], + StrategyKX.time == account_data['time'] + ) + ) + ).scalar_one_or_none() + + if existing: + # 更新 + existing.balance = account_data['balance'] + existing.withdrawal = account_data['withdrawal'] + existing.deposit = account_data['deposit'] + existing.other = account_data['other'] + existing.profit = account_data['profit'] + else: + # 插入 + new_account = StrategyKX(**account_data) + session.add(new_account) + + except Exception as e: + logger.error(f"处理账户数据失败: {account_data}, error={e}") + continue + + return True + + except Exception as e: + logger.error(f"同步账户信息到数据库失败: error={e}") + return False + finally: + session.close() \ No newline at end of file diff --git a/sync/base_sync.py b/sync/base_sync.py new file mode 100644 index 0000000..7d6756f --- /dev/null +++ b/sync/base_sync.py @@ -0,0 +1,90 @@ +from abc import ABC, abstractmethod +from loguru import logger +from typing import List, Dict, Any +import json + +from utils.redis_client import RedisClient +from utils.database_manager import DatabaseManager + +class BaseSync(ABC): + """同步基类""" + + def __init__(self): + self.redis_client = RedisClient() + self.db_manager = DatabaseManager() + self.computer_name = None # 从配置读取 + + @abstractmethod + async def sync(self): + """执行同步""" + pass + + def get_accounts_from_redis(self) -> Dict[str, Dict]: + """从Redis获取账号配置""" + try: + if self.computer_name is None: + from config.settings import COMPUTER_NAME + self.computer_name = COMPUTER_NAME + + # 从Redis获取数据 + result = self.redis_client.client.hgetall(f"{self.computer_name}_strategy_api") + if not result: + logger.warning(f"未找到 {self.computer_name} 的策略API配置") + return {} + + accounts_dict = {} + for exchange_name, accounts_json in result.items(): + try: + accounts = json.loads(accounts_json) + if not accounts: + continue + + # 格式化交易所ID + exchange_id = self.format_exchange_id(exchange_name) + + for account_id, account_info in accounts.items(): + parsed_account = self.parse_account(exchange_id, account_id, account_info) + if parsed_account: + accounts_dict[account_id] = parsed_account + + except json.JSONDecodeError as e: + logger.error(f"解析交易所 {exchange_name} 的JSON数据失败: {e}") + continue + + return accounts_dict + + except Exception as e: + logger.error(f"获取账户信息失败: {e}") + return {} + + def format_exchange_id(self, key: str) -> str: + """格式化交易所ID""" + key = key.lower().strip() + + # 交易所名称映射 + exchange_mapping = { + 'metatrader': 'mt5', + 'binance_spot_test': 'binance', + 'binance_spot': 'binance', + 'binance': 'binance', + 'gate_spot': 'gate', + 'okex': 'okx' + } + + return exchange_mapping.get(key, key) + + def parse_account(self, exchange_id: str, account_id: str, account_info: str) -> Dict: + """解析账号信息""" + try: + source_account_info = json.loads(account_info) + account_data = { + 'exchange_id': exchange_id, + 'k_id': account_id, + 'st_id': int(source_account_info.get('st_id', 0)), + 'add_time': int(source_account_info.get('add_time', 0)) + } + return {**source_account_info, **account_data} + + except json.JSONDecodeError as e: + logger.error(f"解析账号 {account_id} 数据失败: {e}") + return {} \ No newline at end of file diff --git a/sync/manager.py b/sync/manager.py new file mode 100644 index 0000000..eb1f05c --- /dev/null +++ b/sync/manager.py @@ -0,0 +1,66 @@ +import asyncio +from loguru import logger +from typing import List, Dict +import signal +import sys + +from config.settings import SYNC_CONFIG +from .position_sync import PositionSync +from .order_sync import OrderSync +from .account_sync import AccountSync + +class SyncManager: + """同步管理器""" + + def __init__(self): + self.is_running = True + self.sync_interval = SYNC_CONFIG['interval'] + + # 初始化同步器 + self.syncers = [] + + if SYNC_CONFIG['enable_position_sync']: + self.syncers.append(PositionSync()) + logger.info("启用持仓同步") + + if SYNC_CONFIG['enable_order_sync']: + self.syncers.append(OrderSync()) + logger.info("启用订单同步") + + if SYNC_CONFIG['enable_account_sync']: + self.syncers.append(AccountSync()) + logger.info("启用账户信息同步") + + # 注册信号处理器 + signal.signal(signal.SIGINT, self.signal_handler) + signal.signal(signal.SIGTERM, self.signal_handler) + + def signal_handler(self, signum, frame): + """信号处理器""" + logger.info(f"接收到信号 {signum},正在关闭...") + self.is_running = False + + async def start(self): + """启动同步服务""" + logger.info(f"同步服务启动,间隔 {self.sync_interval} 秒") + + while self.is_running: + try: + # 执行所有同步器 + tasks = [syncer.sync() for syncer in self.syncers] + await asyncio.gather(*tasks, return_exceptions=True) + + logger.debug(f"同步完成,等待 {self.sync_interval} 秒") + await asyncio.sleep(self.sync_interval) + + except asyncio.CancelledError: + logger.info("同步任务被取消") + break + except Exception as e: + logger.error(f"同步任务异常: {e}") + await asyncio.sleep(30) # 出错后等待30秒 + + async def stop(self): + """停止同步服务""" + self.is_running = False + logger.info("同步服务停止") \ No newline at end of file diff --git a/sync/order_sync.py b/sync/order_sync.py new file mode 100644 index 0000000..d21b87a --- /dev/null +++ b/sync/order_sync.py @@ -0,0 +1,166 @@ +from .base_sync import BaseSync +from loguru import logger +from typing import List, Dict +import json +import time +from datetime import datetime, timedelta + +class OrderSync(BaseSync): + """订单数据同步器""" + + async def sync(self): + """同步订单数据""" + try: + # 获取所有账号 + accounts = self.get_accounts_from_redis() + + for k_id_str, account_info in accounts.items(): + try: + k_id = int(k_id_str) + st_id = account_info.get('st_id', 0) + exchange_id = account_info['exchange_id'] + + if k_id <= 0 or st_id <= 0: + continue + + # 从Redis获取最近N天的订单数据 + orders = await self._get_recent_orders_from_redis(k_id, exchange_id) + + # 同步到数据库 + if orders: + success = self._sync_orders_to_db(k_id, st_id, orders) + if success: + logger.debug(f"订单同步成功: k_id={k_id}, 订单数={len(orders)}") + + except Exception as e: + logger.error(f"同步账号 {k_id_str} 订单失败: {e}") + continue + + logger.info("订单数据同步完成") + + except Exception as e: + logger.error(f"订单同步失败: {e}") + + async def _get_recent_orders_from_redis(self, k_id: int, exchange_id: str) -> List[Dict]: + """从Redis获取最近N天的订单数据""" + try: + redis_key = f"{exchange_id}:orders:{k_id}" + + # 计算最近N天的日期 + from config.settings import SYNC_CONFIG + recent_days = SYNC_CONFIG['recent_days'] + + today = datetime.now() + recent_dates = [] + for i in range(recent_days): + date = today - timedelta(days=i) + date_format = date.strftime('%Y-%m-%d') + recent_dates.append(date_format) + + # 获取所有key + all_keys = self.redis_client.client.hkeys(redis_key) + + orders_list = [] + for key in all_keys: + key_str = key.decode('utf-8') if isinstance(key, bytes) else key + + if key_str == 'positions': + continue + + # 检查是否以最近N天的日期开头 + for date_format in recent_dates: + if key_str.startswith(date_format + '_'): + try: + order_json = self.redis_client.client.hget(redis_key, key_str) + if order_json: + order = json.loads(order_json) + + # 验证时间 + order_time = order.get('time', 0) + if order_time >= int(time.time()) - recent_days * 24 * 3600: + orders_list.append(order) + + break + except: + break + + return orders_list + + except Exception as e: + logger.error(f"获取Redis订单数据失败: k_id={k_id}, error={e}") + return [] + + def _sync_orders_to_db(self, k_id: int, st_id: int, orders_data: List[Dict]) -> bool: + """同步订单数据到数据库""" + session = self.db_manager.get_session() + try: + # 准备批量数据 + insert_data = [] + for order_data in orders_data: + try: + order_dict = self._convert_order_data(order_data) + + # 检查完整性 + required_fields = ['order_id', 'symbol', 'side', 'time'] + if not all(order_dict.get(field) for field in required_fields): + continue + + insert_data.append(order_dict) + + except Exception as e: + logger.error(f"转换订单数据失败: {order_data}, error={e}") + continue + + if not insert_data: + return True + + with session.begin(): + # 使用参数化批量插入 + sql = """ + INSERT INTO deh_strategy_order_new + (st_id, k_id, asset, order_id, symbol, side, price, time, + order_qty, last_qty, avg_price, exchange_id) + VALUES + (:st_id, :k_id, :asset, :order_id, :symbol, :side, :price, :time, + :order_qty, :last_qty, :avg_price, :exchange_id) + ON DUPLICATE KEY UPDATE + side = VALUES(side), + price = VALUES(price), + time = VALUES(time), + order_qty = VALUES(order_qty), + last_qty = VALUES(last_qty), + avg_price = VALUES(avg_price) + """ + + # 分块执行 + from config.settings import SYNC_CONFIG + chunk_size = SYNC_CONFIG['chunk_size'] + + for i in range(0, len(insert_data), chunk_size): + chunk = insert_data[i:i + chunk_size] + session.execute(text(sql), chunk) + + return True + + except Exception as e: + logger.error(f"同步订单到数据库失败: k_id={k_id}, error={e}") + return False + finally: + session.close() + + def _convert_order_data(self, data: Dict) -> Dict: + """转换订单数据格式""" + return { + 'st_id': int(data.get('st_id', 0)), + 'k_id': int(data.get('k_id', 0)), + 'asset': 'USDT', + 'order_id': str(data.get('order_id', '')), + 'symbol': data.get('symbol', ''), + 'side': data.get('side', ''), + 'price': float(data.get('price', 0)) if data.get('price') is not None else None, + 'time': int(data.get('time', 0)) if data.get('time') is not None else None, + 'order_qty': float(data.get('order_qty', 0)) if data.get('order_qty') is not None else None, + 'last_qty': float(data.get('last_qty', 0)) if data.get('last_qty') is not None else None, + 'avg_price': float(data.get('avg_price', 0)) if data.get('avg_price') is not None else None, + 'exchange_id': None # 忽略该字段 + } \ No newline at end of file diff --git a/sync/position_sync.py b/sync/position_sync.py new file mode 100644 index 0000000..b2a3a5c --- /dev/null +++ b/sync/position_sync.py @@ -0,0 +1,174 @@ +from .base_sync import BaseSync +from loguru import logger +from typing import List, Dict +import json +from datetime import datetime, timedelta + +class PositionSync(BaseSync): + """持仓数据同步器""" + + async def sync(self): + """同步持仓数据""" + try: + # 获取所有账号 + accounts = self.get_accounts_from_redis() + + for k_id_str, account_info in accounts.items(): + try: + k_id = int(k_id_str) + st_id = account_info.get('st_id', 0) + exchange_id = account_info['exchange_id'] + + if k_id <= 0 or st_id <= 0: + continue + + # 从Redis获取持仓数据 + positions = await self._get_positions_from_redis(k_id, exchange_id) + + # 同步到数据库 + if positions: + success = self._sync_positions_to_db(k_id, st_id, positions) + if success: + logger.debug(f"持仓同步成功: k_id={k_id}, 持仓数={len(positions)}") + + except Exception as e: + logger.error(f"同步账号 {k_id_str} 持仓失败: {e}") + continue + + logger.info("持仓数据同步完成") + + except Exception as e: + logger.error(f"持仓同步失败: {e}") + + async def _get_positions_from_redis(self, k_id: int, exchange_id: str) -> List[Dict]: + """从Redis获取持仓数据""" + try: + redis_key = f"{exchange_id}:positions:{k_id}" + redis_data = self.redis_client.client.hget(redis_key, 'positions') + + if not redis_data: + return [] + + positions = json.loads(redis_data) + + # 添加账号信息 + for position in positions: + position['k_id'] = k_id + + return positions + + except Exception as e: + logger.error(f"获取Redis持仓数据失败: k_id={k_id}, error={e}") + return [] + + def _sync_positions_to_db(self, k_id: int, st_id: int, positions_data: List[Dict]) -> bool: + """同步持仓数据到数据库""" + session = self.db_manager.get_session() + try: + # 使用批量优化方案 + from sqlalchemy.dialects.mysql import insert + + # 准备数据 + insert_data = [] + keep_keys = set() # 需要保留的(symbol, side)组合 + + for pos_data in positions_data: + try: + # 转换数据(这里需要实现转换逻辑) + pos_dict = self._convert_position_data(pos_data) + if not all([pos_dict.get('symbol'), pos_dict.get('side')]): + continue + + # 重命名qty为sum + if 'qty' in pos_dict: + pos_dict['sum'] = pos_dict.pop('qty') + + insert_data.append(pos_dict) + keep_keys.add((pos_dict['symbol'], pos_dict['side'])) + + except Exception as e: + logger.error(f"转换持仓数据失败: {pos_data}, error={e}") + continue + + with session.begin(): + if not insert_data: + # 清空该账号持仓 + session.execute( + delete(StrategyPosition).where( + and_( + StrategyPosition.k_id == k_id, + StrategyPosition.st_id == st_id + ) + ) + ) + return True + + # 批量插入/更新 + stmt = insert(StrategyPosition.__table__).values(insert_data) + + update_dict = { + 'price': stmt.inserted.price, + 'sum': stmt.inserted.sum, + 'asset_num': stmt.inserted.asset_num, + 'asset_profit': stmt.inserted.asset_profit, + 'leverage': stmt.inserted.leverage, + 'uptime': stmt.inserted.uptime, + 'profit_price': stmt.inserted.profit_price, + 'stop_price': stmt.inserted.stop_price, + 'liquidation_price': stmt.inserted.liquidation_price + } + + stmt = stmt.on_duplicate_key_update(**update_dict) + session.execute(stmt) + + # 删除多余持仓 + if keep_keys: + existing_positions = session.execute( + select(StrategyPosition).where( + and_( + StrategyPosition.k_id == k_id, + StrategyPosition.st_id == st_id + ) + ) + ).scalars().all() + + to_delete_ids = [] + for existing in existing_positions: + key = (existing.symbol, existing.side) + if key not in keep_keys: + to_delete_ids.append(existing.id) + + if to_delete_ids: + session.execute( + delete(StrategyPosition).where( + StrategyPosition.id.in_(to_delete_ids) + ) + ) + + return True + + except Exception as e: + logger.error(f"同步持仓到数据库失败: k_id={k_id}, error={e}") + return False + finally: + session.close() + + def _convert_position_data(self, data: Dict) -> Dict: + """转换持仓数据格式""" + # 这里实现具体的转换逻辑 + return { + 'st_id': int(data.get('st_id', 0)), + 'k_id': int(data.get('k_id', 0)), + 'asset': 'USDT', + 'symbol': data.get('symbol', ''), + 'side': data.get('side', ''), + 'price': float(data.get('price', 0)) if data.get('price') is not None else None, + 'qty': float(data.get('qty', 0)) if data.get('qty') is not None else None, + 'asset_num': float(data.get('asset_num', 0)) if data.get('asset_num') is not None else None, + 'asset_profit': float(data.get('asset_profit', 0)) if data.get('asset_profit') is not None else None, + 'leverage': int(data.get('leverage', 0)) if data.get('leverage') is not None else None, + 'uptime': int(data.get('uptime', 0)) if data.get('uptime') is not None else None, + 'profit_price': float(data.get('profit_price', 0)) if data.get('profit_price') is not None else None, + 'stop_price': float(data.get('stop_price', 0)) if data.get('stop_price') is not None else None, + 'liquidation_price': float(data.get('liquidation_price', 0)) if data.get('liquidation_price') is not None else None + } \ No newline at end of file diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/database_manager.py b/utils/database_manager.py new file mode 100644 index 0000000..029728e --- /dev/null +++ b/utils/database_manager.py @@ -0,0 +1,75 @@ +from sqlalchemy import create_engine, select, update, insert, delete, and_, or_, text +from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.exc import SQLAlchemyError +from loguru import logger +from typing import List, Dict, Optional, Any +import json + +from config.database import DATABASE_CONFIG, SQLALCHEMY_CONFIG +from models.orm_models import Base, StrategyPosition, StrategyOrder, StrategyKX + +class DatabaseManager: + """数据库管理器""" + + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if not self._initialized: + self._engine = None + self._session_factory = None + self._initialized = True + self._init_engine() + + def _init_engine(self): + """初始化数据库引擎""" + try: + # 构建数据库URL + db_url = ( + f"mysql+pymysql://{DATABASE_CONFIG['user']}:{DATABASE_CONFIG['password']}" + f"@{DATABASE_CONFIG['host']}:{DATABASE_CONFIG['port']}" + f"/{DATABASE_CONFIG['database']}?charset={DATABASE_CONFIG['charset']}" + ) + + # 创建引擎 + self._engine = create_engine( + db_url, + echo=SQLALCHEMY_CONFIG['echo'], + echo_pool=SQLALCHEMY_CONFIG['echo_pool'], + pool_size=DATABASE_CONFIG['pool_size'], + max_overflow=DATABASE_CONFIG['max_overflow'], + pool_recycle=DATABASE_CONFIG['pool_recycle'], + pool_pre_ping=SQLALCHEMY_CONFIG['pool_pre_ping'] + ) + + # 创建会话工厂 + self._session_factory = sessionmaker( + bind=self._engine, + expire_on_commit=False + ) + + # 创建表(如果不存在) + Base.metadata.create_all(self._engine) + + logger.info("SQLAlchemy数据库引擎初始化成功") + + except Exception as e: + logger.error(f"数据库引擎初始化失败: {e}") + raise + + def get_session(self) -> Session: + """获取数据库会话""" + if self._session_factory is None: + self._init_engine() + return self._session_factory() + + def close(self): + """关闭数据库连接""" + if self._engine: + self._engine.dispose() + logger.info("数据库连接已关闭") \ No newline at end of file diff --git a/utils/helpers.py b/utils/helpers.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/redis_client.py b/utils/redis_client.py new file mode 100644 index 0000000..0f25831 --- /dev/null +++ b/utils/redis_client.py @@ -0,0 +1,54 @@ +import redis +from loguru import logger +from config.settings import REDIS_CONFIG + +class RedisClient: + """Redis客户端管理器""" + + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if not self._initialized: + self._pool = None + self._client = None + self._initialized = True + + @property + def client(self): + """获取Redis客户端(懒加载)""" + if self._client is None: + self._init_connection_pool() + return self._client + + def _init_connection_pool(self): + """初始化连接池""" + try: + self._pool = redis.ConnectionPool( + host=REDIS_CONFIG['host'], + port=REDIS_CONFIG['port'], + db=REDIS_CONFIG['db'], + decode_responses=REDIS_CONFIG['decode_responses'], + max_connections=REDIS_CONFIG['max_connections'], + socket_keepalive=True + ) + self._client = redis.Redis(connection_pool=self._pool) + + # 测试连接 + self._client.ping() + logger.info("Redis连接池初始化成功") + + except Exception as e: + logger.error(f"Redis连接失败: {e}") + raise + + def close(self): + """关闭连接池""" + if self._pool: + self._pool.disconnect() + logger.info("Redis连接池已关闭") \ No newline at end of file