Files
ccxt_with_mt5/test/test_mt5_websocket_orders.py
lz_db 8646036ca5 1
2025-11-22 16:08:27 +08:00

476 lines
16 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import json
import logging
from datetime import datetime
from ccxt.pro import mt5
from ccxt.base.errors import ExchangeError, AuthenticationError
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('MT5-WebSocket-Test')
class MT5WebSocketOrderTest:
def __init__(self, config):
self.exchange = mt5({
'apiKey': config.get('apiKey', ''),
'secret': config.get('secret', ''),
'password': config.get('password', ''),
'user': config.get('user', 62333850),
'host': config.get('host', '78.140.180.198'),
'port': config.get('port', 443),
'sandbox': True,
'verbose': True,
})
# 存储数据
self.balance = None
self.orders = []
self.open_orders = []
self.closed_orders = []
# 统计信息
self.stats = {
'order_updates': 0,
'balance_updates': 0,
'trade_updates': 0,
'errors': 0
}
logger.info("MT5 WebSocket 订单测试初始化完成")
async def watch_orders(self):
"""监听订单更新"""
logger.info("开始监听订单更新...")
while True:
try:
orders = await self.exchange.watch_orders()
for order in orders:
await self.handle_order_update(order)
except Exception as e:
logger.error(f"监听订单时发生错误: {e}")
self.stats['errors'] += 1
await asyncio.sleep(5) # 等待后重试
async def watch_balance(self):
"""监听余额更新"""
logger.info("开始监听余额更新...")
while True:
try:
balance = await self.exchange.watch_balance()
await self.handle_balance_update(balance)
except Exception as e:
logger.error(f"监听余额时发生错误: {e}")
self.stats['errors'] += 1
await asyncio.sleep(5)
async def watch_my_trades(self):
"""监听交易更新"""
logger.info("开始监听交易更新...")
while True:
try:
trades = await self.exchange.watch_my_trades()
for trade in trades:
await self.handle_trade_update(trade)
except Exception as e:
logger.error(f"监听交易时发生错误: {e}")
self.stats['errors'] += 1
await asyncio.sleep(5)
async def handle_order_update(self, order):
"""处理订单更新"""
self.stats['order_updates'] += 1
order_id = order['id']
symbol = order['symbol']
status = order['status']
side = order['side']
order_type = order['type']
price = order.get('price')
amount = order.get('amount')
filled = order.get('filled')
# 检查是否是新的订单
existing_order = next((o for o in self.orders if o['id'] == order_id), None)
if existing_order:
# 更新现有订单
old_status = existing_order['status']
if old_status != status:
logger.info(f"🔁 订单状态更新: {order_id} {symbol} {side} {old_status} -> {status}")
# 更新订单信息
existing_order.update(order)
else:
# 新订单
logger.info(f"🆕 新订单: {order_id} {symbol} {side} {order_type} {status}")
self.orders.append(order.copy())
# 更新开单/平单列表
self.update_order_lists(order)
# 打印订单详情
self.print_order_details(order)
async def handle_balance_update(self, balance):
"""处理余额更新"""
self.stats['balance_updates'] += 1
self.balance = balance
# 提取主要余额信息
total_balance = 0
free_balance = 0
used_balance = 0
for currency, info in balance['total'].items():
if info is not None:
total_balance += info
for currency, info in balance['free'].items():
if info is not None:
free_balance += info
for currency, info in balance['used'].items():
if info is not None:
used_balance += info
logger.info(f"💰 余额更新 - 总余额: {total_balance:.2f}, 可用: {free_balance:.2f}, 占用: {used_balance:.2f}")
# 打印详细余额信息
self.print_balance_details(balance)
async def handle_trade_update(self, trade):
"""处理交易更新"""
self.stats['trade_updates'] += 1
trade_id = trade['id']
order_id = trade['order']
symbol = trade['symbol']
side = trade['side']
price = trade['price']
amount = trade['amount']
cost = trade.get('cost')
fee = trade.get('fee', {})
logger.info(f"💸 交易执行: {trade_id} | 订单: {order_id} | {symbol} {side} {amount} @ {price}")
if cost:
logger.info(f" 交易成本: {cost:.2f}")
if fee and fee.get('cost'):
logger.info(f" 手续费: {fee['cost']} {fee.get('currency', '')}")
def update_order_lists(self, order):
"""更新开单和平单列表"""
order_id = order['id']
status = order['status']
# 从开单列表中移除已关闭的订单
self.open_orders = [o for o in self.open_orders if o['id'] != order_id]
# 如果是开单状态,添加到开单列表
if status in ['open', 'pending']:
self.open_orders.append(order.copy())
# 如果是关闭状态,添加到平单列表
elif status in ['closed', 'canceled', 'expired', 'rejected']:
# 检查是否已经在平单列表中
if not any(o['id'] == order_id for o in self.closed_orders):
self.closed_orders.append(order.copy())
def print_order_details(self, order):
"""打印订单详情"""
order_id = order['id']
symbol = order['symbol']
status = order['status']
side = order['side']
order_type = order['type']
price = order.get('price', 'N/A')
amount = order.get('amount', 'N/A')
filled = order.get('filled', 'N/A')
remaining = order.get('remaining', 'N/A')
cost = order.get('cost', 'N/A')
details = [
f"订单ID: {order_id}",
f"交易对: {symbol}",
f"方向: {side}",
f"类型: {order_type}",
f"状态: {status}",
f"价格: {price}",
f"数量: {amount}",
f"已成交: {filled}",
f"未成交: {remaining}",
f"成本: {cost}"
]
logger.debug("📋 订单详情: " + " | ".join(details))
def print_balance_details(self, balance):
"""打印余额详情"""
if not balance:
return
logger.debug("💳 详细余额信息:")
for currency in ['USD', 'EUR', 'GBP', 'JPY']:
if currency in balance['total'] and balance['total'][currency] is not None:
total = balance['total'][currency]
free = balance['free'].get(currency, 0)
used = balance['used'].get(currency, 0)
logger.debug(f" {currency}: 总额={total:.2f}, 可用={free:.2f}, 占用={used:.2f}")
async def place_test_orders(self):
"""放置测试订单"""
logger.info("开始放置测试订单...")
test_orders = [
{
'symbol': 'EUR/USD',
'type': 'limit',
'side': 'buy',
'amount': 0.01,
'price': 1.0800
},
{
'symbol': 'EUR/USD',
'type': 'limit',
'side': 'sell',
'amount': 0.01,
'price': 1.0900
},
{
'symbol': 'GBP/USD',
'type': 'market',
'side': 'buy',
'amount': 0.01
}
]
created_orders = []
for order_params in test_orders:
try:
logger.info(f"尝试下单: {order_params}")
order = await self.exchange.create_order(
order_params['symbol'],
order_params['type'],
order_params['side'],
order_params['amount'],
order_params.get('price')
)
created_orders.append(order)
logger.info(f"✅ 下单成功: {order['id']}")
await asyncio.sleep(2) # 间隔2秒
except Exception as e:
logger.error(f"❌ 下单失败: {e}")
return created_orders
async def cancel_test_orders(self, orders):
"""取消测试订单"""
logger.info("开始取消测试订单...")
for order in orders:
try:
if order['status'] in ['open', 'pending']:
await self.exchange.cancel_order(order['id'], order['symbol'])
logger.info(f"✅ 取消订单成功: {order['id']}")
await asyncio.sleep(1)
except Exception as e:
logger.error(f"❌ 取消订单失败 {order['id']}: {e}")
async def get_current_orders(self):
"""获取当前订单状态"""
try:
open_orders = await self.exchange.fetch_open_orders()
closed_orders = await self.exchange.fetch_closed_orders()
logger.info(f"当前开单数量: {len(open_orders)}")
logger.info(f"历史订单数量: {len(closed_orders)}")
return open_orders, closed_orders
except Exception as e:
logger.error(f"获取订单状态失败: {e}")
return [], []
def print_statistics(self):
"""打印统计信息"""
logger.info("📊 WebSocket 测试统计:")
logger.info(f" 订单更新次数: {self.stats['order_updates']}")
logger.info(f" 余额更新次数: {self.stats['balance_updates']}")
logger.info(f" 交易更新次数: {self.stats['trade_updates']}")
logger.info(f" 错误次数: {self.stats['errors']}")
logger.info(f" 总订单数: {len(self.orders)}")
logger.info(f" 当前开单: {len(self.open_orders)}")
logger.info(f" 已关闭订单: {len(self.closed_orders)}")
if self.balance:
total = sum([v for v in self.balance['total'].values() if v is not None])
logger.info(f" 当前总余额: {total:.2f}")
async def run_test(self, duration=300, place_test_orders=True):
"""运行测试"""
logger.info(f"🚀 开始 MT5 WebSocket 订单测试,持续时间: {duration}")
start_time = datetime.now()
try:
# 启动监听任务
watch_tasks = [
asyncio.create_task(self.watch_orders()),
asyncio.create_task(self.watch_balance()),
asyncio.create_task(self.watch_my_trades()),
]
# 等待连接建立
await asyncio.sleep(5)
# 放置测试订单
test_orders = []
if place_test_orders:
test_orders = await self.place_test_orders()
# 等待订单处理
await asyncio.sleep(10)
# 获取当前订单状态
await self.get_current_orders()
# 等待一段时间后取消测试订单
await asyncio.sleep(30)
await self.cancel_test_orders(test_orders)
# 主测试循环
test_task = asyncio.create_task(self.test_loop(duration))
await test_task
# 取消监听任务
for task in watch_tasks:
task.cancel()
# 等待任务结束
await asyncio.gather(*watch_tasks, return_exceptions=True)
except Exception as e:
logger.error(f"测试运行错误: {e}")
finally:
# 关闭连接
await self.exchange.close()
end_time = datetime.now()
duration_actual = (end_time - start_time).total_seconds()
logger.info(f"✅ 测试完成,实际运行时间: {duration_actual:.2f}")
self.print_statistics()
async def test_loop(self, duration):
"""测试主循环"""
start_time = asyncio.get_event_loop().time()
while True:
current_time = asyncio.get_event_loop().time()
elapsed = current_time - start_time
if elapsed >= duration:
break
# 每分钟打印一次状态
if int(elapsed) % 60 == 0:
logger.info(f"⏰ 测试运行中... 已运行: {int(elapsed)}")
self.print_statistics()
await asyncio.sleep(1)
async def real_time_monitoring(self):
"""实时监控模式"""
logger.info("🔍 启动实时监控模式...")
try:
await asyncio.gather(
self.watch_orders(),
self.watch_balance(),
self.watch_my_trades(),
)
except KeyboardInterrupt:
logger.info("👋 用户中断监控")
finally:
await self.exchange.close()
self.print_statistics()
# 测试配置
TEST_CONFIG = {
'user': 62333850, # 演示账户
'password': 'tecimil4',
'host': '78.140.180.198',
'port': 443,
}
async def main():
"""主函数"""
print("=" * 60)
print("MT5 WebSocket 订单信息测试")
print("=" * 60)
# 创建测试实例
tester = MT5WebSocketOrderTest(TEST_CONFIG)
try:
# 选择测试模式
print("\n选择测试模式:")
print("1. 完整测试 (包含测试订单)")
print("2. 实时监控模式")
print("3. 仅测试连接")
logger.error(f"测试执行错误: ")
choice = input("请输入选择 (1-3, 默认1): ").strip()
if choice == "2":
# 实时监控模式
await tester.real_time_monitoring()
elif choice == "3":
# 仅测试连接
await tester.get_current_orders()
await tester.exchange.close()
else:
# 完整测试
duration = input("输入测试持续时间(秒, 默认300): ").strip()
duration = int(duration) if duration.isdigit() else 300
place_orders = input("是否放置测试订单? (y/n, 默认y): ").strip().lower()
place_orders = place_orders != 'n'
await tester.run_test(duration=duration, place_test_orders=place_orders)
except KeyboardInterrupt:
logger.info("👋 用户中断测试")
except Exception as e:
logger.error(f"测试执行错误: {e}")
finally:
print("\n" + "=" * 60)
print("测试结束")
print("=" * 60)
if __name__ == "__main__":
# 运行测试
asyncio.run(main())