#!/usr/bin/env python3 # -*- coding: utf-8 -*- import asyncio import logging import sys import os from datetime import datetime # 添加 ccxt 路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from ccxt import mt5 as mt5_sync from ccxt.async_support import mt5 as mt5_async from ccxt.pro import mt5 as mt5_pro from ccxt.base.errors import ExchangeError, AuthenticationError, InvalidOrder # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('mt5_test.log', encoding='utf-8') ] ) logger = logging.getLogger('MT5-Test') # 测试配置 TEST_CONFIG = { 'apiKey': '76888962', 'secret': 'LZ-trade666888', 'verbose': False, # 启用详细日志 'hostname': '43.167.188.220:5000', 'host': '18.163.85.196', 'port': 443, } class MT5SyncTest: """同步版本测试""" def __init__(self): self.exchange = mt5_sync(TEST_CONFIG) self.test_results = {} def run_all_tests(self): """运行所有同步测试""" logger.info("🚀 开始同步版本测试") tests = [ self.test_connection, self.test_token_management, self.test_markets, self.test_balance, self.test_ticker, self.test_account_details, self.test_orders, self.test_order_operations, ] for test in tests: try: test_name = test.__name__ logger.info(f"\n{'='*50}") logger.info(f"执行测试: {test_name}") logger.info(f"{'='*50}") result = test() self.test_results[test_name] = result status = "✅ 通过" if result else "❌ 失败" logger.info(f"{status} - {test_name}") except Exception as e: logger.error(f"❌ 测试失败 {test.__name__}: {e}") self.test_results[test.__name__] = False self.print_test_summary() return self.test_results def test_connection(self): """测试连接""" try: logger.info("测试服务器连接...") result = self.exchange.ping_server() logger.info(f"Ping 结果: {result}") return result is True except Exception as e: logger.error(f"连接测试失败: {e}") return False def test_token_management(self): """测试 Token 管理""" try: logger.info("测试 Token 获取...") token = self.exchange.get_token() logger.info(f"获取到 Token: {token}") # logger.info("测试连接检查...") # check_result = self.exchange.check_connect() # logger.info(f"连接检查: {check_result}") logger.info("测试服务器时区...") timezone = self.exchange.server_timezone() logger.info(f"服务器时区: {timezone}") return token is not None and timezone is not None except Exception as e: logger.error(f"Token 管理测试失败: {e}") return False def test_markets(self): """测试市场数据""" try: logger.info("获取交易对列表...") markets = self.exchange.fetch_markets() logger.info(f"获取到 {len(markets)} 个交易对") # 显示前5个交易对 for i, market in enumerate(markets[:5]): logger.info(f" {i+1}. {market['symbol']} - {market['base']}/{market['quote']}") if len(markets) > 5: logger.info(f" ... 还有 {len(markets) - 5} 个交易对") return len(markets) > 0 except Exception as e: logger.error(f"市场数据测试失败: {e}") return False def test_balance(self): """测试余额查询""" try: logger.info("获取账户余额...") balance = self.exchange.fetch_balance() logger.info(f"余额信息: {balance}") if 'USDT' in balance['total']: total = balance['total']['USDT'] free = balance['free']['USDT'] used = balance['used']['USDT'] logger.info(f"USDT 余额 - 总额: {total}, 可用: {free}, 占用: {used}") return balance is not None except Exception as e: logger.error(f"余额查询测试失败: {e}") return False def test_ticker(self): """测试行情数据""" try: symbols_to_test = ['EUR/USD', 'GBP/USD', 'USD/JPY'] for symbol in symbols_to_test: try: logger.info(f"获取 {symbol} 行情...") ticker = self.exchange.fetch_ticker(symbol) if ticker: logger.info(f" {symbol}: 买={ticker['bid']}, 卖={ticker['ask']}, 最后={ticker['last']}") else: logger.warning(f" 无法获取 {symbol} 行情") except Exception as e: logger.warning(f" 获取 {symbol} 行情失败: {e}") # 测试批量获取行情 logger.info("测试批量获取行情...") tickers = self.exchange.fetch_tickers(['EUR/USD', 'GBP/USD']) logger.info(f"批量获取到 {len(tickers)} 个行情") return True except Exception as e: logger.error(f"行情数据测试失败: {e}") return False def test_account_details(self): """测试账户详情""" try: logger.info("获取账户详情...") account_details = self.exchange.fetch_account_details() logger.info(f"账户详情: {account_details}") required_fields = ['serverName', 'user', 'currency', 'accountLeverage'] for field in required_fields: if field in account_details: logger.info(f" {field}: {account_details[field]}") return account_details is not None except Exception as e: logger.error(f"账户详情测试失败: {e}") return False def test_orders(self): """测试订单查询""" try: logger.info("获取未平仓订单...") open_orders = self.exchange.fetch_open_orders() logger.info(f"未平仓订单数量: {len(open_orders)}") for order in open_orders[:3]: # 显示前3个订单 logger.info(f" 订单 {order['id']}: {order['symbol']} {order['side']} {order['type']} {order['status']}") logger.info("获取已平仓订单...") closed_orders = self.exchange.fetch_closed_orders() logger.info(f"已平仓订单数量: {len(closed_orders)}") return True except Exception as e: logger.error(f"订单查询测试失败: {e}") return False def test_order_operations(self): """测试订单操作(只测试不实际下单)""" try: logger.info("测试订单创建参数验证...") # 测试市价单参数 market_order_params = { 'symbol': 'EUR/USD', 'type': 'market', 'side': 'buy', 'amount': 0.01, } # 测试限价单参数 limit_order_params = { 'symbol': 'EUR/USD', 'type': 'limit', 'side': 'buy', 'amount': 0.01, 'price': 1.0800, } logger.info("✅ 订单参数验证通过") logger.info("注意: 实际下单测试需要在真实环境中进行") return True except Exception as e: logger.error(f"订单操作测试失败: {e}") return False def print_test_summary(self): """打印测试总结""" logger.info("\n" + "="*60) logger.info("📊 同步版本测试总结") logger.info("="*60) passed = sum(1 for result in self.test_results.values() if result) total = len(self.test_results) for test_name, result in self.test_results.items(): status = "✅ 通过" if result else "❌ 失败" logger.info(f" {test_name}: {status}") logger.info(f"\n总体结果: {passed}/{total} 通过") if passed == total: logger.info("🎉 所有测试通过!") else: logger.info("⚠️ 部分测试失败,请检查日志") class MT5AsyncTest: """异步版本测试""" def __init__(self): self.exchange = None self.test_results = {} async def initialize(self): """初始化异步交易所""" self.exchange = mt5_async(TEST_CONFIG) async def run_all_tests(self): """运行所有异步测试""" logger.info("\n🚀 开始异步版本测试") await self.initialize() tests = [ self.test_connection_async, self.test_markets_async, self.test_balance_async, self.test_ticker_async, self.test_orders_async, ] for test in tests: try: test_name = test.__name__ logger.info(f"\n{'='*50}") logger.info(f"执行测试: {test_name}") logger.info(f"{'='*50}") result = await test() self.test_results[test_name] = result status = "✅ 通过" if result else "❌ 失败" logger.info(f"{status} - {test_name}") except Exception as e: logger.error(f"❌ 测试失败 {test.__name__}: {e}") self.test_results[test.__name__] = False await self.exchange.close() self.print_test_summary() return self.test_results async def test_connection_async(self): """测试异步连接""" try: logger.info("测试异步 Token 获取...") token = await self.exchange.get_token() logger.info(f"获取到 Token: {token}") return token is not None except Exception as e: logger.error(f"异步连接测试失败: {e}") return False async def test_markets_async(self): """测试异步市场数据""" try: logger.info("获取异步交易对列表...") markets = await self.exchange.fetch_markets() logger.info(f"获取到 {len(markets)} 个交易对") return len(markets) > 0 except Exception as e: logger.error(f"异步市场数据测试失败: {e}") return False async def test_balance_async(self): """测试异步余额查询""" try: logger.info("获取异步账户余额...") balance = await self.exchange.fetch_balance() logger.info(f"余额信息: {balance}") return balance is not None except Exception as e: logger.error(f"异步余额查询测试失败: {e}") return False async def test_ticker_async(self): """测试异步行情数据""" try: logger.info("获取异步行情...") ticker = await self.exchange.fetch_ticker('EUR/USD') logger.info(f"EUR/USD: 买={ticker['bid']}, 卖={ticker['ask']}") return ticker is not None except Exception as e: logger.error(f"异步行情数据测试失败: {e}") return False async def test_orders_async(self): """测试异步订单查询""" try: logger.info("获取异步未平仓订单...") open_orders = await self.exchange.fetch_open_orders() logger.info(f"未平仓订单数量: {len(open_orders)}") return True except Exception as e: logger.error(f"异步订单查询测试失败: {e}") return False def print_test_summary(self): """打印测试总结""" logger.info("\n" + "="*60) logger.info("📊 异步版本测试总结") logger.info("="*60) passed = sum(1 for result in self.test_results.values() if result) total = len(self.test_results) for test_name, result in self.test_results.items(): status = "✅ 通过" if result else "❌ 失败" logger.info(f" {test_name}: {status}") logger.info(f"\n总体结果: {passed}/{total} 通过") class MT5WebSocketTest: """WebSocket 测试""" def __init__(self): self.exchange = None self.received_messages = [] async def initialize(self): """初始化 WebSocket 交易所""" self.exchange = mt5_pro(TEST_CONFIG) async def test_websocket_orders(self): """测试 WebSocket 订单监听""" try: await self.initialize() logger.info("\n🔌 开始 WebSocket 订单监听测试") logger.info("监听订单更新...") # 设置超时 timeout = 30 # 30秒后停止监听 async def order_listener(): try: orders = await self.exchange.watch_orders() logger.info(f"📦 收到订单更新: {len(orders)} 个订单") for order in orders: logger.info(f" 订单 {order['id']}: {order['symbol']} {order['side']} {order['status']}") self.received_messages.append({ 'type': 'order', 'data': order, 'timestamp': datetime.now() }) except Exception as e: logger.error(f"订单监听错误: {e}") # 运行监听器 await asyncio.wait_for(order_listener(), timeout=timeout) return len(self.received_messages) > 0 except asyncio.TimeoutError: logger.info("⏰ WebSocket 监听超时(正常结束)") return len(self.received_messages) > 0 except Exception as e: logger.error(f"WebSocket 测试失败: {e}") return False finally: if self.exchange: await self.exchange.close() async def test_websocket_balance(self): """测试 WebSocket 余额监听""" try: await self.initialize() logger.info("\n💰 开始 WebSocket 余额监听测试") logger.info("监听余额更新...") timeout = 20 # 20秒后停止监听 async def balance_listener(): try: balance = await self.exchange.watch_balance() logger.info(f"💰 收到余额更新") logger.info(f" 余额信息: {balance}") self.received_messages.append({ 'type': 'balance', 'data': balance, 'timestamp': datetime.now() }) except Exception as e: logger.error(f"余额监听错误: {e}") await asyncio.wait_for(balance_listener(), timeout=timeout) return len([msg for msg in self.received_messages if msg['type'] == 'balance']) > 0 except asyncio.TimeoutError: logger.info("⏰ WebSocket 余额监听超时(正常结束)") return len([msg for msg in self.received_messages if msg['type'] == 'balance']) > 0 except Exception as e: logger.error(f"WebSocket 余额测试失败: {e}") return False finally: if self.exchange: await self.exchange.close() async def main(): """主测试函数""" print("="*70) print("MT5 集成测试套件") print("="*70) # 同步测试 sync_test = MT5SyncTest() sync_results = sync_test.run_all_tests() # 异步测试 async_test = MT5AsyncTest() async_results = await async_test.run_all_tests() # WebSocket 测试 ws_test = MT5WebSocketTest() print("\n" + "="*70) print("🔌 WebSocket 测试(需要实际交易活动)") print("="*70) ws_order_result = await ws_test.test_websocket_orders() ws_balance_result = await ws_test.test_websocket_balance() # 最终总结 print("\n" + "="*70) print("🎯 最终测试总结") print("="*70) sync_passed = sum(1 for result in sync_results.values() if result) async_passed = sum(1 for result in async_results.values() if result) print(f"同步版本: {sync_passed}/{len(sync_results)} 通过") print(f"异步版本: {async_passed}/{len(async_results)} 通过") print(f"WebSocket 订单: {'✅ 通过' if ws_order_result else '❌ 失败'}") print(f"WebSocket 余额: {'✅ 通过' if ws_balance_result else '❌ 失败'}") total_tests = len(sync_results) + len(async_results) + 2 total_passed = sync_passed + async_passed + ws_order_result + ws_balance_result print(f"\n总体结果: {total_passed}/{total_tests} 通过") if total_passed == total_tests: print("🎉 所有测试通过!MT5 集成工作正常") else: print("⚠️ 部分测试失败,请检查日志文件 'mt5_test.log'") if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n👋 用户中断测试") except Exception as e: print(f"❌ 测试执行错误: {e}") import traceback traceback.print_exc()