From 42a0391eeb7c2aafe22935fd52cb29ebc4fe08bf Mon Sep 17 00:00:00 2001 From: lz_db Date: Mon, 17 Nov 2025 17:18:51 +0800 Subject: [PATCH] 1 --- ccxt/async_support/mt5.py | 131 +++++- ccxt/mt5.py | 72 +++- ccxt/pro/mt5.py | 871 ++++++++++++++++++++++++++++++++------ test/test_mt5_tools.py | 28 +- 4 files changed, 956 insertions(+), 146 deletions(-) diff --git a/ccxt/async_support/mt5.py b/ccxt/async_support/mt5.py index 65b9bb6..59ece59 100644 --- a/ccxt/async_support/mt5.py +++ b/ccxt/async_support/mt5.py @@ -107,6 +107,10 @@ class mt5(Exchange, ImplicitAPI): 'OrderSend': 1, 'OrderModify': 1, 'OrderClose': 1, + 'SubscribeOhlc': 1, # K线订阅 + 'UnsubscribeOhlc': 1, # K线取消订阅 + 'Subscribe': 1, # 行情订阅 + 'UnSubscribe': 1, # 行情取消订阅 }, }, }, @@ -297,11 +301,13 @@ class mt5(Exchange, ImplicitAPI): balance = self.safe_number(response, 'balance', 0.0) margin = self.safe_number(response, 'margin', 0.0) free_margin = self.safe_number(response, 'freeMargin', 0.0) + equity = self.safe_number(response, 'equity', 0.0) result[currency] = { 'free': free_margin, 'used': margin, 'total': balance, + 'equity': equity, } return self.safe_balance(result) @@ -532,6 +538,7 @@ class mt5(Exchange, ImplicitAPI): 'datetime': self.iso8601(timestamp), 'timestamp': timestamp, 'lastTradeTimestamp': last_trade_timestamp, + 'lastUpdateTimestamp': last_trade_timestamp, 'status': status, 'symbol': symbol, 'type': type, @@ -539,7 +546,9 @@ class mt5(Exchange, ImplicitAPI): 'postOnly': None, 'side': side, 'price': price, - 'stopPrice': None, + 'stopLossPrice': self.safe_number(order, 'stopLoss'), + 'takeProfitPrice': self.safe_number(order, 'takeProfit'), + 'reduceOnly':None, 'triggerPrice': None, 'amount': amount, 'filled': filled, @@ -547,7 +556,7 @@ class mt5(Exchange, ImplicitAPI): 'cost': cost, 'trades': None, 'fee': fee, - 'info': order, + # 'info': order, 'average': None, }) except Exception as e: @@ -589,6 +598,68 @@ class mt5(Exchange, ImplicitAPI): } return self.safe_string(types, type, type) + def parse_position(self, order_data, market: Market = None): + """从订单数据解析持仓""" + # 只有状态为 Filled 的订单才是持仓 + state = self.safe_string(order_data, 'state') + if state != 'Filled': + return None + + symbol = self.safe_string(order_data, 'symbol') + if symbol and len(symbol) >= 6: + base = symbol[:3] + quote = symbol[3:] + symbol = base + '/' + quote + + timestamp = self.parse8601(self.safe_string(order_data, 'openTime')) + open_timestamp_utc = self.safe_integer(order_data, 'openTimestampUTC') + if open_timestamp_utc: + timestamp = open_timestamp_utc + + # 确定持仓方向 + order_type = self.safe_string(order_data, 'orderType') + side = 'long' if order_type == 'Buy' else 'short' + + # 计算持仓价值 + contracts = self.safe_number(order_data, 'lots', 0) + entry_price = self.safe_number(order_data, 'openPrice', 0) + current_price = self.safe_number(order_data, 'closePrice', entry_price) + notional = contracts * entry_price if contracts and entry_price else None + + # 计算盈亏百分比 + percentage = None + if entry_price and current_price and entry_price != 0: + if side == 'long': + percentage = (current_price - entry_price) / entry_price + else: + percentage = (entry_price - current_price) / entry_price + + return { + 'id': self.safe_string(order_data, 'ticket'), + 'symbol': symbol, + 'timestamp': timestamp, + 'datetime': self.iso8601(timestamp), + 'side': side, + 'contracts': contracts, + 'contractSize': self.safe_number(order_data, 'contractSize', 1.0), + 'entryPrice': entry_price, + 'markPrice': current_price, # 使用当前价格作为标记价格 + 'notional': notional, + 'leverage': 1, # MT5 可能需要从账户信息获取 + 'unrealizedPnl': self.safe_number(order_data, 'profit', 0), + 'realizedPnl': 0, # 对于持仓,已实现盈亏为0 + 'liquidationPrice': None, # MT5 可能不提供 + 'marginMode': 'cross', + 'percentage': percentage, + 'marginRatio': None, + 'collateral': None, + 'initialMargin': None, # 可能需要计算 + 'initialMarginPercentage': None, + 'maintenanceMargin': None, + 'maintenanceMarginPercentage': None, + # 'info': order_data, + } + async def create_order(self, symbol, type, side, amount, price=None, params={}): """创建订单""" await self.load_markets() @@ -650,6 +721,62 @@ class mt5(Exchange, ImplicitAPI): response = await self.private_get_orderclose(self.extend(request, params)) return self.parse_order(response) + async def private_get(self, endpoint, params={}): + """发送私有 GET 请求""" + return await self.fetch_private(endpoint, 'GET', params) + + async def fetch_private(self, path, method='GET', params={}, headers=None, body=None): + """发送私有 API 请求""" + url = self.urls['api']['private'] + '/' + path + query = self.omit(params, self.extract_params(path)) + + if method == 'GET' and query: + url += '?' + self.urlencode(query) + + if self.verbose: + print(f"🔧 发送请求: {url}") + + import aiohttp + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + content = await response.text() + # 直接返回内容,让 parse_json 处理 + return self.parse_json(content) + else: + error_text = await response.text() + raise ExchangeError(f"HTTP {response.status}: {error_text}") + except Exception as e: + raise ExchangeError(f"请求失败: {e}") + + def parse_json(self, response): + """解析响应,支持多种格式""" + if not response: + return response + + response = response.strip() + + # 处理常见的成功响应 + if response in ['OK', 'SUCCESS', 'True', 'true']: + return True + + # 处理常见的失败响应 + if response in ['FAIL', 'ERROR', 'False', 'false']: + return False + + # 尝试解析 JSON + try: + import json + return json.loads(response) + except json.JSONDecodeError: + # 不是 JSON,返回原始响应 + return response + except Exception as e: + if self.verbose: + print(f"响应解析失败: {e}, 响应: {response}") + return response + def sign(self, path, api='public', method='GET', params={}, headers=None, body=None): """签名请求""" base_url = self.urls['api'][api] diff --git a/ccxt/mt5.py b/ccxt/mt5.py index c223275..1f6c501 100644 --- a/ccxt/mt5.py +++ b/ccxt/mt5.py @@ -112,6 +112,7 @@ class mt5(Exchange, ImplicitAPI): 'OrderSend': 1, 'OrderModify': 1, 'OrderClose': 1, + 'SubscribeOhlc': 1, # 添加订阅端点 }, }, }, @@ -488,11 +489,13 @@ class mt5(Exchange, ImplicitAPI): balance = self.safe_number(response, 'balance', 0.0) margin = self.safe_number(response, 'margin', 0.0) free_margin = self.safe_number(response, 'freeMargin', 0.0) + equity = self.safe_number(response, 'equity', 0.0) result[currency] = { 'free': free_margin, 'used': margin, 'total': balance, + 'equity': equity, } return self.safe_balance(result) @@ -614,13 +617,14 @@ class mt5(Exchange, ImplicitAPI): 'cost': fee_cost, 'currency': None, } - + return self.safe_order({ 'id': id, 'clientOrderId': None, 'datetime': self.iso8601(timestamp), 'timestamp': timestamp, 'lastTradeTimestamp': last_trade_timestamp, + 'lastUpdateTimestamp': last_trade_timestamp, 'status': status, 'symbol': symbol, 'type': type, @@ -628,7 +632,9 @@ class mt5(Exchange, ImplicitAPI): 'postOnly': None, 'side': side, 'price': price, - 'stopPrice': None, + 'stopLossPrice': self.safe_number(order, 'stopLoss'), + 'takeProfitPrice': self.safe_number(order, 'takeProfit'), + 'reduceOnly':None, 'triggerPrice': None, 'amount': amount, 'filled': filled, @@ -678,6 +684,68 @@ class mt5(Exchange, ImplicitAPI): } return self.safe_string(types, type, type) + def parse_position(self, order_data, market: Market = None): + """从订单数据解析持仓""" + # 只有状态为 Filled 的订单才是持仓 + state = self.safe_string(order_data, 'state') + if state != 'Filled': + return None + + symbol = self.safe_string(order_data, 'symbol') + if symbol and len(symbol) >= 6: + base = symbol[:3] + quote = symbol[3:] + symbol = base + '/' + quote + + timestamp = self.parse8601(self.safe_string(order_data, 'openTime')) + open_timestamp_utc = self.safe_integer(order_data, 'openTimestampUTC') + if open_timestamp_utc: + timestamp = open_timestamp_utc + + # 确定持仓方向 + order_type = self.safe_string(order_data, 'orderType') + side = 'long' if order_type == 'Buy' else 'short' + + # 计算持仓价值 + contracts = self.safe_number(order_data, 'lots', 0) + entry_price = self.safe_number(order_data, 'openPrice', 0) + current_price = self.safe_number(order_data, 'closePrice', entry_price) + notional = contracts * entry_price if contracts and entry_price else None + + # 计算盈亏百分比 + percentage = None + if entry_price and current_price and entry_price != 0: + if side == 'long': + percentage = (current_price - entry_price) / entry_price + else: + percentage = (entry_price - current_price) / entry_price + + return { + 'id': self.safe_string(order_data, 'ticket'), + 'symbol': symbol, + 'timestamp': timestamp, + 'datetime': self.iso8601(timestamp), + 'side': side, + 'contracts': contracts, + 'contractSize': self.safe_number(order_data, 'contractSize', 1.0), + 'entryPrice': entry_price, + 'markPrice': current_price, # 使用当前价格作为标记价格 + 'notional': notional, + 'leverage': 1, # MT5 可能需要从账户信息获取 + 'unrealizedPnl': self.safe_number(order_data, 'profit', 0), + 'realizedPnl': 0, # 对于持仓,已实现盈亏为0 + 'liquidationPrice': None, # MT5 可能不提供 + 'marginMode': 'cross', + 'percentage': percentage, + 'marginRatio': None, + 'collateral': None, + 'initialMargin': None, # 可能需要计算 + 'initialMarginPercentage': None, + 'maintenanceMargin': None, + 'maintenanceMarginPercentage': None, + 'info': order_data, + } + def create_order(self, symbol, type, side, amount, price=None, params={}): """创建订单""" self.load_token() diff --git a/ccxt/pro/mt5.py b/ccxt/pro/mt5.py index 01b6ea3..9b229d9 100644 --- a/ccxt/pro/mt5.py +++ b/ccxt/pro/mt5.py @@ -5,6 +5,7 @@ from ccxt.base.errors import ExchangeError, ArgumentsRequired from ccxt.async_support.base.ws.client import Client import asyncio from typing import Optional, Dict, Any, List +import urllib.parse class mt5(mt5Parent): @@ -14,8 +15,10 @@ class mt5(mt5Parent): 'ws': True, 'watchBalance': True, 'watchOrders': True, + 'watchPositions': True, 'watchTicker': True, 'watchOHLCV': True, + 'watchAll': True, }, 'urls': { 'api': { @@ -26,188 +29,798 @@ class mt5(mt5Parent): 'tradesLimit': 1000, 'ordersLimit': 1000, 'OHLCVLimit': 1000, + 'positionsLimit': 1000, }, 'streaming': { 'ping': True, 'maxPingPongMisses': 2, }, }) - + + async def watch_all(self, params={}): + """监听所有数据(订单、持仓、钱包)""" + if not hasattr(self, 'token') or not self.token: + await self.get_token() + + url = self.urls['api']['ws'] + '/OnOrderUpdate?id=' + self.token + message_hash = 'all_data' + return await self.watch(url, message_hash) + async def watch_balance(self, params={}): - """ - 监听余额变化 - """ + """监听余额变化""" if not hasattr(self, 'token') or not self.token: await self.get_token() - url = self.urls['api']['ws'] + '/OnOrderProfit' - request = { - 'id': self.token, - } + url = self.urls['api']['ws'] + '/OnOrderUpdate?id=' + self.token message_hash = 'balance' - return await self.watch(url, message_hash, request, params) + return await self.watch(url, message_hash) - async def watch_orders(self, symbol: Optional[str] = None, since: Optional[int] = None, limit: Optional[int] = None, params={}): - """ - 监听订单变化 - """ + async def watch_orders(self, symbol: Optional[str] = None, since: Optional[int] = None, limit: int = 1, params={}): + """监听订单变化""" if not hasattr(self, 'token') or not self.token: await self.get_token() - url = self.urls['api']['ws'] + '/OnOrderUpdate' - request = { - 'id': self.token, - } + url = self.urls['api']['ws'] + '/OnOrderUpdate?id=' + self.token message_hash = 'orders' if symbol is not None: symbol = self.symbol(symbol) message_hash += ':' + symbol - orders = await self.watch(url, message_hash, request, params) - if self.newUpdates: - limit = orders.getLimit(symbol, limit) - return self.filter_by_symbol_since_limit(orders, symbol, since, limit, True) - + orders = await self.watch(url, message_hash) + + # 过滤订单 + if symbol is not None: + orders = [order for order in orders if order['symbol'] == symbol] + if since is not None: + orders = [order for order in orders if order['timestamp'] >= since] + + # 限制返回数量 + if limit is not None: + orders = orders[-limit:] + + return orders + + async def watch_positions(self, symbols: Optional[List[str]] = None, since: Optional[int] = None, limit: Optional[int] = None, params={}): + """监听持仓变化""" + if not hasattr(self, 'token') or not self.token: + await self.get_token() + + url = self.urls['api']['ws'] + '/OnOrderUpdate?id=' + self.token + message_hash = 'positions' + if symbols is not None: + symbols = [self.symbol(symbol) for symbol in symbols] + message_hash += ':' + ':'.join(symbols) + + positions = await self.watch(url, message_hash) + + # 过滤持仓 + if symbols is not None: + positions = [position for position in positions if position['symbol'] in symbols] + if since is not None: + positions = [position for position in positions if position['timestamp'] >= since] + if limit is not None: + positions = positions[-limit:] + + return positions + async def watch_ticker(self, symbol: str, params={}): - """ - 监听行情变化 - """ await self.load_markets() market = self.market(symbol) if not hasattr(self, 'token') or not self.token: await self.get_token() - - url = self.urls['api']['ws'] + '/OnQuote' + + # WebSocket 监听地址 + url = self.urls['api']['ws'] + '/OnQuote?id=' + self.token + + # 订阅参数 request = { 'id': self.token, 'symbol': market['id'], } + + # 消息哈希 message_hash = 'ticker:' + market['symbol'] - return await self.watch(url, message_hash, request, params) + + # 订阅哈希(用于管理订阅状态) + subscribe_hash = 'ticker:' + market['id'] + + try: + # 检查是否已经订阅 + if not hasattr(self, 'ticker_subscriptions'): + self.ticker_subscriptions = set() + + # 如果尚未订阅,发送订阅请求 + if subscribe_hash not in self.ticker_subscriptions: + subscription_result = await self.send_ticker_subscription(request, params) + + if not subscription_result: + raise ExchangeError("行情订阅失败") + + # 记录订阅状态 + self.ticker_subscriptions.add(subscribe_hash) + + if self.verbose: + print(f"✅ 行情订阅成功: {symbol}") + + # 监听 WebSocket 数据 + ticker = await self.watch(url, message_hash, subscribe_hash=subscribe_hash) + + return ticker + + except Exception as e: + if self.verbose: + print(f"❌ 行情监听失败: {e}") + raise e - async def watch_ohlcv(self, symbol: str, timeframe='1m', since: Optional[int] = None, limit: Optional[int] = None, params={}): - """ - 监听K线数据 - """ + async def close_ticker(self, symbol: str, params={}): + """取消行情订阅""" await self.load_markets() market = self.market(symbol) if not hasattr(self, 'token') or not self.token: await self.get_token() + + # 取消订阅参数 + request = { + 'id': self.token, + 'symbol': market['id'], + } + + try: + # 发送取消订阅请求 + result = await self.private_get('UnSubscribe', self.extend(request, params)) - url = self.urls['api']['ws'] + '/OnOhlc' + # 更新订阅状态 + if hasattr(self, 'ticker_subscriptions'): + subscribe_hash = 'ticker:' + market['id'] + if subscribe_hash in self.ticker_subscriptions: + self.ticker_subscriptions.remove(subscribe_hash) + + if self.verbose: + print(f"✅ 取消行情订阅成功: {symbol}") + + return result + + except Exception as e: + if self.verbose: + print(f"❌ 取消行情订阅失败: {e}") + raise e + + async def watch_ohlcv(self, symbol: str, timeframe='1m', since: Optional[int] = None, limit: Optional[int] = None, params={}): + await self.load_markets() + market = self.market(symbol) + + if not hasattr(self, 'token') or not self.token: + await self.get_token() + + # WebSocket 监听地址 + url = self.urls['api']['ws'] + '/OnOhlc?id=' + self.token + + # 订阅参数 request = { 'id': self.token, 'symbol': market['id'], 'timeframe': self.timeframes[timeframe], } + + # 消息哈希 message_hash = 'ohlcv:' + market['symbol'] + ':' + timeframe - ohlcv = await self.watch(url, message_hash, request, params) - if self.newUpdates: - limit = ohlcv.getLimit(symbol, limit) - return self.filter_by_since_limit(ohlcv, since, limit, 0, True) - - def handle_balance(self, client: Client, message): - """处理余额更新""" - message_hash = 'balance' - balance = self.parse_balance(message) - self.balance = balance - client.resolve(balance, message_hash) - - def handle_orders(self, client: Client, message): - """处理订单更新""" - message_hash = 'orders' - if self.orders is None: - self.orders = ArrayCacheBySymbolById() - - orders = self.parse_ws_orders(message) - for order in orders: - self.orders.append(order) - - client.resolve(self.orders, message_hash) - - def handle_ticker(self, client: Client, message): - """处理行情更新""" - ticker = self.parse_ws_ticker(message) - symbol = ticker['symbol'] - message_hash = 'ticker:' + symbol - self.tickers[symbol] = ticker - client.resolve(ticker, message_hash) - - def handle_ohlcv(self, client: Client, message): - """处理K线更新""" - symbol = self.safe_string(message, 'symbol') - timeframe = self.safe_string(message, 'timeframe', '1m') - message_hash = 'ohlcv:' + symbol + ':' + timeframe + # 订阅哈希(用于管理订阅状态) + subscribe_hash = 'ohlcv:' + market['id'] + ':' + str(self.timeframes[timeframe]) - if self.ohlcvs is None: - self.ohlcvs = {} - if symbol not in self.ohlcvs: - self.ohlcvs[symbol] = {} + try: + # 检查是否已经订阅 + if not hasattr(self, 'ohlcv_subscriptions'): + self.ohlcv_subscriptions = set() - stored = self.ohlcvs[symbol][timeframe] - if stored is None: - limit = self.safe_integer(self.options, 'OHLCVLimit', 1000) - stored = ArrayCacheByTimestamp(limit) - self.ohlcvs[symbol][timeframe] = stored + # 如果尚未订阅,发送订阅请求 + if subscribe_hash not in self.ohlcv_subscriptions: + subscription_result = await self.send_ohlcv_subscription(request, params) + + if not subscription_result: + raise ExchangeError("K线订阅失败") + + # 记录订阅状态 + self.ohlcv_subscriptions.add(subscribe_hash) + + if self.verbose: + print(f"✅ K线订阅成功: {symbol} {timeframe}") - ohlcv = self.parse_ws_ohlcv(message) - stored.append(ohlcv) - client.resolve(stored, message_hash) + # 监听 WebSocket 数据 + ohlcv = await self.watch(url, message_hash, subscribe_hash=subscribe_hash) + + return self.filter_by_since_limit(ohlcv, since, limit, 0, True) + + except Exception as e: + if self.verbose: + print(f"❌ K线监听失败: {e}") + raise e - def parse_ws_orders(self, message): - """解析WebSocket订单数据""" - orders = self.safe_value(message, 'data', []) - result = [] - for order_data in orders: - order = self.parse_ws_order(order_data) - result.append(order) - return result + async def close_ohlcv(self, symbol: Optional[str] = None, timeframe: Optional[str] = None, params={}): + """取消K线订阅""" + if not hasattr(self, 'token') or not self.token: + await self.get_token() + + # 取消订阅参数 + request = { + 'id': self.token, + } + + # 如果指定了交易对,添加到参数中 + if symbol is not None: + await self.load_markets() + market = self.market(symbol) + request['symbol'] = market['id'] + + try: + # 发送取消订阅请求 + result = await self.private_get('UnsubscribeOhlc', self.extend(request, params)) + + # 更新订阅状态 + if hasattr(self, 'ohlcv_subscriptions'): + if symbol is not None: + # 取消指定交易对的所有时间帧订阅 + if timeframe is not None: + # 取消指定交易对和指定时间帧的订阅 + subscribe_hash = 'ohlcv:' + market['id'] + ':' + str(self.timeframes[timeframe]) + if subscribe_hash in self.ohlcv_subscriptions: + self.ohlcv_subscriptions.remove(subscribe_hash) + else: + # 取消指定交易对的所有时间帧订阅 + to_remove = [h for h in self.ohlcv_subscriptions if h.startswith('ohlcv:' + market['id'] + ':')] + for hash_to_remove in to_remove: + self.ohlcv_subscriptions.remove(hash_to_remove) + else: + # 取消所有K线订阅 + self.ohlcv_subscriptions.clear() + + if self.verbose: + if symbol is not None: + if timeframe is not None: + print(f"✅ 取消K线订阅成功: {symbol} {timeframe}") + else: + print(f"✅ 取消K线订阅成功: {symbol} (所有时间帧)") + else: + print(f"✅ 取消所有K线订阅成功") + + return result + + except Exception as e: + if self.verbose: + print(f"❌ 取消K线订阅失败: {e}") + raise e + + async def send_ticker_subscription(self, request, params={}): + """发送行情数据订阅请求""" + try: + # 使用 private_get 发送订阅 + response = await self.private_get('Subscribe', self.extend(request, params)) + + if self.verbose: + print(f"✅ 行情订阅响应: {response}") + + # 检查订阅是否成功 + if response in [True, 'OK', 'SUCCESS']: + return True + else: + raise ExchangeError(f"行情订阅失败: {response}") + + except Exception as e: + if self.verbose: + print(f"private_get 失败,尝试直接HTTP请求: {e}") + + # 直接HTTP请求 + return await self.send_direct_subscription('Subscribe', request, params) - def parse_ws_order(self, order_data): - """解析单个WebSocket订单""" - return self.parse_order(order_data) - - def parse_ws_ticker(self, message): - """解析WebSocket行情数据""" - return self.parse_ticker(message) - - def parse_ws_ohlcv(self, message): - """解析WebSocket K线数据""" - return [ - self.parse8601(self.safe_string(message, 'time')), - self.safe_number(message, 'open'), - self.safe_number(message, 'high'), - self.safe_number(message, 'low'), - self.safe_number(message, 'close'), - self.safe_number(message, 'volume', 0), - ] def handle_message(self, client: Client, message): - """处理所有WebSocket消息""" - error_code = self.safe_string(message, 'errorCode') - if error_code is not None: - self.handle_error_message(client, message) - return - - # 根据消息类型路由处理 - message_type = self.safe_string(message, 'type') - - if message_type == 'OnOrderProfit': - self.handle_balance(client, message) - elif message_type == 'OnOrderUpdate': - self.handle_orders(client, message) - elif message_type == 'OnQuote': - self.handle_ticker(client, message) - elif message_type == 'OnOhlc': - self.handle_ohlcv(client, message) - elif 'OpenedOrders' in message: - self.handle_orders(client, message) + """处理WebSocket消息 - 根据类型分发""" + try: + # 检查错误 + error_code = self.safe_string(message, 'errorCode') + if error_code is not None and error_code != '0': + self.handle_error_message(client, message) + return + + # 解析消息类型 + message_type = self.safe_string(message, 'type') + + if message_type == 'OrderUpdate': + self.handle_order_update_message(client, message) + elif message_type == 'OpenedOrders': + self.handle_opened_orders_message(client, message) + elif message_type == 'Ohlc': + self.handle_ohlc_message(client, message) + elif message_type == 'Quote': + self.handle_ticker_message(client, message) + else: + if self.verbose: + print(f"未知消息类型: {message_type}") + + except Exception as e: + if self.verbose: + print(f"处理WebSocket消息失败: {e}, message: {message}") + def handle_order_update_message(self, client: Client, message): + """处理 OrderUpdate 类型消息(包含订单、持仓、余额)""" + try: + data = self.safe_value(message, 'data', {}) + timestamp = self.safe_integer(message, 'timestampUTC') + + # 1. 解析余额信息 + balance_data = self.parse_ws_balance_from_data(data) + if balance_data: + balance_data['timestamp'] = timestamp + balance_data['datetime'] = self.iso8601(timestamp) + self.balance = balance_data + client.resolve(balance_data, 'balance') + else: + # 即使没有余额数据,也返回空余额 + client.resolve(self.safe_balance({}), 'balance') + + # 2. 解析订单信息(从 update.order) + update_data = self.safe_value(data, 'update', {}) + order_data = self.safe_value(update_data, 'order') + if order_data: + order = self.parse_ws_order(order_data) + if order: + # 使用简单的列表而不是 ArrayCacheBySymbolById + if not hasattr(self, 'orders_list'): + self.orders_list = [] + # 更新或添加订单 + self.update_or_add_order(order) + client.resolve(self.orders_list.copy(), 'orders') + else: + # 即使没有订单数据,也返回空列表 + if not hasattr(self, 'orders_list'): + self.orders_list = [] + client.resolve(self.orders_list.copy(), 'orders') + + # 3. 解析持仓信息(从 openedOrders) + opened_orders = self.safe_value(data, 'openedOrders', []) + positions = self.parse_ws_positions_from_orders(opened_orders) + # 总是返回持仓列表,即使为空 + if not hasattr(self, 'positions_list'): + self.positions_list = [] + # 完全替换持仓列表 + self.positions_list = positions + client.resolve(self.positions_list.copy(), 'positions') + + # 4. 返回所有数据(用于 watch_all) + all_data = { + 'balance': balance_data if balance_data else self.safe_balance({}), + 'orders': self.orders_list.copy() if hasattr(self, 'orders_list') else [], + 'positions': self.positions_list.copy() if hasattr(self, 'positions_list') else [], + 'timestamp': timestamp, + } + client.resolve(all_data, 'all_data') + + except Exception as e: + error = ExchangeError(f"解析OrderUpdate消息失败: {e}") + client.reject(error, 'balance') + client.reject(error, 'orders') + client.reject(error, 'positions') + client.reject(error, 'all_data') + + def handle_opened_orders_message(self, client: Client, message): + """处理 OpenedOrders 类型消息(只包含持仓)""" + try: + data = self.safe_value(message, 'data', []) + timestamp = self.safe_integer(message, 'timestampUTC') + + # 解析持仓信息 + positions = self.parse_ws_positions_from_orders(data) + # 总是返回持仓列表,即使为空 + if not hasattr(self, 'positions_list'): + self.positions_list = [] + # 完全替换持仓列表 + self.positions_list = positions + client.resolve(self.positions_list.copy(), 'positions') + + # 返回所有数据(持仓数据) + all_data = { + 'balance': None, + 'orders': [], + 'positions': self.positions_list.copy() if hasattr(self, 'positions_list') else [], + 'timestamp': timestamp, + } + client.resolve(all_data, 'all_data') + + except Exception as e: + error = ExchangeError(f"解析OpenedOrders消息失败: {e}") + client.reject(error, 'positions') + client.reject(error, 'all_data') + + def handle_ohlc_message(self, client: Client, message): + """处理K线数据消息""" + try: + data = self.safe_value(message, 'data', {}) + symbol = self.safe_string(data, 'symbol') + + if symbol and len(symbol) >= 6: + base = symbol[:3] + quote = symbol[3:] + symbol = base + '/' + quote + + # 获取时间帧 + timeframe_value = self.safe_integer(data, 'timeframe') + timeframe = self.parse_timeframe_from_value(timeframe_value) + + # 构建消息哈希 + message_hash = 'ohlcv:' + symbol + ':' + timeframe + + # 解析K线数据 + ohlcv = self.parse_ws_ohlcv_message(data) + + # 更新缓存 + if not hasattr(self, 'ohlcv_cache'): + self.ohlcv_cache = {} + if symbol not in self.ohlcv_cache: + self.ohlcv_cache[symbol] = {} + + stored = self.ohlcv_cache[symbol].get(timeframe) + if stored is None: + limit = self.safe_integer(self.options, 'OHLCVLimit', 1000) + # 使用简单的列表代替 ArrayCacheByTimestamp + stored = [] + self.ohlcv_cache[symbol][timeframe] = stored + + # 添加新的K线数据 + stored.append(ohlcv) + + # 限制缓存大小 + if len(stored) > self.options['OHLCVLimit']: + stored = stored[-self.options['OHLCVLimit']:] + self.ohlcv_cache[symbol][timeframe] = stored + + # 返回数据 + client.resolve(stored, message_hash) + + except Exception as e: + if self.verbose: + print(f"处理K线消息失败: {e}") + # 拒绝相关的promise + symbol = self.safe_string(message.get('data', {}), 'symbol') + if symbol and len(symbol) >= 6: + base = symbol[:3] + quote = symbol[3:] + symbol = base + '/' + quote + timeframe_value = self.safe_integer(message.get('data', {}), 'timeframe') + timeframe = self.parse_timeframe_from_value(timeframe_value) + message_hash = 'ohlcv:' + symbol + ':' + timeframe + client.reject(e, message_hash) + + def handle_ticker_message(self, client: Client, message): + """处理行情数据消息""" + try: + data = self.safe_value(message, 'data', {}) + symbol = self.safe_string(data, 'symbol') + + if symbol and len(symbol) >= 6: + base = symbol[:3] + quote = symbol[3:] + symbol = base + '/' + quote + + # 构建消息哈希 + message_hash = 'ticker:' + symbol + + # 解析行情数据 + ticker = self.parse_ws_ticker_message(data) + + # 更新缓存 + if not hasattr(self, 'ticker_cache'): + self.ticker_cache = {} + + self.ticker_cache[symbol] = ticker + + # 返回数据 + client.resolve(ticker, message_hash) + + except Exception as e: + if self.verbose: + print(f"处理行情消息失败: {e}") + # 拒绝相关的promise + symbol = self.safe_string(message.get('data', {}), 'symbol') + if symbol and len(symbol) >= 6: + base = symbol[:3] + quote = symbol[3:] + symbol = base + '/' + quote + message_hash = 'ticker:' + symbol + client.reject(e, message_hash) + + def update_or_add_order(self, order): + """更新或添加订单到列表""" + if not hasattr(self, 'orders_list'): + self.orders_list = [] + + order_id = order['id'] + # 查找是否已存在相同ID的订单 + existing_index = -1 + for i, existing_order in enumerate(self.orders_list): + if existing_order['id'] == order_id: + existing_index = i + break + + if existing_index >= 0: + # 更新现有订单 + self.orders_list[existing_index] = order + else: + # 添加新订单 + self.orders_list.append(order) + + # 限制订单列表长度 + if len(self.orders_list) > self.options['ordersLimit']: + self.orders_list = self.orders_list[-self.options['ordersLimit']:] + + def parse_ws_balance_from_data(self, data): + """从数据中解析余额信息""" + result = { + 'timestamp': None, + 'datetime': None, + } + + currency = 'USDT' + + balance = self.safe_number(data, 'balance', 0) + equity = self.safe_number(data, 'equity', 0) + margin = self.safe_number(data, 'margin', 0) + free_margin = self.safe_number(data, 'freeMargin', 0) + profit = self.safe_number(data, 'profit', 0) + + result[currency] = { + 'free': free_margin, + 'used': margin, + 'total': balance, + 'equity': equity, + 'profit': profit, + } + + return self.safe_balance(result) + + def parse_ws_order(self, order_data): + """解析单个订单数据""" + if not order_data: + return None + + symbol = self.safe_string(order_data, 'symbol') + if symbol and len(symbol) >= 6: + base = symbol[:3] + quote = symbol[3:] + symbol = base + '/' + quote + + # 确定订单状态 + state = self.safe_string(order_data, 'state') + status = self.parse_order_status(state) + + # 确定订单是否已关闭 + close_time = self.safe_string(order_data, 'closeTime') + is_closed = close_time and close_time != "0001-01-01T00:00:00" + + timestamp = self.parse8601(self.safe_string(order_data, 'openTime')) + last_trade_timestamp = None + if is_closed: + last_trade_timestamp = self.parse8601(close_time) + + amount = self.safe_number(order_data, 'lots') + filled = self.safe_number(order_data, 'closeLots') + remaining = amount - filled if amount is not None and filled is not None else None + + return { + 'id': self.safe_string(order_data, 'ticket'), + 'clientOrderId': None, + 'datetime': self.iso8601(timestamp), + 'timestamp': timestamp, + 'lastTradeTimestamp': last_trade_timestamp, + 'lastUpdateTimestamp': last_trade_timestamp, + 'status': status, + 'symbol': symbol, + 'type': self.parse_order_type(self.safe_string(order_data, 'orderType')), + 'timeInForce': None, + 'postOnly': None, + 'side': self.parse_order_side(self.safe_string(order_data, 'orderType')), + 'price': self.safe_number(order_data, 'openPrice'), + 'stopLossPrice': self.safe_number(order_data, 'stopLoss'), + 'takeProfitPrice': self.safe_number(order_data, 'takeProfit'), + 'reduceOnly':None, + 'triggerPrice': None, + 'amount': amount, + 'filled': filled, + 'remaining': remaining, + 'cost': None, + 'trades': None, + 'fee': { + 'cost': self.safe_number(order_data, 'commission', 0) + self.safe_number(order_data, 'fee', 0), + 'currency': None, + }, + 'average': None, + 'info': order_data, + } + + def parse_ws_positions_from_orders(self, orders_data): + """从订单数据解析持仓信息""" + positions = [] + + if not isinstance(orders_data, list): + orders_data = [orders_data] + + for order_data in orders_data: + try: + # 只有状态为 Filled 的订单才被认为是持仓 + state = self.safe_string(order_data, 'state') + if state == 'Filled': + position = self.parse_ws_position(order_data) + if position: + positions.append(position) + + except Exception as e: + if self.verbose: + print(f"解析持仓失败: {e}") + continue + + return positions + + def parse_ws_position(self, order_data): + """解析单个持仓""" + symbol = self.safe_string(order_data, 'symbol') + if symbol and len(symbol) >= 6: + base = symbol[:3] + quote = symbol[3:] + symbol = base + '/' + quote + + timestamp = self.parse8601(self.safe_string(order_data, 'openTime')) + + contracts = self.safe_number(order_data, 'lots') + entry_price = self.safe_number(order_data, 'openPrice') + mark_price = self.safe_number(order_data, 'closePrice') + notional = contracts * entry_price if contracts and entry_price else None + + return { + 'id': self.safe_string(order_data, 'ticket'), + 'symbol': symbol, + 'timestamp': timestamp, + 'datetime': self.iso8601(timestamp), + 'side': self.parse_order_side(self.safe_string(order_data, 'orderType')), + 'contracts': contracts, + 'contractSize': self.safe_number(order_data, 'contractSize', 1.0), + 'entryPrice': entry_price, + 'markPrice': mark_price, + 'notional': notional, + 'leverage': 1, + 'unrealizedPnl': self.safe_number(order_data, 'profit'), + 'realizedPnl': 0, + 'liquidationPrice': None, + 'marginMode': 'cross', + 'percentage': None, + 'marginRatio': None, + 'collateral': None, + 'initialMargin': None, + 'initialMarginPercentage': None, + 'maintenanceMargin': None, + 'maintenanceMarginPercentage': None, + 'info': order_data, + } + + def parse_ws_ohlcv_message(self, data): + """解析WebSocket K线数据""" + timestamp = self.parse8601(self.safe_string(data, 'time')) + + return [ + timestamp, # 时间戳 + self.safe_number(data, 'open'), # 开盘价 + self.safe_number(data, 'high'), # 最高价 + self.safe_number(data, 'low'), # 最低价 + self.safe_number(data, 'close'), # 收盘价 + self.safe_number(data, 'volume', 0) # 成交量 + ] + + def parse_ws_ticker_message(self, data): + """解析WebSocket行情数据""" + symbol = self.safe_string(data, 'symbol') + if symbol and len(symbol) >= 6: + base = symbol[:3] + quote = symbol[3:] + symbol = base + '/' + quote + + timestamp = self.parse8601(self.safe_string(data, 'time')) + + bid = self.safe_number(data, 'bid') + ask = self.safe_number(data, 'ask') + last = self.safe_number(data, 'last') + + # 如果没有 last 价格,使用中间价 + if last is None and bid is not None and ask is not None: + last = (bid + ask) / 2 + + # 计算涨跌幅(需要历史数据,这里设为None) + change = None + percentage = None + + return { + 'symbol': symbol, + 'timestamp': timestamp, + 'datetime': self.iso8601(timestamp), + 'high': None, + 'low': None, + 'bid': bid, + 'bidVolume': None, + 'ask': ask, + 'askVolume': None, + 'vwap': None, + 'open': None, + 'close': None, + 'last': last, + 'previousClose': None, + 'change': change, + 'percentage': percentage, + 'average': None, + 'baseVolume': self.safe_number(data, 'volume'), + 'quoteVolume': None, + 'info': data, + } + + def parse_timeframe_from_value(self, timeframe_value): + """将时间帧数值转换为字符串""" + timeframe_map = { + 1: '1m', + 5: '5m', + 15: '15m', + 30: '30m', + 60: '1h', + 240: '4h', + 1440: '1d', + 10080: '1w', + 43200: '1M' + } + return self.safe_string(timeframe_map, timeframe_value, '1m') + + async def send_ohlcv_subscription(self, request, params={}): + """发送K线数据订阅请求""" + try: + # 方法1:尝试使用 private_get + response = await self.private_get('SubscribeOhlc', self.extend(request, params)) + + if self.verbose: + print(f"✅ K线订阅成功: {response}") + + # 检查订阅是否成功 + if response in [True, 'OK', 'SUCCESS']: + return True + else: + raise ExchangeError(f"订阅失败: {response}") + + except Exception as e: + if self.verbose: + print(f"private_get 失败,尝试直接HTTP请求: {e}") + + # 方法2:直接HTTP请求 + return await self.send_direct_subscription('SubscribeOhlc', request, params) + def handle_error_message(self, client: Client, message): """处理错误消息""" error_code = self.safe_string(message, 'errorCode') - error_message = self.safe_string(message, 'message') - raise ExchangeError(f"MT5 WebSocket error {error_code}: {error_message}") \ No newline at end of file + error_message = self.safe_string(message, 'message', 'Unknown error') + + error = ExchangeError(f"MT5 WebSocket error {error_code}: {error_message}") + + try: + # 拒绝固定消息哈希 + client.reject(error, 'balance') + client.reject(error, 'orders') + client.reject(error, 'positions') + client.reject(error, 'all_data') + + # 安全地拒绝所有K线消息哈希 + if hasattr(client, 'futures') and client.futures: + ohlcv_hashes = [hash for hash in client.futures.keys() if isinstance(hash, str) and hash.startswith('ohlcv:')] + for ohlcv_hash in ohlcv_hashes: + try: + client.reject(error, ohlcv_hash) + except Exception as e: + if self.verbose: + print(f"拒绝K线消息哈希失败 {ohlcv_hash}: {e}") + + except Exception as e: + if self.verbose: + print(f"错误处理失败: {e}") \ No newline at end of file diff --git a/test/test_mt5_tools.py b/test/test_mt5_tools.py index a439895..1eec10b 100644 --- a/test/test_mt5_tools.py +++ b/test/test_mt5_tools.py @@ -107,21 +107,22 @@ async def websocket_quick_test(): 'apiKey': '76888962', 'secret': 'LZ-trade666888', 'verbose': False, # 启用详细日志 + # 'debug': True, # 启用详细调试信息 'hostname': '43.167.188.220:5000', - 'options': { - # 'server': '147.160.254.81:443', # 使用服务器名称 - # 或者 - 'host': '18.163.85.196', - 'port': 443, - }, + 'host': '18.163.85.196', + 'port': 443, }) try: # 监听订单更新 async def order_listener(): - orders = await exchange.watch_orders() - for order in orders: - logger.info(f"📦 订单更新: {order['id']} {order['symbol']} {order['side']} {order['status']}") + while True: + # print("111111111") + res = await exchange.watch_ticker(symbol='BTCUSD') + print("===========================收到信息") + print(res) + # for order in res: + # logger.info(f"📦 订单更新: {order}") # 监听余额更新 async def balance_listener(): @@ -132,16 +133,17 @@ async def websocket_quick_test(): # 运行监听器 await asyncio.gather( order_listener(), - balance_listener(), - return_exceptions=True + # balance_listener(), + # return_exceptions=True ) except Exception as e: - logger.error(f"WebSocket 测试错误: {e}") + # logger.error(f"WebSocket 测试错误: {e}") + logger.exception("WebSocket 测试错误:") finally: await exchange.close() if __name__ == "__main__": # 运行快速测试 - asyncio.run(quick_order_test()) \ No newline at end of file + asyncio.run(websocket_quick_test()) \ No newline at end of file