Files
ccxt_with_mt5/ccxt/pro/xt.py
lz_db 0fab423a18 add
2025-11-16 12:31:03 +08:00

1174 lines
52 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
# PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN:
# https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code
import ccxt.async_support
from ccxt.async_support.base.ws.cache import ArrayCache, ArrayCacheBySymbolById, ArrayCacheBySymbolBySide, ArrayCacheByTimestamp
from ccxt.base.types import Any, Balances, Int, Market, Order, OrderBook, Position, Str, Strings, Ticker, Tickers, Trade
from ccxt.async_support.base.ws.client import Client
from typing import List
class xt(ccxt.async_support.xt):
def describe(self) -> Any:
return self.deep_extend(super(xt, self).describe(), {
'has': {
'ws': True,
'watchOHLCV': True,
'watchOrderBook': True,
'watchTicker': True,
'watchTickers': True,
'watchTrades': True,
'watchTradesForSymbols': False,
'watchBalance': True,
'watchOrders': True,
'watchMyTrades': True,
'watchPositions': True,
},
'urls': {
'api': {
'ws': {
'spot': 'wss://stream.xt.com',
'contract': 'wss://fstream.xt.com/ws',
},
},
},
'options': {
'tradesLimit': 1000,
'ordersLimit': 1000,
'OHLCVLimit': 1000,
'watchTicker': {
'method': 'ticker', # agg_ticker(contract only)
},
'watchTickers': {
'method': 'tickers', # agg_tickers(contract only)
},
'watchPositions': {
'type': 'swap',
'fetchPositionsSnapshot': True,
'awaitPositionsSnapshot': True,
},
},
'streaming': {
'keepAlive': 20000,
'ping': self.ping,
},
'token': None,
})
async def get_listen_key(self, isContract: bool):
"""
@ignore
required for private endpoints
:param str isContract: True for contract trades
https://doc.xt.com/#websocket_privategetToken
https://doc.xt.com/#futures_user_websocket_v2base
:returns str: listen key / access token
"""
self.check_required_credentials()
tradeType = 'contract' if isContract else 'spot'
url = self.urls['api']['ws'][tradeType]
if not isContract:
url = url + '/private'
client = self.client(url)
token = self.safe_string(client.subscriptions, 'token')
if token is None:
if isContract:
response = await self.privateLinearGetFutureUserV1UserListenKey()
#
# {
# returnCode: '0',
# msgInfo: 'success',
# error: null,
# result: '3BC1D71D6CF96DA3458FC35B05B633351684511731128'
# }
#
client.subscriptions['token'] = self.safe_string(response, 'result')
else:
response = await self.privateSpotPostWsToken()
#
# {
# "rc": 0,
# "mc": "SUCCESS",
# "ma": [],
# "result": {
# "token": "eyJhbqGciOiJSUzI1NiJ9.eyJhY2NvdW50SWQiOiIyMTQ2Mjg1MzIyNTU5Iiwic3ViIjoibGh4dDRfMDAwMUBzbmFwbWFpbC5jYyIsInNjb3BlIjoiYXV0aCIsImlzcyI6Inh0LmNvbSIsImxhc3RBdXRoVGltZSI6MTY2MzgxMzY5MDk1NSwic2lnblR5cGUiOiJBSyIsInVzZXJOYW1lIjoibGh4dDRfMDAwMUBzbmFwbWFpbC5jYyIsImV4cCI6MTY2NjQwNTY5MCwiZGV2aWNlIjoidW5rbm93biIsInVzZXJJZCI6MjE0NjI4NTMyMjU1OX0.h3zJlJBQrK2x1HvUxsKivnn6PlSrSDXXXJ7WqHAYSrN2CG5XPTKc4zKnTVoYFbg6fTS0u1fT8wH7wXqcLWXX71vm0YuP8PCvdPAkUIq4-HyzltbPr5uDYd0UByx0FPQtq1exvsQGe7evXQuDXx3SEJXxEqUbq_DNlXPTq_JyScI",
# "refreshToken": "eyJhbGciOiqJSUzI1NiJ9.eyJhY2NvdW50SWQiOiIyMTQ2Mjg1MzIyNTU5Iiwic3ViIjoibGh4dDRfMDAwMUBzbmFwbWFpbC5jYyIsInNjb3BlIjoicmVmcmVzaCIsImlzcyI6Inh0LmNvbSIsImxhc3RBdXRoVGltZSI6MTY2MzgxMzY5MDk1NSwic2lnblR5cGUiOiJBSyIsInVzZXJOYW1lIjoibGh4dDRfMDAwMUBzbmFwbWFpbC5jYyIsImV4cCI6MTY2NjQwNTY5MCwiZGV2aWNlIjoidW5rbm93biIsInVzZXJJZCI6MjE0NjI4NTMyMjU1OX0.Fs3YVm5YrEOzzYOSQYETSmt9iwxUHBovh2u73liv1hLUec683WGfktA_s28gMk4NCpZKFeQWFii623FvdfNoteXR0v1yZ2519uNvNndtuZICDdv3BQ4wzW1wIHZa1skxFfqvsDnGdXpjqu9UFSbtHwxprxeYfnxChNk4ssei430"
# }
# }
#
result = self.safe_dict(response, 'result')
client.subscriptions['token'] = self.safe_string(result, 'accessToken')
return client.subscriptions['token']
def get_cache_index(self, orderbook, cache):
# return the first index of the cache that can be applied to the orderbook or -1 if not possible
nonce = self.safe_integer(orderbook, 'nonce')
firstDelta = self.safe_value(cache, 0)
firstDeltaNonce = self.safe_integer_2(firstDelta, 'i', 'u')
if nonce < firstDeltaNonce - 1:
return -1
for i in range(0, len(cache)):
delta = cache[i]
deltaNonce = self.safe_integer_2(delta, 'i', 'u')
if deltaNonce >= nonce:
return i
return len(cache)
def handle_delta(self, orderbook, delta):
orderbook['nonce'] = self.safe_integer_2(delta, 'i', 'u')
obAsks = self.safe_list(delta, 'a', [])
obBids = self.safe_list(delta, 'b', [])
bids = orderbook['bids']
asks = orderbook['asks']
for i in range(0, len(obBids)):
bid = obBids[i]
price = self.safe_number(bid, 0)
quantity = self.safe_number(bid, 1)
bids.store(price, quantity)
for i in range(0, len(obAsks)):
ask = obAsks[i]
price = self.safe_number(ask, 0)
quantity = self.safe_number(ask, 1)
asks.store(price, quantity)
# self.handleBidAsks(storedBids, bids)
# self.handleBidAsks(storedAsks, asks)
async def subscribe(self, name: str, access: str, methodName: str, market: Market = None, symbols: List[str] = None, params={}):
"""
@ignore
Connects to a websocket channel
https://doc.xt.com/#websocket_privaterequestFormat
https://doc.xt.com/#futures_market_websocket_v2base
:param str name: name of the channel
:param str access: public or private
:param str methodName: the name of the CCXT class method
:param dict [market]: CCXT market
:param str[] [symbols]: unified market symbols
:param dict params: extra parameters specific to the xt api
:returns dict: data from the websocket stream
"""
privateAccess = access == 'private'
type = None
type, params = self.handle_market_type_and_params(methodName, market, params)
isContract = (type != 'spot')
subscribe = {
'method': 'SUBSCRIBE' if isContract else 'subscribe',
'id': self.number_to_string(self.milliseconds()) + name, # call back ID
}
if privateAccess:
if not isContract:
subscribe['params'] = [name]
subscribe['listenKey'] = await self.get_listen_key(isContract)
else:
listenKey = await self.get_listen_key(isContract)
param = name + '@' + listenKey
subscribe['params'] = [param]
else:
subscribe['params'] = [name]
tradeType = 'contract' if isContract else 'spot'
messageHash = name + '::' + tradeType
if symbols is not None:
messageHash = messageHash + '::' + ','.join(symbols)
request = self.extend(subscribe, params)
tail = access
if isContract:
tail = 'user' if privateAccess else 'market'
url = self.urls['api']['ws'][tradeType] + '/' + tail
return await self.watch(url, messageHash, request, messageHash)
async def watch_ticker(self, symbol: str, params={}) -> Ticker:
"""
watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market
https://doc.xt.com/#websocket_publictickerRealTime
https://doc.xt.com/#futures_market_websocket_v2tickerRealTime
https://doc.xt.com/#futures_market_websocket_v2aggTickerRealTime
:param str symbol: unified symbol of the market to fetch the ticker for
:param dict params: extra parameters specific to the xt api endpoint
:param str [params.method]: 'agg_ticker'(contract only) or 'ticker', default = 'ticker' - the endpoint that will be streamed
:returns dict: a `ticker structure <https://docs.ccxt.com/en/latest/manual.html#ticker-structure>`
"""
await self.load_markets()
market = self.market(symbol)
options = self.safe_dict(self.options, 'watchTicker')
defaultMethod = self.safe_string(options, 'method', 'ticker')
method = self.safe_string(params, 'method', defaultMethod)
name = method + '@' + market['id']
return await self.subscribe(name, 'public', 'watchTicker', market, None, params)
async def watch_tickers(self, symbols: Strings = None, params={}) -> Tickers:
"""
watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market
https://doc.xt.com/#websocket_publicallTicker
https://doc.xt.com/#futures_market_websocket_v2allTicker
https://doc.xt.com/#futures_market_websocket_v2allAggTicker
:param str [symbols]: unified market symbols
:param dict params: extra parameters specific to the xt api endpoint
:param str [params.method]: 'agg_tickers'(contract only) or 'tickers', default = 'tickers' - the endpoint that will be streamed
:returns dict: a `ticker structure <https://docs.ccxt.com/en/latest/manual.html#ticker-structure>`
"""
await self.load_markets()
options = self.safe_dict(self.options, 'watchTickers')
defaultMethod = self.safe_string(options, 'method', 'tickers')
name = self.safe_string(params, 'method', defaultMethod)
market = None
if symbols is not None:
market = self.market(symbols[0])
tickers = await self.subscribe(name, 'public', 'watchTickers', market, symbols, params)
if self.newUpdates:
return tickers
return self.filter_by_array(self.tickers, 'symbol', symbols)
async def watch_ohlcv(self, symbol: str, timeframe: str = '1m', since: Int = None, limit: Int = None, params={}) -> List[list]:
"""
watches historical candlestick data containing the open, high, low, and close price, and the volume of a market
https://doc.xt.com/#websocket_publicsymbolKline
https://doc.xt.com/#futures_market_websocket_v2symbolKline
:param str symbol: unified symbol of the market to fetch OHLCV data for
:param str timeframe: 1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 6h, 8h, 12h, 1d, 3d, 1w, or 1M
:param int [since]: not used by xt watchOHLCV
:param int [limit]: not used by xt watchOHLCV
:param dict params: extra parameters specific to the xt api endpoint
:returns int[][]: A list of candles ordered, open, high, low, close, volume
"""
await self.load_markets()
market = self.market(symbol)
name = 'kline@' + market['id'] + ',' + timeframe
ohlcv = await self.subscribe(name, 'public', 'watchOHLCV', market, None, params)
if self.newUpdates:
limit = ohlcv.getLimit(symbol, limit)
return self.filter_by_since_limit(ohlcv, since, limit, 0, True)
async def watch_trades(self, symbol: str, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
"""
get the list of most recent trades for a particular symbol
https://doc.xt.com/#websocket_publicdealRecord
https://doc.xt.com/#futures_market_websocket_v2dealRecord
:param str symbol: unified symbol of the market to fetch trades for
:param int [since]: timestamp in ms of the earliest trade to fetch
:param int [limit]: the maximum amount of trades to fetch
:param dict params: extra parameters specific to the xt api endpoint
:returns dict[]: a list of `trade structures <https://docs.ccxt.com/en/latest/manual.html?#public-trades>`
"""
await self.load_markets()
market = self.market(symbol)
name = 'trade@' + market['id']
trades = await self.subscribe(name, 'public', 'watchTrades', market, None, params)
if self.newUpdates:
limit = trades.getLimit(symbol, limit)
return self.filter_by_since_limit(trades, since, limit, 'timestamp')
async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook:
"""
watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
https://doc.xt.com/#websocket_publiclimitDepth
https://doc.xt.com/#websocket_publicincreDepth
https://doc.xt.com/#futures_market_websocket_v2limitDepth
https://doc.xt.com/#futures_market_websocket_v2increDepth
:param str symbol: unified symbol of the market to fetch the order book for
:param int [limit]: not used by xt watchOrderBook
:param dict params: extra parameters specific to the xt api endpoint
:param int [params.levels]: 5, 10, 20, or 50
:returns dict: A dictionary of `order book structures <https://docs.ccxt.com/en/latest/manual.html#order-book-structure>` indexed by market symbols
"""
await self.load_markets()
market = self.market(symbol)
levels = self.safe_string(params, 'levels')
params = self.omit(params, 'levels')
name = 'depth_update@' + market['id']
if levels is not None:
name = 'depth@' + market['id'] + ',' + levels
orderbook = await self.subscribe(name, 'public', 'watchOrderBook', market, None, params)
return orderbook.limit()
async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]:
"""
watches information on multiple orders made by the user
https://doc.xt.com/#websocket_privateorderChange
https://doc.xt.com/#futures_user_websocket_v2order
:param str [symbol]: unified market symbol
:param int [since]: not used by xt watchOrders
:param int [limit]: the maximum number of orders to return
:param dict params: extra parameters specific to the xt api endpoint
:returns dict[]: a list of `order structures <https://docs.ccxt.com/en/latest/manual.html#order-structure>`
"""
await self.load_markets()
name = 'order'
market = None
if symbol is not None:
market = self.market(symbol)
orders = await self.subscribe(name, 'private', 'watchOrders', market, None, params)
if self.newUpdates:
limit = orders.getLimit(symbol, limit)
return self.filter_by_since_limit(orders, since, limit, 'timestamp')
async def watch_my_trades(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
"""
watches information on multiple trades made by the user
https://doc.xt.com/#websocket_privateorderDeal
https://doc.xt.com/#futures_user_websocket_v2trade
:param str symbol: unified market symbol of the market orders were made in
:param int [since]: the earliest time in ms to fetch orders for
:param int [limit]: the maximum number of orde structures to retrieve
:param dict params: extra parameters specific to the kucoin api endpoint
:returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=trade-structure>`
"""
await self.load_markets()
name = 'trade'
market = None
if symbol is not None:
market = self.market(symbol)
trades = await self.subscribe(name, 'private', 'watchMyTrades', market, None, params)
if self.newUpdates:
limit = trades.getLimit(symbol, limit)
return self.filter_by_since_limit(trades, since, limit, 'timestamp')
async def watch_balance(self, params={}) -> Balances:
"""
watches information on multiple orders made by the user
https://doc.xt.com/#websocket_privatebalanceChange
https://doc.xt.com/#futures_user_websocket_v2balance
:param dict params: extra parameters specific to the xt api endpoint
:returns dict[]: a list of `balance structures <https://docs.ccxt.com/#/?id=balance-structure>`
"""
await self.load_markets()
name = 'balance'
return await self.subscribe(name, 'private', 'watchBalance', None, None, params)
async def watch_positions(self, symbols: Strings = None, since: Int = None, limit: Int = None, params={}) -> List[Position]:
"""
https://doc.xt.com/#futures_user_websocket_v2position
watch all open positions
:param str[]|None symbols: list of unified market symbols
:param number [since]: since timestamp
:param number [limit]: limit
:param dict params: extra parameters specific to the exchange API endpoint
:returns dict[]: a list of `position structure <https://docs.ccxt.com/en/latest/manual.html#position-structure>`
"""
await self.load_markets()
url = self.urls['api']['ws']['contract'] + '/' + 'user'
client = self.client(url)
self.set_positions_cache(client)
fetchPositionsSnapshot = self.handle_option('watchPositions', 'fetchPositionsSnapshot', True)
awaitPositionsSnapshot = self.handle_option('watchPositions', 'awaitPositionsSnapshot', True)
cache = self.positions
if fetchPositionsSnapshot and awaitPositionsSnapshot and self.is_empty(cache):
snapshot = await client.future('fetchPositionsSnapshot')
return self.filter_by_symbols_since_limit(snapshot, symbols, since, limit, True)
name = 'position'
newPositions = await self.subscribe(name, 'private', 'watchPositions', None, None, params)
if self.newUpdates:
return newPositions
return self.filter_by_symbols_since_limit(cache, symbols, since, limit, True)
def set_positions_cache(self, client: Client):
if self.positions is None:
self.positions = ArrayCacheBySymbolBySide()
fetchPositionsSnapshot = self.handle_option('watchPositions', 'fetchPositionsSnapshot')
if fetchPositionsSnapshot:
messageHash = 'fetchPositionsSnapshot'
if not (messageHash in client.futures):
client.future(messageHash)
self.spawn(self.load_positions_snapshot, client, messageHash)
async def load_positions_snapshot(self, client, messageHash):
positions = await self.fetch_positions(None)
self.positions = ArrayCacheBySymbolBySide()
cache = self.positions
for i in range(0, len(positions)):
position = positions[i]
contracts = self.safe_number(position, 'contracts', 0)
if contracts > 0:
cache.append(position)
# don't remove the future from the .futures cache
future = client.futures[messageHash]
future.resolve(cache)
client.resolve(cache, 'position::contract')
def handle_position(self, client, message):
#
# {
# topic: 'position',
# event: 'position',
# data: {
# accountId: 245296,
# accountType: 0,
# symbol: 'eth_usdt',
# contractType: 'PERPETUAL',
# positionType: 'CROSSED',
# positionSide: 'LONG',
# positionSize: '1',
# closeOrderSize: '0',
# availableCloseSize: '1',
# realizedProfit: '-0.0121',
# entryPrice: '2637.87',
# openOrderSize: '1',
# isolatedMargin: '2.63787',
# openOrderMarginFrozen: '2.78832014',
# underlyingType: 'U_BASED',
# leverage: 10,
# welfareAccount: False,
# profitFixedLatest: {},
# closeProfit: '0.0000',
# totalFee: '-0.0158',
# totalFundFee: '0.0037',
# markPrice: '2690.96'
# }
# }
#
if self.positions is None:
self.positions = ArrayCacheBySymbolBySide()
cache = self.positions
data = self.safe_dict(message, 'data', {})
position = self.parse_position(data)
cache.append(position)
messageHashes = self.find_message_hashes(client, 'position::contract')
for i in range(0, len(messageHashes)):
messageHash = messageHashes[i]
parts = messageHash.split('::')
symbolsString = parts[1]
symbols = symbolsString.split(',')
positions = self.filter_by_array([position], 'symbol', symbols, False)
if not self.is_empty(positions):
client.resolve(positions, messageHash)
client.resolve([position], 'position::contract')
def handle_ticker(self, client: Client, message: dict):
#
# spot
#
# {
# topic: 'ticker',
# event: 'ticker@btc_usdt',
# data: {
# s: 'btc_usdt', # symbol
# t: 1683501935877, # time(Last transaction time)
# cv: '-82.67', # priceChangeValue(24 hour price change)
# cr: '-0.0028', # priceChangeRate 24-hour price change(percentage)
# o: '28823.87', # open price
# c: '28741.20', # close price
# h: '29137.64', # highest price
# l: '28660.93', # lowest price
# q: '6372.601573', # quantity
# v: '184086075.2772391' # volume
# }
# }
#
# contract
#
# {
# "topic": "ticker",
# "event": "ticker@btc_usdt",
# "data": {
# "s": "btc_index", # trading pair
# "o": "49000", # opening price
# "c": "50000", # closing price
# "h": "0.1", # highest price
# "l": "0.1", # lowest price
# "a": "0.1", # volume
# "v": "0.1", # turnover
# "ch": "0.21", # quote change
# "t": 123124124 # timestamp
# }
# }
#
# agg_ticker(contract)
#
# {
# "topic": "agg_ticker",
# "event": "agg_ticker@btc_usdt",
# "data": {
# "s": "btc_index", # trading pair
# "o": "49000", # opening price
# "c": "50000", # closing price
# "h": "0.1", # highest price
# "l": "0.1", # lowest price
# "a": "0.1", # volume
# "v": "0.1", # turnover
# "ch": "0.21", # quote change
# "i": "0.21" , # index price
# "m": "0.21", # mark price
# "bp": "0.21", # bid price
# "ap": "0.21" , # ask price
# "t": 123124124 # timestamp
# }
# }
#
data = self.safe_dict(message, 'data')
marketId = self.safe_string(data, 's')
if marketId is not None:
cv = self.safe_string(data, 'cv')
isSpot = cv is not None
ticker = self.parse_ticker(data)
symbol = ticker['symbol']
self.tickers[symbol] = ticker
event = self.safe_string(message, 'event')
messageHashTail = 'spot' if isSpot else 'contract'
messageHash = event + '::' + messageHashTail
client.resolve(ticker, messageHash)
return message
def handle_tickers(self, client: Client, message: dict):
#
# spot
#
# {
# topic: 'tickers',
# event: 'tickers',
# data: [
# {
# s: 'elon_usdt',
# t: 1683502958381,
# cv: '-0.0000000125',
# cr: '-0.0495',
# o: '0.0000002522',
# c: '0.0000002397',
# h: '0.0000002690',
# l: '0.0000002371',
# q: '3803783034.0000000000',
# v: '955.3260820022'
# },
# ...
# ]
# }
#
# contract
#
# {
# "topic": "tickers",
# "event": "tickers",
# "data": [
# {
# "s": "btc_index", # trading pair
# "o": "49000", # opening price
# "c": "50000", # closing price
# "h": "0.1", # highest price
# "l": "0.1", # lowest price
# "a": "0.1", # volume
# "v": "0.1", # turnover
# "ch": "0.21", # quote change
# "t": 123124124 # timestamp
# }
# ]
# }
#
# agg_ticker(contract)
#
# {
# "topic": "agg_tickers",
# "event": "agg_tickers",
# "data": [
# {
# "s": "btc_index", # trading pair
# "o": "49000", # opening price
# "c": "50000", # closing price
# "h": "0.1", # highest price
# "l": "0.1", # lowest price
# "a": "0.1", # volume
# "v": "0.1", # turnover
# "ch": "0.21", # quote change
# "i": "0.21" , # index price
# "m": "0.21", # mark price
# "bp": "0.21", # bid price
# "ap": "0.21" , # ask price
# "t": 123124124 # timestamp
# }
# ]
# }
#
data = self.safe_list(message, 'data', [])
firstTicker = self.safe_dict(data, 0)
spotTest = self.safe_string_2(firstTicker, 'cv', 'aq')
tradeType = 'spot' if (spotTest is not None) else 'contract'
newTickers = []
for i in range(0, len(data)):
tickerData = data[i]
ticker = self.parse_ticker(tickerData)
symbol = ticker['symbol']
self.tickers[symbol] = ticker
newTickers.append(ticker)
messageHashStart = self.safe_string(message, 'topic') + '::' + tradeType
messageHashes = self.find_message_hashes(client, messageHashStart + '::')
for i in range(0, len(messageHashes)):
messageHash = messageHashes[i]
parts = messageHash.split('::')
symbolsString = parts[2]
symbols = symbolsString.split(',')
tickers = self.filter_by_array(newTickers, 'symbol', symbols)
tickersSymbols = list(tickers.keys())
numTickers = len(tickersSymbols)
if numTickers > 0:
client.resolve(tickers, messageHash)
client.resolve(self.tickers, messageHashStart)
return message
def handle_ohlcv(self, client: Client, message: dict):
#
# spot
#
# {
# "topic": "kline",
# "event": "kline@btc_usdt,5m",
# "data": {
# "s": "btc_usdt", # symbol
# "t": 1656043200000, # time
# "i": "5m", # interval
# "o": "44000", # open price
# "c": "50000", # close price
# "h": "52000", # highest price
# "l": "36000", # lowest price
# "q": "34.2", # qty(quantity)
# "v": "230000" # volume
# }
# }
#
# contract
#
# {
# "topic": "kline",
# "event": "kline@btc_usdt,5m",
# "data": {
# "s": "btc_index", # trading pair
# "o": "49000", # opening price
# "c": "50000", # closing price
# "h": "0.1", # highest price
# "l": "0.1", # lowest price
# "a": "0.1", # volume
# "v": "0.1", # turnover
# "ch": "0.21", # quote change
# "t": 123124124 # timestamp
# }
# }
#
data = self.safe_dict(message, 'data', {})
marketId = self.safe_string(data, 's')
if marketId is not None:
timeframe = self.safe_string(data, 'i')
tradeType = 'spot' if ('q' in data) else 'contract'
market = self.safe_market(marketId, None, None, tradeType)
symbol = market['symbol']
parsed = self.parse_ohlcv(data, market)
self.ohlcvs[symbol] = self.safe_dict(self.ohlcvs, symbol, {})
stored = self.safe_value(self.ohlcvs[symbol], timeframe)
if stored is None:
limit = self.safe_integer(self.options, 'OHLCVLimit', 1000)
stored = ArrayCacheByTimestamp(limit)
self.ohlcvs[symbol][timeframe] = stored
stored.append(parsed)
event = self.safe_string(message, 'event')
messageHash = event + '::' + tradeType
client.resolve(stored, messageHash)
return message
def handle_trade(self, client: Client, message: dict):
#
# spot
#
# {
# topic: 'trade',
# event: 'trade@btc_usdt',
# data: {
# s: 'btc_usdt',
# i: '228825383103928709',
# t: 1684258222702,
# p: '27003.65',
# q: '0.000796',
# b: True
# }
# }
#
# contract
#
# {
# "topic": "trade",
# "event": "trade@btc_usdt",
# "data": {
# "s": "btc_index", # trading pair
# "p": "50000", # price
# "a": "0.1" # Quantity
# "m": "BID" # Deal side BID:Buy ASK:Sell
# "t": 123124124 # timestamp
# }
# }
#
data = self.safe_dict(message, 'data')
marketId = self.safe_string_lower(data, 's')
if marketId is not None:
trade = self.parse_trade(data)
i = self.safe_string(data, 'i')
tradeType = 'spot' if (i is not None) else 'contract'
market = self.safe_market(marketId, None, None, tradeType)
symbol = market['symbol']
event = self.safe_string(message, 'event')
tradesArray = self.safe_value(self.trades, symbol)
if tradesArray is None:
tradesLimit = self.safe_integer(self.options, 'tradesLimit', 1000)
tradesArray = ArrayCache(tradesLimit)
self.trades[symbol] = tradesArray
tradesArray.append(trade)
messageHash = event + '::' + tradeType
client.resolve(tradesArray, messageHash)
return message
def handle_order_book(self, client: Client, message: dict):
#
# spot
#
# {
# "topic": "depth",
# "event": "depth@btc_usdt,20",
# "data": {
# "s": "btc_usdt", # symbol
# "fi": 1681433733351, # firstUpdateId = previous lastUpdateId + 1
# "i": 1681433733371, # updateId
# "a": [ # asks(sell order)
# [ # [0]price, [1]quantity
# "34000", # price
# "1.2" # quantity
# ],
# [
# "34001",
# "2.3"
# ]
# ],
# "b": [ # bids(buy order)
# [
# "32000",
# "0.2"
# ],
# [
# "31000",
# "0.5"
# ]
# ]
# }
# }
#
# contract
#
# {
# "topic": "depth",
# "event": "depth@btc_usdt,20",
# "data": {
# s: "btc_usdt",
# pu: "548111455664",
# fu: "548111455665",
# u: "548111455667",
# a: [
# [
# "26841.5",
# "50210",
# ],
# ],
# b: [
# [
# "26841",
# "67075",
# ],
# ],
# t: 1684530667083,
# }
# }
#
data = self.safe_dict(message, 'data')
marketId = self.safe_string(data, 's')
if marketId is not None:
event = self.safe_string(message, 'event')
splitEvent = event.split(',')
event = self.safe_string(splitEvent, 0)
tradeType = 'contract' if ('fu' in data) else 'spot'
market = self.safe_market(marketId, None, None, tradeType)
symbol = market['symbol']
obAsks = self.safe_list(data, 'a')
obBids = self.safe_list(data, 'b')
messageHash = event + '::' + tradeType
if not (symbol in self.orderbooks):
subscription = self.safe_dict(client.subscriptions, messageHash, {})
limit = self.safe_integer(subscription, 'limit')
self.orderbooks[symbol] = self.order_book({}, limit)
orderbook = self.orderbooks[symbol]
nonce = self.safe_integer(orderbook, 'nonce')
if nonce is None:
cacheLength = len(orderbook.cache)
snapshotDelay = self.handle_option('watchOrderBook', 'snapshotDelay', 25)
if cacheLength == snapshotDelay:
self.spawn(self.load_order_book, client, messageHash, symbol)
orderbook.cache.append(data)
return
if obAsks is not None:
asks = orderbook['asks']
for i in range(0, len(obAsks)):
ask = obAsks[i]
price = self.safe_number(ask, 0)
quantity = self.safe_number(ask, 1)
asks.store(price, quantity)
if obBids is not None:
bids = orderbook['bids']
for i in range(0, len(obBids)):
bid = obBids[i]
price = self.safe_number(bid, 0)
quantity = self.safe_number(bid, 1)
bids.store(price, quantity)
timestamp = self.safe_integer(data, 't')
orderbook['nonce'] = self.safe_integer_2(data, 'i', 'u')
orderbook['timestamp'] = timestamp
orderbook['datetime'] = self.iso8601(timestamp)
orderbook['symbol'] = symbol
client.resolve(orderbook, messageHash)
def parse_ws_order_trade(self, trade: dict, market: Market = None):
#
# {
# "s": "btc_usdt", # symbol
# "t": 1656043204763, # time happened time
# "i": "6216559590087220004", # orderId,
# "ci": "test123", # clientOrderId
# "st": "PARTIALLY_FILLED", # state
# "sd": "BUY", # side BUY/SELL
# "eq": "2", # executedQty executed quantity
# "ap": "30000", # avg price
# "f": "0.002" # fee
# }
#
# contract
#
# {
# "symbol": "btc_usdt", # Trading pair
# "orderId": "1234", # Order Id
# "origQty": "34244", # Original Quantity
# "avgPrice": "123", # Quantity
# "price": "1111", # Average price
# "executedQty": "34244", # Volume(Cont)
# "orderSide": "BUY", # BUY, SELL
# "positionSide": "LONG", # LONG, SHORT
# "marginFrozen": "123", # Occupied margin
# "sourceType": "default", # DEFAULT:normal order,ENTRUST:plan commission,PROFIR:Take Profit and Stop Loss
# "sourceId" : "1231231", # Triggering conditions ID
# "state": "", # state:NEWNew order(unfilled);PARTIALLY_FILLED:Partial deal;PARTIALLY_CANCELED:Partial revocation;FILLED:Filled;CANCELED:Cancled;REJECTED:Order failed;EXPIREDExpired
# "createTime": 1731231231, # CreateTime
# "clientOrderId": "204788317630342726"
# }
#
marketId = self.safe_string(trade, 's')
tradeType = 'contract' if ('symbol' in trade) else 'spot'
market = self.safe_market(marketId, market, None, tradeType)
timestamp = self.safe_string(trade, 't')
return self.safe_trade({
'info': trade,
'id': None,
'timestamp': timestamp,
'datetime': self.iso8601(timestamp),
'symbol': market['symbol'],
'order': self.safe_string(trade, 'i', 'orderId'),
'type': self.parse_order_status(self.safe_string(trade, 'st', 'state')),
'side': self.safe_string_lower(trade, 'sd', 'orderSide'),
'takerOrMaker': None,
'price': self.safe_number(trade, 'price'),
'amount': self.safe_string(trade, 'origQty'),
'cost': None,
'fee': {
'currency': None,
'cost': self.safe_number(trade, 'f'),
'rate': None,
},
}, market)
def parse_ws_order(self, order: dict, market: Market = None):
#
# spot
#
# {
# "s": "btc_usdt", # symbol
# "bc": "btc", # base currency
# "qc": "usdt", # quotation currency
# "t": 1656043204763, # happened time
# "ct": 1656043204663, # create time
# "i": "6216559590087220004", # order id,
# "ci": "test123", # client order id
# "st": "PARTIALLY_FILLED", # state NEW/PARTIALLY_FILLED/FILLED/CANCELED/REJECTED/EXPIRED
# "sd": "BUY", # side BUY/SELL
# "tp": "LIMIT", # type LIMIT/MARKET
# "oq": "4" # original quantity
# "oqq": 48000, # original quotation quantity
# "eq": "2", # executed quantity
# "lq": "2", # remaining quantity
# "p": "4000", # price
# "ap": "30000", # avg price
# "f":"0.002" # fee
# }
#
# contract
#
# {
# "symbol": "btc_usdt", # Trading pair
# "orderId": "1234", # Order Id
# "origQty": "34244", # Original Quantity
# "avgPrice": "123", # Quantity
# "price": "1111", # Average price
# "executedQty": "34244", # Volume(Cont)
# "orderSide": "BUY", # BUY, SELL
# "positionSide": "LONG", # LONG, SHORT
# "marginFrozen": "123", # Occupied margin
# "sourceType": "default", # DEFAULT:normal order,ENTRUST:plan commission,PROFIR:Take Profit and Stop Loss
# "sourceId" : "1231231", # Triggering conditions ID
# "state": "", # state:NEWNew order(unfilled);PARTIALLY_FILLED:Partial deal;PARTIALLY_CANCELED:Partial revocation;FILLED:Filled;CANCELED:Cancled;REJECTED:Order failed;EXPIREDExpired
# "createTime": 1731231231, # CreateTime
# "clientOrderId": "204788317630342726"
# }
#
marketId = self.safe_string_2(order, 's', 'symbol')
tradeType = 'contract' if ('symbol' in order) else 'spot'
market = self.safe_market(marketId, market, None, tradeType)
timestamp = self.safe_integer_2(order, 'ct', 'createTime')
return self.safe_order({
'info': order,
'id': self.safe_string_2(order, 'i', 'orderId'),
'clientOrderId': self.safe_string_2(order, 'ci', 'clientOrderId'),
'timestamp': timestamp,
'datetime': self.iso8601(timestamp),
'lastTradeTimestamp': None,
'symbol': market['symbol'],
'type': market['type'],
'timeInForce': None,
'postOnly': None,
'side': self.safe_string_lower_2(order, 'sd', 'orderSide'),
'price': self.safe_number_2(order, 'p', 'price'),
'stopPrice': None,
'stopLoss': None,
'takeProfit': None,
'amount': self.safe_string_2(order, 'oq', 'origQty'),
'filled': self.safe_string_2(order, 'eq', 'executedQty'),
'remaining': self.safe_string(order, 'lq'),
'cost': None,
'average': self.safe_string_2(order, 'ap', 'avgPrice'),
'status': self.parse_order_status(self.safe_string(order, 'st', 'state')),
'fee': {
'currency': None,
'cost': self.safe_number(order, 'f'),
},
'trades': None,
}, market)
def handle_order(self, client: Client, message: dict):
#
# spot
#
# {
# "topic": "order",
# "event": "order",
# "data": {
# "s": "btc_usdt", # symbol
# "t": 1656043204763, # time happened time
# "i": "6216559590087220004", # orderId,
# "ci": "test123", # clientOrderId
# "st": "PARTIALLY_FILLED", # state
# "sd": "BUY", # side BUY/SELL
# "eq": "2", # executedQty executed quantity
# "ap": "30000", # avg price
# "f": "0.002" # fee
# }
# }
#
# contract
#
# {
# "topic": "order",
# "event": "order@123456",
# "data": {
# "symbol": "btc_usdt", # Trading pair
# "orderId": "1234", # Order Id
# "origQty": "34244", # Original Quantity
# "avgPrice": "123", # Quantity
# "price": "1111", # Average price
# "executedQty": "34244", # Volume(Cont)
# "orderSide": "BUY", # BUY, SELL
# "positionSide": "LONG", # LONG, SHORT
# "marginFrozen": "123", # Occupied margin
# "sourceType": "default", # DEFAULT:normal order,ENTRUST:plan commission,PROFIR:Take Profit and Stop Loss
# "sourceId" : "1231231", # Triggering conditions ID
# "state": "", # state:NEWNew order(unfilled);PARTIALLY_FILLED:Partial deal;PARTIALLY_CANCELED:Partial revocation;FILLED:Filled;CANCELED:Cancled;REJECTED:Order failed;EXPIREDExpired
# "createTime": 1731231231, # CreateTime
# "clientOrderId": "204788317630342726"
# }
# }
#
orders = self.orders
if orders is None:
limit = self.safe_integer(self.options, 'ordersLimit')
orders = ArrayCacheBySymbolById(limit)
self.orders = orders
order = self.safe_dict(message, 'data', {})
marketId = self.safe_string_2(order, 's', 'symbol')
if marketId is not None:
tradeType = 'contract' if ('symbol' in order) else 'spot'
market = self.safe_market(marketId, None, None, tradeType)
parsed = self.parse_ws_order(order, market)
orders.append(parsed)
client.resolve(orders, 'order::' + tradeType)
return message
def handle_balance(self, client: Client, message: dict):
#
# spot
#
# {
# topic: 'balance',
# event: 'balance',
# data: {
# a: 3513677381884,
# t: 1684250056775,
# c: 'usdt',
# b: '7.71000000',
# f: '0.00000000',
# z: 'SPOT'
# }
# }
#
# contract
#
# {
# "topic": "balance",
# "event": "balance@123456",
# "data": {
# "coin": "usdt",
# "underlyingType": 1, # 1:Coin-M,2:USDT-M
# "walletBalance": "123", # Balance
# "openOrderMarginFrozen": "123", # Frozen order
# "isolatedMargin": "213", # Isolated Margin
# "crossedMargin": "0" # Crossed Margin
# "availableBalance": '2.256114450000000000',
# "coupon": '0',
# "bonus": '0'
# }
# }
#
data = self.safe_dict(message, 'data', {})
currencyId = self.safe_string_2(data, 'c', 'coin')
code = self.safe_currency_code(currencyId)
account = self.account()
account['free'] = self.safe_string(data, 'availableBalance')
account['used'] = self.safe_string(data, 'f')
account['total'] = self.safe_string_2(data, 'b', 'walletBalance')
self.balance[code] = account
self.balance = self.safe_balance(self.balance)
tradeType = 'contract' if ('coin' in data) else 'spot'
client.resolve(self.balance, 'balance::' + tradeType)
def handle_my_trades(self, client: Client, message: dict):
#
# spot
#
# {
# "topic": "trade",
# "event": "trade",
# "data": {
# "s": "btc_usdt", # symbol
# "t": 1656043204763, # time
# "i": "6316559590087251233", # tradeId
# "oi": "6216559590087220004", # orderId
# "p": "30000", # trade price
# "q": "3", # qty quantity
# "v": "90000" # volume trade amount
# }
# }
#
# contract
#
# {
# "topic": "trade",
# "event": "trade@123456",
# "data": {
# "symbol": 'btc_usdt',
# "orderSide": 'SELL',
# "positionSide": 'LONG',
# "orderId": '231485367663419328',
# "price": '27152.7',
# "quantity": '33',
# "marginUnfrozen": '2.85318000',
# "timestamp": 1684892412565
# }
# }
#
data = self.safe_dict(message, 'data', {})
stored = self.myTrades
if stored is None:
limit = self.safe_integer(self.options, 'tradesLimit', 1000)
stored = ArrayCacheBySymbolById(limit)
self.myTrades = stored
parsedTrade = self.parse_trade(data)
market = self.market(parsedTrade['symbol'])
stored.append(parsedTrade)
tradeType = 'contract' if market['contract'] else 'spot'
client.resolve(stored, 'trade::' + tradeType)
def handle_message(self, client: Client, message):
event = self.safe_string(message, 'event')
if event == 'pong':
client.onPong()
elif event is not None:
topic = self.safe_string(message, 'topic')
methods = {
'kline': self.handle_ohlcv,
'depth': self.handle_order_book,
'depth_update': self.handle_order_book,
'ticker': self.handle_ticker,
'agg_ticker': self.handle_ticker,
'tickers': self.handle_tickers,
'agg_tickers': self.handle_tickers,
'balance': self.handle_balance,
'order': self.handle_order,
'position': self.handle_position,
}
method = self.safe_value(methods, topic)
if topic == 'trade':
data = self.safe_dict(message, 'data')
if ('oi' in data) or ('orderId' in data):
method = self.handle_my_trades
else:
method = self.handle_trade
if method is not None:
method(client, message)
def ping(self, client: Client):
client.lastPong = self.milliseconds()
return 'ping'
def handle_error_message(self, client: Client, message: dict):
#
# {
# "id": "123",
# "code": 401,
# "msg": "token expire"
# }
#
msg = self.safe_string(message, 'msg')
if (msg == 'invalid_listen_key') or (msg == 'token expire'):
client.subscriptions['token'] = None
self.get_listen_key(True)
return
client.reject(message)