# -*- coding: utf-8 -*- # PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN: # https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code import ccxt.async_support from ccxt.async_support.base.ws.cache import ArrayCache, ArrayCacheBySymbolById, ArrayCacheBySymbolBySide, ArrayCacheByTimestamp from ccxt.base.types import Any, Balances, Int, Market, Order, OrderBook, Position, Str, Strings, Ticker, Tickers, Trade from ccxt.async_support.base.ws.client import Client from typing import List class xt(ccxt.async_support.xt): def describe(self) -> Any: return self.deep_extend(super(xt, self).describe(), { 'has': { 'ws': True, 'watchOHLCV': True, 'watchOrderBook': True, 'watchTicker': True, 'watchTickers': True, 'watchTrades': True, 'watchTradesForSymbols': False, 'watchBalance': True, 'watchOrders': True, 'watchMyTrades': True, 'watchPositions': True, }, 'urls': { 'api': { 'ws': { 'spot': 'wss://stream.xt.com', 'contract': 'wss://fstream.xt.com/ws', }, }, }, 'options': { 'tradesLimit': 1000, 'ordersLimit': 1000, 'OHLCVLimit': 1000, 'watchTicker': { 'method': 'ticker', # agg_ticker(contract only) }, 'watchTickers': { 'method': 'tickers', # agg_tickers(contract only) }, 'watchPositions': { 'type': 'swap', 'fetchPositionsSnapshot': True, 'awaitPositionsSnapshot': True, }, }, 'streaming': { 'keepAlive': 20000, 'ping': self.ping, }, 'token': None, }) async def get_listen_key(self, isContract: bool): """ @ignore required for private endpoints :param str isContract: True for contract trades https://doc.xt.com/#websocket_privategetToken https://doc.xt.com/#futures_user_websocket_v2base :returns str: listen key / access token """ self.check_required_credentials() tradeType = 'contract' if isContract else 'spot' url = self.urls['api']['ws'][tradeType] if not isContract: url = url + '/private' client = self.client(url) token = self.safe_string(client.subscriptions, 'token') if token is None: if isContract: response = await self.privateLinearGetFutureUserV1UserListenKey() # # { # returnCode: '0', # msgInfo: 'success', # error: null, # result: '3BC1D71D6CF96DA3458FC35B05B633351684511731128' # } # client.subscriptions['token'] = self.safe_string(response, 'result') else: response = await self.privateSpotPostWsToken() # # { # "rc": 0, # "mc": "SUCCESS", # "ma": [], # "result": { # "token": "eyJhbqGciOiJSUzI1NiJ9.eyJhY2NvdW50SWQiOiIyMTQ2Mjg1MzIyNTU5Iiwic3ViIjoibGh4dDRfMDAwMUBzbmFwbWFpbC5jYyIsInNjb3BlIjoiYXV0aCIsImlzcyI6Inh0LmNvbSIsImxhc3RBdXRoVGltZSI6MTY2MzgxMzY5MDk1NSwic2lnblR5cGUiOiJBSyIsInVzZXJOYW1lIjoibGh4dDRfMDAwMUBzbmFwbWFpbC5jYyIsImV4cCI6MTY2NjQwNTY5MCwiZGV2aWNlIjoidW5rbm93biIsInVzZXJJZCI6MjE0NjI4NTMyMjU1OX0.h3zJlJBQrK2x1HvUxsKivnn6PlSrSDXXXJ7WqHAYSrN2CG5XPTKc4zKnTVoYFbg6fTS0u1fT8wH7wXqcLWXX71vm0YuP8PCvdPAkUIq4-HyzltbPr5uDYd0UByx0FPQtq1exvsQGe7evXQuDXx3SEJXxEqUbq_DNlXPTq_JyScI", # "refreshToken": "eyJhbGciOiqJSUzI1NiJ9.eyJhY2NvdW50SWQiOiIyMTQ2Mjg1MzIyNTU5Iiwic3ViIjoibGh4dDRfMDAwMUBzbmFwbWFpbC5jYyIsInNjb3BlIjoicmVmcmVzaCIsImlzcyI6Inh0LmNvbSIsImxhc3RBdXRoVGltZSI6MTY2MzgxMzY5MDk1NSwic2lnblR5cGUiOiJBSyIsInVzZXJOYW1lIjoibGh4dDRfMDAwMUBzbmFwbWFpbC5jYyIsImV4cCI6MTY2NjQwNTY5MCwiZGV2aWNlIjoidW5rbm93biIsInVzZXJJZCI6MjE0NjI4NTMyMjU1OX0.Fs3YVm5YrEOzzYOSQYETSmt9iwxUHBovh2u73liv1hLUec683WGfktA_s28gMk4NCpZKFeQWFii623FvdfNoteXR0v1yZ2519uNvNndtuZICDdv3BQ4wzW1wIHZa1skxFfqvsDnGdXpjqu9UFSbtHwxprxeYfnxChNk4ssei430" # } # } # result = self.safe_dict(response, 'result') client.subscriptions['token'] = self.safe_string(result, 'accessToken') return client.subscriptions['token'] def get_cache_index(self, orderbook, cache): # return the first index of the cache that can be applied to the orderbook or -1 if not possible nonce = self.safe_integer(orderbook, 'nonce') firstDelta = self.safe_value(cache, 0) firstDeltaNonce = self.safe_integer_2(firstDelta, 'i', 'u') if nonce < firstDeltaNonce - 1: return -1 for i in range(0, len(cache)): delta = cache[i] deltaNonce = self.safe_integer_2(delta, 'i', 'u') if deltaNonce >= nonce: return i return len(cache) def handle_delta(self, orderbook, delta): orderbook['nonce'] = self.safe_integer_2(delta, 'i', 'u') obAsks = self.safe_list(delta, 'a', []) obBids = self.safe_list(delta, 'b', []) bids = orderbook['bids'] asks = orderbook['asks'] for i in range(0, len(obBids)): bid = obBids[i] price = self.safe_number(bid, 0) quantity = self.safe_number(bid, 1) bids.store(price, quantity) for i in range(0, len(obAsks)): ask = obAsks[i] price = self.safe_number(ask, 0) quantity = self.safe_number(ask, 1) asks.store(price, quantity) # self.handleBidAsks(storedBids, bids) # self.handleBidAsks(storedAsks, asks) async def subscribe(self, name: str, access: str, methodName: str, market: Market = None, symbols: List[str] = None, params={}): """ @ignore Connects to a websocket channel https://doc.xt.com/#websocket_privaterequestFormat https://doc.xt.com/#futures_market_websocket_v2base :param str name: name of the channel :param str access: public or private :param str methodName: the name of the CCXT class method :param dict [market]: CCXT market :param str[] [symbols]: unified market symbols :param dict params: extra parameters specific to the xt api :returns dict: data from the websocket stream """ privateAccess = access == 'private' type = None type, params = self.handle_market_type_and_params(methodName, market, params) isContract = (type != 'spot') subscribe = { 'method': 'SUBSCRIBE' if isContract else 'subscribe', 'id': self.number_to_string(self.milliseconds()) + name, # call back ID } if privateAccess: if not isContract: subscribe['params'] = [name] subscribe['listenKey'] = await self.get_listen_key(isContract) else: listenKey = await self.get_listen_key(isContract) param = name + '@' + listenKey subscribe['params'] = [param] else: subscribe['params'] = [name] tradeType = 'contract' if isContract else 'spot' messageHash = name + '::' + tradeType if symbols is not None: messageHash = messageHash + '::' + ','.join(symbols) request = self.extend(subscribe, params) tail = access if isContract: tail = 'user' if privateAccess else 'market' url = self.urls['api']['ws'][tradeType] + '/' + tail return await self.watch(url, messageHash, request, messageHash) async def watch_ticker(self, symbol: str, params={}) -> Ticker: """ watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market https://doc.xt.com/#websocket_publictickerRealTime https://doc.xt.com/#futures_market_websocket_v2tickerRealTime https://doc.xt.com/#futures_market_websocket_v2aggTickerRealTime :param str symbol: unified symbol of the market to fetch the ticker for :param dict params: extra parameters specific to the xt api endpoint :param str [params.method]: 'agg_ticker'(contract only) or 'ticker', default = 'ticker' - the endpoint that will be streamed :returns dict: a `ticker structure ` """ await self.load_markets() market = self.market(symbol) options = self.safe_dict(self.options, 'watchTicker') defaultMethod = self.safe_string(options, 'method', 'ticker') method = self.safe_string(params, 'method', defaultMethod) name = method + '@' + market['id'] return await self.subscribe(name, 'public', 'watchTicker', market, None, params) async def watch_tickers(self, symbols: Strings = None, params={}) -> Tickers: """ watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market https://doc.xt.com/#websocket_publicallTicker https://doc.xt.com/#futures_market_websocket_v2allTicker https://doc.xt.com/#futures_market_websocket_v2allAggTicker :param str [symbols]: unified market symbols :param dict params: extra parameters specific to the xt api endpoint :param str [params.method]: 'agg_tickers'(contract only) or 'tickers', default = 'tickers' - the endpoint that will be streamed :returns dict: a `ticker structure ` """ await self.load_markets() options = self.safe_dict(self.options, 'watchTickers') defaultMethod = self.safe_string(options, 'method', 'tickers') name = self.safe_string(params, 'method', defaultMethod) market = None if symbols is not None: market = self.market(symbols[0]) tickers = await self.subscribe(name, 'public', 'watchTickers', market, symbols, params) if self.newUpdates: return tickers return self.filter_by_array(self.tickers, 'symbol', symbols) async def watch_ohlcv(self, symbol: str, timeframe: str = '1m', since: Int = None, limit: Int = None, params={}) -> List[list]: """ watches historical candlestick data containing the open, high, low, and close price, and the volume of a market https://doc.xt.com/#websocket_publicsymbolKline https://doc.xt.com/#futures_market_websocket_v2symbolKline :param str symbol: unified symbol of the market to fetch OHLCV data for :param str timeframe: 1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 6h, 8h, 12h, 1d, 3d, 1w, or 1M :param int [since]: not used by xt watchOHLCV :param int [limit]: not used by xt watchOHLCV :param dict params: extra parameters specific to the xt api endpoint :returns int[][]: A list of candles ordered, open, high, low, close, volume """ await self.load_markets() market = self.market(symbol) name = 'kline@' + market['id'] + ',' + timeframe ohlcv = await self.subscribe(name, 'public', 'watchOHLCV', market, None, params) if self.newUpdates: limit = ohlcv.getLimit(symbol, limit) return self.filter_by_since_limit(ohlcv, since, limit, 0, True) async def watch_trades(self, symbol: str, since: Int = None, limit: Int = None, params={}) -> List[Trade]: """ get the list of most recent trades for a particular symbol https://doc.xt.com/#websocket_publicdealRecord https://doc.xt.com/#futures_market_websocket_v2dealRecord :param str symbol: unified symbol of the market to fetch trades for :param int [since]: timestamp in ms of the earliest trade to fetch :param int [limit]: the maximum amount of trades to fetch :param dict params: extra parameters specific to the xt api endpoint :returns dict[]: a list of `trade structures ` """ await self.load_markets() market = self.market(symbol) name = 'trade@' + market['id'] trades = await self.subscribe(name, 'public', 'watchTrades', market, None, params) if self.newUpdates: limit = trades.getLimit(symbol, limit) return self.filter_by_since_limit(trades, since, limit, 'timestamp') async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook: """ watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data https://doc.xt.com/#websocket_publiclimitDepth https://doc.xt.com/#websocket_publicincreDepth https://doc.xt.com/#futures_market_websocket_v2limitDepth https://doc.xt.com/#futures_market_websocket_v2increDepth :param str symbol: unified symbol of the market to fetch the order book for :param int [limit]: not used by xt watchOrderBook :param dict params: extra parameters specific to the xt api endpoint :param int [params.levels]: 5, 10, 20, or 50 :returns dict: A dictionary of `order book structures ` indexed by market symbols """ await self.load_markets() market = self.market(symbol) levels = self.safe_string(params, 'levels') params = self.omit(params, 'levels') name = 'depth_update@' + market['id'] if levels is not None: name = 'depth@' + market['id'] + ',' + levels orderbook = await self.subscribe(name, 'public', 'watchOrderBook', market, None, params) return orderbook.limit() async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]: """ watches information on multiple orders made by the user https://doc.xt.com/#websocket_privateorderChange https://doc.xt.com/#futures_user_websocket_v2order :param str [symbol]: unified market symbol :param int [since]: not used by xt watchOrders :param int [limit]: the maximum number of orders to return :param dict params: extra parameters specific to the xt api endpoint :returns dict[]: a list of `order structures ` """ await self.load_markets() name = 'order' market = None if symbol is not None: market = self.market(symbol) orders = await self.subscribe(name, 'private', 'watchOrders', market, None, params) if self.newUpdates: limit = orders.getLimit(symbol, limit) return self.filter_by_since_limit(orders, since, limit, 'timestamp') async def watch_my_trades(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Trade]: """ watches information on multiple trades made by the user https://doc.xt.com/#websocket_privateorderDeal https://doc.xt.com/#futures_user_websocket_v2trade :param str symbol: unified market symbol of the market orders were made in :param int [since]: the earliest time in ms to fetch orders for :param int [limit]: the maximum number of orde structures to retrieve :param dict params: extra parameters specific to the kucoin api endpoint :returns dict[]: a list of `trade structures ` """ await self.load_markets() name = 'trade' market = None if symbol is not None: market = self.market(symbol) trades = await self.subscribe(name, 'private', 'watchMyTrades', market, None, params) if self.newUpdates: limit = trades.getLimit(symbol, limit) return self.filter_by_since_limit(trades, since, limit, 'timestamp') async def watch_balance(self, params={}) -> Balances: """ watches information on multiple orders made by the user https://doc.xt.com/#websocket_privatebalanceChange https://doc.xt.com/#futures_user_websocket_v2balance :param dict params: extra parameters specific to the xt api endpoint :returns dict[]: a list of `balance structures ` """ await self.load_markets() name = 'balance' return await self.subscribe(name, 'private', 'watchBalance', None, None, params) async def watch_positions(self, symbols: Strings = None, since: Int = None, limit: Int = None, params={}) -> List[Position]: """ https://doc.xt.com/#futures_user_websocket_v2position watch all open positions :param str[]|None symbols: list of unified market symbols :param number [since]: since timestamp :param number [limit]: limit :param dict params: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `position structure ` """ await self.load_markets() url = self.urls['api']['ws']['contract'] + '/' + 'user' client = self.client(url) self.set_positions_cache(client) fetchPositionsSnapshot = self.handle_option('watchPositions', 'fetchPositionsSnapshot', True) awaitPositionsSnapshot = self.handle_option('watchPositions', 'awaitPositionsSnapshot', True) cache = self.positions if fetchPositionsSnapshot and awaitPositionsSnapshot and self.is_empty(cache): snapshot = await client.future('fetchPositionsSnapshot') return self.filter_by_symbols_since_limit(snapshot, symbols, since, limit, True) name = 'position' newPositions = await self.subscribe(name, 'private', 'watchPositions', None, None, params) if self.newUpdates: return newPositions return self.filter_by_symbols_since_limit(cache, symbols, since, limit, True) def set_positions_cache(self, client: Client): if self.positions is None: self.positions = ArrayCacheBySymbolBySide() fetchPositionsSnapshot = self.handle_option('watchPositions', 'fetchPositionsSnapshot') if fetchPositionsSnapshot: messageHash = 'fetchPositionsSnapshot' if not (messageHash in client.futures): client.future(messageHash) self.spawn(self.load_positions_snapshot, client, messageHash) async def load_positions_snapshot(self, client, messageHash): positions = await self.fetch_positions(None) self.positions = ArrayCacheBySymbolBySide() cache = self.positions for i in range(0, len(positions)): position = positions[i] contracts = self.safe_number(position, 'contracts', 0) if contracts > 0: cache.append(position) # don't remove the future from the .futures cache future = client.futures[messageHash] future.resolve(cache) client.resolve(cache, 'position::contract') def handle_position(self, client, message): # # { # topic: 'position', # event: 'position', # data: { # accountId: 245296, # accountType: 0, # symbol: 'eth_usdt', # contractType: 'PERPETUAL', # positionType: 'CROSSED', # positionSide: 'LONG', # positionSize: '1', # closeOrderSize: '0', # availableCloseSize: '1', # realizedProfit: '-0.0121', # entryPrice: '2637.87', # openOrderSize: '1', # isolatedMargin: '2.63787', # openOrderMarginFrozen: '2.78832014', # underlyingType: 'U_BASED', # leverage: 10, # welfareAccount: False, # profitFixedLatest: {}, # closeProfit: '0.0000', # totalFee: '-0.0158', # totalFundFee: '0.0037', # markPrice: '2690.96' # } # } # if self.positions is None: self.positions = ArrayCacheBySymbolBySide() cache = self.positions data = self.safe_dict(message, 'data', {}) position = self.parse_position(data) cache.append(position) messageHashes = self.find_message_hashes(client, 'position::contract') for i in range(0, len(messageHashes)): messageHash = messageHashes[i] parts = messageHash.split('::') symbolsString = parts[1] symbols = symbolsString.split(',') positions = self.filter_by_array([position], 'symbol', symbols, False) if not self.is_empty(positions): client.resolve(positions, messageHash) client.resolve([position], 'position::contract') def handle_ticker(self, client: Client, message: dict): # # spot # # { # topic: 'ticker', # event: 'ticker@btc_usdt', # data: { # s: 'btc_usdt', # symbol # t: 1683501935877, # time(Last transaction time) # cv: '-82.67', # priceChangeValue(24 hour price change) # cr: '-0.0028', # priceChangeRate 24-hour price change(percentage) # o: '28823.87', # open price # c: '28741.20', # close price # h: '29137.64', # highest price # l: '28660.93', # lowest price # q: '6372.601573', # quantity # v: '184086075.2772391' # volume # } # } # # contract # # { # "topic": "ticker", # "event": "ticker@btc_usdt", # "data": { # "s": "btc_index", # trading pair # "o": "49000", # opening price # "c": "50000", # closing price # "h": "0.1", # highest price # "l": "0.1", # lowest price # "a": "0.1", # volume # "v": "0.1", # turnover # "ch": "0.21", # quote change # "t": 123124124 # timestamp # } # } # # agg_ticker(contract) # # { # "topic": "agg_ticker", # "event": "agg_ticker@btc_usdt", # "data": { # "s": "btc_index", # trading pair # "o": "49000", # opening price # "c": "50000", # closing price # "h": "0.1", # highest price # "l": "0.1", # lowest price # "a": "0.1", # volume # "v": "0.1", # turnover # "ch": "0.21", # quote change # "i": "0.21" , # index price # "m": "0.21", # mark price # "bp": "0.21", # bid price # "ap": "0.21" , # ask price # "t": 123124124 # timestamp # } # } # data = self.safe_dict(message, 'data') marketId = self.safe_string(data, 's') if marketId is not None: cv = self.safe_string(data, 'cv') isSpot = cv is not None ticker = self.parse_ticker(data) symbol = ticker['symbol'] self.tickers[symbol] = ticker event = self.safe_string(message, 'event') messageHashTail = 'spot' if isSpot else 'contract' messageHash = event + '::' + messageHashTail client.resolve(ticker, messageHash) return message def handle_tickers(self, client: Client, message: dict): # # spot # # { # topic: 'tickers', # event: 'tickers', # data: [ # { # s: 'elon_usdt', # t: 1683502958381, # cv: '-0.0000000125', # cr: '-0.0495', # o: '0.0000002522', # c: '0.0000002397', # h: '0.0000002690', # l: '0.0000002371', # q: '3803783034.0000000000', # v: '955.3260820022' # }, # ... # ] # } # # contract # # { # "topic": "tickers", # "event": "tickers", # "data": [ # { # "s": "btc_index", # trading pair # "o": "49000", # opening price # "c": "50000", # closing price # "h": "0.1", # highest price # "l": "0.1", # lowest price # "a": "0.1", # volume # "v": "0.1", # turnover # "ch": "0.21", # quote change # "t": 123124124 # timestamp # } # ] # } # # agg_ticker(contract) # # { # "topic": "agg_tickers", # "event": "agg_tickers", # "data": [ # { # "s": "btc_index", # trading pair # "o": "49000", # opening price # "c": "50000", # closing price # "h": "0.1", # highest price # "l": "0.1", # lowest price # "a": "0.1", # volume # "v": "0.1", # turnover # "ch": "0.21", # quote change # "i": "0.21" , # index price # "m": "0.21", # mark price # "bp": "0.21", # bid price # "ap": "0.21" , # ask price # "t": 123124124 # timestamp # } # ] # } # data = self.safe_list(message, 'data', []) firstTicker = self.safe_dict(data, 0) spotTest = self.safe_string_2(firstTicker, 'cv', 'aq') tradeType = 'spot' if (spotTest is not None) else 'contract' newTickers = [] for i in range(0, len(data)): tickerData = data[i] ticker = self.parse_ticker(tickerData) symbol = ticker['symbol'] self.tickers[symbol] = ticker newTickers.append(ticker) messageHashStart = self.safe_string(message, 'topic') + '::' + tradeType messageHashes = self.find_message_hashes(client, messageHashStart + '::') for i in range(0, len(messageHashes)): messageHash = messageHashes[i] parts = messageHash.split('::') symbolsString = parts[2] symbols = symbolsString.split(',') tickers = self.filter_by_array(newTickers, 'symbol', symbols) tickersSymbols = list(tickers.keys()) numTickers = len(tickersSymbols) if numTickers > 0: client.resolve(tickers, messageHash) client.resolve(self.tickers, messageHashStart) return message def handle_ohlcv(self, client: Client, message: dict): # # spot # # { # "topic": "kline", # "event": "kline@btc_usdt,5m", # "data": { # "s": "btc_usdt", # symbol # "t": 1656043200000, # time # "i": "5m", # interval # "o": "44000", # open price # "c": "50000", # close price # "h": "52000", # highest price # "l": "36000", # lowest price # "q": "34.2", # qty(quantity) # "v": "230000" # volume # } # } # # contract # # { # "topic": "kline", # "event": "kline@btc_usdt,5m", # "data": { # "s": "btc_index", # trading pair # "o": "49000", # opening price # "c": "50000", # closing price # "h": "0.1", # highest price # "l": "0.1", # lowest price # "a": "0.1", # volume # "v": "0.1", # turnover # "ch": "0.21", # quote change # "t": 123124124 # timestamp # } # } # data = self.safe_dict(message, 'data', {}) marketId = self.safe_string(data, 's') if marketId is not None: timeframe = self.safe_string(data, 'i') tradeType = 'spot' if ('q' in data) else 'contract' market = self.safe_market(marketId, None, None, tradeType) symbol = market['symbol'] parsed = self.parse_ohlcv(data, market) self.ohlcvs[symbol] = self.safe_dict(self.ohlcvs, symbol, {}) stored = self.safe_value(self.ohlcvs[symbol], timeframe) if stored is None: limit = self.safe_integer(self.options, 'OHLCVLimit', 1000) stored = ArrayCacheByTimestamp(limit) self.ohlcvs[symbol][timeframe] = stored stored.append(parsed) event = self.safe_string(message, 'event') messageHash = event + '::' + tradeType client.resolve(stored, messageHash) return message def handle_trade(self, client: Client, message: dict): # # spot # # { # topic: 'trade', # event: 'trade@btc_usdt', # data: { # s: 'btc_usdt', # i: '228825383103928709', # t: 1684258222702, # p: '27003.65', # q: '0.000796', # b: True # } # } # # contract # # { # "topic": "trade", # "event": "trade@btc_usdt", # "data": { # "s": "btc_index", # trading pair # "p": "50000", # price # "a": "0.1" # Quantity # "m": "BID" # Deal side BID:Buy ASK:Sell # "t": 123124124 # timestamp # } # } # data = self.safe_dict(message, 'data') marketId = self.safe_string_lower(data, 's') if marketId is not None: trade = self.parse_trade(data) i = self.safe_string(data, 'i') tradeType = 'spot' if (i is not None) else 'contract' market = self.safe_market(marketId, None, None, tradeType) symbol = market['symbol'] event = self.safe_string(message, 'event') tradesArray = self.safe_value(self.trades, symbol) if tradesArray is None: tradesLimit = self.safe_integer(self.options, 'tradesLimit', 1000) tradesArray = ArrayCache(tradesLimit) self.trades[symbol] = tradesArray tradesArray.append(trade) messageHash = event + '::' + tradeType client.resolve(tradesArray, messageHash) return message def handle_order_book(self, client: Client, message: dict): # # spot # # { # "topic": "depth", # "event": "depth@btc_usdt,20", # "data": { # "s": "btc_usdt", # symbol # "fi": 1681433733351, # firstUpdateId = previous lastUpdateId + 1 # "i": 1681433733371, # updateId # "a": [ # asks(sell order) # [ # [0]price, [1]quantity # "34000", # price # "1.2" # quantity # ], # [ # "34001", # "2.3" # ] # ], # "b": [ # bids(buy order) # [ # "32000", # "0.2" # ], # [ # "31000", # "0.5" # ] # ] # } # } # # contract # # { # "topic": "depth", # "event": "depth@btc_usdt,20", # "data": { # s: "btc_usdt", # pu: "548111455664", # fu: "548111455665", # u: "548111455667", # a: [ # [ # "26841.5", # "50210", # ], # ], # b: [ # [ # "26841", # "67075", # ], # ], # t: 1684530667083, # } # } # data = self.safe_dict(message, 'data') marketId = self.safe_string(data, 's') if marketId is not None: event = self.safe_string(message, 'event') splitEvent = event.split(',') event = self.safe_string(splitEvent, 0) tradeType = 'contract' if ('fu' in data) else 'spot' market = self.safe_market(marketId, None, None, tradeType) symbol = market['symbol'] obAsks = self.safe_list(data, 'a') obBids = self.safe_list(data, 'b') messageHash = event + '::' + tradeType if not (symbol in self.orderbooks): subscription = self.safe_dict(client.subscriptions, messageHash, {}) limit = self.safe_integer(subscription, 'limit') self.orderbooks[symbol] = self.order_book({}, limit) orderbook = self.orderbooks[symbol] nonce = self.safe_integer(orderbook, 'nonce') if nonce is None: cacheLength = len(orderbook.cache) snapshotDelay = self.handle_option('watchOrderBook', 'snapshotDelay', 25) if cacheLength == snapshotDelay: self.spawn(self.load_order_book, client, messageHash, symbol) orderbook.cache.append(data) return if obAsks is not None: asks = orderbook['asks'] for i in range(0, len(obAsks)): ask = obAsks[i] price = self.safe_number(ask, 0) quantity = self.safe_number(ask, 1) asks.store(price, quantity) if obBids is not None: bids = orderbook['bids'] for i in range(0, len(obBids)): bid = obBids[i] price = self.safe_number(bid, 0) quantity = self.safe_number(bid, 1) bids.store(price, quantity) timestamp = self.safe_integer(data, 't') orderbook['nonce'] = self.safe_integer_2(data, 'i', 'u') orderbook['timestamp'] = timestamp orderbook['datetime'] = self.iso8601(timestamp) orderbook['symbol'] = symbol client.resolve(orderbook, messageHash) def parse_ws_order_trade(self, trade: dict, market: Market = None): # # { # "s": "btc_usdt", # symbol # "t": 1656043204763, # time happened time # "i": "6216559590087220004", # orderId, # "ci": "test123", # clientOrderId # "st": "PARTIALLY_FILLED", # state # "sd": "BUY", # side BUY/SELL # "eq": "2", # executedQty executed quantity # "ap": "30000", # avg price # "f": "0.002" # fee # } # # contract # # { # "symbol": "btc_usdt", # Trading pair # "orderId": "1234", # Order Id # "origQty": "34244", # Original Quantity # "avgPrice": "123", # Quantity # "price": "1111", # Average price # "executedQty": "34244", # Volume(Cont) # "orderSide": "BUY", # BUY, SELL # "positionSide": "LONG", # LONG, SHORT # "marginFrozen": "123", # Occupied margin # "sourceType": "default", # DEFAULT:normal order,ENTRUST:plan commission,PROFIR:Take Profit and Stop Loss # "sourceId" : "1231231", # Triggering conditions ID # "state": "", # state:NEW:New order(unfilled);PARTIALLY_FILLED:Partial deal;PARTIALLY_CANCELED:Partial revocation;FILLED:Filled;CANCELED:Cancled;REJECTED:Order failed;EXPIRED:Expired # "createTime": 1731231231, # CreateTime # "clientOrderId": "204788317630342726" # } # marketId = self.safe_string(trade, 's') tradeType = 'contract' if ('symbol' in trade) else 'spot' market = self.safe_market(marketId, market, None, tradeType) timestamp = self.safe_string(trade, 't') return self.safe_trade({ 'info': trade, 'id': None, 'timestamp': timestamp, 'datetime': self.iso8601(timestamp), 'symbol': market['symbol'], 'order': self.safe_string(trade, 'i', 'orderId'), 'type': self.parse_order_status(self.safe_string(trade, 'st', 'state')), 'side': self.safe_string_lower(trade, 'sd', 'orderSide'), 'takerOrMaker': None, 'price': self.safe_number(trade, 'price'), 'amount': self.safe_string(trade, 'origQty'), 'cost': None, 'fee': { 'currency': None, 'cost': self.safe_number(trade, 'f'), 'rate': None, }, }, market) def parse_ws_order(self, order: dict, market: Market = None): # # spot # # { # "s": "btc_usdt", # symbol # "bc": "btc", # base currency # "qc": "usdt", # quotation currency # "t": 1656043204763, # happened time # "ct": 1656043204663, # create time # "i": "6216559590087220004", # order id, # "ci": "test123", # client order id # "st": "PARTIALLY_FILLED", # state NEW/PARTIALLY_FILLED/FILLED/CANCELED/REJECTED/EXPIRED # "sd": "BUY", # side BUY/SELL # "tp": "LIMIT", # type LIMIT/MARKET # "oq": "4" # original quantity # "oqq": 48000, # original quotation quantity # "eq": "2", # executed quantity # "lq": "2", # remaining quantity # "p": "4000", # price # "ap": "30000", # avg price # "f":"0.002" # fee # } # # contract # # { # "symbol": "btc_usdt", # Trading pair # "orderId": "1234", # Order Id # "origQty": "34244", # Original Quantity # "avgPrice": "123", # Quantity # "price": "1111", # Average price # "executedQty": "34244", # Volume(Cont) # "orderSide": "BUY", # BUY, SELL # "positionSide": "LONG", # LONG, SHORT # "marginFrozen": "123", # Occupied margin # "sourceType": "default", # DEFAULT:normal order,ENTRUST:plan commission,PROFIR:Take Profit and Stop Loss # "sourceId" : "1231231", # Triggering conditions ID # "state": "", # state:NEW:New order(unfilled);PARTIALLY_FILLED:Partial deal;PARTIALLY_CANCELED:Partial revocation;FILLED:Filled;CANCELED:Cancled;REJECTED:Order failed;EXPIRED:Expired # "createTime": 1731231231, # CreateTime # "clientOrderId": "204788317630342726" # } # marketId = self.safe_string_2(order, 's', 'symbol') tradeType = 'contract' if ('symbol' in order) else 'spot' market = self.safe_market(marketId, market, None, tradeType) timestamp = self.safe_integer_2(order, 'ct', 'createTime') return self.safe_order({ 'info': order, 'id': self.safe_string_2(order, 'i', 'orderId'), 'clientOrderId': self.safe_string_2(order, 'ci', 'clientOrderId'), 'timestamp': timestamp, 'datetime': self.iso8601(timestamp), 'lastTradeTimestamp': None, 'symbol': market['symbol'], 'type': market['type'], 'timeInForce': None, 'postOnly': None, 'side': self.safe_string_lower_2(order, 'sd', 'orderSide'), 'price': self.safe_number_2(order, 'p', 'price'), 'stopPrice': None, 'stopLoss': None, 'takeProfit': None, 'amount': self.safe_string_2(order, 'oq', 'origQty'), 'filled': self.safe_string_2(order, 'eq', 'executedQty'), 'remaining': self.safe_string(order, 'lq'), 'cost': None, 'average': self.safe_string_2(order, 'ap', 'avgPrice'), 'status': self.parse_order_status(self.safe_string(order, 'st', 'state')), 'fee': { 'currency': None, 'cost': self.safe_number(order, 'f'), }, 'trades': None, }, market) def handle_order(self, client: Client, message: dict): # # spot # # { # "topic": "order", # "event": "order", # "data": { # "s": "btc_usdt", # symbol # "t": 1656043204763, # time happened time # "i": "6216559590087220004", # orderId, # "ci": "test123", # clientOrderId # "st": "PARTIALLY_FILLED", # state # "sd": "BUY", # side BUY/SELL # "eq": "2", # executedQty executed quantity # "ap": "30000", # avg price # "f": "0.002" # fee # } # } # # contract # # { # "topic": "order", # "event": "order@123456", # "data": { # "symbol": "btc_usdt", # Trading pair # "orderId": "1234", # Order Id # "origQty": "34244", # Original Quantity # "avgPrice": "123", # Quantity # "price": "1111", # Average price # "executedQty": "34244", # Volume(Cont) # "orderSide": "BUY", # BUY, SELL # "positionSide": "LONG", # LONG, SHORT # "marginFrozen": "123", # Occupied margin # "sourceType": "default", # DEFAULT:normal order,ENTRUST:plan commission,PROFIR:Take Profit and Stop Loss # "sourceId" : "1231231", # Triggering conditions ID # "state": "", # state:NEW:New order(unfilled);PARTIALLY_FILLED:Partial deal;PARTIALLY_CANCELED:Partial revocation;FILLED:Filled;CANCELED:Cancled;REJECTED:Order failed;EXPIRED:Expired # "createTime": 1731231231, # CreateTime # "clientOrderId": "204788317630342726" # } # } # orders = self.orders if orders is None: limit = self.safe_integer(self.options, 'ordersLimit') orders = ArrayCacheBySymbolById(limit) self.orders = orders order = self.safe_dict(message, 'data', {}) marketId = self.safe_string_2(order, 's', 'symbol') if marketId is not None: tradeType = 'contract' if ('symbol' in order) else 'spot' market = self.safe_market(marketId, None, None, tradeType) parsed = self.parse_ws_order(order, market) orders.append(parsed) client.resolve(orders, 'order::' + tradeType) return message def handle_balance(self, client: Client, message: dict): # # spot # # { # topic: 'balance', # event: 'balance', # data: { # a: 3513677381884, # t: 1684250056775, # c: 'usdt', # b: '7.71000000', # f: '0.00000000', # z: 'SPOT' # } # } # # contract # # { # "topic": "balance", # "event": "balance@123456", # "data": { # "coin": "usdt", # "underlyingType": 1, # 1:Coin-M,2:USDT-M # "walletBalance": "123", # Balance # "openOrderMarginFrozen": "123", # Frozen order # "isolatedMargin": "213", # Isolated Margin # "crossedMargin": "0" # Crossed Margin # "availableBalance": '2.256114450000000000', # "coupon": '0', # "bonus": '0' # } # } # data = self.safe_dict(message, 'data', {}) currencyId = self.safe_string_2(data, 'c', 'coin') code = self.safe_currency_code(currencyId) account = self.account() account['free'] = self.safe_string(data, 'availableBalance') account['used'] = self.safe_string(data, 'f') account['total'] = self.safe_string_2(data, 'b', 'walletBalance') self.balance[code] = account self.balance = self.safe_balance(self.balance) tradeType = 'contract' if ('coin' in data) else 'spot' client.resolve(self.balance, 'balance::' + tradeType) def handle_my_trades(self, client: Client, message: dict): # # spot # # { # "topic": "trade", # "event": "trade", # "data": { # "s": "btc_usdt", # symbol # "t": 1656043204763, # time # "i": "6316559590087251233", # tradeId # "oi": "6216559590087220004", # orderId # "p": "30000", # trade price # "q": "3", # qty quantity # "v": "90000" # volume trade amount # } # } # # contract # # { # "topic": "trade", # "event": "trade@123456", # "data": { # "symbol": 'btc_usdt', # "orderSide": 'SELL', # "positionSide": 'LONG', # "orderId": '231485367663419328', # "price": '27152.7', # "quantity": '33', # "marginUnfrozen": '2.85318000', # "timestamp": 1684892412565 # } # } # data = self.safe_dict(message, 'data', {}) stored = self.myTrades if stored is None: limit = self.safe_integer(self.options, 'tradesLimit', 1000) stored = ArrayCacheBySymbolById(limit) self.myTrades = stored parsedTrade = self.parse_trade(data) market = self.market(parsedTrade['symbol']) stored.append(parsedTrade) tradeType = 'contract' if market['contract'] else 'spot' client.resolve(stored, 'trade::' + tradeType) def handle_message(self, client: Client, message): event = self.safe_string(message, 'event') if event == 'pong': client.onPong() elif event is not None: topic = self.safe_string(message, 'topic') methods = { 'kline': self.handle_ohlcv, 'depth': self.handle_order_book, 'depth_update': self.handle_order_book, 'ticker': self.handle_ticker, 'agg_ticker': self.handle_ticker, 'tickers': self.handle_tickers, 'agg_tickers': self.handle_tickers, 'balance': self.handle_balance, 'order': self.handle_order, 'position': self.handle_position, } method = self.safe_value(methods, topic) if topic == 'trade': data = self.safe_dict(message, 'data') if ('oi' in data) or ('orderId' in data): method = self.handle_my_trades else: method = self.handle_trade if method is not None: method(client, message) def ping(self, client: Client): client.lastPong = self.milliseconds() return 'ping' def handle_error_message(self, client: Client, message: dict): # # { # "id": "123", # "code": 401, # "msg": "token expire" # } # msg = self.safe_string(message, 'msg') if (msg == 'invalid_listen_key') or (msg == 'token expire'): client.subscriptions['token'] = None self.get_listen_key(True) return client.reject(message)