935 lines
36 KiB
Python
935 lines
36 KiB
Python
# -*- coding: utf-8 -*-
|
||
|
||
from ccxt.async_support.mt5 import mt5 as mt5Parent
|
||
from ccxt.base.errors import ExchangeError, ArgumentsRequired
|
||
from ccxt.async_support.base.ws.client import Client
|
||
import asyncio
|
||
from typing import Optional, Dict, Any, List
|
||
import urllib.parse
|
||
|
||
|
||
class mt5(mt5Parent):
|
||
def describe(self):
|
||
return self.deep_extend(super(mt5, self).describe(), {
|
||
'has': {
|
||
'ws': True,
|
||
'watchBalance': True,
|
||
'watchOrders': True,
|
||
'watchPositions': True,
|
||
'watchTicker': True,
|
||
'watchOHLCV': True,
|
||
'watchAll': True,
|
||
},
|
||
'urls': {
|
||
'api': {
|
||
'ws': 'ws://{hostname}',
|
||
},
|
||
},
|
||
'options': {
|
||
'tradesLimit': 1000,
|
||
'ordersLimit': 1000,
|
||
'OHLCVLimit': 1000,
|
||
'positionsLimit': 1000,
|
||
},
|
||
'streaming': {
|
||
'ping': True,
|
||
'maxPingPongMisses': 2,
|
||
},
|
||
})
|
||
|
||
async def watch_all(self, params={}):
|
||
"""监听所有数据(订单、持仓、钱包)"""
|
||
if not hasattr(self, 'token') or not self.token:
|
||
await self.get_token()
|
||
|
||
# 检查连接状态
|
||
try:
|
||
await self.check_connect()
|
||
except Exception as e:
|
||
if self.verbose:
|
||
print(f"连接检查失败,重新连接: {e}")
|
||
await self.get_token()
|
||
ws_url = self.implode_hostname(self.urls['api']['ws'])
|
||
url = ws_url + '/OnOrderUpdate?id=' + self.token
|
||
message_hash = 'all_data'
|
||
return await self.watch(url, message_hash)
|
||
|
||
async def watch_balance(self, params={}):
|
||
"""监听余额变化"""
|
||
if not hasattr(self, 'token') or not self.token:
|
||
await self.get_token()
|
||
ws_url = self.implode_hostname(self.urls['api']['ws'])
|
||
url = ws_url + '/OnOrderUpdate?id=' + self.token
|
||
message_hash = 'balance'
|
||
return await self.watch(url, message_hash)
|
||
|
||
async def watch_orders(self, symbol: Optional[str] = None, since: Optional[int] = None, limit: int = 1, params={}):
|
||
"""监听订单变化"""
|
||
if not hasattr(self, 'token') or not self.token:
|
||
await self.get_token()
|
||
ws_url = self.implode_hostname(self.urls['api']['ws'])
|
||
url = ws_url + '/OnOrderUpdate?id=' + self.token
|
||
message_hash = 'orders'
|
||
if symbol is not None:
|
||
symbol = self.symbol(symbol)
|
||
message_hash += ':' + symbol
|
||
|
||
orders = await self.watch(url, message_hash)
|
||
|
||
# 过滤订单
|
||
if symbol is not None:
|
||
orders = [order for order in orders if order['symbol'] == symbol]
|
||
if since is not None:
|
||
orders = [order for order in orders if order['timestamp'] >= since]
|
||
|
||
# 限制返回数量
|
||
if limit is not None:
|
||
orders = orders[-limit:]
|
||
|
||
return orders
|
||
|
||
async def watch_positions(self, symbols: Optional[List[str]] = None, since: Optional[int] = None, limit: Optional[int] = None, params={}):
|
||
"""监听持仓变化"""
|
||
if not hasattr(self, 'token') or not self.token:
|
||
await self.get_token()
|
||
ws_url = self.implode_hostname(self.urls['api']['ws'])
|
||
url = ws_url + '/OnOrderUpdate?id=' + self.token
|
||
message_hash = 'positions'
|
||
if symbols is not None:
|
||
symbols = [self.symbol(symbol) for symbol in symbols]
|
||
message_hash += ':' + ':'.join(symbols)
|
||
|
||
positions = await self.watch(url, message_hash)
|
||
|
||
# 过滤持仓
|
||
if symbols is not None:
|
||
positions = [position for position in positions if position['symbol'] in symbols]
|
||
if since is not None:
|
||
positions = [position for position in positions if position['timestamp'] >= since]
|
||
if limit is not None:
|
||
positions = positions[-limit:]
|
||
|
||
return positions
|
||
|
||
async def watch_ticker(self, symbol: str, params={}):
|
||
await self.load_markets()
|
||
market = self.market(symbol)
|
||
|
||
if not hasattr(self, 'token') or not self.token:
|
||
await self.get_token()
|
||
|
||
# WebSocket 监听地址
|
||
ws_url = self.implode_hostname(self.urls['api']['ws'])
|
||
url = ws_url + '/OnQuote?id=' + self.token
|
||
|
||
# 订阅参数
|
||
request = {
|
||
'id': self.token,
|
||
'symbol': market['id'],
|
||
}
|
||
|
||
# 消息哈希
|
||
message_hash = 'ticker:' + market['symbol']
|
||
|
||
# 订阅哈希(用于管理订阅状态)
|
||
subscribe_hash = 'ticker:' + market['id']
|
||
|
||
try:
|
||
# 检查是否已经订阅
|
||
if not hasattr(self, 'ticker_subscriptions'):
|
||
self.ticker_subscriptions = set()
|
||
|
||
# 如果尚未订阅,发送订阅请求
|
||
if subscribe_hash not in self.ticker_subscriptions:
|
||
subscription_result = await self.send_ticker_subscription(request, params)
|
||
|
||
if not subscription_result:
|
||
raise ExchangeError("行情订阅失败")
|
||
|
||
# 记录订阅状态
|
||
self.ticker_subscriptions.add(subscribe_hash)
|
||
|
||
if self.verbose:
|
||
print(f"✅ 行情订阅成功: {symbol}")
|
||
|
||
# 监听 WebSocket 数据
|
||
ticker = await self.watch(url, message_hash, subscribe_hash=subscribe_hash)
|
||
|
||
return ticker
|
||
|
||
except Exception as e:
|
||
if self.verbose:
|
||
print(f"❌ 行情监听失败: {e}")
|
||
raise e
|
||
|
||
async def watch_ohlcv(self, symbol: str, timeframe='1m', since: Optional[int] = None, limit: Optional[int] = None, params={}):
|
||
await self.load_markets()
|
||
market = self.market(symbol)
|
||
|
||
if not hasattr(self, 'token') or not self.token:
|
||
await self.get_token()
|
||
|
||
# WebSocket 监听地址
|
||
ws_url = self.implode_hostname(self.urls['api']['ws'])
|
||
url = ws_url + '/OnOhlc?id=' + self.token
|
||
|
||
# 订阅参数
|
||
request = {
|
||
'id': self.token,
|
||
'symbol': market['id'],
|
||
'timeframe': self.timeframes[timeframe],
|
||
}
|
||
|
||
# 消息哈希
|
||
message_hash = 'ohlcv:' + market['symbol'] + ':' + timeframe
|
||
|
||
# 订阅哈希(用于管理订阅状态)
|
||
subscribe_hash = 'ohlcv:' + market['id'] + ':' + str(self.timeframes[timeframe])
|
||
|
||
try:
|
||
# 检查是否已经订阅
|
||
if not hasattr(self, 'ohlcv_subscriptions'):
|
||
self.ohlcv_subscriptions = set()
|
||
|
||
# 如果尚未订阅,发送订阅请求
|
||
if subscribe_hash not in self.ohlcv_subscriptions:
|
||
subscription_result = await self.send_ohlcv_subscription(request, params)
|
||
|
||
if not subscription_result:
|
||
raise ExchangeError("K线订阅失败")
|
||
|
||
# 记录订阅状态
|
||
self.ohlcv_subscriptions.add(subscribe_hash)
|
||
|
||
if self.verbose:
|
||
print(f"✅ K线订阅成功: {symbol} {timeframe}")
|
||
|
||
# 监听 WebSocket 数据
|
||
ohlcv = await self.watch(url, message_hash, subscribe_hash=subscribe_hash)
|
||
|
||
return self.filter_by_since_limit(ohlcv, since, limit, 0, True)
|
||
|
||
except Exception as e:
|
||
if self.verbose:
|
||
print(f"❌ K线监听失败: {e}")
|
||
raise e
|
||
|
||
async def close_ticker(self, symbol: str, params={}):
|
||
"""取消行情订阅"""
|
||
await self.load_markets()
|
||
market = self.market(symbol)
|
||
|
||
if not hasattr(self, 'token') or not self.token:
|
||
await self.get_token()
|
||
|
||
# 取消订阅参数
|
||
request = {
|
||
'id': self.token,
|
||
'symbol': market['id'],
|
||
}
|
||
|
||
try:
|
||
# 发送取消订阅请求
|
||
result = await self.private_get('UnSubscribe', self.extend(request, params))
|
||
|
||
# 更新订阅状态
|
||
if hasattr(self, 'ticker_subscriptions'):
|
||
subscribe_hash = 'ticker:' + market['id']
|
||
if subscribe_hash in self.ticker_subscriptions:
|
||
self.ticker_subscriptions.remove(subscribe_hash)
|
||
|
||
if self.verbose:
|
||
print(f"✅ 取消行情订阅成功: {symbol}")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
if self.verbose:
|
||
print(f"❌ 取消行情订阅失败: {e}")
|
||
raise e
|
||
|
||
async def close_ohlcv(self, symbol: Optional[str] = None, timeframe: Optional[str] = None, params={}):
|
||
"""取消K线订阅"""
|
||
if not hasattr(self, 'token') or not self.token:
|
||
await self.get_token()
|
||
|
||
# 取消订阅参数
|
||
request = {
|
||
'id': self.token,
|
||
}
|
||
|
||
# 如果指定了交易对,添加到参数中
|
||
if symbol is not None:
|
||
await self.load_markets()
|
||
market = self.market(symbol)
|
||
request['symbol'] = market['id']
|
||
|
||
try:
|
||
# 发送取消订阅请求
|
||
result = await self.private_get('UnsubscribeOhlc', self.extend(request, params))
|
||
|
||
# 更新订阅状态
|
||
if hasattr(self, 'ohlcv_subscriptions'):
|
||
if symbol is not None:
|
||
# 取消指定交易对的所有时间帧订阅
|
||
if timeframe is not None:
|
||
# 取消指定交易对和指定时间帧的订阅
|
||
subscribe_hash = 'ohlcv:' + market['id'] + ':' + str(self.timeframes[timeframe])
|
||
if subscribe_hash in self.ohlcv_subscriptions:
|
||
self.ohlcv_subscriptions.remove(subscribe_hash)
|
||
else:
|
||
# 取消指定交易对的所有时间帧订阅
|
||
to_remove = [h for h in self.ohlcv_subscriptions if h.startswith('ohlcv:' + market['id'] + ':')]
|
||
for hash_to_remove in to_remove:
|
||
self.ohlcv_subscriptions.remove(hash_to_remove)
|
||
else:
|
||
# 取消所有K线订阅
|
||
self.ohlcv_subscriptions.clear()
|
||
|
||
if self.verbose:
|
||
if symbol is not None:
|
||
if timeframe is not None:
|
||
print(f"✅ 取消K线订阅成功: {symbol} {timeframe}")
|
||
else:
|
||
print(f"✅ 取消K线订阅成功: {symbol} (所有时间帧)")
|
||
else:
|
||
print(f"✅ 取消所有K线订阅成功")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
if self.verbose:
|
||
print(f"❌ 取消K线订阅失败: {e}")
|
||
raise e
|
||
|
||
async def close(self):
|
||
"""关闭所有连接和清理缓存"""
|
||
try:
|
||
# 取消所有订阅
|
||
if hasattr(self, 'ohlcv_subscriptions') and self.ohlcv_subscriptions:
|
||
await self.close_ohlcv()
|
||
|
||
if hasattr(self, 'ticker_subscriptions') and self.ticker_subscriptions:
|
||
# 取消所有ticker订阅
|
||
for subscribe_hash in list(self.ticker_subscriptions):
|
||
symbol = subscribe_hash.replace('ticker:', '')
|
||
try:
|
||
await self.close_ticker(symbol)
|
||
except Exception:
|
||
pass
|
||
|
||
# 清理缓存
|
||
self.orders_list = []
|
||
self.positions_list = []
|
||
self.ohlcv_cache = {}
|
||
self.ticker_cache = {}
|
||
|
||
# 断开连接
|
||
await self.disconnect()
|
||
|
||
except Exception as e:
|
||
if self.verbose:
|
||
print(f"关闭连接时出错: {e}")
|
||
|
||
await super().close()
|
||
|
||
def handle_message(self, client: Client, message):
|
||
"""处理WebSocket消息 - 根据类型分发"""
|
||
try:
|
||
# 检查错误
|
||
error_code = self.safe_string(message, 'errorCode')
|
||
if error_code is not None and error_code != '0':
|
||
self.handle_error_message(client, message)
|
||
return
|
||
|
||
# 解析消息类型
|
||
message_type = self.safe_string(message, 'type')
|
||
|
||
if message_type == 'OrderUpdate':
|
||
self.handle_order_update_message(client, message)
|
||
elif message_type == 'OpenedOrders':
|
||
self.handle_opened_orders_message(client, message)
|
||
elif message_type == 'Ohlc':
|
||
self.handle_ohlc_message(client, message)
|
||
elif message_type == 'Quote':
|
||
self.handle_ticker_message(client, message)
|
||
else:
|
||
if self.verbose:
|
||
print(f"未知消息类型: {message_type}")
|
||
|
||
except Exception as e:
|
||
if self.verbose:
|
||
print(f"处理WebSocket消息失败: {e}, message: {message}")
|
||
|
||
def handle_order_update_message(self, client: Client, message):
|
||
"""处理 OrderUpdate 类型消息(包含订单、持仓、余额)"""
|
||
try:
|
||
data = self.safe_value(message, 'data', {})
|
||
timestamp = self.safe_integer(message, 'timestampUTC')
|
||
|
||
# 1. 解析余额信息
|
||
balance_data = self.parse_ws_balance_from_data(data)
|
||
if balance_data:
|
||
balance_data['timestamp'] = timestamp
|
||
balance_data['datetime'] = self.iso8601(timestamp)
|
||
self.balance = balance_data
|
||
client.resolve(balance_data, 'balance')
|
||
else:
|
||
# 即使没有余额数据,也返回空余额
|
||
client.resolve(self.safe_balance({}), 'balance')
|
||
|
||
# 2. 解析订单信息(从 update.order)
|
||
update_data = self.safe_value(data, 'update', {})
|
||
order_data = self.safe_value(update_data, 'order')
|
||
if order_data:
|
||
order_data['update_type'] = self.safe_value(update_data, 'type', None) # 这个字段可以判断是开仓还是平仓
|
||
order = self.parse_ws_order(order_data)
|
||
if order:
|
||
# 使用简单的列表而不是 ArrayCacheBySymbolById
|
||
if not hasattr(self, 'orders_list'):
|
||
self.orders_list = []
|
||
# 更新或添加订单
|
||
self.update_or_add_order(order)
|
||
client.resolve(self.orders_list.copy(), 'orders')
|
||
else:
|
||
# 即使没有订单数据,也返回空列表
|
||
if not hasattr(self, 'orders_list'):
|
||
self.orders_list = []
|
||
client.resolve(self.orders_list.copy(), 'orders')
|
||
|
||
# 3. 解析持仓信息(从 openedOrders)
|
||
opened_orders = self.safe_value(data, 'openedOrders', [])
|
||
positions = self.parse_ws_positions_from_orders(opened_orders)
|
||
# 总是返回持仓列表,即使为空
|
||
if not hasattr(self, 'positions_list'):
|
||
self.positions_list = []
|
||
# 完全替换持仓列表
|
||
self.positions_list = positions
|
||
client.resolve(self.positions_list.copy(), 'positions')
|
||
|
||
# 4. 返回所有数据(用于 watch_all)
|
||
all_data = {
|
||
'balance': balance_data if balance_data else self.safe_balance({}),
|
||
'orders': self.orders_list.copy() if hasattr(self, 'orders_list') else [],
|
||
'positions': self.positions_list.copy() if hasattr(self, 'positions_list') else [],
|
||
'timestamp': timestamp,
|
||
}
|
||
client.resolve(all_data, 'all_data')
|
||
|
||
except Exception as e:
|
||
error = ExchangeError(f"解析OrderUpdate消息失败: {e}")
|
||
client.reject(error, 'balance')
|
||
client.reject(error, 'orders')
|
||
client.reject(error, 'positions')
|
||
client.reject(error, 'all_data')
|
||
|
||
def handle_opened_orders_message(self, client: Client, message):
|
||
"""处理 OpenedOrders 类型消息(只包含持仓)"""
|
||
try:
|
||
data = self.safe_value(message, 'data', [])
|
||
timestamp = self.safe_integer(message, 'timestampUTC')
|
||
|
||
# 解析持仓信息
|
||
positions = self.parse_ws_positions_from_orders(data)
|
||
# 总是返回持仓列表,即使为空
|
||
if not hasattr(self, 'positions_list'):
|
||
self.positions_list = []
|
||
# 完全替换持仓列表
|
||
self.positions_list = positions
|
||
client.resolve(self.positions_list.copy(), 'positions')
|
||
|
||
# 返回所有数据(持仓数据)
|
||
all_data = {
|
||
'balance': None,
|
||
'orders': [],
|
||
'positions': self.positions_list.copy() if hasattr(self, 'positions_list') else [],
|
||
'timestamp': timestamp,
|
||
}
|
||
client.resolve(all_data, 'all_data')
|
||
|
||
except Exception as e:
|
||
error = ExchangeError(f"解析OpenedOrders消息失败: {e}")
|
||
client.reject(error, 'positions')
|
||
client.reject(error, 'all_data')
|
||
|
||
def handle_ohlc_message(self, client: Client, message):
|
||
"""处理K线数据消息"""
|
||
try:
|
||
data = self.safe_value(message, 'data', {})
|
||
symbol = self.safe_string(data, 'symbol')
|
||
|
||
if symbol and len(symbol) >= 6:
|
||
base = symbol[:3]
|
||
quote = symbol[3:]
|
||
symbol = base + '/' + quote
|
||
|
||
# 获取时间帧
|
||
timeframe_value = self.safe_integer(data, 'timeframe')
|
||
timeframe = self.parse_timeframe_from_value(timeframe_value)
|
||
|
||
# 构建消息哈希
|
||
message_hash = 'ohlcv:' + symbol + ':' + timeframe
|
||
|
||
# 解析K线数据
|
||
ohlcv = self.parse_ws_ohlcv_message(data)
|
||
|
||
# 更新缓存
|
||
if not hasattr(self, 'ohlcv_cache'):
|
||
self.ohlcv_cache = {}
|
||
if symbol not in self.ohlcv_cache:
|
||
self.ohlcv_cache[symbol] = {}
|
||
|
||
stored = self.ohlcv_cache[symbol].get(timeframe)
|
||
if stored is None:
|
||
limit = self.safe_integer(self.options, 'OHLCVLimit', 1000)
|
||
# 使用简单的列表代替 ArrayCacheByTimestamp
|
||
stored = []
|
||
self.ohlcv_cache[symbol][timeframe] = stored
|
||
|
||
# 添加新的K线数据
|
||
stored.append(ohlcv)
|
||
|
||
# 限制缓存大小
|
||
if len(stored) > self.options['OHLCVLimit']:
|
||
stored = stored[-self.options['OHLCVLimit']:]
|
||
self.ohlcv_cache[symbol][timeframe] = stored
|
||
|
||
# 返回数据
|
||
client.resolve(stored, message_hash)
|
||
|
||
except Exception as e:
|
||
if self.verbose:
|
||
print(f"处理K线消息失败: {e}")
|
||
# 拒绝相关的promise
|
||
symbol = self.safe_string(message.get('data', {}), 'symbol')
|
||
if symbol and len(symbol) >= 6:
|
||
base = symbol[:3]
|
||
quote = symbol[3:]
|
||
symbol = base + '/' + quote
|
||
timeframe_value = self.safe_integer(message.get('data', {}), 'timeframe')
|
||
timeframe = self.parse_timeframe_from_value(timeframe_value)
|
||
message_hash = 'ohlcv:' + symbol + ':' + timeframe
|
||
client.reject(e, message_hash)
|
||
|
||
def handle_ticker_message(self, client: Client, message):
|
||
"""处理行情数据消息"""
|
||
try:
|
||
data = self.safe_value(message, 'data', {})
|
||
symbol = self.safe_string(data, 'symbol')
|
||
|
||
if symbol and len(symbol) >= 6:
|
||
base = symbol[:3]
|
||
quote = symbol[3:]
|
||
symbol = base + '/' + quote
|
||
|
||
# 构建消息哈希
|
||
message_hash = 'ticker:' + symbol
|
||
|
||
# 解析行情数据
|
||
ticker = self.parse_ws_ticker_message(data)
|
||
|
||
# 更新缓存
|
||
if not hasattr(self, 'ticker_cache'):
|
||
self.ticker_cache = {}
|
||
|
||
self.ticker_cache[symbol] = ticker
|
||
|
||
# 返回数据
|
||
client.resolve(ticker, message_hash)
|
||
|
||
except Exception as e:
|
||
if self.verbose:
|
||
print(f"处理行情消息失败: {e}")
|
||
# 拒绝相关的promise
|
||
symbol = self.safe_string(message.get('data', {}), 'symbol')
|
||
if symbol and len(symbol) >= 6:
|
||
base = symbol[:3]
|
||
quote = symbol[3:]
|
||
symbol = base + '/' + quote
|
||
message_hash = 'ticker:' + symbol
|
||
client.reject(e, message_hash)
|
||
|
||
def update_or_add_order(self, order):
|
||
"""更新或添加订单到列表"""
|
||
if not hasattr(self, 'orders_list'):
|
||
self.orders_list = []
|
||
|
||
order_id = order['id']
|
||
# 查找是否已存在相同ID的订单
|
||
existing_index = -1
|
||
for i, existing_order in enumerate(self.orders_list):
|
||
if existing_order['id'] == order_id:
|
||
existing_index = i
|
||
break
|
||
|
||
if existing_index >= 0:
|
||
# 更新现有订单
|
||
self.orders_list[existing_index] = order
|
||
else:
|
||
# 添加新订单
|
||
self.orders_list.append(order)
|
||
|
||
# 限制订单列表长度
|
||
if len(self.orders_list) > self.options['ordersLimit']:
|
||
self.orders_list = self.orders_list[-self.options['ordersLimit']:]
|
||
|
||
def parse_ws_balance_from_data(self, data):
|
||
"""从数据中解析余额信息"""
|
||
result = {
|
||
'timestamp': None,
|
||
'datetime': None,
|
||
}
|
||
|
||
currency = 'USDT'
|
||
|
||
balance = self.safe_number(data, 'balance', 0)
|
||
equity = self.safe_number(data, 'equity', 0)
|
||
margin = self.safe_number(data, 'margin', 0)
|
||
free_margin = self.safe_number(data, 'freeMargin', 0)
|
||
profit = self.safe_number(data, 'profit', 0)
|
||
|
||
result[currency] = {
|
||
'free': free_margin,
|
||
'used': margin,
|
||
'total': balance,
|
||
'equity': equity,
|
||
'profit': profit,
|
||
}
|
||
|
||
return self.safe_balance(result)
|
||
|
||
def parse_ws_order(self, order_data):
|
||
"""解析单个订单数据"""
|
||
if not order_data:
|
||
return None
|
||
# print("++++++",order_data)
|
||
try:
|
||
symbol = self.safe_string(order_data, 'symbol')
|
||
if symbol and len(symbol) >= 6:
|
||
base = symbol[:3]
|
||
quote = symbol[3:]
|
||
symbol = base + '/' + quote
|
||
|
||
# 确定订单状态
|
||
state = self.safe_string(order_data, 'state')
|
||
status = self.parse_order_status(state)
|
||
|
||
# 确定订单是否已关闭
|
||
close_time = self.safe_string(order_data, 'closeTime')
|
||
is_closed = close_time and close_time != "0001-01-01T00:00:00"
|
||
|
||
timestamp = self.parse8601(self.safe_string(order_data, 'openTime'))
|
||
if timestamp is None:
|
||
timestamp = self.milliseconds()
|
||
|
||
last_trade_timestamp = timestamp
|
||
if is_closed:
|
||
last_trade_timestamp = self.parse8601(close_time)
|
||
|
||
mt5_order_type = self.safe_string(order_data, 'update_type', None)
|
||
|
||
amount = self.safe_number(order_data, 'lots', 0)
|
||
filled = self.safe_number(order_data, 'closeLots', 0)
|
||
price = self.safe_number(order_data, 'openPrice')
|
||
side = self.parse_order_side(self.safe_string(order_data, 'orderType'))
|
||
|
||
if mt5_order_type == 'MarketOpen':
|
||
amount = self.safe_number(order_data, 'lots', 0)
|
||
filled = self.safe_number(order_data, 'lots', 0)
|
||
elif mt5_order_type == 'MarketClose':
|
||
amount = self.safe_number(self.safe_dict(order_data, 'dealInternalIn', {}), 'lots', 0)
|
||
filled = self.safe_number(order_data, 'closeLots', 0)
|
||
price = self.safe_number(order_data, 'closePrice')
|
||
if side == 'buy':
|
||
side = 'sell'
|
||
else:
|
||
side = 'buy'
|
||
remaining = max(amount - filled, 0) if amount is not None and filled is not None else None
|
||
|
||
return {
|
||
'id': self.safe_string(order_data, 'ticket'),
|
||
'clientOrderId': self.safe_string(order_data, 'comment'),
|
||
'datetime': self.iso8601(timestamp),
|
||
'timestamp': timestamp,
|
||
'lastTradeTimestamp': last_trade_timestamp,
|
||
'lastUpdateTimestamp': last_trade_timestamp,
|
||
'status': status,
|
||
'symbol': symbol,
|
||
'type': self.parse_order_type(self.safe_string(order_data, 'orderType')),
|
||
'timeInForce': None,
|
||
'postOnly': None,
|
||
'side': side,
|
||
'price': price,
|
||
'stopLossPrice': self.safe_number(order_data, 'stopLoss'),
|
||
'takeProfitPrice': self.safe_number(order_data, 'takeProfit'),
|
||
'reduceOnly': None,
|
||
'triggerPrice': None,
|
||
'amount': amount,
|
||
'filled': filled,
|
||
'remaining': remaining,
|
||
'cost': None,
|
||
'trades': None,
|
||
'fee': {
|
||
'cost': self.safe_number(order_data, 'commission', 0) + self.safe_number(order_data, 'fee', 0),
|
||
'currency': None,
|
||
},
|
||
'average': None,
|
||
'info': order_data,
|
||
}
|
||
except Exception as e:
|
||
if self.verbose:
|
||
print(f"解析订单数据失败: {e}")
|
||
return None
|
||
|
||
def parse_ws_positions_from_orders(self, orders_data):
|
||
"""从订单数据解析持仓信息"""
|
||
positions = []
|
||
|
||
if not orders_data:
|
||
return positions
|
||
|
||
if not isinstance(orders_data, list):
|
||
orders_data = [orders_data]
|
||
|
||
for order_data in orders_data:
|
||
try:
|
||
# 只有状态为 Filled 的订单才被认为是持仓
|
||
state = self.safe_string(order_data, 'state')
|
||
if state == 'Filled':
|
||
position = self.parse_ws_position(order_data)
|
||
if position:
|
||
positions.append(position)
|
||
|
||
except Exception as e:
|
||
if self.verbose:
|
||
print(f"解析持仓失败: {e}")
|
||
continue
|
||
|
||
return positions
|
||
|
||
def parse_ws_position(self, order_data):
|
||
"""解析单个持仓"""
|
||
symbol = self.safe_string(order_data, 'symbol')
|
||
if symbol and len(symbol) >= 6:
|
||
base = symbol[:3]
|
||
quote = symbol[3:]
|
||
symbol = base + '/' + quote
|
||
|
||
timestamp = self.parse8601(self.safe_string(order_data, 'openTime'))
|
||
|
||
contracts = self.safe_number(order_data, 'lots')
|
||
entry_price = self.safe_number(order_data, 'openPrice')
|
||
mark_price = self.safe_number(order_data, 'closePrice')
|
||
notional = contracts * entry_price if contracts and entry_price else None
|
||
|
||
return {
|
||
'id': self.safe_string(order_data, 'ticket'),
|
||
'symbol': symbol,
|
||
'timestamp': timestamp,
|
||
'datetime': self.iso8601(timestamp),
|
||
'side': self.parse_order_side(self.safe_string(order_data, 'orderType')),
|
||
'contracts': contracts,
|
||
'contractSize': self.safe_number(order_data, 'contractSize', 1.0),
|
||
'entryPrice': entry_price,
|
||
'markPrice': mark_price,
|
||
'notional': notional,
|
||
'leverage': 1,
|
||
'unrealizedPnl': self.safe_number(order_data, 'profit'),
|
||
'realizedPnl': 0,
|
||
'liquidationPrice': None,
|
||
'marginMode': 'cross',
|
||
'percentage': None,
|
||
'marginRatio': None,
|
||
'collateral': None,
|
||
'initialMargin': None,
|
||
'initialMarginPercentage': None,
|
||
'maintenanceMargin': None,
|
||
'maintenanceMarginPercentage': None,
|
||
'info': order_data,
|
||
}
|
||
|
||
def parse_ws_ohlcv_message(self, data):
|
||
"""解析WebSocket K线数据"""
|
||
timestamp = self.parse8601(self.safe_string(data, 'time'))
|
||
|
||
# 确保时间戳是毫秒
|
||
if timestamp and timestamp < 1000000000000:
|
||
timestamp *= 1000
|
||
|
||
return [
|
||
timestamp, # 时间戳
|
||
self.safe_number(data, 'open'), # 开盘价
|
||
self.safe_number(data, 'high'), # 最高价
|
||
self.safe_number(data, 'low'), # 最低价
|
||
self.safe_number(data, 'close'), # 收盘价
|
||
self.safe_number(data, 'volume', 0) # 成交量
|
||
]
|
||
|
||
def parse_ws_ticker_message(self, data):
|
||
"""解析WebSocket行情数据"""
|
||
symbol = self.safe_string(data, 'symbol')
|
||
if symbol and len(symbol) >= 6:
|
||
base = symbol[:3]
|
||
quote = symbol[3:]
|
||
symbol = base + '/' + quote
|
||
|
||
timestamp = self.parse8601(self.safe_string(data, 'time'))
|
||
|
||
bid = self.safe_number(data, 'bid')
|
||
ask = self.safe_number(data, 'ask')
|
||
last = self.safe_number(data, 'last')
|
||
|
||
# 如果没有 last 价格,使用中间价
|
||
if last is None and bid is not None and ask is not None:
|
||
last = (bid + ask) / 2
|
||
|
||
# 计算涨跌幅(需要历史数据,这里设为None)
|
||
change = None
|
||
percentage = None
|
||
|
||
return {
|
||
'symbol': symbol,
|
||
'timestamp': timestamp,
|
||
'datetime': self.iso8601(timestamp),
|
||
'high': None,
|
||
'low': None,
|
||
'bid': bid,
|
||
'bidVolume': None,
|
||
'ask': ask,
|
||
'askVolume': None,
|
||
'vwap': None,
|
||
'open': None,
|
||
'close': None,
|
||
'last': last,
|
||
'previousClose': None,
|
||
'change': change,
|
||
'percentage': percentage,
|
||
'average': None,
|
||
'baseVolume': self.safe_number(data, 'volume'),
|
||
'quoteVolume': None,
|
||
'info': data,
|
||
}
|
||
|
||
def parse_timeframe_from_value(self, timeframe_value):
|
||
"""将时间帧数值转换为字符串"""
|
||
timeframe_map = {
|
||
1: '1m',
|
||
5: '5m',
|
||
15: '15m',
|
||
30: '30m',
|
||
60: '1h',
|
||
240: '4h',
|
||
1440: '1d',
|
||
10080: '1w',
|
||
43200: '1M'
|
||
}
|
||
return self.safe_string(timeframe_map, timeframe_value, '1m')
|
||
|
||
async def send_ohlcv_subscription(self, request, params={}):
|
||
"""发送K线数据订阅请求"""
|
||
try:
|
||
# 方法1:尝试使用 private_get
|
||
response = await self.private_get('SubscribeOhlc', self.extend(request, params))
|
||
|
||
if self.verbose:
|
||
print(f"✅ K线订阅成功: {response}")
|
||
|
||
# 检查订阅是否成功
|
||
if response in [True, 'OK', 'SUCCESS']:
|
||
return True
|
||
else:
|
||
raise ExchangeError(f"订阅失败: {response}")
|
||
|
||
except Exception as e:
|
||
if self.verbose:
|
||
print(f"private_get 失败,尝试直接HTTP请求: {e}")
|
||
|
||
# 方法2:直接HTTP请求
|
||
return await self.send_direct_subscription('SubscribeOhlc', request, params)
|
||
|
||
async def send_ticker_subscription(self, request, params={}):
|
||
"""发送行情数据订阅请求"""
|
||
try:
|
||
# 使用 private_get 发送订阅
|
||
response = await self.private_get('Subscribe', self.extend(request, params))
|
||
|
||
if self.verbose:
|
||
print(f"✅ 行情订阅响应: {response}")
|
||
|
||
# 检查订阅是否成功
|
||
if response in [True, 'OK', 'SUCCESS']:
|
||
return True
|
||
else:
|
||
raise ExchangeError(f"行情订阅失败: {response}")
|
||
|
||
except Exception as e:
|
||
if self.verbose:
|
||
print(f"private_get 失败,尝试直接HTTP请求: {e}")
|
||
|
||
# 直接HTTP请求
|
||
return await self.send_direct_subscription('Subscribe', request, params)
|
||
|
||
async def send_direct_subscription(self, endpoint, request, params={}):
|
||
"""直接发送订阅HTTP请求"""
|
||
import aiohttp
|
||
|
||
# 构建完整的URL
|
||
base_url = self.urls['api']['private']
|
||
query_params = self.extend(request, params)
|
||
query_string = urllib.parse.urlencode(query_params)
|
||
|
||
url = f"{base_url}/{endpoint}?{query_string}"
|
||
|
||
if self.verbose:
|
||
print(f"🔧 直接订阅URL: {url}")
|
||
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(url) as response:
|
||
if response.status == 200:
|
||
result = await response.text()
|
||
if self.verbose:
|
||
print(f"✅ 直接订阅成功: {result}")
|
||
return self.parse_json(result)
|
||
else:
|
||
error_text = await response.text()
|
||
raise ExchangeError(f"订阅请求失败 {response.status}: {error_text}")
|
||
|
||
except Exception as e:
|
||
raise ExchangeError(f"订阅请求错误: {e}")
|
||
|
||
def handle_error_message(self, client: Client, message):
|
||
"""处理错误消息"""
|
||
error_code = self.safe_string(message, 'errorCode')
|
||
error_message = self.safe_string(message, 'message', 'Unknown error')
|
||
|
||
error = ExchangeError(f"MT5 WebSocket error {error_code}: {error_message}")
|
||
|
||
try:
|
||
# 拒绝固定消息哈希
|
||
client.reject(error, 'balance')
|
||
client.reject(error, 'orders')
|
||
client.reject(error, 'positions')
|
||
client.reject(error, 'all_data')
|
||
|
||
# 安全地拒绝所有K线消息哈希
|
||
if hasattr(client, 'futures') and client.futures:
|
||
ohlcv_hashes = [hash for hash in client.futures.keys() if isinstance(hash, str) and hash.startswith('ohlcv:')]
|
||
for ohlcv_hash in ohlcv_hashes:
|
||
try:
|
||
client.reject(error, ohlcv_hash)
|
||
except Exception as e:
|
||
if self.verbose:
|
||
print(f"拒绝K线消息哈希失败 {ohlcv_hash}: {e}")
|
||
# 拒绝所有ticker消息哈希
|
||
if hasattr(client, 'futures') and client.futures:
|
||
ticker_hashes = [hash for hash in client.futures.keys() if isinstance(hash, str) and hash.startswith('ticker:')]
|
||
for ticker_hash in ticker_hashes:
|
||
try:
|
||
client.reject(error, ticker_hash)
|
||
except Exception:
|
||
pass
|
||
|
||
except Exception as e:
|
||
if self.verbose:
|
||
print(f"错误处理失败: {e}") |