# -*- coding: utf-8 -*- # PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN: # https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code import ccxt.async_support from ccxt.async_support.base.ws.cache import ArrayCache, ArrayCacheBySymbolById, ArrayCacheBySymbolBySide, ArrayCacheByTimestamp import hashlib from ccxt.base.types import Any, Balances, Bool, Int, Liquidation, Num, Order, OrderBook, OrderSide, OrderType, Position, Str, Strings, Ticker, Tickers, FundingRate, FundingRates, Trade from ccxt.async_support.base.ws.client import Client from typing import List from ccxt.base.errors import ExchangeError from ccxt.base.errors import AuthenticationError from ccxt.base.errors import ArgumentsRequired from ccxt.base.errors import BadRequest from ccxt.base.errors import InvalidNonce from ccxt.base.errors import ChecksumError class okx(ccxt.async_support.okx): def describe(self) -> Any: return self.deep_extend(super(okx, self).describe(), { 'has': { 'ws': True, 'watchTicker': True, 'watchMarkPrice': True, 'watchMarkPrices': True, 'watchTickers': True, 'watchBidsAsks': True, 'watchOrderBook': True, 'watchTrades': True, 'watchTradesForSymbols': True, 'watchOrderBookForSymbols': True, 'watchBalance': True, 'watchLiquidations': 'emulated', 'watchLiquidationsForSymbols': True, 'watchMyLiquidations': 'emulated', 'watchMyLiquidationsForSymbols': True, 'watchOHLCV': True, 'watchOHLCVForSymbols': True, 'watchOrders': True, 'watchMyTrades': True, 'watchPositions': True, 'watchFundingRate': True, 'watchFundingRates': True, 'createOrderWs': True, 'editOrderWs': True, 'cancelOrderWs': True, 'cancelOrdersWs': True, 'cancelAllOrdersWs': True, }, 'urls': { 'api': { 'ws': 'wss://ws.okx.com:8443/ws/v5', }, 'test': { 'ws': 'wss://wspap.okx.com:8443/ws/v5', }, }, 'options': { 'watchOrderBook': { 'checksum': True, # # bbo-tbt # 1. Newly added channel that sends tick-by-tick Level 1 data # 2. All API users can subscribe # 3. Public depth channel, verification not required # # books-l2-tbt # 1. Only users who're VIP5 and above can subscribe # 2. Identity verification required before subscription # # books50-l2-tbt # 1. Only users who're VIP4 and above can subscribe # 2. Identity verification required before subscription # # books # 1. All API users can subscribe # 2. Public depth channel, verification not required # # books5 # 1. All API users can subscribe # 2. Public depth channel, verification not required # 3. Data feeds will be delivered every 100ms(vs. every 200ms now) # 'depth': 'books', }, 'watchBalance': 'spot', # margin, futures, swap 'watchTicker': { 'channel': 'tickers', # tickers, sprd-tickers, index-tickers, block-tickers }, 'watchTickers': { 'channel': 'tickers', # tickers, sprd-tickers, index-tickers, block-tickers }, 'watchOrders': { 'type': 'ANY', # SPOT, MARGIN, SWAP, FUTURES, OPTION, ANY }, 'watchMyTrades': { 'type': 'ANY', # SPOT, MARGIN, SWAP, FUTURES, OPTION, ANY }, 'createOrderWs': { 'op': 'batch-orders', # order, batch-orders }, 'editOrderWs': { 'op': 'amend-order', # amend-order, batch-amend-orders }, 'ws': { # 'inflate': True, }, }, 'streaming': { # okex does not support built-in ws protocol-level ping-pong # instead it requires a custom text-based ping-pong 'ping': self.ping, 'keepAlive': 18000, }, }) def get_url(self, channel: str, access='public'): # for context: https://www.okx.com/help-center/changes-to-v5-api-websocket-subscription-parameter-and-url isSandbox = self.options['sandboxMode'] sandboxSuffix = '?brokerId=9999' if isSandbox else '' isBusiness = (access == 'business') isPublic = (access == 'public') url = self.urls['api']['ws'] if isBusiness or (channel.find('candle') > -1) or (channel == 'orders-algo'): return url + '/business' + sandboxSuffix elif isPublic: return url + '/public' + sandboxSuffix return url + '/private' + sandboxSuffix async def subscribe_multiple(self, access, channel, symbols: Strings = None, params={}): await self.load_markets() if symbols is None: symbols = self.symbols symbols = self.market_symbols(symbols) url = self.get_url(channel, access) messageHashes = [] args = [] for i in range(0, len(symbols)): marketId = self.market_id(symbols[i]) arg: dict = { 'channel': channel, 'instId': marketId, } args.append(self.extend(arg, params)) messageHashes.append(channel + '::' + symbols[i]) request: dict = { 'op': 'subscribe', 'args': args, } return await self.watch_multiple(url, messageHashes, request, messageHashes) async def subscribe(self, access, messageHash, channel, symbol, params={}): await self.load_markets() url = self.get_url(channel, access) firstArgument: dict = { 'channel': channel, } if symbol is not None: market = self.market(symbol) messageHash += ':' + market['id'] firstArgument['instId'] = market['id'] request: dict = { 'op': 'subscribe', 'args': [ self.deep_extend(firstArgument, params), ], } return await self.watch(url, messageHash, request, messageHash) async def watch_trades(self, symbol: str, since: Int = None, limit: Int = None, params={}) -> List[Trade]: """ https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-trades-channel https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-all-trades-channel get the list of most recent trades for a particular symbol :param str symbol: unified symbol of the market to fetch trades for :param int [since]: timestamp in ms of the earliest trade to fetch :param int [limit]: the maximum amount of trades to fetch :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `trade structures ` """ return await self.watch_trades_for_symbols([symbol], since, limit, params) async def watch_trades_for_symbols(self, symbols: List[str], since: Int = None, limit: Int = None, params={}) -> List[Trade]: """ https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-trades-channel https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-all-trades-channel get the list of most recent trades for a particular symbol :param str symbols: :param int [since]: timestamp in ms of the earliest trade to fetch :param int [limit]: the maximum amount of trades to fetch :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.channel]: the channel to subscribe to, trades by default. Can be 'trades' and 'trades-all' :returns dict[]: a list of `trade structures ` """ symbolsLength = len(symbols) if symbolsLength == 0: raise ArgumentsRequired(self.id + ' watchTradesForSymbols() requires a non-empty array of symbols') await self.load_markets() symbols = self.market_symbols(symbols) channel = None channel, params = self.handle_option_and_params(params, 'watchTrades', 'channel', 'trades') topics = [] messageHashes = [] for i in range(0, len(symbols)): symbol = symbols[i] messageHashes.append(channel + ':' + symbol) marketId = self.market_id(symbol) topic: dict = { 'channel': channel, 'instId': marketId, } topics.append(topic) request: dict = { 'op': 'subscribe', 'args': topics, } access = 'public' if channel == 'trades-all': access = 'business' await self.authenticate({'access': access}) url = self.get_url(channel, access) trades = await self.watch_multiple(url, messageHashes, request, messageHashes) if self.newUpdates: first = self.safe_value(trades, 0) tradeSymbol = self.safe_string(first, 'symbol') limit = trades.getLimit(tradeSymbol, limit) return self.filter_by_since_limit(trades, since, limit, 'timestamp', True) async def un_watch_trades_for_symbols(self, symbols: List[str], params={}) -> Any: """ unWatches from the stream channel :param str[] symbols: :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.channel]: the channel to subscribe to, trades by default. Can be trades, trades-all :returns dict[]: a list of `trade structures ` """ await self.load_markets() symbols = self.market_symbols(symbols, None, False) channel = None channel, params = self.handle_option_and_params(params, 'watchTrades', 'channel', 'trades') topics = [] messageHashes = [] for i in range(0, len(symbols)): symbol = symbols[i] messageHashes.append('unsubscribe:' + channel + symbol) marketId = self.market_id(symbol) topic: dict = { 'channel': channel, 'instId': marketId, } topics.append(topic) request: dict = { 'op': 'unsubscribe', 'args': topics, } access = 'public' if channel == 'trades-all': access = 'business' await self.authenticate({'access': access}) url = self.get_url(channel, access) return await self.watch_multiple(url, messageHashes, request, messageHashes) async def un_watch_trades(self, symbol: str, params={}) -> Any: """ unWatches from the stream channel :param str symbol: unified symbol of the market to fetch trades for :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `trade structures ` """ return await self.un_watch_trades_for_symbols([symbol], params) def handle_trades(self, client: Client, message): # # { # "arg": {channel: "trades", instId: "BTC-USDT"}, # "data": [ # { # "instId": "BTC-USDT", # "tradeId": "216970876", # "px": "31684.5", # "sz": "0.00001186", # "side": "buy", # "ts": "1626531038288" # } # ] # } # { # "arg": { # "channel": "trades-all", # "instId": "BTC-USDT" # }, # "data": [ # { # "instId": "BTC-USDT", # "tradeId": "130639474", # "px": "42219.9", # "sz": "0.12060306", # "side": "buy", # "source": "0", # "ts": "1630048897897" # } # ] # } # arg = self.safe_value(message, 'arg', {}) channel = self.safe_string(arg, 'channel') marketId = self.safe_string(arg, 'instId') symbol = self.safe_symbol(marketId) data = self.safe_value(message, 'data', []) tradesLimit = self.safe_integer(self.options, 'tradesLimit', 1000) for i in range(0, len(data)): trade = self.parse_trade(data[i]) messageHash = channel + ':' + symbol stored = self.safe_value(self.trades, symbol) if stored is None: stored = ArrayCache(tradesLimit) self.trades[symbol] = stored stored.append(trade) client.resolve(stored, messageHash) async def watch_funding_rate(self, symbol: str, params={}) -> FundingRate: """ watch the current funding rate https://www.okx.com/docs-v5/en/#public-data-websocket-funding-rate-channel :param str symbol: unified market symbol :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: a `funding rate structure ` """ symbol = self.symbol(symbol) fr = await self.watch_funding_rates([symbol], params) return fr[symbol] async def watch_funding_rates(self, symbols: List[str], params={}) -> FundingRates: """ watch the funding rate for multiple markets https://www.okx.com/docs-v5/en/#public-data-websocket-funding-rate-channel :param str[] symbols: list of unified market symbols :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: a dictionary of `funding rates structures `, indexe by market symbols """ await self.load_markets() symbols = self.market_symbols(symbols) channel = 'funding-rate' topics = [] messageHashes = [] for i in range(0, len(symbols)): symbol = symbols[i] messageHashes.append(channel + ':' + symbol) marketId = self.market_id(symbol) topic: dict = { 'channel': channel, 'instId': marketId, } topics.append(topic) request: dict = { 'op': 'subscribe', 'args': topics, } url = self.get_url(channel, 'public') fundingRate = await self.watch_multiple(url, messageHashes, request, messageHashes) if self.newUpdates: symbol = self.safe_string(fundingRate, 'symbol') result: dict = {} result[symbol] = fundingRate return result return self.filter_by_array(self.fundingRates, 'symbol', symbols) def handle_funding_rate(self, client: Client, message): # # "data":[ # { # "fundingRate":"0.0001875391284828", # "fundingTime":"1700726400000", # "instId":"BTC-USD-SWAP", # "instType":"SWAP", # "method": "next_period", # "maxFundingRate":"0.00375", # "minFundingRate":"-0.00375", # "nextFundingRate":"0.0002608059239328", # "nextFundingTime":"1700755200000", # "premium": "0.0001233824646391", # "settFundingRate":"0.0001699799259033", # "settState":"settled", # "ts":"1700724675402" # } # ] # data = self.safe_list(message, 'data', []) for i in range(0, len(data)): rawfr = data[i] fundingRate = self.parse_funding_rate(rawfr) symbol = fundingRate['symbol'] self.fundingRates[symbol] = fundingRate client.resolve(fundingRate, 'funding-rate' + ':' + fundingRate['symbol']) async def watch_ticker(self, symbol: str, params={}) -> Ticker: """ https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-tickers-channel watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market :param str symbol: unified symbol of the market to fetch the ticker for :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.channel]: the channel to subscribe to, tickers by default. Can be tickers, sprd-tickers, index-tickers, block-tickers :returns dict: a `ticker structure ` """ channel = None channel, params = self.handle_option_and_params(params, 'watchTicker', 'channel', 'tickers') params['channel'] = channel market = self.market(symbol) symbol = market['symbol'] ticker = await self.watch_tickers([symbol], params) return self.safe_value(ticker, symbol) async def un_watch_ticker(self, symbol: str, params={}) -> Any: """ https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-tickers-channel unWatches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market :param str symbol: unified symbol of the market to fetch the ticker for :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.channel]: the channel to subscribe to, tickers by default. Can be tickers, sprd-tickers, index-tickers, block-tickers :returns dict: a `ticker structure ` """ return await self.un_watch_tickers([symbol], params) async def watch_tickers(self, symbols: Strings = None, params={}) -> Tickers: """ https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-tickers-channel watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for all markets of a specific list :param str[] [symbols]: unified symbol of the market to fetch the ticker for :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.channel]: the channel to subscribe to, tickers by default. Can be tickers, sprd-tickers, index-tickers, block-tickers :returns dict: a `ticker structure ` """ await self.load_markets() symbols = self.market_symbols(symbols, None, False) channel = None channel, params = self.handle_option_and_params(params, 'watchTickers', 'channel', 'tickers') newTickers = await self.subscribe_multiple('public', channel, symbols, params) if self.newUpdates: return newTickers return self.filter_by_array(self.tickers, 'symbol', symbols) async def watch_mark_price(self, symbol: str, params={}) -> Ticker: """ https://www.okx.com/docs-v5/en/#public-data-websocket-mark-price-channel watches a mark price :param str symbol: unified symbol of the market to fetch the ticker for :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.channel]: the channel to subscribe to, tickers by default. Can be tickers, sprd-tickers, index-tickers, block-tickers :returns dict: a `ticker structure ` """ channel = None channel, params = self.handle_option_and_params(params, 'watchMarkPrice', 'channel', 'mark-price') params['channel'] = channel market = self.market(symbol) symbol = market['symbol'] ticker = await self.watch_mark_prices([symbol], params) return ticker[symbol] async def watch_mark_prices(self, symbols: Strings = None, params={}) -> Tickers: """ https://www.okx.com/docs-v5/en/#public-data-websocket-mark-price-channel watches mark prices :param str[] [symbols]: unified symbol of the market to fetch the ticker for :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.channel]: the channel to subscribe to, tickers by default. Can be tickers, sprd-tickers, index-tickers, block-tickers :returns dict: a `ticker structure ` """ await self.load_markets() symbols = self.market_symbols(symbols, None, False) channel = None channel, params = self.handle_option_and_params(params, 'watchMarkPrices', 'channel', 'mark-price') newTickers = await self.subscribe_multiple('public', channel, symbols, params) if self.newUpdates: return newTickers return self.filter_by_array(self.tickers, 'symbol', symbols) async def un_watch_tickers(self, symbols: Strings = None, params={}) -> Any: """ https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-tickers-channel unWatches a price ticker, a statistical calculation with the information calculated over the past 24 hours for all markets of a specific list :param str[] [symbols]: unified symbol of the market to fetch the ticker for :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.channel]: the channel to subscribe to, tickers by default. Can be tickers, sprd-tickers, index-tickers, block-tickers :returns dict: a `ticker structure ` """ await self.load_markets() symbols = self.market_symbols(symbols, None, False) channel = None channel, params = self.handle_option_and_params(params, 'watchTickers', 'channel', 'tickers') topics = [] messageHashes = [] for i in range(0, len(symbols)): symbol = symbols[i] messageHashes.append('unsubscribe:ticker:' + symbol) marketId = self.market_id(symbol) topic: dict = { 'channel': channel, 'instId': marketId, } topics.append(topic) request: dict = { 'op': 'unsubscribe', 'args': topics, } url = self.get_url(channel, 'public') return await self.watch_multiple(url, messageHashes, request, messageHashes) def handle_ticker(self, client: Client, message): # # { # "arg": {channel: "tickers", instId: "BTC-USDT"}, # "data": [ # { # "instType": "SPOT", # "instId": "BTC-USDT", # "last": "31500.1", # "lastSz": "0.00001754", # "askPx": "31500.1", # "askSz": "0.00998144", # "bidPx": "31500", # "bidSz": "3.05652439", # "open24h": "31697", # "high24h": "32248", # "low24h": "31165.6", # "sodUtc0": "31385.5", # "sodUtc8": "32134.9", # "volCcy24h": "503403597.38138519", # "vol24h": "15937.10781721", # "ts": "1626526618762" # } # ] # } # self.handle_bid_ask(client, message) arg = self.safe_value(message, 'arg', {}) marketId = self.safe_string(arg, 'instId') market = self.safe_market(marketId, None, '-') symbol = market['symbol'] channel = self.safe_string(arg, 'channel') data = self.safe_value(message, 'data', []) newTickers: dict = {} for i in range(0, len(data)): ticker = self.parse_ticker(data[i]) self.tickers[symbol] = ticker newTickers[symbol] = ticker messageHash = channel + '::' + symbol client.resolve(newTickers, messageHash) async def watch_bids_asks(self, symbols: Strings = None, params={}) -> Tickers: """ https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-tickers-channel watches best bid & ask for symbols :param str[] symbols: unified symbol of the market to fetch the ticker for :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: a `ticker structure ` """ await self.load_markets() symbols = self.market_symbols(symbols, None, False) channel = None channel, params = self.handle_option_and_params(params, 'watchBidsAsks', 'channel', 'tickers') url = self.get_url(channel, 'public') messageHashes = [] args = [] for i in range(0, len(symbols)): marketId = self.market_id(symbols[i]) arg: dict = { 'channel': channel, 'instId': marketId, } args.append(self.extend(arg, params)) messageHashes.append('bidask::' + symbols[i]) request: dict = { 'op': 'subscribe', 'args': args, } newTickers = await self.watch_multiple(url, messageHashes, request, messageHashes) if self.newUpdates: tickers: dict = {} tickers[newTickers['symbol']] = newTickers return tickers return self.filter_by_array(self.bidsasks, 'symbol', symbols) def handle_bid_ask(self, client: Client, message): # # { # "arg": {channel: "tickers", instId: "BTC-USDT"}, # "data": [ # { # "instType": "SPOT", # "instId": "BTC-USDT", # "last": "31500.1", # "lastSz": "0.00001754", # "askPx": "31500.1", # "askSz": "0.00998144", # "bidPx": "31500", # "bidSz": "3.05652439", # "open24h": "31697", # "high24h": "32248", # "low24h": "31165.6", # "sodUtc0": "31385.5", # "sodUtc8": "32134.9", # "volCcy24h": "503403597.38138519", # "vol24h": "15937.10781721", # "ts": "1626526618762" # } # ] # } # data = self.safe_list(message, 'data', []) ticker = self.safe_dict(data, 0, {}) parsedTicker = self.parse_ws_bid_ask(ticker) symbol = parsedTicker['symbol'] self.bidsasks[symbol] = parsedTicker messageHash = 'bidask::' + symbol client.resolve(parsedTicker, messageHash) def parse_ws_bid_ask(self, ticker, market=None): marketId = self.safe_string(ticker, 'instId') market = self.safe_market(marketId, market) symbol = self.safe_string(market, 'symbol') timestamp = self.safe_integer(ticker, 'ts') return self.safe_ticker({ 'symbol': symbol, 'timestamp': timestamp, 'datetime': self.iso8601(timestamp), 'ask': self.safe_string(ticker, 'askPx'), 'askVolume': self.safe_string(ticker, 'askSz'), 'bid': self.safe_string(ticker, 'bidPx'), 'bidVolume': self.safe_string(ticker, 'bidSz'), 'info': ticker, }, market) async def watch_liquidations_for_symbols(self, symbols: List[str], since: Int = None, limit: Int = None, params={}) -> List[Liquidation]: """ watch the public liquidations of a trading pair https://www.okx.com/docs-v5/en/#public-data-websocket-liquidation-orders-channel :param str symbols: :param int [since]: the earliest time in ms to fetch liquidations for :param int [limit]: the maximum number of liquidation structures to retrieve :param dict [params]: exchange specific parameters for the okx api endpoint :returns dict: an array of `liquidation structures ` """ await self.load_markets() symbols = self.market_symbols(symbols, None, True, True) messageHash = 'liquidations' messageHashes = [] if symbols is not None: for i in range(0, len(symbols)): symbol = symbols[i] messageHashes.append(messageHash + '::' + symbol) else: messageHashes.append(messageHash) market = self.get_market_from_symbols(symbols) type = None type, params = self.handle_market_type_and_params('watchliquidationsForSymbols', market, params) channel = 'liquidation-orders' if type == 'spot': type = 'SWAP' elif type == 'future': type = 'futures' uppercaseType = type.upper() request = { 'op': 'subscribe', 'args': [ { 'channel': channel, 'instType': uppercaseType, }, ], } url = self.get_url(channel, 'public') newLiquidations = await self.watch_multiple(url, messageHashes, request, messageHashes) if self.newUpdates: return newLiquidations return self.filter_by_symbols_since_limit(self.liquidations, symbols, since, limit, True) def handle_liquidation(self, client: Client, message): # # { # "arg": { # "channel": "liquidation-orders", # "instType": "SWAP" # }, # "data": [ # { # "details": [ # { # "bkLoss": "0", # "bkPx": "0.007831", # "ccy": "", # "posSide": "short", # "side": "buy", # "sz": "13", # "ts": "1692266434010" # } # ], # "instFamily": "IOST-USDT", # "instId": "IOST-USDT-SWAP", # "instType": "SWAP", # "uly": "IOST-USDT" # } # ] # } # rawLiquidations = self.safe_list(message, 'data', []) for i in range(0, len(rawLiquidations)): rawLiquidation = rawLiquidations[i] liquidation = self.parse_ws_liquidation(rawLiquidation) symbol = self.safe_string(liquidation, 'symbol') liquidations = self.safe_value(self.liquidations, symbol) if liquidations is None: limit = self.safe_integer(self.options, 'liquidationsLimit', 1000) liquidations = ArrayCache(limit) liquidations.append(liquidation) self.liquidations[symbol] = liquidations client.resolve([liquidation], 'liquidations') client.resolve([liquidation], 'liquidations::' + symbol) async def watch_my_liquidations_for_symbols(self, symbols: List[str], since: Int = None, limit: Int = None, params={}) -> List[Liquidation]: """ watch the private liquidations of a trading pair https://www.okx.com/docs-v5/en/#trading-account-websocket-balance-and-position-channel :param str[] symbols: :param int [since]: the earliest time in ms to fetch liquidations for :param int [limit]: the maximum number of liquidation structures to retrieve :param dict [params]: exchange specific parameters for the okx api endpoint :returns dict: an array of `liquidation structures ` """ await self.load_markets() isTrigger = self.safe_value_2(params, 'stop', 'trigger', False) params = self.omit(params, ['stop', 'trigger']) await self.authenticate({'access': 'business' if isTrigger else 'private'}) symbols = self.market_symbols(symbols, None, True, True) messageHash = 'myLiquidations' messageHashes = [] if symbols is not None: for i in range(0, len(symbols)): symbol = symbols[i] messageHashes.append(messageHash + '::' + symbol) else: messageHashes.append(messageHash) channel = 'balance_and_position' request: dict = { 'op': 'subscribe', 'args': [ { 'channel': channel, }, ], } url = self.get_url(channel, 'private') newLiquidations = await self.watch_multiple(url, messageHashes, self.deep_extend(request, params), messageHashes) if self.newUpdates: return newLiquidations return self.filter_by_symbols_since_limit(self.liquidations, symbols, since, limit, True) def handle_my_liquidation(self, client: Client, message): # # { # "arg": { # "channel": "balance_and_position", # "uid": "77982378738415879" # }, # "data": [{ # "pTime": "1597026383085", # "eventType": "snapshot", # "balData": [{ # "ccy": "BTC", # "cashBal": "1", # "uTime": "1597026383085" # }], # "posData": [{ # "posId": "1111111111", # "tradeId": "2", # "instId": "BTC-USD-191018", # "instType": "FUTURES", # "mgnMode": "cross", # "posSide": "long", # "pos": "10", # "ccy": "BTC", # "posCcy": "", # "avgPx": "3320", # "uTIme": "1597026383085" # }], # "trades": [{ # "instId": "BTC-USD-191018", # "tradeId": "2", # }] # }] # } # rawLiquidations = self.safe_list(message, 'data', []) for i in range(0, len(rawLiquidations)): rawLiquidation = rawLiquidations[i] eventType = self.safe_string(rawLiquidation, 'eventType') if eventType != 'liquidation': return liquidation = self.parse_ws_my_liquidation(rawLiquidation) symbol = self.safe_string(liquidation, 'symbol') liquidations = self.safe_value(self.liquidations, symbol) if liquidations is None: limit = self.safe_integer(self.options, 'myLiquidationsLimit', 1000) liquidations = ArrayCache(limit) liquidations.append(liquidation) self.liquidations[symbol] = liquidations client.resolve([liquidation], 'myLiquidations') client.resolve([liquidation], 'myLiquidations::' + symbol) def parse_ws_my_liquidation(self, liquidation, market=None): # # { # "pTime": "1597026383085", # "eventType": "snapshot", # "balData": [{ # "ccy": "BTC", # "cashBal": "1", # "uTime": "1597026383085" # }], # "posData": [{ # "posId": "1111111111", # "tradeId": "2", # "instId": "BTC-USD-191018", # "instType": "FUTURES", # "mgnMode": "cross", # "posSide": "long", # "pos": "10", # "ccy": "BTC", # "posCcy": "", # "avgPx": "3320", # "uTIme": "1597026383085" # }], # "trades": [{ # "instId": "BTC-USD-191018", # "tradeId": "2", # }] # } # posData = self.safe_list(liquidation, 'posData', []) firstPosData = self.safe_dict(posData, 0, {}) marketId = self.safe_string(firstPosData, 'instId') market = self.safe_market(marketId, market) timestamp = self.safe_integer(firstPosData, 'uTIme') return self.safe_liquidation({ 'info': liquidation, 'symbol': self.safe_symbol(marketId, market), 'contracts': self.safe_number(firstPosData, 'pos'), 'contractSize': self.safe_number(market, 'contractSize'), 'price': self.safe_number(liquidation, 'avgPx'), 'baseValue': None, 'quoteValue': None, 'timestamp': timestamp, 'datetime': self.iso8601(timestamp), }) def parse_ws_liquidation(self, liquidation, market=None): # # public liquidation # { # "details": [ # { # "bkLoss": "0", # "bkPx": "0.007831", # "ccy": "", # "posSide": "short", # "side": "buy", # "sz": "13", # "ts": "1692266434010" # } # ], # "instFamily": "IOST-USDT", # "instId": "IOST-USDT-SWAP", # "instType": "SWAP", # "uly": "IOST-USDT" # } # details = self.safe_list(liquidation, 'details', []) liquidationDetails = self.safe_dict(details, 0, {}) marketId = self.safe_string(liquidation, 'instId') market = self.safe_market(marketId, market) timestamp = self.safe_integer(liquidationDetails, 'ts') return self.safe_liquidation({ 'info': liquidation, 'symbol': self.safe_symbol(marketId, market), 'contracts': self.safe_number(liquidationDetails, 'sz'), 'contractSize': self.safe_number(market, 'contractSize'), 'price': self.safe_number(liquidationDetails, 'bkPx'), 'side': self.safe_string(liquidationDetails, 'side'), 'baseValue': None, 'quoteValue': None, 'timestamp': timestamp, 'datetime': self.iso8601(timestamp), }) async def watch_ohlcv(self, symbol: str, timeframe: str = '1m', since: Int = None, limit: Int = None, params={}) -> List[list]: """ watches historical candlestick data containing the open, high, low, and close price, and the volume of a market :param str symbol: unified symbol of the market to fetch OHLCV data for :param str timeframe: the length of time each candle represents :param int [since]: timestamp in ms of the earliest candle to fetch :param int [limit]: the maximum amount of candles to fetch :param dict [params]: extra parameters specific to the exchange API endpoint :returns int[][]: A list of candles ordered, open, high, low, close, volume """ await self.load_markets() symbol = self.symbol(symbol) interval = self.safe_string(self.timeframes, timeframe, timeframe) name = 'candle' + interval ohlcv = await self.subscribe('public', name, name, symbol, params) if self.newUpdates: limit = ohlcv.getLimit(symbol, limit) return self.filter_by_since_limit(ohlcv, since, limit, 0, True) async def un_watch_ohlcv(self, symbol: str, timeframe: str = '1m', params={}) -> Any: """ watches historical candlestick data containing the open, high, low, and close price, and the volume of a market :param str symbol: unified symbol of the market to fetch OHLCV data for :param str timeframe: the length of time each candle represents :param dict [params]: extra parameters specific to the exchange API endpoint :returns int[][]: A list of candles ordered, open, high, low, close, volume """ await self.load_markets() return await self.un_watch_ohlcv_for_symbols([[symbol, timeframe]], params) async def watch_ohlcv_for_symbols(self, symbolsAndTimeframes: List[List[str]], since: Int = None, limit: Int = None, params={}): """ watches historical candlestick data containing the open, high, low, and close price, and the volume of a market :param str[][] symbolsAndTimeframes: array of arrays containing unified symbols and timeframes to fetch OHLCV data for, example [['BTC/USDT', '1m'], ['LTC/USDT', '5m']] :param int [since]: timestamp in ms of the earliest candle to fetch :param int [limit]: the maximum amount of candles to fetch :param dict [params]: extra parameters specific to the exchange API endpoint :returns int[][]: A list of candles ordered, open, high, low, close, volume """ symbolsLength = len(symbolsAndTimeframes) if symbolsLength == 0 or not isinstance(symbolsAndTimeframes[0], list): raise ArgumentsRequired(self.id + " watchOHLCVForSymbols() requires a an array of symbols and timeframes, like [['BTC/USDT', '1m'], ['LTC/USDT', '5m']]") await self.load_markets() topics = [] messageHashes = [] for i in range(0, len(symbolsAndTimeframes)): symbolAndTimeframe = symbolsAndTimeframes[i] sym = symbolAndTimeframe[0] tf = symbolAndTimeframe[1] marketId = self.market_id(sym) interval = self.safe_string(self.timeframes, tf, tf) channel = 'candle' + interval topic: dict = { 'channel': channel, 'instId': marketId, } topics.append(topic) messageHashes.append('multi:' + channel + ':' + sym) request: dict = { 'op': 'subscribe', 'args': topics, } url = self.get_url('candle', 'public') symbol, timeframe, candles = await self.watch_multiple(url, messageHashes, request, messageHashes) if self.newUpdates: limit = candles.getLimit(symbol, limit) filtered = self.filter_by_since_limit(candles, since, limit, 0, True) return self.create_ohlcv_object(symbol, timeframe, filtered) async def un_watch_ohlcv_for_symbols(self, symbolsAndTimeframes: List[List[str]], params={}) -> Any: """ unWatches historical candlestick data containing the open, high, low, and close price, and the volume of a market :param str[][] symbolsAndTimeframes: array of arrays containing unified symbols and timeframes to fetch OHLCV data for, example [['BTC/USDT', '1m'], ['LTC/USDT', '5m']] :param dict [params]: extra parameters specific to the exchange API endpoint :returns int[][]: A list of candles ordered, open, high, low, close, volume """ symbolsLength = len(symbolsAndTimeframes) if symbolsLength == 0 or not isinstance(symbolsAndTimeframes[0], list): raise ArgumentsRequired(self.id + " watchOHLCVForSymbols() requires a an array of symbols and timeframes, like [['BTC/USDT', '1m'], ['LTC/USDT', '5m']]") await self.load_markets() topics = [] messageHashes = [] for i in range(0, len(symbolsAndTimeframes)): symbolAndTimeframe = symbolsAndTimeframes[i] sym = symbolAndTimeframe[0] tf = symbolAndTimeframe[1] marketId = self.market_id(sym) interval = self.safe_string(self.timeframes, tf, tf) channel = 'candle' + interval topic: dict = { 'channel': channel, 'instId': marketId, } topics.append(topic) messageHashes.append('unsubscribe:multi:' + channel + ':' + sym) request: dict = { 'op': 'unsubscribe', 'args': topics, } url = self.get_url('candle', 'public') return await self.watch_multiple(url, messageHashes, request, messageHashes) def handle_ohlcv(self, client: Client, message): # # { # "arg": {channel: "candle1m", instId: "BTC-USDT"}, # "data": [ # [ # "1626690720000", # "31334", # "31334", # "31334", # "31334", # "0.0077", # "241.2718" # ] # ] # } # arg = self.safe_value(message, 'arg', {}) channel = self.safe_string(arg, 'channel') data = self.safe_value(message, 'data', []) marketId = self.safe_string(arg, 'instId') market = self.safe_market(marketId) symbol = market['symbol'] interval = channel.replace('candle', '') # use a reverse lookup in a static map instead timeframe = self.find_timeframe(interval) for i in range(0, len(data)): parsed = self.parse_ohlcv(data[i], market) self.ohlcvs[symbol] = self.safe_value(self.ohlcvs, symbol, {}) stored = self.safe_value(self.ohlcvs[symbol], timeframe) if stored is None: limit = self.safe_integer(self.options, 'OHLCVLimit', 1000) stored = ArrayCacheByTimestamp(limit) self.ohlcvs[symbol][timeframe] = stored stored.append(parsed) messageHash = channel + ':' + market['id'] client.resolve(stored, messageHash) # for multiOHLCV we need special object, to other "multi" # methods, because OHLCV response item does not contain symbol # or timeframe, thus otherwise it would be unrecognizable messageHashForMulti = 'multi:' + channel + ':' + symbol client.resolve([symbol, timeframe, stored], messageHashForMulti) async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook: """ https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-order-book-channel watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data :param str symbol: unified symbol of the market to fetch the order book for :param int [limit]: the maximum amount of order book entries to return :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.depth]: okx order book depth, can be books, books5, books-l2-tbt, books50-l2-tbt, bbo-tbt :returns dict: A dictionary of `order book structures ` indexed by market symbols """ # # bbo-tbt # 1. Newly added channel that sends tick-by-tick Level 1 data # 2. All API users can subscribe # 3. Public depth channel, verification not required # # books-l2-tbt # 1. Only users who're VIP5 and above can subscribe # 2. Identity verification required before subscription # # books50-l2-tbt # 1. Only users who're VIP4 and above can subscribe # 2. Identity verification required before subscription # # books # 1. All API users can subscribe # 2. Public depth channel, verification not required # # books5 # 1. All API users can subscribe # 2. Public depth channel, verification not required # 3. Data feeds will be delivered every 100ms(vs. every 200ms now) # return await self.watch_order_book_for_symbols([symbol], limit, params) async def watch_order_book_for_symbols(self, symbols: List[str], limit: Int = None, params={}) -> OrderBook: """ https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-order-book-channel watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data :param str[] symbols: unified array of symbols :param int [limit]: 1,5, 400, 50(l2-tbt, vip4+) or 40000(vip5+) the maximum amount of order book entries to return :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.depth]: okx order book depth, can be books, books5, books-l2-tbt, books50-l2-tbt, bbo-tbt :returns dict: A dictionary of `order book structures ` indexed by market symbols """ await self.load_markets() symbols = self.market_symbols(symbols) depth = None depth, params = self.handle_option_and_params(params, 'watchOrderBook', 'depth', 'books') if limit is not None: if limit == 1: depth = 'bbo-tbt' elif limit > 1 and limit <= 5: depth = 'books5' elif limit == 50: depth = 'books50-l2-tbt' # Make sure you have VIP4 and above elif limit == 400: depth = 'books' if (depth == 'books-l2-tbt') or (depth == 'books50-l2-tbt'): if not self.check_required_credentials(False): raise AuthenticationError(self.id + ' watchOrderBook/watchOrderBookForSymbols requires authentication for self depth. Add credentials or change the depth option to books or books5') await self.authenticate({'access': 'public'}) topics = [] messageHashes = [] for i in range(0, len(symbols)): symbol = symbols[i] messageHashes.append(depth + ':' + symbol) marketId = self.market_id(symbol) topic: dict = { 'channel': depth, 'instId': marketId, } topics.append(topic) request: dict = { 'op': 'subscribe', 'args': topics, } url = self.get_url(depth, 'public') orderbook = await self.watch_multiple(url, messageHashes, request, messageHashes) return orderbook.limit() async def un_watch_order_book_for_symbols(self, symbols: List[str], params={}) -> Any: """ https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-order-book-channel unWatches information on open orders with bid(buy) and ask(sell) prices, volumes and other data :param str[] symbols: unified array of symbols :param dict [params]: extra parameters specific to the exchange API endpoint :param int [params.limit]: the maximum amount of order book entries to return :param str [params.depth]: okx order book depth, can be books, books5, books-l2-tbt, books50-l2-tbt, bbo-tbt :returns dict: A dictionary of `order book structures ` indexed by market symbols """ await self.load_markets() symbols = self.market_symbols(symbols, None, False) depth = None depth, params = self.handle_option_and_params(params, 'watchOrderBook', 'depth', 'books') limit = self.safe_integer(params, 'limit') if limit is not None: if limit == 1: depth = 'bbo-tbt' elif limit > 1 and limit <= 5: depth = 'books5' elif limit == 50: depth = 'books50-l2-tbt' # Make sure you have VIP4 and above elif limit == 400: depth = 'books' topics = [] subMessageHashes = [] messageHashes = [] for i in range(0, len(symbols)): symbol = symbols[i] subMessageHashes.append(depth + ':' + symbol) messageHashes.append('unsubscribe:orderbook:' + symbol) marketId = self.market_id(symbol) topic: dict = { 'channel': depth, 'instId': marketId, } topics.append(topic) request: dict = { 'op': 'unsubscribe', 'args': topics, } url = self.get_url(depth, 'public') return await self.watch_multiple(url, messageHashes, request, messageHashes) async def un_watch_order_book(self, symbol: str, params={}) -> Any: """ https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-order-book-channel unWatches information on open orders with bid(buy) and ask(sell) prices, volumes and other data :param str symbol: unified array of symbols :param dict [params]: extra parameters specific to the exchange API endpoint :param int [params.limit]: the maximum amount of order book entries to return :param str [params.depth]: okx order book depth, can be books, books5, books-l2-tbt, books50-l2-tbt, bbo-tbt :returns dict: A dictionary of `order book structures ` indexed by market symbols """ return await self.un_watch_order_book_for_symbols([symbol], params) def handle_delta(self, bookside, delta): # # [ # "31685", # price # "0.78069158", # amount # "0", # liquidated orders # "17" # orders # ] # price = self.safe_float(delta, 0) amount = self.safe_float(delta, 1) bookside.store(price, amount) def handle_deltas(self, bookside, deltas): for i in range(0, len(deltas)): self.handle_delta(bookside, deltas[i]) def handle_order_book_message(self, client: Client, message, orderbook, messageHash, market=None): # # { # "asks": [ # ['31738.3', '0.05973179', "0", "3"], # ['31738.5', '0.11035404', "0", "2"], # ['31739.6', '0.01', "0", "1"], # ], # "bids": [ # ['31738.2', '0.67557666', "0", "9"], # ['31738', '0.02466947', "0", "2"], # ['31736.3', '0.01705046', "0", "2"], # ], # "instId": "BTC-USDT", # "ts": "1626537446491" # "checksum": -855196043, # "prevSeqId": 123456, # "seqId": 123457 # } # asks = self.safe_value(message, 'asks', []) bids = self.safe_value(message, 'bids', []) storedAsks = orderbook['asks'] storedBids = orderbook['bids'] self.handle_deltas(storedAsks, asks) self.handle_deltas(storedBids, bids) marketId = self.safe_string(message, 'instId') symbol = self.safe_symbol(marketId, market) checksum = self.handle_option('watchOrderBook', 'checksum', True) seqId = self.safe_integer(message, 'seqId') if checksum: prevSeqId = self.safe_integer(message, 'prevSeqId') nonce = orderbook['nonce'] asksLength = len(storedAsks) bidsLength = len(storedBids) payloadArray = [] for i in range(0, 25): if i < bidsLength: payloadArray.append(self.number_to_string(storedBids[i][0])) payloadArray.append(self.number_to_string(storedBids[i][1])) if i < asksLength: payloadArray.append(self.number_to_string(storedAsks[i][0])) payloadArray.append(self.number_to_string(storedAsks[i][1])) payload = ':'.join(payloadArray) responseChecksum = self.safe_integer(message, 'checksum') localChecksum = self.crc32(payload, True) error = None if prevSeqId != -1 and nonce != prevSeqId: error = InvalidNonce(self.id + ' watchOrderBook received invalid nonce') if responseChecksum != localChecksum: error = ChecksumError(self.id + ' ' + self.orderbook_checksum_message(symbol)) if error is not None: del client.subscriptions[messageHash] if symbol is not None: del self.orderbooks[symbol] client.reject(error, messageHash) timestamp = self.safe_integer(message, 'ts') orderbook['nonce'] = seqId orderbook['timestamp'] = timestamp orderbook['datetime'] = self.iso8601(timestamp) return orderbook def handle_order_book(self, client: Client, message): # # snapshot # # { # "arg": {channel: 'books-l2-tbt', instId: "BTC-USDT"}, # "action": "snapshot", # "data": [ # { # "asks": [ # ['31685', '0.78069158', "0", "17"], # ['31685.1', '0.0001', "0", "1"], # ['31685.6', '0.04543165', "0", "1"], # ], # "bids": [ # ['31684.9', '0.01', "0", "1"], # ['31682.9', '0.0001', "0", "1"], # ['31680.7', '0.01', "0", "1"], # ], # "ts": "1626532416403", # "checksum": -1023440116 # } # ] # } # # update # # { # "arg": {channel: 'books-l2-tbt', instId: "BTC-USDT"}, # "action": "update", # "data": [ # { # "asks": [ # ['31657.7', '0', "0", "0"], # ['31659.7', '0.01', "0", "1"], # ['31987.3', '0.01', "0", "1"] # ], # "bids": [ # ['31642.9', '0.50296385', "0", "4"], # ['31639.9', '0', "0", "0"], # ['31638.7', '0.01', "0", "1"], # ], # "ts": "1626535709008", # "checksum": 830931827 # } # ] # } # # books5 # # { # "arg": {channel: "books5", instId: "BTC-USDT"}, # "data": [ # { # "asks": [ # ['31738.3', '0.05973179', "0", "3"], # ['31738.5', '0.11035404', "0", "2"], # ['31739.6', '0.01', "0", "1"], # ], # "bids": [ # ['31738.2', '0.67557666', "0", "9"], # ['31738', '0.02466947', "0", "2"], # ['31736.3', '0.01705046', "0", "2"], # ], # "instId": "BTC-USDT", # "ts": "1626537446491" # } # ] # } # # bbo-tbt # # { # "arg":{ # "channel":"bbo-tbt", # "instId":"BTC-USDT" # }, # "data":[ # { # "asks":[["36232.2","1.8826134","0","17"]], # "bids":[["36232.1","0.00572212","0","2"]], # "ts":"1651826598363" # } # ] # } # arg = self.safe_dict(message, 'arg', {}) channel = self.safe_string(arg, 'channel') action = self.safe_string(message, 'action') data = self.safe_list(message, 'data', []) marketId = self.safe_string(arg, 'instId') market = self.safe_market(marketId) symbol = market['symbol'] depths: dict = { 'bbo-tbt': 1, 'books': 400, 'books5': 5, 'books-l2-tbt': 400, 'books50-l2-tbt': 50, } limit = self.safe_integer(depths, channel) messageHash = channel + ':' + symbol if action == 'snapshot': for i in range(0, len(data)): update = data[i] orderbook = self.order_book({}, limit) self.orderbooks[symbol] = orderbook orderbook['symbol'] = symbol self.handle_order_book_message(client, update, orderbook, messageHash) client.resolve(orderbook, messageHash) elif action == 'update': if symbol in self.orderbooks: orderbook = self.orderbooks[symbol] for i in range(0, len(data)): update = data[i] self.handle_order_book_message(client, update, orderbook, messageHash, market) client.resolve(orderbook, messageHash) elif (channel == 'books5') or (channel == 'bbo-tbt'): if not (symbol in self.orderbooks): self.orderbooks[symbol] = self.order_book({}, limit) orderbook = self.orderbooks[symbol] for i in range(0, len(data)): update = data[i] timestamp = self.safe_integer(update, 'ts') snapshot = self.parse_order_book(update, symbol, timestamp, 'bids', 'asks', 0, 1) orderbook.reset(snapshot) client.resolve(orderbook, messageHash) return message async def authenticate(self, params={}): self.check_required_credentials() access = self.safe_string(params, 'access', 'private') params = self.omit(params, ['access']) url = self.get_url('users', access) messageHash = 'authenticated' client = self.client(url) future = client.reusableFuture(messageHash) authenticated = self.safe_value(client.subscriptions, messageHash) if authenticated is None: timestamp = str(self.seconds()) method = 'GET' path = '/users/self/verify' auth = timestamp + method + path signature = self.hmac(self.encode(auth), self.encode(self.secret), hashlib.sha256, 'base64') operation = 'login' request: dict = { 'op': operation, 'args': [ { 'apiKey': self.apiKey, 'passphrase': self.password, 'timestamp': timestamp, 'sign': signature, }, ], } # Only add params['access'] to prevent sending custom parameters, such. if 'access' in params: request['access'] = params['access'] self.watch(url, messageHash, request, messageHash) return await future async def watch_balance(self, params={}) -> Balances: """ watch balance and get the amount of funds available for trading or funds locked in orders :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: a `balance structure ` """ await self.load_markets() await self.authenticate() return await self.subscribe('private', 'account', 'account', None, params) def handle_balance_and_position(self, client: Client, message): self.handle_my_liquidation(client, message) def handle_balance(self, client: Client, message): # # { # "arg": {channel: "account"}, # "data": [ # { # "adjEq": '', # "details": [ # { # "availBal": '', # "availEq": "8.21009913", # "cashBal": "8.21009913", # "ccy": "USDT", # "coinUsdPrice": "0.99994", # "crossLiab": '', # "disEq": "8.2096065240522", # "eq": "8.21009913", # "eqUsd": "8.2096065240522", # "frozenBal": "0", # "interest": '', # "isoEq": "0", # "isoLiab": '', # "liab": '', # "maxLoan": '', # "mgnRatio": '', # "notionalLever": "0", # "ordFrozen": "0", # "twap": "0", # "uTime": "1621927314996", # "upl": "0" # }, # ], # "imr": '', # "isoEq": "0", # "mgnRatio": '', # "mmr": '', # "notionalUsd": '', # "ordFroz": '', # "totalEq": "22.1930992296832", # "uTime": "1626692120916" # } # ] # } # arg = self.safe_value(message, 'arg', {}) channel = self.safe_string(arg, 'channel') type = 'spot' balance = self.parseTradingBalance(message) oldBalance = self.safe_value(self.balance, type, {}) newBalance = self.deep_extend(oldBalance, balance) self.balance[type] = self.safe_balance(newBalance) client.resolve(self.balance[type], channel) def order_to_trade(self, order, market=None): info = self.safe_value(order, 'info', {}) timestamp = self.safe_integer(info, 'fillTime') feeMarketId = self.safe_string(info, 'fillFeeCcy') isTaker = self.safe_string(info, 'execType', '') == 'T' return self.safe_trade({ 'info': info, 'timestamp': timestamp, 'datetime': self.iso8601(timestamp), 'symbol': self.safe_string(order, 'symbol'), 'id': self.safe_string(info, 'tradeId'), 'order': self.safe_string(order, 'id'), 'type': self.safe_string(order, 'type'), 'takerOrMaker': 'taker' if (isTaker) else 'maker', 'side': self.safe_string(order, 'side'), 'price': self.safe_number(info, 'fillPx'), 'amount': self.safe_number(info, 'fillSz'), 'cost': self.safe_number(order, 'cost'), 'fee': { 'cost': self.safe_number(info, 'fillFee'), 'currency': self.safe_currency_code(feeMarketId), }, }, market) async def watch_my_trades(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Trade]: """ watches information on multiple trades made by the user https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-order-channel :param str [symbol]: unified market symbol of the market trades were made in :param int [since]: the earliest time in ms to fetch trades for :param int [limit]: the maximum number of trade structures to retrieve :param dict [params]: extra parameters specific to the exchange API endpoint :param bool [params.trigger]: True if fetching trigger or conditional trades :param str [params.type]: 'spot', 'swap', 'future', 'option', 'ANY', 'SPOT', 'MARGIN', 'SWAP', 'FUTURES' or 'OPTION' :param str [params.marginMode]: 'cross' or 'isolated', for automatically setting the type to spot margin :returns dict[]: a list of `trade structures ` """ # By default, receive order updates from any instrument type type = None type, params = self.handle_option_and_params(params, 'watchMyTrades', 'type', 'ANY') isTrigger = self.safe_bool_2(params, 'trigger', 'stop', False) params = self.omit(params, ['trigger', 'stop']) await self.load_markets() await self.authenticate({'access': 'business' if isTrigger else 'private'}) channel = 'orders-algo' if isTrigger else 'orders' messageHash = channel + '::myTrades' market = None if symbol is not None: market = self.market(symbol) symbol = market['symbol'] type = market['type'] messageHash = messageHash + '::' + symbol if type == 'future': type = 'futures' uppercaseType = type.upper() marginMode = None marginMode, params = self.handle_margin_mode_and_params('watchMyTrades', params) if uppercaseType == 'SPOT': if marginMode is not None: uppercaseType = 'MARGIN' request: dict = { 'instType': uppercaseType, } orders = await self.subscribe('private', messageHash, channel, None, self.extend(request, params)) if self.newUpdates: limit = orders.getLimit(symbol, limit) return self.filter_by_symbol_since_limit(orders, symbol, since, limit, True) async def watch_positions(self, symbols: Strings = None, since: Int = None, limit: Int = None, params={}) -> List[Position]: """ https://www.okx.com/docs-v5/en/#trading-account-websocket-positions-channel watch all open positions :param str[]|None symbols: list of unified market symbols @param since @param limit :param dict params: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `position structure ` """ await self.load_markets() await self.authenticate(params) symbols = self.market_symbols(symbols) request: dict = { 'instType': 'ANY', } channel = 'positions' newPositions = None if symbols is None: arg: dict = { 'channel': 'positions', 'instType': 'ANY', } args = [self.extend(arg, params)] nonSymbolRequest: dict = { 'op': 'subscribe', 'args': args, } url = self.get_url(channel, 'private') newPositions = await self.watch(url, channel, nonSymbolRequest, channel) else: newPositions = await self.subscribe_multiple('private', channel, symbols, self.extend(request, params)) if self.newUpdates: return newPositions return self.filter_by_symbols_since_limit(self.positions, symbols, since, limit, True) def handle_positions(self, client, message): # # { # arg: { # channel: 'positions', # instType: 'ANY', # instId: 'XRP-USDT-SWAP', # uid: '464737184507959869' # }, # data: [{ # adl: '1', # availPos: '', # avgPx: '0.52668', # baseBal: '', # baseBorrowed: '', # baseInterest: '', # bizRefId: '', # bizRefType: '', # cTime: '1693151444408', # ccy: 'USDT', # closeOrderAlgo: [], # deltaBS: '', # deltaPA: '', # gammaBS: '', # gammaPA: '', # idxPx: '0.52683', # imr: '17.564000000000004', # instId: 'XRP-USDT-SWAP', # instType: 'SWAP', # interest: '', # last: '0.52691', # lever: '3', # liab: '', # liabCcy: '', # liqPx: '0.3287514731020614', # margin: '', # markPx: '0.52692', # mgnMode: 'cross', # mgnRatio: '69.00363001456147', # mmr: '0.26346', # notionalUsd: '52.68620388000001', # optVal: '', # pTime: '1693151906023', # pendingCloseOrdLiabVal: '', # pos: '1', # posCcy: '', # posId: '616057041198907393', # posSide: 'net', # quoteBal: '', # quoteBorrowed: '', # quoteInterest: '', # spotInUseAmt: '', # spotInUseCcy: '', # thetaBS: '', # thetaPA: '', # tradeId: '138745402', # uTime: '1693151444408', # upl: '0.0240000000000018', # uplLastPx: '0.0229999999999952', # uplRatio: '0.0013670539986328', # uplRatioLastPx: '0.001310093415356', # usdPx: '', # vegaBS: '', # vegaPA: '' # }] # } # arg = self.safe_value(message, 'arg', {}) marketId = self.safe_string(arg, 'instId') market = self.safe_market(marketId, None, '-') symbol = market['symbol'] channel = self.safe_string(arg, 'channel', '') data = self.safe_value(message, 'data', []) if self.positions is None: self.positions = ArrayCacheBySymbolBySide() cache = self.positions newPositions = [] for i in range(0, len(data)): rawPosition = data[i] position = self.parse_position(rawPosition) if position['contracts'] == 0: position['side'] = 'long' shortPosition = self.clone(position) shortPosition['side'] = 'short' cache.append(shortPosition) newPositions.append(shortPosition) newPositions.append(position) cache.append(position) messageHash = channel if symbol is not None: messageHash = channel + '::' + symbol client.resolve(newPositions, messageHash) async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]: """ watches information on multiple orders made by the user https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-order-channel :param str [symbol]: unified market symbol of the market the orders were made in :param int [since]: the earliest time in ms to fetch orders for :param int [limit]: the maximum number of order structures to retrieve :param dict [params]: extra parameters specific to the exchange API endpoint :param bool [params.trigger]: True if fetching trigger or conditional orders :param str [params.type]: 'spot', 'swap', 'future', 'option', 'ANY', 'SPOT', 'MARGIN', 'SWAP', 'FUTURES' or 'OPTION' :param str [params.marginMode]: 'cross' or 'isolated', for automatically setting the type to spot margin :returns dict[]: a list of `order structures ` """ type = None # By default, receive order updates from any instrument type type, params = self.handle_option_and_params(params, 'watchOrders', 'type', 'ANY') isTrigger = self.safe_value_2(params, 'stop', 'trigger', False) params = self.omit(params, ['stop', 'trigger']) await self.load_markets() await self.authenticate({'access': 'business' if isTrigger else 'private'}) market = None if symbol is not None: market = self.market(symbol) symbol = market['symbol'] type = market['type'] if type == 'future': type = 'futures' uppercaseType = type.upper() marginMode = None marginMode, params = self.handle_margin_mode_and_params('watchOrders', params) if uppercaseType == 'SPOT': if marginMode is not None: uppercaseType = 'MARGIN' request: dict = { 'instType': uppercaseType, } channel = 'orders-algo' if isTrigger else 'orders' orders = await self.subscribe('private', channel, channel, symbol, self.extend(request, params)) if self.newUpdates: limit = orders.getLimit(symbol, limit) return self.filter_by_symbol_since_limit(orders, symbol, since, limit, True) def handle_orders(self, client: Client, message, subscription=None): # # { # "arg":{ # "channel":"orders", # "instType":"SPOT" # }, # "data":[ # { # "accFillSz":"0", # "amendResult":"", # "avgPx":"", # "cTime":"1634548275191", # "category":"normal", # "ccy":"", # "clOrdId":"e847386590ce4dBC330547db94a08ba0", # "code":"0", # "execType":"", # "fee":"0", # "feeCcy":"USDT", # "fillFee":"0", # "fillFeeCcy":"", # "fillNotionalUsd":"", # "fillPx":"", # "fillSz":"0", # "fillTime":"", # "instId":"ETH-USDT", # "instType":"SPOT", # "lever":"", # "msg":"", # "notionalUsd":"451.4516256", # "ordId":"370257534141235201", # "ordType":"limit", # "pnl":"0", # "posSide":"", # "px":"60000", # "rebate":"0", # "rebateCcy":"ETH", # "reqId":"", # "side":"sell", # "slOrdPx":"", # "slTriggerPx":"", # "state":"live", # "sz":"0.007526", # "tag":"", # "tdMode":"cash", # "tgtCcy":"", # "tpOrdPx":"", # "tpTriggerPx":"", # "tradeId":"", # "uTime":"1634548275191" # } # ] # } # self.handle_my_trades(client, message) arg = self.safe_value(message, 'arg', {}) channel = self.safe_string(arg, 'channel') orders = self.safe_value(message, 'data', []) ordersLength = len(orders) if ordersLength > 0: limit = self.safe_integer(self.options, 'ordersLimit', 1000) if self.orders is None: self.orders = ArrayCacheBySymbolById(limit) self.triggerOrders = ArrayCacheBySymbolById(limit) stored = self.triggerOrders if (channel == 'orders-algo') else self.orders marketIds = [] parsed = self.parse_orders(orders) for i in range(0, len(parsed)): order = parsed[i] stored.append(order) symbol = order['symbol'] market = self.market(symbol) marketIds.append(market['id']) client.resolve(stored, channel) for i in range(0, len(marketIds)): messageHash = channel + ':' + marketIds[i] client.resolve(stored, messageHash) def handle_my_trades(self, client: Client, message): # # { # "arg":{ # "channel":"orders", # "instType":"SPOT" # }, # "data":[ # { # "accFillSz":"0", # "amendResult":"", # "avgPx":"", # "cTime":"1634548275191", # "category":"normal", # "ccy":"", # "clOrdId":"e847386590ce4dBC330547db94a08ba0", # "code":"0", # "execType":"", # "fee":"0", # "feeCcy":"USDT", # "fillFee":"0", # "fillFeeCcy":"", # "fillNotionalUsd":"", # "fillPx":"", # "fillSz":"0", # "fillTime":"", # "instId":"ETH-USDT", # "instType":"SPOT", # "lever":"", # "msg":"", # "notionalUsd":"451.4516256", # "ordId":"370257534141235201", # "ordType":"limit", # "pnl":"0", # "posSide":"", # "px":"60000", # "rebate":"0", # "rebateCcy":"ETH", # "reqId":"", # "side":"sell", # "slOrdPx":"", # "slTriggerPx":"", # "state":"live", # "sz":"0.007526", # "tag":"", # "tdMode":"cash", # "tgtCcy":"", # "tpOrdPx":"", # "tpTriggerPx":"", # "tradeId":"", # "uTime":"1634548275191" # } # ] # } # arg = self.safe_value(message, 'arg', {}) channel = self.safe_string(arg, 'channel') rawOrders = self.safe_value(message, 'data', []) filteredOrders = [] # filter orders with no last trade id for i in range(0, len(rawOrders)): rawOrder = rawOrders[i] tradeId = self.safe_string(rawOrder, 'tradeId', '') if len(tradeId) > 0: order = self.parse_order(rawOrder) filteredOrders.append(order) tradesLength = len(filteredOrders) if tradesLength == 0: return if self.myTrades is None: limit = self.safe_integer(self.options, 'tradesLimit', 1000) self.myTrades = ArrayCacheBySymbolById(limit) myTrades = self.myTrades symbols: dict = {} for i in range(0, len(filteredOrders)): rawTrade = filteredOrders[i] trade = self.order_to_trade(rawTrade) myTrades.append(trade) symbol = trade['symbol'] symbols[symbol] = True messageHash = channel + '::myTrades' client.resolve(self.myTrades, messageHash) tradeSymbols = list(symbols.keys()) for i in range(0, len(tradeSymbols)): symbolMessageHash = messageHash + '::' + tradeSymbols[i] client.resolve(self.myTrades, symbolMessageHash) def request_id(self): ts = str(self.milliseconds()) randomNumber = self.rand_number(4) randomPart = str(randomNumber) return ts + randomPart async def create_order_ws(self, symbol: str, type: OrderType, side: OrderSide, amount: float, price: Num = None, params={}) -> Order: """ https://www.okx.com/docs-v5/en/#websocket-api-trade-place-order create a trade order :param str symbol: unified symbol of the market to create an order in :param str type: 'market' or 'limit' :param str side: 'buy' or 'sell' :param float amount: how much of currency you want to trade in units of base currency :param float|None [price]: the price at which the order is to be fulfilled, in units of the quote currency, ignored in market orders :param dict [params]: extra parameters specific to the exchange API endpoint :param boolean params['test']: test order, default False :returns dict: an `order structure ` """ await self.load_markets() await self.authenticate() url = self.get_url('private', 'private') messageHash = self.request_id() op = None op, params = self.handle_option_and_params(params, 'createOrderWs', 'op', 'batch-orders') args = self.create_order_request(symbol, type, side, amount, price, params) ordType = self.safe_string(args, 'ordType') if (ordType == 'trigger') or (ordType == 'conditional') or (type == 'oco') or (type == 'move_order_stop') or (type == 'iceberg') or (type == 'twap'): raise BadRequest(self.id + ' createOrderWs() does not support algo trading. self.options["createOrderWs"]["op"] must be either order or batch-order') if (op != 'order') and (op != 'batch-orders'): raise BadRequest(self.id + ' createOrderWs() does not support algo trading. self.options["createOrderWs"]["op"] must be either order or privatePostTradeOrder or privatePostTradeOrderAlgo') request: dict = { 'id': messageHash, 'op': op, 'args': [args], } return await self.watch(url, messageHash, request, messageHash) def handle_place_orders(self, client: Client, message): # # batch-orders/order/cancel-order # { # "id": "1689281055", # "op": "batch-orders", # "code": "0", # "msg": '', # "data": [{ # "tag": "e847386590ce4dBC", # "ordId": "599823446566084608", # "clOrdId": "e847386590ce4dBCb939511604f394b0", # "sCode": "0", # "sMsg": "Order successfully placed." # }, # ... # ] # } # messageHash = self.safe_string(message, 'id') args = self.safe_value(message, 'data', []) # filter out partial errors args = self.filter_by(args, 'sCode', '0') # if empty means request failed and handle error if self.is_empty(args): method = self.safe_string(message, 'op') stringMsg = self.json(message) self.handle_errors(1, '', client.url, method, {}, stringMsg, message, {}, {}) orders = self.parse_orders(args, None, None, None) first = self.safe_dict(orders, 0, {}) client.resolve(first, messageHash) async def edit_order_ws(self, id: str, symbol: str, type: OrderType, side: OrderSide, amount: Num = None, price: Num = None, params={}) -> Order: """ edit a trade order https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-amend-order https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-amend-multiple-orders :param str id: order id :param str symbol: unified symbol of the market to create an order in :param str type: 'market' or 'limit' :param str side: 'buy' or 'sell' :param float amount: how much of the currency you want to trade in units of the base currency :param float|None [price]: the price at which the order is to be fulfilled, in units of the quote currency, ignored in market orders :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: an `order structure ` """ await self.load_markets() await self.authenticate() url = self.get_url('private', 'private') messageHash = self.request_id() op = None op, params = self.handle_option_and_params(params, 'editOrderWs', 'op', 'amend-order') args = self.edit_order_request(id, symbol, type, side, amount, price, params) request: dict = { 'id': messageHash, 'op': op, 'args': [args], } return await self.watch(url, messageHash, self.extend(request, params), messageHash) async def cancel_order_ws(self, id: str, symbol: Str = None, params={}) -> Order: """ https://okx-docs.github.io/apidocs/websocket_api/en/#cancel-order-trade cancel multiple orders :param str id: order id :param str symbol: unified market symbol, default is None :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.clOrdId]: client order id :returns dict: an list of `order structures ` """ if symbol is None: raise BadRequest(self.id + ' cancelOrderWs() requires a symbol argument') await self.load_markets() await self.authenticate() url = self.get_url('private', 'private') messageHash = self.request_id() clientOrderId = self.safe_string_2(params, 'clOrdId', 'clientOrderId') params = self.omit(params, ['clientOrderId', 'clOrdId']) arg: dict = { 'instId': self.market_id(symbol), } if clientOrderId is not None: arg['clOrdId'] = clientOrderId else: arg['ordId'] = id request: dict = { 'id': messageHash, 'op': 'cancel-order', 'args': [self.extend(arg, params)], } return await self.watch(url, messageHash, request, messageHash) async def cancel_orders_ws(self, ids: List[str], symbol: Str = None, params={}): """ https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-mass-cancel-order cancel multiple orders :param str[] ids: order ids :param str symbol: unified market symbol, default is None :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: an list of `order structures ` """ idsLength: number = len(ids) if idsLength > 20: raise BadRequest(self.id + ' cancelOrdersWs() accepts up to 20 ids at a time') if symbol is None: raise BadRequest(self.id + ' cancelOrdersWs() requires a symbol argument') await self.load_markets() await self.authenticate() url = self.get_url('private', 'private') messageHash = self.request_id() args = [] for i in range(0, idsLength): arg: dict = { 'instId': self.market_id(symbol), 'ordId': ids[i], } args.append(arg) request: dict = { 'id': messageHash, 'op': 'batch-cancel-orders', 'args': args, } return await self.watch(url, messageHash, self.deep_extend(request, params), messageHash) async def cancel_all_orders_ws(self, symbol: Str = None, params={}) -> List[Order]: """ https://docs.okx.com/websockets/#message-cancelAll cancel all open orders of a type. Only applicable to Option in Portfolio Margin mode, and MMP privilege is required. :param str symbol: unified market symbol, only orders in the market of self symbol are cancelled when symbol is not None :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `order structures ` """ if symbol is None: raise BadRequest(self.id + ' cancelAllOrdersWs() requires a symbol argument') await self.load_markets() await self.authenticate() market = self.market(symbol) if market['type'] != 'option': raise BadRequest(self.id + ' cancelAllOrdersWs is only applicable to Option in Portfolio Margin mode, and MMP privilege is required.') url = self.get_url('private', 'private') messageHash = self.request_id() request: dict = { 'id': messageHash, 'op': 'mass-cancel', 'args': [self.extend({ 'instType': 'OPTION', 'instFamily': market['id'], }, params)], } return await self.watch(url, messageHash, request, messageHash) def handle_cancel_all_orders(self, client: Client, message): # # { # "id": "1512", # "op": "mass-cancel", # "data": [ # { # "result": True # } # ], # "code": "0", # "msg": "" # } # messageHash = self.safe_string(message, 'id') data = self.safe_value(message, 'data', []) client.resolve(data, messageHash) def handle_subscription_status(self, client: Client, message): # # {event: 'subscribe', arg: {channel: "tickers", instId: "BTC-USDT"}} # # channel = self.safe_string(message, "channel") # client.subscriptions[channel] = message return message def handle_authenticate(self, client: Client, message): # # {event: "login", success: True} # future = self.safe_value(client.futures, 'authenticated') future.resolve(True) def ping(self, client: Client): # OKX does not support the built-in WebSocket protocol-level ping-pong. # Instead, it requires a custom text-based ping-pong mechanism. return 'ping' def handle_pong(self, client: Client, message): client.lastPong = self.milliseconds() return message def handle_error_message(self, client: Client, message) -> Bool: # # {event: 'error', msg: "Illegal request: {"op":"subscribe","args":["spot/ticker:BTC-USDT"]}", code: "60012"} # {event: 'error", msg: "channel:ticker,instId:BTC-USDT doesn"t exist", code: "60018"} # {"event":"error","msg":"Illegal request: {\\"id\\":\\"17321173472466905\\",\\"op\\":\\"amend-order\\",\\"args\\":[{\\"instId\\":\\"ETH-USDC\\",\\"ordId\\":\\"2000345622407479296\\",\\"newSz\\":\\"0.050857\\",\\"newPx\\":\\"2949.4\\",\\"postOnly\\":true}],\\"postOnly\\":true}","code":"60012","connId":"0808af6c"} # errorCode = self.safe_string(message, 'code') try: if errorCode and errorCode != '0': feedback = self.id + ' ' + self.json(message) if errorCode != '1': self.throw_exactly_matched_exception(self.exceptions['exact'], errorCode, feedback) messageString = self.safe_value(message, 'msg') if messageString is not None: self.throw_broadly_matched_exception(self.exceptions['broad'], messageString, feedback) else: data = self.safe_list(message, 'data', []) for i in range(0, len(data)): d = data[i] errorCode = self.safe_string(d, 'sCode') if errorCode is not None: self.throw_exactly_matched_exception(self.exceptions['exact'], errorCode, feedback) messageString = self.safe_value(message, 'sMsg') if messageString is not None: self.throw_broadly_matched_exception(self.exceptions['broad'], messageString, feedback) raise ExchangeError(feedback) except Exception as e: # if the message contains an id, it means it is a response to a request # so we only reject that promise, instead of deleting all futures, destroying the authentication future id = self.safe_string(message, 'id') if id is None: # try to parse it from the stringified json inside msg msg = self.safe_string(message, 'msg') if msg is not None and msg.startswith('Illegal request: {'): stringifiedJson = msg.replace('Illegal request: ', '') parsedJson = self.parse_json(stringifiedJson) id = self.safe_string(parsedJson, 'id') if id is not None: client.reject(e, id) return False client.reject(e) return False return True def handle_message(self, client: Client, message): if not self.handle_error_message(client, message): return # # {event: 'subscribe', arg: {channel: "tickers", instId: "BTC-USDT"}} # {event: 'login", msg: '", code: "0"} # # { # "arg": {channel: "tickers", instId: "BTC-USDT"}, # "data": [ # { # "instType": "SPOT", # "instId": "BTC-USDT", # "last": "31500.1", # "lastSz": "0.00001754", # "askPx": "31500.1", # "askSz": "0.00998144", # "bidPx": "31500", # "bidSz": "3.05652439", # "open24h": "31697", # "high24h": "32248", # "low24h": "31165.6", # "sodUtc0": "31385.5", # "sodUtc8": "32134.9", # "volCcy24h": "503403597.38138519", # "vol24h": "15937.10781721", # "ts": "1626526618762" # } # ] # } # # {event: 'error', msg: "Illegal request: {"op":"subscribe","args":["spot/ticker:BTC-USDT"]}", code: "60012"} # {event: 'error", msg: "channel:ticker,instId:BTC-USDT doesn"t exist", code: "60018"} # {event: 'error', msg: "Invalid OK_ACCESS_KEY", code: "60005"} # { # "event": "error", # "msg": "Illegal request: {"op":"login","args":["de89b035-b233-44b2-9a13-0ccdd00bda0e","7KUcc8YzQhnxBE3K","1626691289","H57N99mBt5NvW8U19FITrPdOxycAERFMaapQWRqLaSE="]}", # "code": "60012" # } # # # if message == 'pong': self.handle_pong(client, message) return # table = self.safe_string(message, 'table') # if table is None: event = self.safe_string_2(message, 'event', 'op') if event is not None: methods: dict = { # 'info': self.handleSystemStatus, # 'book': 'handleOrderBook', 'login': self.handle_authenticate, 'subscribe': self.handle_subscription_status, 'unsubscribe': self.handle_unsubscription, 'order': self.handle_place_orders, 'batch-orders': self.handle_place_orders, 'amend-order': self.handle_place_orders, 'batch-amend-orders': self.handle_place_orders, 'cancel-order': self.handle_place_orders, 'mass-cancel': self.handle_cancel_all_orders, } method = self.safe_value(methods, event) if method is not None: method(client, message) else: arg = self.safe_value(message, 'arg', {}) channel = self.safe_string(arg, 'channel') methods: dict = { 'bbo-tbt': self.handle_order_book, # newly added channel that sends tick-by-tick Level 1 data, all API users can subscribe, public depth channel, verification not required 'books': self.handle_order_book, # all API users can subscribe, public depth channel, verification not required 'books5': self.handle_order_book, # all API users can subscribe, public depth channel, verification not required, data feeds will be delivered every 100ms(vs. every 200ms now) 'books50-l2-tbt': self.handle_order_book, # only users who're VIP4 and above can subscribe, identity verification required before subscription 'books-l2-tbt': self.handle_order_book, # only users who're VIP5 and above can subscribe, identity verification required before subscription 'tickers': self.handle_ticker, 'mark-price': self.handle_ticker, 'positions': self.handle_positions, 'index-tickers': self.handle_ticker, 'sprd-tickers': self.handle_ticker, 'block-tickers': self.handle_ticker, 'trades': self.handle_trades, 'trades-all': self.handle_trades, 'account': self.handle_balance, 'funding-rate': self.handle_funding_rate, # 'margin_account': self.handle_balance, 'orders': self.handle_orders, 'orders-algo': self.handle_orders, 'liquidation-orders': self.handle_liquidation, 'balance_and_position': self.handle_balance_and_position, } method = self.safe_value(methods, channel) if method is None: if channel.find('candle') == 0: self.handle_ohlcv(client, message) else: method(client, message) def handle_un_subscription_trades(self, client: Client, symbol: str, channel: str): subMessageHash = channel + ':' + symbol messageHash = 'unsubscribe:' + subMessageHash self.clean_unsubscription(client, subMessageHash, messageHash) if symbol in self.trades: del self.trades[symbol] def handle_unsubscription_order_book(self, client: Client, symbol: str, channel: str): subMessageHash = channel + ':' + symbol messageHash = 'unsubscribe:orderbook:' + symbol self.clean_unsubscription(client, subMessageHash, messageHash) if symbol in self.orderbooks: del self.orderbooks[symbol] def handle_unsubscription_ohlcv(self, client: Client, symbol: str, channel: str): tf = channel.replace('candle', '') timeframe = self.find_timeframe(tf) subMessageHash = 'multi:' + channel + ':' + symbol messageHash = 'unsubscribe:' + subMessageHash self.clean_unsubscription(client, subMessageHash, messageHash) if timeframe in self.ohlcvs[symbol]: del self.ohlcvs[symbol][timeframe] def handle_unsubscription_ticker(self, client: Client, symbol: str, channel): subMessageHash = channel + '::' + symbol messageHash = 'unsubscribe:ticker:' + symbol self.clean_unsubscription(client, subMessageHash, messageHash) if symbol in self.tickers: del self.tickers[symbol] def handle_unsubscription(self, client: Client, message): # # { # "event": "unsubscribe", # "arg": { # "channel": "tickers", # "instId": "LTC-USD-200327" # }, # "connId": "a4d3ae55" # } # arg might be an array or list arg = self.safe_dict(message, 'arg', {}) channel = self.safe_string(arg, 'channel', '') marketId = self.safe_string(arg, 'instId') symbol = self.safe_symbol(marketId) if channel == 'trades' or channel == 'trades-all': self.handle_un_subscription_trades(client, symbol, channel) elif channel.startswith('bbo') or channel.startswith('book'): self.handle_unsubscription_order_book(client, symbol, channel) elif channel.find('tickers') > -1: self.handle_unsubscription_ticker(client, symbol, channel) elif channel.startswith('candle'): self.handle_unsubscription_ohlcv(client, symbol, channel)