# -*- coding: utf-8 -*- # PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN: # https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code import ccxt.async_support from ccxt.async_support.base.ws.cache import ArrayCache, ArrayCacheBySymbolById, ArrayCacheByTimestamp from ccxt.base.types import Any, Balances, Bool, Int, Order, OrderBook, Str, Strings, Ticker, Tickers, Trade from ccxt.async_support.base.ws.client import Client from typing import List from ccxt.base.errors import ExchangeError from ccxt.base.errors import ArgumentsRequired class kucoin(ccxt.async_support.kucoin): def describe(self) -> Any: return self.deep_extend(super(kucoin, self).describe(), { 'has': { 'ws': True, 'createOrderWs': False, 'editOrderWs': False, 'fetchOpenOrdersWs': False, 'fetchOrderWs': False, 'cancelOrderWs': False, 'cancelOrdersWs': False, 'cancelAllOrdersWs': False, 'watchBidsAsks': True, 'watchOrderBook': True, 'watchOrders': True, 'watchMyTrades': True, 'watchTickers': True, 'watchTicker': True, 'watchTrades': True, 'watchTradesForSymbols': True, 'watchOrderBookForSymbols': True, 'watchBalance': True, 'watchOHLCV': True, 'unWatchTicker': True, 'unWatchOHLCV': True, 'unWatchOrderBook': True, 'unWatchTrades': True, 'unWatchhTradesForSymbols': True, }, 'options': { 'tradesLimit': 1000, 'watchTicker': { 'name': 'market/snapshot', # market/ticker }, 'watchOrderBook': { 'snapshotDelay': 5, 'snapshotMaxRetries': 3, 'method': '/market/level2', # '/spotMarket/level2Depth5' or '/spotMarket/level2Depth50' }, 'watchMyTrades': { 'method': '/spotMarket/tradeOrders', # or '/spot/tradeFills' }, }, 'streaming': { # kucoin does not support built-in ws protocol-level ping-pong # instead it requires a custom json-based text ping-pong # https://docs.kucoin.com/#ping 'ping': self.ping, }, }) async def negotiate(self, privateChannel, params={}): connectId = 'private' if privateChannel else 'public' urls = self.safe_value(self.options, 'urls', {}) future = self.safe_value(urls, connectId) if future is not None: return await future # we store an awaitable to the url # so that multiple calls don't asynchronously # fetch different urls and overwrite each other urls[connectId] = self.spawn(self.negotiate_helper, privateChannel, params) self.options['urls'] = urls future = urls[connectId] return await future async def negotiate_helper(self, privateChannel, params={}): response = None connectId = 'private' if privateChannel else 'public' try: if privateChannel: response = await self.privatePostBulletPrivate(params) # # { # "code": "200000", # "data": { # "instanceServers": [ # { # "pingInterval": 50000, # "endpoint": "wss://push-private.kucoin.com/endpoint", # "protocol": "websocket", # "encrypt": True, # "pingTimeout": 10000 # } # ], # "token": "2neAiuYvAU61ZDXANAGAsiL4-iAExhsBXZxftpOeh_55i3Ysy2q2LEsEWU64mdzUOPusi34M_wGoSf7iNyEWJ1UQy47YbpY4zVdzilNP-Bj3iXzrjjGlWtiYB9J6i9GjsxUuhPw3BlrzazF6ghq4Lzf7scStOz3KkxjwpsOBCH4=.WNQmhZQeUKIkh97KYgU0Lg==" # } # } # else: response = await self.publicPostBulletPublic(params) data = self.safe_value(response, 'data', {}) instanceServers = self.safe_value(data, 'instanceServers', []) firstInstanceServer = self.safe_value(instanceServers, 0) pingInterval = self.safe_integer(firstInstanceServer, 'pingInterval') endpoint = self.safe_string(firstInstanceServer, 'endpoint') token = self.safe_string(data, 'token') result = endpoint + '?' + self.urlencode({ 'token': token, 'privateChannel': privateChannel, 'connectId': connectId, }) client = self.client(result) client.keepAlive = pingInterval return result except Exception as e: future = self.safe_value(self.options['urls'], connectId) future.reject(e) del self.options['urls'][connectId] return None def request_id(self): requestId = self.sum(self.safe_integer(self.options, 'requestId', 0), 1) self.options['requestId'] = requestId return requestId async def subscribe(self, url, messageHash, subscriptionHash, params={}, subscription=None): requestId = str(self.request_id()) request: dict = { 'id': requestId, 'type': 'subscribe', 'topic': subscriptionHash, 'response': True, } message = self.extend(request, params) client = self.client(url) if not (subscriptionHash in client.subscriptions): client.subscriptions[requestId] = subscriptionHash return await self.watch(url, messageHash, message, subscriptionHash, subscription) async def un_subscribe(self, url, messageHash, topic, subscriptionHash, params={}, subscription: dict = None): return await self.un_subscribe_multiple(url, [messageHash], topic, [subscriptionHash], params, subscription) async def subscribe_multiple(self, url, messageHashes, topic, subscriptionHashes, params={}, subscription=None): requestId = str(self.request_id()) request: dict = { 'id': requestId, 'type': 'subscribe', 'topic': topic, 'response': True, } message = self.extend(request, params) client = self.client(url) for i in range(0, len(subscriptionHashes)): subscriptionHash = subscriptionHashes[i] if not (subscriptionHash in client.subscriptions): client.subscriptions[requestId] = subscriptionHash return await self.watch_multiple(url, messageHashes, message, subscriptionHashes, subscription) async def un_subscribe_multiple(self, url, messageHashes, topic, subscriptionHashes, params={}, subscription: dict = None): requestId = str(self.request_id()) request: dict = { 'id': requestId, 'type': 'unsubscribe', 'topic': topic, 'response': True, } message = self.extend(request, params) if subscription is not None: subscription[requestId] = requestId client = self.client(url) for i in range(0, len(subscriptionHashes)): subscriptionHash = subscriptionHashes[i] if not (subscriptionHash in client.subscriptions): client.subscriptions[requestId] = subscriptionHash return await self.watch_multiple(url, messageHashes, message, subscriptionHashes, subscription) async def watch_ticker(self, symbol: str, params={}) -> Ticker: """ watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market https://www.kucoin.com/docs/websocket/spot-trading/public-channels/market-snapshot :param str symbol: unified symbol of the market to fetch the ticker for :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: a `ticker structure ` """ await self.load_markets() market = self.market(symbol) symbol = market['symbol'] url = await self.negotiate(False) method, query = self.handle_option_and_params(params, 'watchTicker', 'method', '/market/snapshot') topic = method + ':' + market['id'] messageHash = 'ticker:' + symbol return await self.subscribe(url, messageHash, topic, query) async def un_watch_ticker(self, symbol: str, params={}) -> Ticker: """ unWatches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market https://www.kucoin.com/docs/websocket/spot-trading/public-channels/market-snapshot :param str symbol: unified symbol of the market to fetch the ticker for :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: a `ticker structure ` """ await self.load_markets() market = self.market(symbol) symbol = market['symbol'] url = await self.negotiate(False) method = None method, params = self.handle_option_and_params(params, 'watchTicker', 'method', '/market/snapshot') topic = method + ':' + market['id'] messageHash = 'unsubscribe:ticker:' + symbol subMessageHash = 'ticker:' + symbol subscription = { 'messageHashes': [messageHash], 'subMessageHashes': [subMessageHash], 'topic': 'trades', 'unsubscribe': True, 'symbols': [symbol], } return await self.un_subscribe(url, messageHash, topic, subMessageHash, params, subscription) async def watch_tickers(self, symbols: Strings = None, params={}) -> Tickers: """ https://www.kucoin.com/docs/websocket/spot-trading/public-channels/ticker watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for all markets of a specific list :param str[] symbols: unified symbol of the market to fetch the ticker for :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.method]: either '/market/snapshot' or '/market/ticker' default is '/market/ticker' :returns dict: a `ticker structure ` """ await self.load_markets() symbols = self.market_symbols(symbols) messageHash = 'tickers' method = None method, params = self.handle_option_and_params(params, 'watchTickers', 'method', '/market/ticker') messageHashes = [] topics = [] if symbols is not None: for i in range(0, len(symbols)): symbol = symbols[i] messageHashes.append('ticker:' + symbol) market = self.market(symbol) topics.append(method + ':' + market['id']) url = await self.negotiate(False) tickers = None if symbols is None: allTopic = method + ':all' tickers = await self.subscribe(url, messageHash, allTopic, params) if self.newUpdates: return tickers else: marketIds = self.market_ids(symbols) symbolsTopic = method + ':' + ','.join(marketIds) tickers = await self.subscribe_multiple(url, messageHashes, symbolsTopic, topics, params) if self.newUpdates: newDict: dict = {} newDict[tickers['symbol']] = tickers return newDict return self.filter_by_array(self.tickers, 'symbol', symbols) def handle_ticker(self, client: Client, message): # # market/snapshot # # updates come in every 2 sec unless there # were no changes since the previous update # # { # "data": { # "sequence": "1545896669291", # "data": { # "trading": True, # "symbol": "KCS-BTC", # "buy": 0.00011, # "sell": 0.00012, # "sort": 100, # "volValue": 3.13851792584, # total # "baseCurrency": "KCS", # "market": "BTC", # "quoteCurrency": "BTC", # "symbolCode": "KCS-BTC", # "datetime": 1548388122031, # "high": 0.00013, # "vol": 27514.34842, # "low": 0.0001, # "changePrice": -1.0e-5, # "changeRate": -0.0769, # "lastTradedPrice": 0.00012, # "board": 0, # "mark": 0 # } # }, # "subject": "trade.snapshot", # "topic": "/market/snapshot:KCS-BTC", # "type": "message" # } # # market/ticker # # { # "type": "message", # "topic": "/market/ticker:BTC-USDT", # "subject": "trade.ticker", # "data": { # "bestAsk": "62163", # "bestAskSize": "0.99011388", # "bestBid": "62162.9", # "bestBidSize": "0.04794181", # "price": "62162.9", # "sequence": "1621383371852", # "size": "0.00832274", # "time": 1634641987564 # } # } # topic = self.safe_string(message, 'topic') market = None if topic is not None: parts = topic.split(':') first = self.safe_string(parts, 1) marketId = None if first == 'all': marketId = self.safe_string(message, 'subject') else: marketId = first market = self.safe_market(marketId, market, '-') data = self.safe_value(message, 'data', {}) rawTicker = self.safe_value(data, 'data', data) ticker = self.parse_ticker(rawTicker, market) symbol = ticker['symbol'] self.tickers[symbol] = ticker messageHash = 'ticker:' + symbol client.resolve(ticker, messageHash) # watchTickers allTickers: dict = {} allTickers[symbol] = ticker client.resolve(allTickers, 'tickers') async def watch_bids_asks(self, symbols: Strings = None, params={}) -> Tickers: """ https://www.kucoin.com/docs/websocket/spot-trading/public-channels/level1-bbo-market-data watches best bid & ask for symbols :param str[] symbols: unified symbol of the market to fetch the ticker for :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: a `ticker structure ` """ ticker = await self.watch_multi_helper('watchBidsAsks', '/spotMarket/level1:', symbols, params) if self.newUpdates: tickers: dict = {} tickers[ticker['symbol']] = ticker return tickers return self.filter_by_array(self.bidsasks, 'symbol', symbols) async def watch_multi_helper(self, methodName, channelName: str, symbols: Strings = None, params={}): await self.load_markets() symbols = self.market_symbols(symbols, None, False, True, False) length = len(symbols) if length > 100: raise ArgumentsRequired(self.id + ' ' + methodName + '() accepts a maximum of 100 symbols') messageHashes = [] for i in range(0, len(symbols)): symbol = symbols[i] market = self.market(symbol) messageHashes.append('bidask@' + market['symbol']) url = await self.negotiate(False) marketIds = self.market_ids(symbols) joined = ','.join(marketIds) requestId = str(self.request_id()) request: dict = { 'id': requestId, 'type': 'subscribe', 'topic': channelName + joined, 'response': True, } message = self.extend(request, params) return await self.watch_multiple(url, messageHashes, message, messageHashes) def handle_bid_ask(self, client: Client, message): # # arrives one symbol dict # # { # topic: '/spotMarket/level1:ETH-USDT', # type: 'message', # data: { # asks: ['3347.42', '2.0778387'], # bids: ['3347.41', '6.0411697'], # timestamp: 1712231142085 # }, # subject: 'level1' # } # parsedTicker = self.parse_ws_bid_ask(message) symbol = parsedTicker['symbol'] self.bidsasks[symbol] = parsedTicker messageHash = 'bidask@' + symbol client.resolve(parsedTicker, messageHash) def parse_ws_bid_ask(self, ticker, market=None): topic = self.safe_string(ticker, 'topic') parts = topic.split(':') marketId = parts[1] market = self.safe_market(marketId, market) symbol = self.safe_string(market, 'symbol') data = self.safe_dict(ticker, 'data', {}) ask = self.safe_list(data, 'asks', []) bid = self.safe_list(data, 'bids', []) timestamp = self.safe_integer(data, 'timestamp') return self.safe_ticker({ 'symbol': symbol, 'timestamp': timestamp, 'datetime': self.iso8601(timestamp), 'ask': self.safe_number(ask, 0), 'askVolume': self.safe_number(ask, 1), 'bid': self.safe_number(bid, 0), 'bidVolume': self.safe_number(bid, 1), 'info': ticker, }, market) async def watch_ohlcv(self, symbol: str, timeframe: str = '1m', since: Int = None, limit: Int = None, params={}) -> List[list]: """ watches historical candlestick data containing the open, high, low, and close price, and the volume of a market https://www.kucoin.com/docs/websocket/spot-trading/public-channels/klines :param str symbol: unified symbol of the market to fetch OHLCV data for :param str timeframe: the length of time each candle represents :param int [since]: timestamp in ms of the earliest candle to fetch :param int [limit]: the maximum amount of candles to fetch :param dict [params]: extra parameters specific to the exchange API endpoint :returns int[][]: A list of candles ordered, open, high, low, close, volume """ await self.load_markets() url = await self.negotiate(False) market = self.market(symbol) symbol = market['symbol'] period = self.safe_string(self.timeframes, timeframe, timeframe) topic = '/market/candles:' + market['id'] + '_' + period messageHash = 'candles:' + symbol + ':' + timeframe ohlcv = await self.subscribe(url, messageHash, topic, params) if self.newUpdates: limit = ohlcv.getLimit(symbol, limit) return self.filter_by_since_limit(ohlcv, since, limit, 0, True) async def un_watch_ohlcv(self, symbol: str, timeframe: str = '1m', params={}) -> List[list]: """ unWatches historical candlestick data containing the open, high, low, and close price, and the volume of a market https://www.kucoin.com/docs/websocket/spot-trading/public-channels/klines :param str symbol: unified symbol of the market to fetch OHLCV data for :param str timeframe: the length of time each candle represents :param dict [params]: extra parameters specific to the exchange API endpoint :returns int[][]: A list of candles ordered, open, high, low, close, volume """ await self.load_markets() url = await self.negotiate(False) market = self.market(symbol) symbol = market['symbol'] period = self.safe_string(self.timeframes, timeframe, timeframe) topic = '/market/candles:' + market['id'] + '_' + period messageHash = 'unsubscribe:candles:' + symbol + ':' + timeframe subMessageHash = 'candles:' + symbol + ':' + timeframe subscription = { 'messageHashes': [messageHash], 'subMessageHashes': [subMessageHash], 'topic': 'ohlcv', 'unsubscribe': True, 'symbols': [symbol], } return await self.un_subscribe(url, messageHash, topic, messageHash, params, subscription) def handle_ohlcv(self, client: Client, message): # # { # "data": { # "symbol": "BTC-USDT", # "candles": [ # "1624881240", # "34138.8", # "34121.6", # "34138.8", # "34097.9", # "3.06097133", # "104430.955068564" # ], # "time": 1624881284466023700 # }, # "subject": "trade.candles.update", # "topic": "/market/candles:BTC-USDT_1min", # "type": "message" # } # data = self.safe_value(message, 'data', {}) marketId = self.safe_string(data, 'symbol') candles = self.safe_value(data, 'candles', []) topic = self.safe_string(message, 'topic') parts = topic.split('_') interval = self.safe_string(parts, 1) # use a reverse lookup in a static map instead timeframe = self.find_timeframe(interval) market = self.safe_market(marketId) symbol = market['symbol'] messageHash = 'candles:' + symbol + ':' + timeframe self.ohlcvs[symbol] = self.safe_value(self.ohlcvs, symbol, {}) stored = self.safe_value(self.ohlcvs[symbol], timeframe) if stored is None: limit = self.safe_integer(self.options, 'OHLCVLimit', 1000) stored = ArrayCacheByTimestamp(limit) self.ohlcvs[symbol][timeframe] = stored ohlcv = self.parse_ohlcv(candles, market) stored.append(ohlcv) client.resolve(stored, messageHash) async def watch_trades(self, symbol: str, since: Int = None, limit: Int = None, params={}) -> List[Trade]: """ get the list of most recent trades for a particular symbol https://www.kucoin.com/docs/websocket/spot-trading/public-channels/match-execution-data :param str symbol: unified symbol of the market to fetch trades for :param int [since]: timestamp in ms of the earliest trade to fetch :param int [limit]: the maximum amount of trades to fetch :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `trade structures ` """ return await self.watch_trades_for_symbols([symbol], since, limit, params) async def watch_trades_for_symbols(self, symbols: List[str], since: Int = None, limit: Int = None, params={}) -> List[Trade]: """ get the list of most recent trades for a particular symbol https://www.kucoin.com/docs/websocket/spot-trading/public-channels/match-execution-data :param str[] symbols: :param int [since]: timestamp in ms of the earliest trade to fetch :param int [limit]: the maximum amount of trades to fetch :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `trade structures ` """ symbolsLength = len(symbols) if symbolsLength == 0: raise ArgumentsRequired(self.id + ' watchTradesForSymbols() requires a non-empty array of symbols') await self.load_markets() symbols = self.market_symbols(symbols) marketIds = self.market_ids(symbols) url = await self.negotiate(False) messageHashes = [] subscriptionHashes = [] topic = '/market/match:' + ','.join(marketIds) for i in range(0, len(symbols)): symbol = symbols[i] messageHashes.append('trades:' + symbol) marketId = marketIds[i] subscriptionHashes.append('/market/match:' + marketId) trades = await self.subscribe_multiple(url, messageHashes, topic, subscriptionHashes, params) if self.newUpdates: first = self.safe_value(trades, 0) tradeSymbol = self.safe_string(first, 'symbol') limit = trades.getLimit(tradeSymbol, limit) return self.filter_by_since_limit(trades, since, limit, 'timestamp', True) async def un_watch_trades_for_symbols(self, symbols: List[str], params={}) -> Any: """ unWatches trades stream https://www.kucoin.com/docs/websocket/spot-trading/public-channels/match-execution-data :param str symbols: :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `trade structures ` """ await self.load_markets() symbols = self.market_symbols(symbols, None, False) marketIds = self.market_ids(symbols) url = await self.negotiate(False) messageHashes = [] subscriptionHashes = [] topic = '/market/match:' + ','.join(marketIds) for i in range(0, len(symbols)): symbol = symbols[i] messageHashes.append('unsubscribe:trades:' + symbol) subscriptionHashes.append('trades:' + symbol) subscription = { 'messageHashes': messageHashes, 'subMessageHashes': subscriptionHashes, 'topic': 'trades', 'unsubscribe': True, 'symbols': symbols, } return await self.un_subscribe_multiple(url, messageHashes, topic, messageHashes, params, subscription) async def un_watch_trades(self, symbol: str, params={}) -> Any: """ unWatches trades stream https://www.kucoin.com/docs/websocket/spot-trading/public-channels/match-execution-data :param str symbol: unified symbol of the market to fetch trades for :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `trade structures ` """ return await self.un_watch_trades_for_symbols([symbol], params) def handle_trade(self, client: Client, message): # # { # "data": { # "sequence": "1568787654360", # "symbol": "BTC-USDT", # "side": "buy", # "size": "0.00536577", # "price": "9345", # "takerOrderId": "5e356c4a9f1a790008f8d921", # "time": "1580559434436443257", # "type": "match", # "makerOrderId": "5e356bffedf0010008fa5d7f", # "tradeId": "5e356c4aeefabd62c62a1ece" # }, # "subject": "trade.l3match", # "topic": "/market/match:BTC-USDT", # "type": "message" # } # data = self.safe_value(message, 'data', {}) trade = self.parse_trade(data) symbol = trade['symbol'] messageHash = 'trades:' + symbol trades = self.safe_value(self.trades, symbol) if trades is None: limit = self.safe_integer(self.options, 'tradesLimit', 1000) trades = ArrayCache(limit) self.trades[symbol] = trades trades.append(trade) client.resolve(trades, messageHash) async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook: """ https://www.kucoin.com/docs/websocket/spot-trading/public-channels/level1-bbo-market-data https://www.kucoin.com/docs/websocket/spot-trading/public-channels/level2-market-data https://www.kucoin.com/docs/websocket/spot-trading/public-channels/level2-5-best-ask-bid-orders https://www.kucoin.com/docs/websocket/spot-trading/public-channels/level2-50-best-ask-bid-orders watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data :param str symbol: unified symbol of the market to fetch the order book for :param int [limit]: the maximum amount of order book entries to return :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.method]: either '/market/level2' or '/spotMarket/level2Depth5' or '/spotMarket/level2Depth50' default is '/market/level2' :returns dict: A dictionary of `order book structures ` indexed by market symbols """ # # https://docs.kucoin.com/#level-2-market-data # # 1. After receiving the websocket Level 2 data flow, cache the data. # 2. Initiate a REST request to get the snapshot data of Level 2 order book. # 3. Playback the cached Level 2 data flow. # 4. Apply the new Level 2 data flow to the local snapshot to ensure that # the sequence of the new Level 2 update lines up with the sequence of # the previous Level 2 data. Discard all the message prior to that # sequence, and then playback the change to snapshot. # 5. Update the level2 full data based on sequence according to the # size. If the price is 0, ignore the messages and update the sequence. # If the size=0, update the sequence and remove the price of which the # size is 0 out of level 2. Fr other cases, please update the price. # return await self.watch_order_book_for_symbols([symbol], limit, params) async def un_watch_order_book(self, symbol: str, params={}) -> Any: """ https://www.kucoin.com/docs/websocket/spot-trading/public-channels/level1-bbo-market-data https://www.kucoin.com/docs/websocket/spot-trading/public-channels/level2-market-data https://www.kucoin.com/docs/websocket/spot-trading/public-channels/level2-5-best-ask-bid-orders https://www.kucoin.com/docs/websocket/spot-trading/public-channels/level2-50-best-ask-bid-orders unWatches information on open orders with bid(buy) and ask(sell) prices, volumes and other data :param str symbol: unified symbol of the market to fetch the order book for :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.method]: either '/market/level2' or '/spotMarket/level2Depth5' or '/spotMarket/level2Depth50' default is '/market/level2' :returns dict: A dictionary of `order book structures ` indexed by market symbols """ return await self.un_watch_order_book_for_symbols([symbol], params) async def watch_order_book_for_symbols(self, symbols: List[str], limit: Int = None, params={}) -> OrderBook: """ https://www.kucoin.com/docs/websocket/spot-trading/public-channels/level1-bbo-market-data https://www.kucoin.com/docs/websocket/spot-trading/public-channels/level2-market-data https://www.kucoin.com/docs/websocket/spot-trading/public-channels/level2-5-best-ask-bid-orders https://www.kucoin.com/docs/websocket/spot-trading/public-channels/level2-50-best-ask-bid-orders watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data :param str[] symbols: unified array of symbols :param int [limit]: the maximum amount of order book entries to return :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.method]: either '/market/level2' or '/spotMarket/level2Depth5' or '/spotMarket/level2Depth50' default is '/market/level2' :returns dict: A dictionary of `order book structures ` indexed by market symbols """ symbolsLength = len(symbols) if symbolsLength == 0: raise ArgumentsRequired(self.id + ' watchOrderBookForSymbols() requires a non-empty array of symbols') if limit is not None: if (limit != 20) and (limit != 100) and (limit != 50) and (limit != 5): raise ExchangeError(self.id + " watchOrderBook 'limit' argument must be None, 5, 20, 50 or 100") await self.load_markets() symbols = self.market_symbols(symbols) marketIds = self.market_ids(symbols) url = await self.negotiate(False) method: Str = None method, params = self.handle_option_and_params(params, 'watchOrderBook', 'method', '/market/level2') if (limit == 5) or (limit == 50): method = '/spotMarket/level2Depth' + str(limit) topic = method + ':' + ','.join(marketIds) messageHashes = [] subscriptionHashes = [] for i in range(0, len(symbols)): symbol = symbols[i] messageHashes.append('orderbook:' + symbol) marketId = marketIds[i] subscriptionHashes.append(method + ':' + marketId) subscription = {} if method == '/market/level2': # other streams return the entire orderbook, so we don't need to fetch the snapshot through REST subscription = { 'method': self.handle_order_book_subscription, 'symbols': symbols, 'limit': limit, } orderbook = await self.subscribe_multiple(url, messageHashes, topic, subscriptionHashes, params, subscription) return orderbook.limit() async def un_watch_order_book_for_symbols(self, symbols: List[str], params={}) -> Any: """ https://www.kucoin.com/docs/websocket/spot-trading/public-channels/level1-bbo-market-data https://www.kucoin.com/docs/websocket/spot-trading/public-channels/level2-market-data https://www.kucoin.com/docs/websocket/spot-trading/public-channels/level2-5-best-ask-bid-orders https://www.kucoin.com/docs/websocket/spot-trading/public-channels/level2-50-best-ask-bid-orders unWatches information on open orders with bid(buy) and ask(sell) prices, volumes and other data :param str[] symbols: unified array of symbols :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.method]: either '/market/level2' or '/spotMarket/level2Depth5' or '/spotMarket/level2Depth50' default is '/market/level2' :returns dict: A dictionary of `order book structures ` indexed by market symbols """ limit = self.safe_integer(params, 'limit') params = self.omit(params, 'limit') await self.load_markets() symbols = self.market_symbols(symbols, None, False) marketIds = self.market_ids(symbols) url = await self.negotiate(False) method: Str = None method, params = self.handle_option_and_params(params, 'watchOrderBook', 'method', '/market/level2') if (limit == 5) or (limit == 50): method = '/spotMarket/level2Depth' + str(limit) topic = method + ':' + ','.join(marketIds) messageHashes = [] subscriptionHashes = [] for i in range(0, len(symbols)): symbol = symbols[i] messageHashes.append('unsubscribe:orderbook:' + symbol) subscriptionHashes.append('orderbook:' + symbol) subscription = { 'messageHashes': messageHashes, 'symbols': symbols, 'unsubscribe': True, 'topic': 'orderbook', 'subMessageHashes': subscriptionHashes, } return await self.un_subscribe_multiple(url, messageHashes, topic, messageHashes, params, subscription) def handle_order_book(self, client: Client, message): # # initial snapshot is fetched with ccxt's fetchOrderBook # the feed does not include a snapshot, just the deltas # # { # "type":"message", # "topic":"/market/level2:BTC-USDT", # "subject":"trade.l2update", # "data":{ # "sequenceStart":1545896669105, # "sequenceEnd":1545896669106, # "symbol":"BTC-USDT", # "changes": { # "asks": [["6","1","1545896669105"]], # price, size, sequence # "bids": [["4","1","1545896669106"]] # } # } # } # # { # "topic": "/spotMarket/level2Depth5:BTC-USDT", # "type": "message", # "data": { # "asks": [ # [ # "42815.6", # "1.24016245" # ] # ], # "bids": [ # [ # "42815.5", # "0.08652716" # ] # ], # "timestamp": 1707204474018 # }, # "subject": "level2" # } # data = self.safe_value(message, 'data') subject = self.safe_string(message, 'subject') topic = self.safe_string(message, 'topic') topicParts = topic.split(':') topicSymbol = self.safe_string(topicParts, 1) topicChannel = self.safe_string(topicParts, 0) marketId = self.safe_string(data, 'symbol', topicSymbol) symbol = self.safe_symbol(marketId, None, '-') messageHash = 'orderbook:' + symbol # orderbook = self.safe_dict(self.orderbooks, symbol) if subject == 'level2': if not (symbol in self.orderbooks): self.orderbooks[symbol] = self.order_book() else: orderbook = self.orderbooks[symbol] orderbook.reset() self.orderbooks[symbol]['symbol'] = symbol else: if not (symbol in self.orderbooks): self.orderbooks[symbol] = self.order_book() orderbook = self.orderbooks[symbol] nonce = self.safe_integer(orderbook, 'nonce') deltaEnd = self.safe_integer_2(data, 'sequenceEnd', 'timestamp') if nonce is None: cacheLength = len(orderbook.cache) subscriptions = list(client.subscriptions.keys()) subscription = None for i in range(0, len(subscriptions)): key = subscriptions[i] if (key.find(topicSymbol) >= 0) and (key.find(topicChannel) >= 0): subscription = client.subscriptions[key] break limit = self.safe_integer(subscription, 'limit') snapshotDelay = self.handle_option('watchOrderBook', 'snapshotDelay', 5) if cacheLength == snapshotDelay: self.spawn(self.load_order_book, client, messageHash, symbol, limit, {}) orderbook.cache.append(data) return elif nonce >= deltaEnd: return self.handle_delta(self.orderbooks[symbol], data) client.resolve(self.orderbooks[symbol], messageHash) def get_cache_index(self, orderbook, cache): firstDelta = self.safe_value(cache, 0) nonce = self.safe_integer(orderbook, 'nonce') firstDeltaStart = self.safe_integer(firstDelta, 'sequenceStart') if nonce < firstDeltaStart - 1: return -1 for i in range(0, len(cache)): delta = cache[i] deltaStart = self.safe_integer(delta, 'sequenceStart') deltaEnd = self.safe_integer(delta, 'sequenceEnd') if (nonce >= deltaStart - 1) and (nonce < deltaEnd): return i return len(cache) def handle_delta(self, orderbook, delta): timestamp = self.safe_integer_2(delta, 'time', 'timestamp') orderbook['nonce'] = self.safe_integer(delta, 'sequenceEnd', timestamp) orderbook['timestamp'] = timestamp orderbook['datetime'] = self.iso8601(timestamp) changes = self.safe_value(delta, 'changes', delta) bids = self.safe_value(changes, 'bids', []) asks = self.safe_value(changes, 'asks', []) storedBids = orderbook['bids'] storedAsks = orderbook['asks'] self.handle_bid_asks(storedBids, bids) self.handle_bid_asks(storedAsks, asks) def handle_bid_asks(self, bookSide, bidAsks): for i in range(0, len(bidAsks)): bidAsk = self.parse_bid_ask(bidAsks[i]) bookSide.storeArray(bidAsk) def handle_order_book_subscription(self, client: Client, message, subscription): limit = self.safe_integer(subscription, 'limit') symbols = self.safe_value(subscription, 'symbols') if symbols is None: symbol = self.safe_string(subscription, 'symbol') self.orderbooks[symbol] = self.order_book({}, limit) else: for i in range(0, len(symbols)): symbol = symbols[i] self.orderbooks[symbol] = self.order_book({}, limit) # moved snapshot initialization to handleOrderBook to fix # https://github.com/ccxt/ccxt/issues/6820 # the general idea is to fetch the snapshot after the first delta # but not before, because otherwise we cannot synchronize the feed def handle_subscription_status(self, client: Client, message): # # { # "id": "1578090438322", # "type": "ack" # } # id = self.safe_string(message, 'id') if not (id in client.subscriptions): return subscriptionHash = self.safe_string(client.subscriptions, id) subscription = self.safe_value(client.subscriptions, subscriptionHash) del client.subscriptions[id] method = self.safe_value(subscription, 'method') if method is not None: method(client, message, subscription) isUnSub = self.safe_bool(subscription, 'unsubscribe', False) if isUnSub: messageHashes = self.safe_list(subscription, 'messageHashes', []) subMessageHashes = self.safe_list(subscription, 'subMessageHashes', []) for i in range(0, len(messageHashes)): messageHash = messageHashes[i] subHash = subMessageHashes[i] self.clean_unsubscription(client, subHash, messageHash) self.clean_cache(subscription) def handle_system_status(self, client: Client, message): # # todo: answer the question whether handleSystemStatus should be renamed # and unified for any usage pattern that # involves system status and maintenance updates # # { # "id": "1578090234088", # connectId # "type": "welcome", # } # return message async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]: """ watches information on multiple orders made by the user https://www.kucoin.com/docs/websocket/spot-trading/private-channels/private-order-change https://www.kucoin.com/docs/websocket/spot-trading/private-channels/stop-order-event :param str symbol: unified market symbol of the market orders were made in :param int [since]: the earliest time in ms to fetch orders for :param int [limit]: the maximum number of order structures to retrieve :param dict [params]: extra parameters specific to the exchange API endpoint :param boolean [params.trigger]: trigger orders are watched if True :returns dict[]: a list of `order structures ` """ await self.load_markets() trigger = self.safe_value_2(params, 'stop', 'trigger') params = self.omit(params, ['stop', 'trigger']) url = await self.negotiate(True) topic = '/spotMarket/advancedOrders' if trigger else '/spotMarket/tradeOrders' request: dict = { 'privateChannel': True, } messageHash = 'orders' if symbol is not None: market = self.market(symbol) symbol = market['symbol'] messageHash = messageHash + ':' + symbol orders = await self.subscribe(url, messageHash, topic, self.extend(request, params)) if self.newUpdates: limit = orders.getLimit(symbol, limit) return self.filter_by_symbol_since_limit(orders, symbol, since, limit, True) def parse_ws_order_status(self, status): statuses: dict = { 'open': 'open', 'filled': 'closed', 'match': 'open', 'update': 'open', 'canceled': 'canceled', 'cancel': 'canceled', 'TRIGGERED': 'triggered', } return self.safe_string(statuses, status, status) def parse_ws_order(self, order, market=None): # # /spotMarket/tradeOrders # # { # "symbol": "XCAD-USDT", # "orderType": "limit", # "side": "buy", # "orderId": "6249167327218b000135e749", # "type": "canceled", # "orderTime": 1648957043065280224, # "size": "100.452", # "filledSize": "0", # "price": "2.9635", # "clientOid": "buy-XCAD-USDT-1648957043010159", # "remainSize": "0", # "status": "done", # "ts": 1648957054031001037 # } # # /spotMarket/advancedOrders # # { # "createdAt": 1589789942337, # "orderId": "5ec244f6a8a75e0009958237", # "orderPrice": "0.00062", # "orderType": "stop", # "side": "sell", # "size": "1", # "stop": "entry", # "stopPrice": "0.00062", # "symbol": "KCS-BTC", # "tradeType": "TRADE", # "triggerSuccess": True, # "ts": 1589790121382281286, # "type": "triggered" # } # rawType = self.safe_string(order, 'type') status = self.parse_ws_order_status(rawType) timestamp = self.safe_integer_2(order, 'orderTime', 'createdAt') marketId = self.safe_string(order, 'symbol') market = self.safe_market(marketId, market) triggerPrice = self.safe_string(order, 'stopPrice') triggerSuccess = self.safe_value(order, 'triggerSuccess') triggerFail = (triggerSuccess is not True) and (triggerSuccess is not None) # TODO: updated to triggerSuccess == False once transpiler transpiles it correctly if (status == 'triggered') and triggerFail: status = 'canceled' return self.safe_order({ 'info': order, 'symbol': market['symbol'], 'id': self.safe_string(order, 'orderId'), 'clientOrderId': self.safe_string(order, 'clientOid'), 'timestamp': timestamp, 'datetime': self.iso8601(timestamp), 'lastTradeTimestamp': None, 'type': self.safe_string_lower(order, 'orderType'), 'timeInForce': None, 'postOnly': None, 'side': self.safe_string_lower(order, 'side'), 'price': self.safe_string_2(order, 'price', 'orderPrice'), 'stopPrice': triggerPrice, 'triggerPrice': triggerPrice, 'amount': self.safe_string(order, 'size'), 'cost': None, 'average': None, 'filled': self.safe_string(order, 'filledSize'), 'remaining': None, 'status': status, 'fee': None, 'trades': None, }, market) def handle_order(self, client: Client, message): # # Trigger Orders # # { # "createdAt": 1692745706437, # "error": "Balance insufficient!", # not always there # "orderId": "vs86kp757vlda6ni003qs70v", # "orderPrice": "0.26", # "orderType": "stop", # "side": "sell", # "size": "5", # "stop": "loss", # "stopPrice": "0.26", # "symbol": "ADA-USDT", # "tradeType": "TRADE", # "triggerSuccess": False, # not always there # "ts": "1692745706442929298", # "type": "open" # } # messageHash = 'orders' data = self.safe_value(message, 'data') tradeId = self.safe_string(data, 'tradeId') if tradeId is not None: self.handle_my_trade(client, message) parsed = self.parse_ws_order(data) symbol = self.safe_string(parsed, 'symbol') orderId = self.safe_string(parsed, 'id') triggerPrice = self.safe_value(parsed, 'triggerPrice') isTriggerOrder = (triggerPrice is not None) if self.orders is None: limit = self.safe_integer(self.options, 'ordersLimit', 1000) self.orders = ArrayCacheBySymbolById(limit) self.triggerOrders = ArrayCacheBySymbolById(limit) cachedOrders = self.triggerOrders if isTriggerOrder else self.orders orders = self.safe_value(cachedOrders.hashmap, symbol, {}) order = self.safe_value(orders, orderId) if order is not None: # todo add others to calculate average etc if order['status'] == 'closed': parsed['status'] = 'closed' cachedOrders.append(parsed) client.resolve(cachedOrders, messageHash) symbolSpecificMessageHash = messageHash + ':' + symbol client.resolve(cachedOrders, symbolSpecificMessageHash) async def watch_my_trades(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Trade]: """ watches information on multiple trades made by the user https://www.kucoin.com/docs/websocket/spot-trading/private-channels/private-order-change :param str symbol: unified market symbol of the market trades were made in :param int [since]: the earliest time in ms to fetch trades for :param int [limit]: the maximum number of trade structures to retrieve :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.method]: '/spotMarket/tradeOrders' or '/spot/tradeFills' default is '/spotMarket/tradeOrders' :returns dict[]: a list of `trade structures ` """ await self.load_markets() url = await self.negotiate(True) topic: Str = None topic, params = self.handle_option_and_params(params, 'watchMyTrades', 'method', '/spotMarket/tradeOrders') request: dict = { 'privateChannel': True, } messageHash = 'myTrades' if symbol is not None: market = self.market(symbol) symbol = market['symbol'] messageHash = messageHash + ':' + market['symbol'] trades = await self.subscribe(url, messageHash, topic, self.extend(request, params)) if self.newUpdates: limit = trades.getLimit(symbol, limit) return self.filter_by_symbol_since_limit(trades, symbol, since, limit, True) def handle_my_trade(self, client: Client, message): # # { # "type": "message", # "topic": "/spotMarket/tradeOrders", # "subject": "orderChange", # "channelType": "private", # "data": { # "symbol": "KCS-USDT", # "orderType": "limit", # "side": "sell", # "orderId": "5efab07953bdea00089965fa", # "liquidity": "taker", # "type": "match", # "feeType": "takerFee", # "orderTime": 1670329987026, # "size": "0.1", # "filledSize": "0.1", # "price": "0.938", # "matchPrice": "0.96738", # "matchSize": "0.1", # "tradeId": "5efab07a4ee4c7000a82d6d9", # "clientOid": "1593487481000313", # "remainSize": "0", # "status": "match", # "ts": 1670329987311000000 # } # } # if self.myTrades is None: limit = self.safe_integer(self.options, 'tradesLimit', 1000) self.myTrades = ArrayCacheBySymbolById(limit) data = self.safe_dict(message, 'data') parsed = self.parse_ws_trade(data) myTrades = self.myTrades myTrades.append(parsed) messageHash = 'myTrades' client.resolve(self.myTrades, messageHash) symbolSpecificMessageHash = messageHash + ':' + parsed['symbol'] client.resolve(self.myTrades, symbolSpecificMessageHash) def parse_ws_trade(self, trade, market=None): # # /spotMarket/tradeOrders # # { # "symbol": "KCS-USDT", # "orderType": "limit", # "side": "sell", # "orderId": "5efab07953bdea00089965fa", # "liquidity": "taker", # "type": "match", # "feeType": "takerFee", # "orderTime": 1670329987026, # "size": "0.1", # "filledSize": "0.1", # "price": "0.938", # "matchPrice": "0.96738", # "matchSize": "0.1", # "tradeId": "5efab07a4ee4c7000a82d6d9", # "clientOid": "1593487481000313", # "remainSize": "0", # "status": "match", # "ts": 1670329987311000000 # } # # /spot/tradeFills # # { # "fee": 0.00262148, # "feeCurrency": "USDT", # "feeRate": 0.001, # "orderId": "62417436b29df8000183df2f", # "orderType": "market", # "price": 131.074, # "side": "sell", # "size": 0.02, # "symbol": "LTC-USDT", # "time": "1648456758734571745", # "tradeId": "624174362e113d2f467b3043" # } # marketId = self.safe_string(trade, 'symbol') market = self.safe_market(marketId, market, '-') symbol = market['symbol'] type = self.safe_string(trade, 'orderType') side = self.safe_string(trade, 'side') tradeId = self.safe_string(trade, 'tradeId') price = self.safe_string(trade, 'matchPrice') amount = self.safe_string(trade, 'matchSize') if price is None: # /spot/tradeFills price = self.safe_string(trade, 'price') amount = self.safe_string(trade, 'size') order = self.safe_string(trade, 'orderId') timestamp = self.safe_integer_product_2(trade, 'ts', 'time', 0.000001) feeCurrency = market['quote'] feeRate = self.safe_string(trade, 'feeRate') feeCost = self.safe_string(trade, 'fee') return self.safe_trade({ 'info': trade, 'timestamp': timestamp, 'datetime': self.iso8601(timestamp), 'symbol': symbol, 'id': tradeId, 'order': order, 'type': type, 'takerOrMaker': self.safe_string(trade, 'liquidity'), 'side': side, 'price': price, 'amount': amount, 'cost': None, 'fee': { 'cost': feeCost, 'rate': feeRate, 'currency': feeCurrency, }, }, market) async def watch_balance(self, params={}) -> Balances: """ watch balance and get the amount of funds available for trading or funds locked in orders https://www.kucoin.com/docs/websocket/spot-trading/private-channels/account-balance-change :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: a `balance structure ` """ await self.load_markets() url = await self.negotiate(True) topic = '/account/balance' request: dict = { 'privateChannel': True, } messageHash = 'balance' return await self.subscribe(url, messageHash, topic, self.extend(request, params)) def handle_balance(self, client: Client, message): # # { # "id":"6217a451294b030001e3a26a", # "type":"message", # "topic":"/account/balance", # "userId":"6217707c52f97f00012a67db", # "channelType":"private", # "subject":"account.balance", # "data":{ # "accountId":"62177fe67810720001db2f18", # "available":"89", # "availableChange":"-30", # "currency":"USDT", # "hold":"0", # "holdChange":"0", # "relationContext":{ # }, # "relationEvent":"main.transfer", # "relationEventId":"6217a451294b030001e3a26a", # "time":"1645716561816", # "total":"89" # } # data = self.safe_value(message, 'data', {}) messageHash = 'balance' currencyId = self.safe_string(data, 'currency') relationEvent = self.safe_string(data, 'relationEvent') requestAccountType = None if relationEvent is not None: relationEventParts = relationEvent.split('.') requestAccountType = self.safe_string(relationEventParts, 0) selectedType = self.safe_string_2(self.options, 'watchBalance', 'defaultType', 'trade') # trade, main, margin or other accountsByType = self.safe_value(self.options, 'accountsByType') uniformType = self.safe_string(accountsByType, requestAccountType, 'trade') if not (uniformType in self.balance): self.balance[uniformType] = {} self.balance[uniformType]['info'] = data timestamp = self.safe_integer(data, 'time') self.balance[uniformType]['timestamp'] = timestamp self.balance[uniformType]['datetime'] = self.iso8601(timestamp) code = self.safe_currency_code(currencyId) account = self.account() account['free'] = self.safe_string(data, 'available') account['used'] = self.safe_string(data, 'hold') account['total'] = self.safe_string(data, 'total') self.balance[uniformType][code] = account self.balance[uniformType] = self.safe_balance(self.balance[uniformType]) if uniformType == selectedType: client.resolve(self.balance[uniformType], messageHash) def handle_subject(self, client: Client, message): # # { # "type":"message", # "topic":"/market/level2:BTC-USDT", # "subject":"trade.l2update", # "data":{ # "sequenceStart":1545896669105, # "sequenceEnd":1545896669106, # "symbol":"BTC-USDT", # "changes": { # "asks": [["6","1","1545896669105"]], # price, size, sequence # "bids": [["4","1","1545896669106"]] # } # } # } # topic = self.safe_string(message, 'topic') if topic == '/market/ticker:all': self.handle_ticker(client, message) return subject = self.safe_string(message, 'subject') methods: dict = { 'level1': self.handle_bid_ask, 'level2': self.handle_order_book, 'trade.l2update': self.handle_order_book, 'trade.ticker': self.handle_ticker, 'trade.snapshot': self.handle_ticker, 'trade.l3match': self.handle_trade, 'trade.candles.update': self.handle_ohlcv, 'account.balance': self.handle_balance, 'orderChange': self.handle_order, 'stopOrder': self.handle_order, '/spot/tradeFills': self.handle_my_trade, } method = self.safe_value(methods, subject) if method is not None: method(client, message) def ping(self, client: Client): # kucoin does not support built-in ws protocol-level ping-pong # instead it requires a custom json-based text ping-pong # https://docs.kucoin.com/#ping id = str(self.request_id()) return { 'id': id, 'type': 'ping', } def handle_pong(self, client: Client, message): client.lastPong = self.milliseconds() # https://docs.kucoin.com/#ping def handle_error_message(self, client: Client, message) -> Bool: # # { # "id": "1", # "type": "error", # "code": 415, # "data": "type is not supported" # } # data = self.safe_string(message, 'data', '') if data == 'token is expired': type = 'public' if client.url.find('connectId=private') >= 0: type = 'private' self.options['urls'][type] = None self.handle_errors(1, '', client.url, '', {}, data, message, {}, {}) return False def handle_message(self, client: Client, message): type = self.safe_string(message, 'type') methods: dict = { # 'heartbeat': self.handleHeartbeat, 'welcome': self.handle_system_status, 'ack': self.handle_subscription_status, 'message': self.handle_subject, 'pong': self.handle_pong, 'error': self.handle_error_message, } method = self.safe_value(methods, type) if method is not None: method(client, message)