# -*- coding: utf-8 -*- # PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN: # https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code import ccxt.async_support from ccxt.async_support.base.ws.cache import ArrayCache, ArrayCacheBySymbolById, ArrayCacheBySymbolBySide, ArrayCacheByTimestamp from ccxt.base.types import Any, Balances, Bool, Int, Market, Order, OrderBook, Position, Str, Strings, Ticker, Trade from ccxt.async_support.base.ws.client import Client from typing import List class hashkey(ccxt.async_support.hashkey): def describe(self) -> Any: return self.deep_extend(super(hashkey, self).describe(), { 'has': { 'ws': True, 'watchBalance': True, 'watchMyTrades': True, 'watchOHLCV': True, 'watchOrderBook': True, 'watchOrders': True, 'watchTicker': True, 'watchTrades': True, 'watchTradesForSymbols': False, 'watchPositions': False, }, 'urls': { 'api': { 'ws': { 'public': 'wss://stream-glb.hashkey.com/quote/ws/v1', 'private': 'wss://stream-glb.hashkey.com/api/v1/ws', }, 'test': { 'ws': { 'public': 'wss://stream-glb.sim.hashkeydev.com/quote/ws/v1', 'private': 'wss://stream-glb.sim.hashkeydev.com/api/v1/ws', }, }, }, }, 'options': { 'listenKeyRefreshRate': 3600000, 'listenKey': None, 'watchBalance': { 'fetchBalanceSnapshot': True, # or False 'awaitBalanceSnapshot': False, # whether to wait for the balance snapshot before providing updates }, }, 'streaming': { 'keepAlive': 10000, }, }) async def wath_public(self, market: Market, topic: str, messageHash: str, params={}): request: dict = { 'symbol': market['id'], 'topic': topic, 'event': 'sub', } url = self.urls['api']['ws']['public'] return await self.watch(url, messageHash, self.deep_extend(request, params), messageHash) async def watch_private(self, messageHash): listenKey = await self.authenticate() url = self.get_private_url(listenKey) return await self.watch(url, messageHash, None, messageHash) def get_private_url(self, listenKey): return self.urls['api']['ws']['private'] + '/' + listenKey async def watch_ohlcv(self, symbol: str, timeframe: str = '1m', since: Int = None, limit: Int = None, params={}) -> List[list]: """ watches historical candlestick data containing the open, high, low, and close price, and the volume of a market https://hashkeyglobal-apidoc.readme.io/reference/websocket-api#public-stream :param str symbol: unified symbol of the market to fetch OHLCV data for :param str timeframe: the length of time each candle represents :param int [since]: timestamp in ms of the earliest candle to fetch :param int [limit]: the maximum amount of candles to fetch :param dict [params]: extra parameters specific to the exchange API endpoint :param bool [params.binary]: True or False - default False :returns int[][]: A list of candles ordered, open, high, low, close, volume """ await self.load_markets() market = self.market(symbol) symbol = market['symbol'] interval = self.safe_string(self.timeframes, timeframe, timeframe) topic = 'kline_' + interval messageHash = 'ohlcv:' + symbol + ':' + timeframe ohlcv = await self.wath_public(market, topic, messageHash, params) if self.newUpdates: limit = ohlcv.getLimit(symbol, limit) return self.filter_by_since_limit(ohlcv, since, limit, 0, True) def handle_ohlcv(self, client: Client, message): # # { # "symbol": "DOGEUSDT", # "symbolName": "DOGEUSDT", # "topic": "kline", # "params": { # "realtimeInterval": "24h", # "klineType": "1m" # }, # "data": [ # { # "t": 1722861660000, # "s": "DOGEUSDT", # "sn": "DOGEUSDT", # "c": "0.08389", # "h": "0.08389", # "l": "0.08389", # "o": "0.08389", # "v": "0" # } # ], # "f": True, # "sendTime": 1722861664258, # "shared": False # } # marketId = self.safe_string(message, 'symbol') market = self.safe_market(marketId) symbol = self.safe_symbol(marketId, market) if not (symbol in self.ohlcvs): self.ohlcvs[symbol] = {} params = self.safe_dict(message, 'params') klineType = self.safe_string(params, 'klineType') timeframe = self.find_timeframe(klineType) if not (timeframe in self.ohlcvs[symbol]): limit = self.safe_integer(self.options, 'OHLCVLimit', 1000) self.ohlcvs[symbol][timeframe] = ArrayCacheByTimestamp(limit) data = self.safe_list(message, 'data', []) stored = self.ohlcvs[symbol][timeframe] for i in range(0, len(data)): candle = self.safe_dict(data, i, {}) parsed = self.parse_ws_ohlcv(candle, market) stored.append(parsed) messageHash = 'ohlcv:' + symbol + ':' + timeframe client.resolve(stored, messageHash) def parse_ws_ohlcv(self, ohlcv, market: Market = None) -> list: # # { # "t": 1722861660000, # "s": "DOGEUSDT", # "sn": "DOGEUSDT", # "c": "0.08389", # "h": "0.08389", # "l": "0.08389", # "o": "0.08389", # "v": "0" # } # return [ self.safe_integer(ohlcv, 't'), self.safe_number(ohlcv, 'o'), self.safe_number(ohlcv, 'h'), self.safe_number(ohlcv, 'l'), self.safe_number(ohlcv, 'c'), self.safe_number(ohlcv, 'v'), ] async def watch_ticker(self, symbol: str, params={}) -> Ticker: """ watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market https://hashkeyglobal-apidoc.readme.io/reference/websocket-api#public-stream :param str symbol: unified symbol of the market to fetch the ticker for :param dict [params]: extra parameters specific to the exchange API endpoint :param bool [params.binary]: True or False - default False :returns dict: a `ticker structure ` """ await self.load_markets() market = self.market(symbol) symbol = market['symbol'] topic = 'realtimes' messageHash = 'ticker:' + symbol return await self.wath_public(market, topic, messageHash, params) def handle_ticker(self, client: Client, message): # # { # "symbol": "ETHUSDT", # "symbolName": "ETHUSDT", # "topic": "realtimes", # "params": { # "realtimeInterval": "24h" # }, # "data": [ # { # "t": 1722864411064, # "s": "ETHUSDT", # "sn": "ETHUSDT", # "c": "2195", # "h": "2918.85", # "l": "2135.5", # "o": "2915.78", # "v": "666.5019", # "qv": "1586902.757079", # "m": "-0.2472", # "e": 301 # } # ], # "f": False, # "sendTime": 1722864411086, # "shared": False # } # data = self.safe_list(message, 'data', []) ticker = self.parse_ticker(self.safe_dict(data, 0)) symbol = ticker['symbol'] messageHash = 'ticker:' + symbol self.tickers[symbol] = ticker client.resolve(self.tickers[symbol], messageHash) async def watch_trades(self, symbol: str, since: Int = None, limit: Int = None, params={}) -> List[Trade]: """ watches information on multiple trades made in a market https://hashkeyglobal-apidoc.readme.io/reference/websocket-api#public-stream :param str symbol: unified market symbol of the market trades were made in :param int [since]: the earliest time in ms to fetch orders for :param int [limit]: the maximum number of trade structures to retrieve :param dict [params]: extra parameters specific to the exchange API endpoint :param bool [params.binary]: True or False - default False :returns dict[]: a list of `trade structures ` """ await self.load_markets() market = self.market(symbol) symbol = market['symbol'] topic = 'trade' messageHash = 'trades:' + symbol trades = await self.wath_public(market, topic, messageHash, params) if self.newUpdates: limit = trades.getLimit(symbol, limit) return self.filter_by_since_limit(trades, since, limit, 'timestamp', True) def handle_trades(self, client: Client, message): # # { # "symbol": "ETHUSDT", # "symbolName": "ETHUSDT", # "topic": "trade", # "params": { # "realtimeInterval": "24h" # }, # "data": [ # { # "v": "1745922896272048129", # "t": 1722866228075, # "p": "2340.41", # "q": "0.0132", # "m": True # }, # ... # ], # "f": True, # "sendTime": 1722869464248, # "channelId": "668498fffeba4108-00000001-00113184-562e27d215e43f9c-c188b319", # "shared": False # } # marketId = self.safe_string(message, 'symbol') market = self.safe_market(marketId) symbol = market['symbol'] if not (symbol in self.trades): limit = self.safe_integer(self.options, 'tradesLimit', 1000) self.trades[symbol] = ArrayCache(limit) stored = self.trades[symbol] data = self.safe_list(message, 'data') if data is not None: data = self.sort_by(data, 't') for i in range(0, len(data)): trade = self.safe_dict(data, i) parsed = self.parse_ws_trade(trade, market) stored.append(parsed) messageHash = 'trades' + ':' + symbol client.resolve(stored, messageHash) async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook: """ watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data https://hashkeyglobal-apidoc.readme.io/reference/websocket-api#public-stream :param str symbol: unified symbol of the market to fetch the order book for :param int [limit]: the maximum amount of order book entries to return. :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: A dictionary of `order book structures ` indexed by market symbols """ await self.load_markets() market = self.market(symbol) symbol = market['symbol'] topic = 'depth' messageHash = 'orderbook:' + symbol orderbook = await self.wath_public(market, topic, messageHash, params) return orderbook.limit() def handle_order_book(self, client: Client, message): # # { # "symbol": "ETHUSDT", # "symbolName": "ETHUSDT", # "topic": "depth", # "params": {"realtimeInterval": "24h"}, # "data": [ # { # "e": 301, # "s": "ETHUSDT", # "t": 1722873144371, # "v": "84661262_18", # "b": [ # ["1650", "0.0864"], # ... # ], # "a": [ # ["4085", "0.0074"], # ... # ], # "o": 0 # } # ], # "f": False, # "sendTime": 1722873144589, # "channelId": "2265aafffe68b588-00000001-0011510c-9e9ca710b1500854-551830bd", # "shared": False # } # marketId = self.safe_string(message, 'symbol') symbol = self.safe_symbol(marketId) messageHash = 'orderbook:' + symbol if not (symbol in self.orderbooks): self.orderbooks[symbol] = self.order_book({}) orderbook = self.orderbooks[symbol] data = self.safe_list(message, 'data', []) dataEntry = self.safe_dict(data, 0) timestamp = self.safe_integer(dataEntry, 't') snapshot = self.parse_order_book(dataEntry, symbol, timestamp, 'b', 'a') orderbook.reset(snapshot) orderbook['nonce'] = self.safe_integer(message, 'id') self.orderbooks[symbol] = orderbook client.resolve(orderbook, messageHash) async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]: """ watches information on multiple orders made by the user https://hashkeyglobal-apidoc.readme.io/reference/websocket-api#private-stream :param str symbol: unified market symbol of the market orders were made in :param int [since]: the earliest time in ms to fetch orders for :param int [limit]: the maximum number of order structures to retrieve :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `order structures ` """ await self.load_markets() messageHash = 'orders' if symbol is not None: symbol = self.symbol(symbol) messageHash = messageHash + ':' + symbol orders = await self.watch_private(messageHash) if self.newUpdates: limit = orders.getLimit(symbol, limit) return self.filter_by_symbol_since_limit(orders, symbol, since, limit, True) def handle_order(self, client: Client, message): # # swap # { # "e": "contractExecutionReport", # "E": "1723037391181", # "s": "ETHUSDT-PERPETUAL", # "c": "1723037389677", # "S": "BUY_OPEN", # "o": "LIMIT", # "f": "IOC", # "q": "1", # "p": "2561.75", # "X": "FILLED", # "i": "1747358716129257216", # "l": "1", # "z": "1", # "L": "2463.36", # "n": "0.001478016", # "N": "USDT", # "u": True, # "w": True, # "m": False, # "O": "1723037391140", # "Z": "2463.36", # "C": False, # "v": "5", # "reqAmt": "0", # "d": "1747358716255075840", # "r": "0", # "V": "2463.36", # "P": "0", # "lo": False, # "lt": "" # } # if self.orders is None: limit = self.safe_integer(self.options, 'ordersLimit', 1000) self.orders = ArrayCacheBySymbolById(limit) parsed = self.parse_ws_order(message) orders = self.orders orders.append(parsed) messageHash = 'orders' client.resolve(orders, messageHash) symbol = parsed['symbol'] symbolSpecificMessageHash = messageHash + ':' + symbol client.resolve(orders, symbolSpecificMessageHash) def parse_ws_order(self, order: dict, market: Market = None) -> Order: marketId = self.safe_string(order, 's') market = self.safe_market(marketId, market) timestamp = self.safe_integer(order, 'O') side = self.safe_string_lower(order, 'S') reduceOnly: Bool = None side, reduceOnly = self.parseOrderSideAndReduceOnly(side) type = self.parseOrderType(self.safe_string(order, 'o')) timeInForce = self.safe_string(order, 'f') postOnly: Bool = None type, timeInForce, postOnly = self.parseOrderTypeTimeInForceAndPostOnly(type, timeInForce) if market['contract']: # swap orders are always have type 'LIMIT', thus we can not define the correct type type = None return self.safe_order({ 'id': self.safe_string(order, 'i'), 'clientOrderId': self.safe_string(order, 'c'), 'datetime': self.iso8601(timestamp), 'timestamp': timestamp, 'lastTradeTimestamp': None, 'lastUpdateTimestamp': None, 'status': self.parse_order_status(self.safe_string(order, 'X')), 'symbol': market['symbol'], 'type': type, 'timeInForce': timeInForce, 'side': side, 'price': self.safe_string(order, 'p'), 'average': self.safe_string(order, 'V'), 'amount': self.omit_zero(self.safe_string(order, 'q')), 'filled': self.safe_string(order, 'z'), 'remaining': self.safe_string(order, 'r'), 'stopPrice': None, 'triggerPrice': None, 'takeProfitPrice': None, 'stopLossPrice': None, 'cost': self.omit_zero(self.safe_string(order, 'Z')), 'trades': None, 'fee': { 'currency': self.safe_currency_code(self.safe_string(order, 'N')), 'amount': self.omit_zero(self.safe_string(order, 'n')), }, 'reduceOnly': reduceOnly, 'postOnly': postOnly, 'info': order, }, market) async def watch_my_trades(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Trade]: """ watches information on multiple trades made by the user https://hashkeyglobal-apidoc.readme.io/reference/websocket-api#private-stream :param str symbol: unified market symbol of the market trades were made in :param int [since]: the earliest time in ms to fetch trades for :param int [limit]: the maximum number of trade structures to retrieve :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `trade structures ` """ await self.load_markets() messageHash = 'myTrades' if symbol is not None: symbol = self.symbol(symbol) messageHash += ':' + symbol trades = await self.watch_private(messageHash) if self.newUpdates: limit = trades.getLimit(symbol, limit) return self.filter_by_since_limit(trades, since, limit, 'timestamp', True) def handle_my_trade(self, client: Client, message, subscription={}): # # { # "e": "ticketInfo", # "E": "1723037391156", # "s": "ETHUSDT-PERPETUAL", # "q": "1.00", # "t": "1723037391147", # "p": "2463.36", # "T": "1747358716187197441", # "o": "1747358716129257216", # "c": "1723037389677", # "a": "1735619524953226496", # "m": False, # "S": "BUY" # } # if self.myTrades is None: limit = self.safe_integer(self.options, 'tradesLimit', 1000) self.myTrades = ArrayCacheBySymbolById(limit) tradesArray = self.myTrades parsed = self.parse_ws_trade(message) tradesArray.append(parsed) self.myTrades = tradesArray messageHash = 'myTrades' client.resolve(tradesArray, messageHash) symbol = parsed['symbol'] symbolSpecificMessageHash = messageHash + ':' + symbol client.resolve(tradesArray, symbolSpecificMessageHash) def parse_ws_trade(self, trade, market=None) -> Trade: # # watchTrades # { # "v": "1745922896272048129", # "t": 1722866228075, # "p": "2340.41", # "q": "0.0132", # "m": True # } # # watchMyTrades # { # "e": "ticketInfo", # "E": "1723037391156", # "s": "ETHUSDT-PERPETUAL", # "q": "1.00", # "t": "1723037391147", # "p": "2463.36", # "T": "1747358716187197441", # "o": "1747358716129257216", # "c": "1723037389677", # "a": "1735619524953226496", # "m": False, # "S": "BUY" # } # marketId = self.safe_string(trade, 's') market = self.safe_market(marketId, market) timestamp = self.safe_integer(trade, 't') isMaker = self.safe_bool(trade, 'm') takerOrMaker: Str = None if isMaker is not None: if isMaker: takerOrMaker = 'maker' else: takerOrMaker = 'taker' return self.safe_trade({ 'id': self.safe_string_2(trade, 'v', 'T'), 'timestamp': timestamp, 'datetime': self.iso8601(timestamp), 'symbol': market['symbol'], 'side': self.safe_string_lower(trade, 'S'), 'price': self.safe_string(trade, 'p'), 'amount': self.safe_string(trade, 'q'), 'cost': None, 'takerOrMaker': takerOrMaker, 'type': None, 'order': self.safe_string(trade, 'o'), 'fee': None, 'info': trade, }, market) async def watch_positions(self, symbols: Strings = None, since: Int = None, limit: Int = None, params={}) -> List[Position]: """ https://hashkeyglobal-apidoc.readme.io/reference/websocket-api#private-stream watch all open positions :param str[] [symbols]: list of unified market symbols to watch positions for :param int [since]: the earliest time in ms to fetch positions for :param int [limit]: the maximum number of positions to retrieve :param dict params: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `position structure ` """ await self.load_markets() listenKey = await self.authenticate() symbols = self.market_symbols(symbols) messageHash = 'positions' messageHashes = [] if symbols is None: messageHashes.append(messageHash) else: for i in range(0, len(symbols)): symbol = symbols[i] messageHashes.append(messageHash + ':' + symbol) url = self.get_private_url(listenKey) positions = await self.watch_multiple(url, messageHashes, None, messageHashes) if self.newUpdates: return positions return self.filter_by_symbols_since_limit(self.positions, symbols, since, limit, True) def handle_position(self, client: Client, message): # # { # "e": "outboundContractPositionInfo", # "E": "1723084699801", # "A": "1735619524953226496", # "s": "ETHUSDT-PERPETUAL", # "S": "LONG", # "p": "2429.6", # "P": "2", # "a": "2", # "f": "10760.14", # "m": "1.0085", # "r": "-0.0029", # "up": "0.0478", # "pr": "0.0492", # "pv": "4.8592", # "v": "5.00", # "mt": "CROSS", # "mm": "0.0367" # } # if self.positions is None: self.positions = ArrayCacheBySymbolBySide() positions = self.positions parsed = self.parse_ws_position(message) positions.append(parsed) messageHash = 'positions' client.resolve(parsed, messageHash) symbol = parsed['symbol'] client.resolve(parsed, messageHash + ':' + symbol) def parse_ws_position(self, position, market: Market = None) -> Position: marketId = self.safe_string(position, 's') market = self.safe_market(marketId) timestamp = self.safe_integer(position, 'E') return self.safe_position({ 'symbol': market['symbol'], 'id': None, 'timestamp': timestamp, 'datetime': self.iso8601(timestamp), 'contracts': self.safe_number(position, 'P'), 'contractSize': None, 'side': self.safe_string_lower(position, 'S'), 'notional': self.safe_number(position, 'pv'), 'leverage': self.safe_integer(position, 'v'), 'unrealizedPnl': self.safe_number(position, 'up'), 'realizedPnl': self.safe_number(position, 'r'), 'collateral': None, 'entryPrice': self.safe_number(position, 'p'), 'markPrice': None, 'liquidationPrice': self.safe_number(position, 'f'), 'marginMode': self.safe_string_lower(position, 'mt'), 'hedged': True, 'maintenanceMargin': self.safe_number(position, 'mm'), 'maintenanceMarginPercentage': None, 'initialMargin': self.safe_number(position, 'm'), # todo check 'initialMarginPercentage': None, 'marginRatio': None, 'lastUpdateTimestamp': None, 'lastPrice': None, 'stopLossPrice': None, 'takeProfitPrice': None, 'percentage': None, 'info': position, }) async def watch_balance(self, params={}) -> Balances: """ watch balance and get the amount of funds available for trading or funds locked in orders https://hashkeyglobal-apidoc.readme.io/reference/websocket-api#private-stream :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.type]: 'spot' or 'swap' - the type of the market to watch balance for(default 'spot') :returns dict: a `balance structure ` """ listenKey = await self.authenticate() await self.load_markets() type = 'spot' type, params = self.handle_market_type_and_params('watchBalance', None, params, type) messageHash = 'balance:' + type url = self.get_private_url(listenKey) client = self.client(url) self.set_balance_cache(client, type, messageHash) fetchBalanceSnapshot = None awaitBalanceSnapshot = None fetchBalanceSnapshot, params = self.handle_option_and_params(self.options, 'watchBalance', 'fetchBalanceSnapshot', True) awaitBalanceSnapshot, params = self.handle_option_and_params(self.options, 'watchBalance', 'awaitBalanceSnapshot', False) if fetchBalanceSnapshot and awaitBalanceSnapshot: await client.future(type + ':fetchBalanceSnapshot') return await self.watch(url, messageHash, None, messageHash) def set_balance_cache(self, client: Client, type, subscribeHash): if subscribeHash in client.subscriptions: return options = self.safe_dict(self.options, 'watchBalance') snapshot = self.safe_bool(options, 'fetchBalanceSnapshot', True) if snapshot: messageHash = type + ':' + 'fetchBalanceSnapshot' if not (messageHash in client.futures): client.future(messageHash) self.spawn(self.load_balance_snapshot, client, messageHash, type) self.balance[type] = {} # without self comment, transpilation breaks for some reason... async def load_balance_snapshot(self, client, messageHash, type): response = await self.fetch_balance({'type': type}) self.balance[type] = self.extend(response, self.safe_value(self.balance, type, {})) # don't remove the future from the .futures cache future = client.futures[messageHash] future.resolve() client.resolve(self.balance[type], 'balance:' + type) def handle_balance(self, client: Client, message): # # { # "e": "outboundContractAccountInfo", # event type # # outboundContractAccountInfo # "E": "1714717314118", # event time # "T": True, # can trade # "W": True, # can withdraw # "D": True, # can deposit # "B": [ # balances changed # { # "a": "USDT", # asset # "f": "474960.65", # free amount # "l": "24835.178056020383226869", # locked amount # "r": "" # to be released # } # ] # } # event = self.safe_string(message, 'e') data = self.safe_list(message, 'B', []) balanceUpdate = self.safe_dict(data, 0) isSpot = event == 'outboundAccountInfo' type = 'spot' if isSpot else 'swap' if not (type in self.balance): self.balance[type] = {} self.balance[type]['info'] = message currencyId = self.safe_string(balanceUpdate, 'a') code = self.safe_currency_code(currencyId) account = self.account() account['free'] = self.safe_string(balanceUpdate, 'f') account['used'] = self.safe_string(balanceUpdate, 'l') self.balance[type][code] = account self.balance[type] = self.safe_balance(self.balance[type]) messageHash = 'balance:' + type client.resolve(self.balance[type], messageHash) async def authenticate(self, params={}): listenKey = self.safe_string(self.options, 'listenKey') if listenKey is not None: return listenKey response = await self.privatePostApiV1UserDataStream(params) # # { # "listenKey": "atbNEcWnBqnmgkfmYQeTuxKTpTStlZzgoPLJsZhzAOZTbAlxbHqGNWiYaUQzMtDz" # } # listenKey = self.safe_string(response, 'listenKey') self.options['listenKey'] = listenKey listenKeyRefreshRate = self.safe_integer(self.options, 'listenKeyRefreshRate', 3600000) self.delay(listenKeyRefreshRate, self.keep_alive_listen_key, listenKey, params) return listenKey async def keep_alive_listen_key(self, listenKey, params={}): if listenKey is None: return request: dict = { 'listenKey': listenKey, } try: await self.privatePutApiV1UserDataStream(self.extend(request, params)) listenKeyRefreshRate = self.safe_integer(self.options, 'listenKeyRefreshRate', 1200000) self.delay(listenKeyRefreshRate, self.keep_alive_listen_key, listenKey, params) except Exception as error: url = self.get_private_url(listenKey) client = self.client(url) self.options['listenKey'] = None client.reject(error) del self.clients[url] def handle_message(self, client: Client, message): if isinstance(message, list): message = self.safe_dict(message, 0, {}) topic = self.safe_string_2(message, 'topic', 'e') if topic == 'kline': self.handle_ohlcv(client, message) elif topic == 'realtimes': self.handle_ticker(client, message) elif topic == 'trade': self.handle_trades(client, message) elif topic == 'depth': self.handle_order_book(client, message) elif (topic == 'contractExecutionReport') or (topic == 'executionReport'): self.handle_order(client, message) elif topic == 'ticketInfo': self.handle_my_trade(client, message) elif topic == 'outboundContractPositionInfo': self.handle_position(client, message) elif (topic == 'outboundAccountInfo') or (topic == 'outboundContractAccountInfo'): self.handle_balance(client, message)