# -*- coding: utf-8 -*- # PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN: # https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code import ccxt.async_support from ccxt.async_support.base.ws.cache import ArrayCache, ArrayCacheBySymbolById import hashlib from ccxt.base.types import Any, Balances, Bool, Int, Order, OrderBook, Position, Str, Strings, Ticker, Tickers, Trade from ccxt.async_support.base.ws.client import Client from typing import List from ccxt.base.errors import ExchangeError from ccxt.base.errors import AuthenticationError from ccxt.base.errors import ArgumentsRequired from ccxt.base.precise import Precise class krakenfutures(ccxt.async_support.krakenfutures): def describe(self) -> Any: return self.deep_extend(super(krakenfutures, self).describe(), { 'has': { 'ws': True, 'cancelAllOrdersWs': False, 'cancelOrdersWs': False, 'cancelOrderWs': False, 'createOrderWs': False, 'editOrderWs': False, 'fetchBalanceWs': False, 'fetchOpenOrdersWs': False, 'fetchOrderWs': False, 'fetchTradesWs': False, 'watchOHLCV': False, 'watchOrderBook': True, 'watchOrderBookForSymbols': True, 'watchTicker': True, 'watchTickers': True, 'watchBidsAsks': True, 'watchTrades': True, 'watchTradesForSymbols': True, 'watchBalance': True, # 'watchStatus': True, # https://docs.futures.kraken.com/#websocket-api-public-feeds-heartbeat 'watchOrders': True, 'watchMyTrades': True, 'watchPositions': True, }, 'urls': { 'api': { 'ws': 'wss://futures.kraken.com/ws/v1', }, 'test': { 'ws': 'wss://demo-futures.kraken.com/ws/v1', }, }, 'options': { 'tradesLimit': 1000, 'ordersLimit': 1000, 'OHLCVLimit': 1000, 'connectionLimit': 100, # https://docs.futures.kraken.com/#websocket-api-websocket-api-introduction-subscriptions-limits 'requestLimit': 100, # per second 'fetchBalance': { 'type': None, }, }, 'streaming': { 'keepAlive': 30000, }, }) async def authenticate(self, params={}): """ @ignore authenticates the user to access private web socket channels https://docs.futures.kraken.com/#websocket-api-public-feeds-challenge :returns dict: response from exchange """ self.check_required_credentials() # Hash the challenge with the SHA-256 algorithm # Base64-decode your api_secret # Use the result of step 2 to hash the result of step 1 with the HMAC-SHA-512 algorithm # Base64-encode the result of step 3 url = self.urls['api']['ws'] messageHash = 'challenge' client = self.client(url) future = client.reusableFuture(messageHash) authenticated = self.safe_value(client.subscriptions, messageHash) if authenticated is None: request: dict = { 'event': 'challenge', 'api_key': self.apiKey, } message = self.extend(request, params) self.watch(url, messageHash, message, messageHash) return await future async def watch_order_book_for_symbols(self, symbols: List[str], limit: Int = None, params={}) -> OrderBook: """ watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data https://docs.futures.kraken.com/#websocket-api-public-feeds-challenge :param str[] symbols: unified array of symbols :param int [limit]: the maximum amount of order book entries to return :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: A dictionary of `order book structures ` indexed by market symbols """ orderbook = await self.watch_multi_helper('orderbook', 'book', symbols, {'limit': limit}, params) return orderbook.limit() async def subscribe_public(self, name: str, symbols: List[str], params={}): """ @ignore Connects to a websocket channel :param str name: name of the channel :param str[] symbols: CCXT market symbols :param dict [params]: extra parameters specific to the krakenfutures api :returns dict: data from the websocket stream """ await self.load_markets() url = self.urls['api']['ws'] subscribe: dict = { 'event': 'subscribe', 'feed': name, } marketIds = [] messageHash = name if symbols is None: symbols = [] for i in range(0, len(symbols)): symbol = symbols[i] marketIds.append(self.market_id(symbol)) length = len(symbols) if length == 1: market = self.market(marketIds[0]) messageHash = messageHash + ':' + market['symbol'] subscribe['product_ids'] = marketIds request = self.extend(subscribe, params) return await self.watch(url, messageHash, request, messageHash) async def subscribe_private(self, name: str, messageHash: str, params={}): """ @ignore Connects to a websocket channel :param str name: name of the channel :param str messageHash: unique identifier for the message :param dict [params]: extra parameters specific to the krakenfutures api :returns dict: data from the websocket stream """ await self.load_markets() await self.authenticate() url = self.urls['api']['ws'] subscribe: dict = { 'event': 'subscribe', 'feed': name, 'api_key': self.apiKey, 'original_challenge': self.options['challenge'], 'signed_challenge': self.options['signedChallenge'], } request = self.extend(subscribe, params) return await self.watch(url, messageHash, request, messageHash) async def watch_ticker(self, symbol: str, params={}) -> Ticker: """ watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market https://docs.futures.kraken.com/#websocket-api-public-feeds-ticker :param str symbol: unified symbol of the market to fetch the ticker for :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: a `ticker structure ` """ await self.load_markets() symbol = self.symbol(symbol) tickers = await self.watch_tickers([symbol], params) return tickers[symbol] async def watch_tickers(self, symbols: Strings = None, params={}) -> Tickers: """ watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market https://docs.futures.kraken.com/#websocket-api-public-feeds-ticker :param str[] symbols: unified symbols of the markets to fetch the ticker for :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: a `ticker structure ` """ await self.load_markets() symbols = self.market_symbols(symbols, None, False) ticker = await self.watch_multi_helper('ticker', 'ticker', symbols, None, params) if self.newUpdates: result: dict = {} result[ticker['symbol']] = ticker return result return self.filter_by_array(self.tickers, 'symbol', symbols) async def watch_bids_asks(self, symbols: Strings = None, params={}) -> Tickers: """ https://docs.futures.kraken.com/#websocket-api-public-feeds-ticker-lite watches best bid & ask for symbols :param str[] symbols: unified symbol of the market to fetch the ticker for :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: a `ticker structure ` """ ticker = await self.watch_multi_helper('bidask', 'ticker_lite', symbols, None, params) if self.newUpdates: result: dict = {} result[ticker['symbol']] = ticker return result return self.filter_by_array(self.bidsasks, 'symbol', symbols) async def watch_trades(self, symbol: Str, since: Int = None, limit: Int = None, params={}) -> List[Trade]: """ get the list of most recent trades for a particular symbol https://docs.futures.kraken.com/#websocket-api-public-feeds-trade :param str symbol: unified symbol of the market to fetch trades for :param int [since]: timestamp in ms of the earliest trade to fetch :param int [limit]: the maximum amount of trades to fetch :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `trade structures ` """ return await self.watch_trades_for_symbols([symbol], since, limit, params) async def watch_trades_for_symbols(self, symbols: List[str], since: Int = None, limit: Int = None, params={}) -> List[Trade]: """ https://docs.futures.kraken.com/#websocket-api-public-feeds-trade get the list of most recent trades for a list of symbols :param str[] symbols: unified symbol of the market to fetch trades for :param int [since]: timestamp in ms of the earliest trade to fetch :param int [limit]: the maximum amount of trades to fetch :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `trade structures ` """ trades = await self.watch_multi_helper('trade', 'trade', symbols, None, params) if self.newUpdates: first = self.safe_list(trades, 0) tradeSymbol = self.safe_string(first, 'symbol') limit = trades.getLimit(tradeSymbol, limit) return self.filter_by_since_limit(trades, since, limit, 'timestamp', True) async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook: """ watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data https://docs.futures.kraken.com/#websocket-api-public-feeds-book :param str symbol: unified symbol of the market to fetch the order book for :param int [limit]: not used by krakenfutures watchOrderBook :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: A dictionary of `order book structures ` indexed by market symbols """ return await self.watch_order_book_for_symbols([symbol], limit, params) async def watch_positions(self, symbols: Strings = None, since: Int = None, limit: Int = None, params={}) -> List[Position]: """ https://docs.futures.kraken.com/#websocket-api-private-feeds-open-positions watch all open positions :param str[]|None symbols: list of unified market symbols @param since @param limit :param dict params: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `position structure ` """ await self.load_markets() messageHash = '' symbols = self.market_symbols(symbols) if not self.is_empty(symbols): messageHash = '::' + ','.join(symbols) messageHash = 'positions' + messageHash newPositions = await self.subscribe_private('open_positions', messageHash, params) if self.newUpdates: return newPositions return self.filter_by_symbols_since_limit(self.positions, symbols, since, limit, True) def handle_positions(self, client, message): # # { # feed: 'open_positions', # account: '3b111acc-4fcc-45be-a622-57e611fe9f7f', # positions: [ # { # instrument: 'PF_LTCUSD', # balance: 0.5, # pnl: -0.8628305877699987, # entry_price: 70.53, # mark_price: 68.80433882446, # index_price: 68.8091, # liquidation_threshold: 0, # effective_leverage: 0.007028866753648637, # return_on_equity: -1.2233525985679834, # unrealized_funding: 0.0000690610530935388, # initial_margin: 0.7053, # initial_margin_with_orders: 0.7053, # maintenance_margin: 0.35265, # pnl_currency: 'USD' # } # ], # seq: 0, # timestamp: 1698608414910 # } # if self.positions is None: self.positions = ArrayCacheBySymbolById() cache = self.positions rawPositions = self.safe_value(message, 'positions', []) newPositions = [] for i in range(0, len(rawPositions)): rawPosition = rawPositions[i] position = self.parse_ws_position(rawPosition) timestamp = self.safe_integer(message, 'timestamp') position['timestamp'] = timestamp position['datetime'] = self.iso8601(timestamp) newPositions.append(position) cache.append(position) messageHashes = self.find_message_hashes(client, 'positions::') for i in range(0, len(messageHashes)): messageHash = messageHashes[i] parts = messageHash.split('::') symbolsString = parts[1] symbols = symbolsString.split(',') positions = self.filter_by_array(newPositions, 'symbol', symbols, False) if not self.is_empty(positions): client.resolve(positions, messageHash) client.resolve(newPositions, 'positions') def parse_ws_position(self, position, market=None): # # { # instrument: 'PF_LTCUSD', # balance: 0.5, # pnl: -0.8628305877699987, # entry_price: 70.53, # mark_price: 68.80433882446, # index_price: 68.8091, # liquidation_threshold: 0, # effective_leverage: 0.007028866753648637, # return_on_equity: -1.2233525985679834, # unrealized_funding: 0.0000690610530935388, # initial_margin: 0.7053, # initial_margin_with_orders: 0.7053, # maintenance_margin: 0.35265, # pnl_currency: 'USD' # } # marketId = self.safe_string(position, 'instrument') hedged = 'both' balance = self.safe_number(position, 'balance') side = 'long' if (balance > 0) else 'short' return self.safe_position({ 'info': position, 'id': None, 'symbol': self.safe_symbol(marketId), 'notional': None, 'marginMode': None, 'liquidationPrice': self.safe_number(position, 'liquidation_threshold'), 'entryPrice': self.safe_number(position, 'entry_price'), 'unrealizedPnl': self.safe_number(position, 'pnl'), 'percentage': self.safe_number(position, 'return_on_equity'), 'contracts': self.parse_number(Precise.string_abs(self.number_to_string(balance))), 'contractSize': None, 'markPrice': self.safe_number(position, 'mark_price'), 'side': side, 'hedged': hedged, 'timestamp': None, 'datetime': None, 'maintenanceMargin': self.safe_number(position, 'maintenance_margin'), 'maintenanceMarginPercentage': None, 'collateral': None, 'initialMargin': self.safe_number(position, 'initial_margin'), 'initialMarginPercentage': None, 'leverage': None, 'marginRatio': None, }) async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]: """ watches information on multiple orders made by the user https://docs.futures.kraken.com/#websocket-api-private-feeds-open-orders https://docs.futures.kraken.com/#websocket-api-private-feeds-open-orders-verbose :param str symbol: not used by krakenfutures watchOrders :param int [since]: not used by krakenfutures watchOrders :param int [limit]: not used by krakenfutures watchOrders :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `order structures ` """ await self.load_markets() name = 'open_orders' messageHash = 'orders' if symbol is not None: market = self.market(symbol) messageHash += ':' + market['symbol'] orders = await self.subscribe_private(name, messageHash, params) if self.newUpdates: limit = orders.getLimit(symbol, limit) return self.filter_by_since_limit(orders, since, limit, 'timestamp', True) async def watch_my_trades(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Trade]: """ watches information on multiple trades made by the user https://docs.futures.kraken.com/#websocket-api-private-feeds-fills :param str symbol: unified market symbol of the market orders were made in :param int [since]: the earliest time in ms to fetch orders for :param int [limit]: the maximum number of order structures to retrieve :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `trade structures ` """ await self.load_markets() name = 'fills' messageHash = 'myTrades' if symbol is not None: market = self.market(symbol) messageHash += ':' + market['symbol'] trades = await self.subscribe_private(name, messageHash, params) if self.newUpdates: limit = trades.getLimit(symbol, limit) return self.filter_by_since_limit(trades, since, limit, 'timestamp', True) async def watch_balance(self, params={}) -> Balances: """ watches information on the user's account balance https://docs.futures.kraken.com/#websocket-api-private-feeds-balances :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.account]: can be either 'futures' or 'flex_futures' :returns dict} a object of wallet types each with a balance structure {@link https://docs.ccxt.com/#/?id=balance-structure: """ await self.load_markets() name = 'balances' messageHash = name account = None account, params = self.handle_option_and_params(params, 'watchBalance', 'account') if account is not None: if account != 'futures' and account != 'flex_futures': raise ArgumentsRequired(self.id + ' watchBalance account must be either \'futures\' or \'flex_futures\'') messageHash += ':' + account return await self.subscribe_private(name, messageHash, params) def handle_trade(self, client: Client, message): # # snapshot # # { # "feed": "trade_snapshot", # "product_id": "PI_XBTUSD", # "trades": [ # { # "feed": "trade", # "product_id": "PI_XBTUSD", # "uid": "caa9c653-420b-4c24-a9f2-462a054d86f1", # "side": "sell", # "type": "fill", # "seq": 655508, # "time": 1612269657781, # "qty": 440, # "price": 34893 # }, # ... # ] # } # # update # # { # "feed": "trade", # "product_id": "PI_XBTUSD", # "uid": "05af78ac-a774-478c-a50c-8b9c234e071e", # "side": "sell", # "type": "fill", # "seq": 653355, # "time": 1612266317519, # "qty": 15000, # "price": 34969.5 # } # channel = self.safe_string(message, 'feed') marketId = self.safe_string(message, 'product_id') if marketId is not None: market = self.market(marketId) symbol = market['symbol'] messageHash = self.get_message_hash('trade', None, symbol) if self.safe_list(self.trades, symbol) is None: tradesLimit = self.safe_integer(self.options, 'tradesLimit', 1000) self.trades[symbol] = ArrayCache(tradesLimit) tradesArray = self.trades[symbol] if channel == 'trade_snapshot': trades = self.safe_list(message, 'trades', []) length = len(trades) for i in range(0, length): index = length - 1 - i # need reverse to correct chronology item = trades[index] trade = self.parse_ws_trade(item) tradesArray.append(trade) else: trade = self.parse_ws_trade(message) tradesArray.append(trade) client.resolve(tradesArray, messageHash) def parse_ws_trade(self, trade, market=None): # # { # "feed": "trade", # "product_id": "PI_XBTUSD", # "uid": "caa9c653-420b-4c24-a9f1-462a054d86f1", # "side": "sell", # "type": "fill", # "seq": 655508, # "time": 1612269657781, # "qty": 440, # "price": 34893 # } # marketId = self.safe_string(trade, 'product_id') market = self.safe_market(marketId, market) timestamp = self.safe_integer(trade, 'time') return self.safe_trade({ 'info': trade, 'id': self.safe_string(trade, 'uid'), 'symbol': self.safe_string(market, 'symbol'), 'timestamp': timestamp, 'datetime': self.iso8601(timestamp), 'order': None, 'type': self.safe_string(trade, 'type'), 'side': self.safe_string(trade, 'side'), 'takerOrMaker': 'taker', 'price': self.safe_string(trade, 'price'), 'amount': self.safe_string(trade, 'qty'), 'cost': None, 'fee': { 'rate': None, 'cost': None, 'currency': None, }, }, market) def parse_ws_order_trade(self, trade, market=None): # # { # "symbol": "BTC_USDT", # "type": "LIMIT", # "quantity": "1", # "orderId": "32471407854219264", # "tradeFee": "0", # "clientOrderId": "", # "accountType": "SPOT", # "feeCurrency": "", # "eventType": "place", # "source": "API", # "side": "BUY", # "filledQuantity": "0", # "filledAmount": "0", # "matchRole": "MAKER", # "state": "NEW", # "tradeTime": 0, # "tradeAmount": "0", # "orderAmount": "0", # "createTime": 1648708186922, # "price": "47112.1", # "tradeQty": "0", # "tradePrice": "0", # "tradeId": "0", # "ts": 1648708187469 # } # timestamp = self.safe_integer(trade, 'tradeTime') marketId = self.safe_string(trade, 'symbol') return self.safe_trade({ 'info': trade, 'id': self.safe_string(trade, 'tradeId'), 'symbol': self.safe_symbol(marketId, market), 'timestamp': timestamp, 'datetime': self.iso8601(timestamp), 'order': self.safe_string(trade, 'orderId'), 'type': self.safe_string_lower(trade, 'type'), 'side': self.safe_string(trade, 'side'), 'takerOrMaker': self.safe_string(trade, 'matchRole'), 'price': self.safe_string(trade, 'price'), 'amount': self.safe_string(trade, 'tradeAmount'), # ? tradeQty? 'cost': None, 'fee': { 'rate': None, 'cost': self.safe_string(trade, 'tradeFee'), 'currency': self.safe_string(trade, 'feeCurrency'), }, }, market) def handle_order(self, client: Client, message): # # update(verbose) # # { # "feed": "open_orders_verbose", # "order": { # "instrument": "PI_XBTUSD", # "time": 1567597581495, # "last_update_time": 1567597581495, # "qty": 102.0, # "filled": 0.0, # "limit_price": 10601.0, # "stop_price": 0.0, # "type": "limit", # "order_id": "fa9806c9-cba9-4661-9f31-8c5fd045a95d", # "direction": 0, # "reduce_only": False # }, # "is_cancel": True, # "reason": "post_order_failed_because_it_would_be_filled" # } # # update # # { # "feed": "open_orders", # "order": { # "instrument": "PI_XBTUSD", # "time": 1567702877410, # "last_update_time": 1567702877410, # "qty": 304.0, # "filled": 0.0, # "limit_price": 10640.0, # "stop_price": 0.0, # "type": "limit", # "order_id": "59302619-41d2-4f0b-941f-7e7914760ad3", # "direction": 1, # "reduce_only": True # }, # "is_cancel": False, # "reason": "new_placed_order_by_user" # } # { # "feed": "open_orders", # "order_id": "ea8a7144-37db-449b-bb4a-b53c814a0f43", # "is_cancel": True, # "reason": "cancelled_by_user" # } # # { # "feed": 'open_orders', # "order": { # "instrument": 'PF_XBTUSD', # "time": 1698159920097, # "last_update_time": 1699835622988, # "qty": 1.1, # "filled": 0, # "limit_price": 20000, # "stop_price": 0, # "type": 'limit', # "order_id": '0eaf02b0-855d-4451-a3b7-e2b3070c1fa4', # "direction": 0, # "reduce_only": False # }, # "is_cancel": False, # "reason": 'edited_by_user' # } # orders = self.orders if orders is None: limit = self.safe_integer(self.options, 'ordersLimit') orders = ArrayCacheBySymbolById(limit) self.orders = orders order = self.safe_value(message, 'order') if order is not None: marketId = self.safe_string(order, 'instrument') messageHash = 'orders' symbol = self.safe_symbol(marketId) orderId = self.safe_string(order, 'order_id') previousOrders = self.safe_value(orders.hashmap, symbol, {}) previousOrder = self.safe_value(previousOrders, orderId) reason = self.safe_string(message, 'reason') if (previousOrder is None) or (reason == 'edited_by_user'): parsed = self.parse_ws_order(order) orders.append(parsed) client.resolve(orders, messageHash) client.resolve(orders, messageHash + ':' + symbol) else: trade = self.parse_ws_trade(order) if previousOrder['trades'] is None: previousOrder['trades'] = [] previousOrder['trades'].append(trade) previousOrder['lastTradeTimestamp'] = trade['timestamp'] totalCost = '0' totalAmount = '0' trades = previousOrder['trades'] for i in range(0, len(trades)): currentTrade = trades[i] totalCost = Precise.string_add(totalCost, self.number_to_string(currentTrade['cost'])) totalAmount = Precise.string_add(totalAmount, self.number_to_string(currentTrade['amount'])) if Precise.string_gt(totalAmount, '0'): previousOrder['average'] = Precise.string_div(totalCost, totalAmount) previousOrder['cost'] = totalCost if previousOrder['filled'] is not None: stringOrderFilled = self.number_to_string(previousOrder['filled']) previousOrder['filled'] = Precise.string_add(stringOrderFilled, self.number_to_string(trade['amount'])) if previousOrder['amount'] is not None: previousOrder['remaining'] = Precise.string_sub(self.number_to_string(previousOrder['amount']), stringOrderFilled) if previousOrder['fee'] is None: previousOrder['fee'] = { 'rate': None, 'cost': '0', 'currency': self.number_to_string(trade['fee']['currency']), } if (previousOrder['fee']['cost'] is not None) and (trade['fee']['cost'] is not None): stringOrderCost = self.number_to_string(previousOrder['fee']['cost']) stringTradeCost = self.number_to_string(trade['fee']['cost']) previousOrder['fee']['cost'] = Precise.string_add(stringOrderCost, stringTradeCost) # update the newUpdates count orders.append(self.safe_order(previousOrder)) client.resolve(orders, messageHash + ':' + symbol) client.resolve(orders, messageHash) else: isCancel = self.safe_value(message, 'is_cancel') if isCancel: # get order without symbol for i in range(0, len(orders)): currentOrder = orders[i] if currentOrder['id'] == message['order_id']: orders[i] = self.extend(currentOrder, { 'status': 'canceled', }) client.resolve(orders, 'orders') client.resolve(orders, 'orders:' + currentOrder['symbol']) break return message def handle_order_snapshot(self, client: Client, message): # # verbose # # { # "feed": "open_orders_verbose_snapshot", # "account": "0f9c23b8-63e2-40e4-9592-6d5aa57c12ba", # "orders": [ # { # "instrument": "PI_XBTUSD", # "time": 1567428848005, # "last_update_time": 1567428848005, # "qty": 100.0, # "filled": 0.0, # "limit_price": 8500.0, # "stop_price": 0.0, # "type": "limit", # "order_id": "566942c8-a3b5-4184-a451-622b09493129", # "direction": 0, # "reduce_only": False # }, # ... # ] # } # # regular # # { # "feed": "open_orders_snapshot", # "account": "e258dba9-4dd4-4da5-bfef-75beb91c098e", # "orders": [ # { # "instrument": "PI_XBTUSD", # "time": 1612275024153, # "last_update_time": 1612275024153, # "qty": 1000, # "filled": 0, # "limit_price": 34900, # "stop_price": 13789, # "type": "stop", # "order_id": "723ba95f-13b7-418b-8fcf-ab7ba6620555", # "direction": 1, # "reduce_only": False, # "triggerSignal": "last" # }, # ... # ] # } orders = self.safe_value(message, 'orders', []) limit = self.safe_integer(self.options, 'ordersLimit') self.orders = ArrayCacheBySymbolById(limit) symbols: dict = {} cachedOrders = self.orders for i in range(0, len(orders)): order = orders[i] parsed = self.parse_ws_order(order) symbol = parsed['symbol'] symbols[symbol] = True cachedOrders.append(parsed) length = len(self.orders) if length > 0: client.resolve(self.orders, 'orders') keys = list(symbols.keys()) for i in range(0, len(keys)): symbol = keys[i] messageHash = 'orders:' + symbol client.resolve(self.orders, messageHash) def parse_ws_order(self, order, market=None): # # update # # { # "feed": "open_orders_verbose", # "order": { # "instrument": "PI_XBTUSD", # "time": 1567597581495, # "last_update_time": 1567597581495, # "qty": 102.0, # "filled": 0.0, # "limit_price": 10601.0, # "stop_price": 0.0, # "type": "limit", # "order_id": "fa9806c9-cba9-4661-9f31-8c5fd045a95d", # "direction": 0, # "reduce_only": False # }, # "is_cancel": True, # "reason": "post_order_failed_because_it_would_be_filled" # } # # snapshot # # { # "instrument": "PI_XBTUSD", # "time": 1567597581495, # "last_update_time": 1567597581495, # "qty": 102.0, # "filled": 0.0, # "limit_price": 10601.0, # "stop_price": 0.0, # "type": "limit", # "order_id": "fa9806c9-cba9-4661-9f31-8c5fd045a95d", # "direction": 0, # "reduce_only": False # } # isCancelled = self.safe_value(order, 'is_cancel') unparsedOrder = order status = None if isCancelled is not None: unparsedOrder = self.safe_value(order, 'order') if isCancelled is True: status = 'cancelled' marketId = self.safe_string(unparsedOrder, 'instrument') timestamp = self.safe_string(unparsedOrder, 'time') direction = self.safe_integer(unparsedOrder, 'direction') return self.safe_order({ 'info': order, 'symbol': self.safe_symbol(marketId, market), 'id': self.safe_string(unparsedOrder, 'order_id'), 'clientOrderId': None, 'timestamp': timestamp, 'datetime': self.iso8601(timestamp), 'lastTradeTimestamp': None, 'type': self.safe_string(unparsedOrder, 'type'), 'timeInForce': None, 'postOnly': None, 'side': 'buy' if (direction == 0) else 'sell', 'price': self.safe_string(unparsedOrder, 'limit_price'), 'stopPrice': self.safe_string(unparsedOrder, 'stop_price'), 'triggerPrice': self.safe_string(unparsedOrder, 'stop_price'), 'amount': self.safe_string(unparsedOrder, 'qty'), 'cost': None, 'average': None, 'filled': self.safe_string(unparsedOrder, 'filled'), 'remaining': None, 'status': status, 'fee': { 'rate': None, 'cost': None, 'currency': None, }, 'trades': None, }) def handle_ticker(self, client: Client, message): # # { # "time": 1680811086487, # "product_id": "PI_XBTUSD", # "funding_rate": 7.792297e-12, # "funding_rate_prediction": -4.2671095e-11, # "relative_funding_rate": 2.18013888889e-7, # "relative_funding_rate_prediction": -0.0000011974, # "next_funding_rate_time": 1680811200000, # "feed": "ticker", # "bid": 28060, # "ask": 28070, # "bid_size": 2844, # "ask_size": 1902, # "volume": 19628180, # "dtm": 0, # "leverage": "50x", # "index": 28062.14, # "premium": 0, # "last": 28053.5, # "change": -0.7710945651981715, # "suspended": False, # "tag": "perpetual", # "pair": "XBT:USD", # "openInterest": 28875946, # "markPrice": 28064.92082724592, # "maturityTime": 0, # "post_only": False, # "volumeQuote": 19628180 # } # marketId = self.safe_string(message, 'product_id') if marketId is not None: ticker = self.parse_ws_ticker(message) symbol = ticker['symbol'] self.tickers[symbol] = ticker messageHash = self.get_message_hash('ticker', None, symbol) client.resolve(ticker, messageHash) def handle_bid_ask(self, client: Client, message): # # { # "feed": "ticker_lite", # "product_id": "FI_ETHUSD_210625", # "bid": 1753.45, # "ask": 1760.35, # "change": 13.448175559936647, # "premium": 9.1, # "volume": 6899673.0, # "tag": "semiannual", # "pair": "ETH:USD", # "dtm": 141, # "maturityTime": 1624633200000, # "volumeQuote": 6899673.0 # } # marketId = self.safe_string(message, 'product_id') if marketId is not None: ticker = self.parse_ws_ticker(message) symbol = ticker['symbol'] self.bidsasks[symbol] = ticker messageHash = self.get_message_hash('bidask', None, symbol) client.resolve(ticker, messageHash) def parse_ws_ticker(self, ticker, market=None): # # { # "time": 1680811086487, # "product_id": "PI_XBTUSD", # "funding_rate": 7.792297e-12, # "funding_rate_prediction": -4.2671095e-11, # "relative_funding_rate": 2.18013888889e-7, # "relative_funding_rate_prediction": -0.0000011974, # "next_funding_rate_time": 1680811200000, # "feed": "ticker", # "bid": 28060, # "ask": 28070, # "bid_size": 2844, # "ask_size": 1902, # "volume": 19628180, # "dtm": 0, # "leverage": "50x", # "index": 28062.14, # "premium": 0, # "last": 28053.5, # "change": -0.7710945651981715, # "suspended": False, # "tag": "perpetual", # "pair": "XBT:USD", # "openInterest": 28875946, # "markPrice": 28064.92082724592, # "maturityTime": 0, # "post_only": False, # "volumeQuote": 19628180 # } # # ticker_lite # # { # "feed": "ticker_lite", # "product_id": "FI_ETHUSD_210625", # "bid": 1753.45, # "ask": 1760.35, # "change": 13.448175559936647, # "premium": 9.1, # "volume": 6899673.0, # "tag": "semiannual", # "pair": "ETH:USD", # "dtm": 141, # "maturityTime": 1624633200000, # "volumeQuote": 6899673.0 # } # marketId = self.safe_string(ticker, 'product_id') market = self.safe_market(marketId, market) symbol = market['symbol'] timestamp = self.parse8601(self.safe_string(ticker, 'lastTime')) last = self.safe_string(ticker, 'last') return self.safe_ticker({ 'info': ticker, 'symbol': symbol, 'timestamp': timestamp, 'datetime': self.iso8601(timestamp), 'high': None, 'low': None, 'bid': self.safe_string(ticker, 'bid'), 'bidVolume': self.safe_string(ticker, 'bid_size'), 'ask': self.safe_string(ticker, 'ask'), 'askVolume': self.safe_string(ticker, 'ask_size'), 'vwap': None, 'open': None, 'close': last, 'last': last, 'previousClose': None, 'change': self.safe_string(ticker, 'change'), 'percentage': None, 'average': None, 'baseVolume': self.safe_string(ticker, 'volume'), 'quoteVolume': self.safe_string(ticker, 'volumeQuote'), 'markPrice': self.safe_string(ticker, 'markPrice'), 'indexPrice': self.safe_string(ticker, 'index'), }) def handle_order_book_snapshot(self, client: Client, message): # # { # "feed": "book_snapshot", # "product_id": "PI_XBTUSD", # "timestamp": 1612269825817, # "seq": 326072249, # "tickSize": null, # "bids": [ # { # "price": 34892.5, # "qty": 6385 # }, # { # "price": 34892, # "qty": 10924 # }, # ], # "asks": [ # { # "price": 34911.5, # "qty": 20598 # }, # { # "price": 34912, # "qty": 2300 # }, # ] # } # marketId = self.safe_string(message, 'product_id') market = self.safe_market(marketId) symbol = market['symbol'] messageHash = self.get_message_hash('orderbook', None, symbol) subscription = self.safe_dict(client.subscriptions, messageHash, {}) limit = self.safe_integer(subscription, 'limit') timestamp = self.safe_integer(message, 'timestamp') self.orderbooks[symbol] = self.order_book({}, limit) orderbook = self.orderbooks[symbol] bids = self.safe_list(message, 'bids') asks = self.safe_list(message, 'asks') for i in range(0, len(bids)): bid = bids[i] price = self.safe_number(bid, 'price') qty = self.safe_number(bid, 'qty') bidsSide = orderbook['bids'] bidsSide.store(price, qty) for i in range(0, len(asks)): ask = asks[i] price = self.safe_number(ask, 'price') qty = self.safe_number(ask, 'qty') asksSide = orderbook['asks'] asksSide.store(price, qty) orderbook['timestamp'] = timestamp orderbook['datetime'] = self.iso8601(timestamp) orderbook['symbol'] = symbol client.resolve(orderbook, messageHash) def handle_order_book(self, client: Client, message): # # { # "feed": "book", # "product_id": "PI_XBTUSD", # "side": "sell", # "seq": 326094134, # "price": 34981, # "qty": 0, # "timestamp": 1612269953629 # } # marketId = self.safe_string(message, 'product_id') market = self.safe_market(marketId) symbol = market['symbol'] messageHash = self.get_message_hash('orderbook', None, symbol) orderbook = self.orderbooks[symbol] side = self.safe_string(message, 'side') price = self.safe_number(message, 'price') qty = self.safe_number(message, 'qty') timestamp = self.safe_integer(message, 'timestamp') if side == 'sell': asks = orderbook['asks'] asks.store(price, qty) else: bids = orderbook['bids'] bids.store(price, qty) orderbook['timestamp'] = timestamp orderbook['datetime'] = self.iso8601(timestamp) client.resolve(orderbook, messageHash) def handle_balance(self, client: Client, message): # # snapshot # # { # "feed": "balances_snapshot", # "account": "4a012c31-df95-484a-9473-d51e4a0c4ae7", # "holding": { # "USDT": 4997.5012493753, # "XBT": 0.1285407184, # ... # }, # "futures": { # "F-ETH:EUR": { # "name": "F-ETH:EUR", # "pair": "ETH/EUR", # "unit": "EUR", # "portfolio_value": 0.0, # "balance": 0.0, # "maintenance_margin": 0.0, # "initial_margin": 0.0, # "available": 0.0, # "unrealized_funding": 0.0, # "pnl": 0.0 # }, # ... # }, # "flex_futures": { # "currencies": { # "USDT": { # "quantity": 0.0, # "value": 0.0, # "collateral_value": 0.0, # "available": 0.0, # "haircut": 0.0, # "conversion_spread": 0.0 # }, # ... # }, # "balance_value":0.0, # "portfolio_value":0.0, # "collateral_value":0.0, # "initial_margin":0.0, # "initial_margin_without_orders":0.0, # "maintenance_margin":0.0, # "pnl":0.0, # "unrealized_funding":0.0, # "total_unrealized":0.0, # "total_unrealized_as_margin":0.0, # "margin_equity":0.0, # "available_margin":0.0 # "isolated":{ # }, # "cross":{ # "balance_value":9963.66, # "portfolio_value":9963.66, # "collateral_value":9963.66, # "initial_margin":0.0, # "initial_margin_without_orders":0.0, # "maintenance_margin":0.0, # "pnl":0.0, # "unrealized_funding":0.0, # "total_unrealized":0.0, # "total_unrealized_as_margin":0.0, # "margin_equity":9963.66, # "available_margin":9963.66, # "effective_leverage":0.0 # }, # }, # "timestamp":1640995200000, # "seq":0 # } # # update # # Holding Wallet # # { # "feed": "balances", # "account": "7a641082-55c7-4411-a85f-930ec2e09617", # "holding": { # "USD": 5000.0 # }, # "futures": {}, # "timestamp": 1640995200000, # "seq": 83 # } # # Multi-Collateral # # { # "feed": "balances" # "account": "7a641082-55c7-4411-a85f-930ec2e09617" # "flex_futures": { # "currencies": { # "USDT": { # "quantity": 0.0, # "value": 0.0, # "collateral_value": 0.0, # "available": 0.0, # "haircut": 0.0, # "conversion_spread": 0.0 # }, # ... # }, # "balance_value": 5000.0, # "portfolio_value": 5000.0, # "collateral_value": 5000.0, # "initial_margin": 0.0, # "initial_margin_without_orders": 0.0, # "maintenance_margin": 0.0, # "pnl": 0.0, # "unrealized_funding": 0.0, # "total_unrealized": 0.0, # "total_unrealized_as_margin": 0.0, # "margin_equity": 5000.0, # "available_margin": 5000.0 # }, # "timestamp": 1640995200000, # "seq": 1 # } # # Sample Single-Collateral Balance Delta # # { # "feed": "balances", # "account": "7a641082-55c7-4411-a85f-930ec2e09617", # "holding": {}, # "futures": { # "F-XBT:USD": { # "name": "F-XBT:USD", # "pair": "XBT/USD", # "unit": "XBT", # "portfolio_value": 0.1219368845, # "balance": 0.1219368845, # "maintenance_margin": 0.0, # "initial_margin": 0.0, # "available": 0.1219368845, # "unrealized_funding": 0.0, # "pnl": 0.0 # } # }, # "timestamp": 1640995200000, # "seq": 2 # } # holding = self.safe_value(message, 'holding') futures = self.safe_value(message, 'futures') flexFutures = self.safe_value(message, 'flex_futures') messageHash = 'balances' timestamp = self.safe_integer(message, 'timestamp') if holding is not None: holdingKeys = list(holding.keys()) # cashAccount holdingResult: dict = { 'info': message, 'timestamp': timestamp, 'datetime': self.iso8601(timestamp), } for i in range(0, len(holdingKeys)): key = holdingKeys[i] code = self.safe_currency_code(key) newAccount = self.account() newAccount['total'] = self.safe_string(holding, key) holdingResult[code] = newAccount self.balance['cash'] = holdingResult self.balance['cash'] = self.safe_balance(self.balance['cash']) client.resolve(holdingResult, messageHash) if futures is not None: futuresKeys = list(futures.keys()) # marginAccount futuresResult: dict = { 'info': message, 'timestamp': timestamp, 'datetime': self.iso8601(timestamp), } for i in range(0, len(futuresKeys)): key = futuresKeys[i] symbol = self.safe_symbol(key) newAccount = self.account() future = self.safe_value(futures, key) currencyId = self.safe_string(future, 'unit') code = self.safe_currency_code(currencyId) newAccount['free'] = self.safe_string(future, 'available') newAccount['used'] = self.safe_string(future, 'initial_margin') newAccount['total'] = self.safe_string(future, 'balance') futuresResult[symbol] = {} futuresResult[symbol][code] = newAccount self.balance['margin'] = futuresResult self.balance['margin'] = self.safe_balance(self.balance['margin']) client.resolve(self.balance['margin'], messageHash + 'futures') if flexFutures is not None: flexFutureCurrencies = self.safe_value(flexFutures, 'currencies', {}) flexFuturesKeys = list(flexFutureCurrencies.keys()) # multi-collateral margin account flexFuturesResult: dict = { 'info': message, 'timestamp': timestamp, 'datetime': self.iso8601(timestamp), } for i in range(0, len(flexFuturesKeys)): key = flexFuturesKeys[i] flexFuture = self.safe_value(flexFutureCurrencies, key) code = self.safe_currency_code(key) newAccount = self.account() newAccount['free'] = self.safe_string(flexFuture, 'available') newAccount['used'] = self.safe_string(flexFuture, 'collateral_value') newAccount['total'] = self.safe_string(flexFuture, 'quantity') flexFuturesResult[code] = newAccount self.balance['flex'] = flexFuturesResult self.balance['flex'] = self.safe_balance(self.balance['flex']) client.resolve(self.balance['flex'], messageHash + 'flex_futures') client.resolve(self.balance, messageHash) def handle_my_trades(self, client: Client, message): # # { # "feed": "fills_snapshot", # "account": "DemoUser", # "fills": [ # { # "instrument": "FI_XBTUSD_200925", # "time": 1600256910739, # "price": 10937.5, # "seq": 36, # "buy": True, # "qty": 5000.0, # "order_id": "9e30258b-5a98-4002-968a-5b0e149bcfbf", # "cli_ord_id": "8b58d9da-fcaf-4f60-91bc-9973a3eba48d", # only on update, not on snapshot # "fill_id": "cad76f07-814e-4dc6-8478-7867407b6bff", # "fill_type": "maker", # "fee_paid": -0.00009142857, # "fee_currency": "BTC", # "taker_order_type": "ioc", # "order_type": "limit" # }, # ... # ] # } # trades = self.safe_value(message, 'fills', []) stored = self.myTrades if stored is None: limit = self.safe_integer(self.options, 'tradesLimit', 1000) stored = ArrayCacheBySymbolById(limit) self.myTrades = stored tradeSymbols: dict = {} for i in range(0, len(trades)): trade = trades[i] parsedTrade = self.parse_ws_my_trade(trade) tradeSymbols[parsedTrade['symbol']] = True stored.append(parsedTrade) tradeSymbolKeys = list(tradeSymbols.keys()) for i in range(0, len(tradeSymbolKeys)): symbol = tradeSymbolKeys[i] messageHash = 'myTrades:' + symbol client.resolve(stored, messageHash) client.resolve(stored, 'myTrades') def parse_ws_my_trade(self, trade, market=None): # # { # "instrument": "FI_XBTUSD_200925", # "time": 1600256910739, # "price": 10937.5, # "seq": 36, # "buy": True, # "qty": 5000.0, # "order_id": "9e30258b-5a98-4002-968a-5b0e149bcfbf", # "cli_ord_id": "8b58d9da-fcaf-4f60-91bc-9973a3eba48d", # only on update, not on snapshot # "fill_id": "cad76f07-814e-4dc6-8478-7867407b6bff", # "fill_type": "maker", # "fee_paid": -0.00009142857, # "fee_currency": "BTC", # "taker_order_type": "ioc", # "order_type": "limit" # } # timestamp = self.safe_integer(trade, 'time') marketId = self.safe_string(trade, 'instrument') market = self.safe_market(marketId, market) isBuy = self.safe_value(trade, 'buy') feeCurrencyId = self.safe_string(trade, 'fee_currency') return self.safe_trade({ 'info': trade, 'id': self.safe_string(trade, 'fill_id'), 'timestamp': timestamp, 'datetime': self.iso8601(timestamp), 'symbol': self.safe_string(market, 'symbol'), 'order': self.safe_string(trade, 'order_id'), 'type': self.safe_string(trade, 'type'), 'side': 'buy' if isBuy else 'sell', 'takerOrMaker': self.safe_string(trade, 'fill_type'), 'price': self.safe_string(trade, 'price'), 'amount': self.safe_string(trade, 'qty'), 'cost': None, 'fee': { 'currency': self.safe_currency_code(feeCurrencyId), 'cost': self.safe_string(trade, 'fee_paid'), 'rate': None, }, }) async def watch_multi_helper(self, unifiedName: str, channelName: str, symbols: Strings = None, subscriptionArgs=None, params={}): await self.load_markets() # symbols are required symbols = self.market_symbols(symbols, None, False, True, False) messageHashes = [] for i in range(0, len(symbols)): messageHashes.append(self.get_message_hash(unifiedName, None, self.symbol(symbols[i]))) marketIds = self.market_ids(symbols) request: dict = { 'event': 'subscribe', 'feed': channelName, 'product_ids': marketIds, } url = self.urls['api']['ws'] return await self.watch_multiple(url, messageHashes, self.extend(request, params), messageHashes, subscriptionArgs) def get_message_hash(self, unifiedElementName: str, subChannelName: Str = None, symbol: Str = None): # unifiedElementName can be : orderbook, trade, ticker, bidask ... # subChannelName only applies to channel that needs specific variation(i.e. depth_50, depth_100..) to be selected withSymbol = symbol is not None messageHash = unifiedElementName if not withSymbol: messageHash += 's' else: messageHash += ':' + symbol if subChannelName is not None: messageHash += '#' + subChannelName return messageHash def handle_error_message(self, client: Client, message) -> Bool: # # { # event: 'alert', # message: 'Failed to subscribe to authenticated feed' # } # errMsg = self.safe_string(message, 'message') try: raise ExchangeError(self.id + ' ' + errMsg) except Exception as error: client.reject(error) return False def handle_message(self, client, message): event = self.safe_string(message, 'event') if event == 'challenge': self.handle_authenticate(client, message) elif event == 'alert': self.handle_error_message(client, message) elif event == 'pong': client.lastPong = self.milliseconds() elif event is None: feed = self.safe_string(message, 'feed') methods: dict = { 'ticker': self.handle_ticker, 'ticker_lite': self.handle_bid_ask, 'trade': self.handle_trade, 'trade_snapshot': self.handle_trade, # 'heartbeat': self.handleStatus, 'book': self.handle_order_book, 'book_snapshot': self.handle_order_book_snapshot, 'open_orders_verbose': self.handle_order, 'open_orders_verbose_snapshot': self.handle_order_snapshot, 'fills': self.handle_my_trades, 'fills_snapshot': self.handle_my_trades, 'open_orders': self.handle_order, 'open_orders_snapshot': self.handle_order_snapshot, 'balances': self.handle_balance, 'balances_snapshot': self.handle_balance, 'open_positions': self.handle_positions, } method = self.safe_value(methods, feed) if method is not None: method(client, message) def handle_authenticate(self, client: Client, message): """ @ignore https://docs.futures.kraken.com/#websocket-api-websocket-api-introduction-sign-challenge-challenge """ # # { # "event": "challenge", # "message": "226aee50-88fc-4618-a42a-34f7709570b2" # } # event = self.safe_value(message, 'event') messageHash = 'challenge' if event != 'error': challenge = self.safe_value(message, 'message') hashedChallenge = self.hash(self.encode(challenge), 'sha256', 'binary') base64Secret = self.base64_to_binary(self.secret) signature = self.hmac(hashedChallenge, base64Secret, hashlib.sha512, 'base64') self.options['challenge'] = challenge self.options['signedChallenge'] = signature future = self.safe_value(client.futures, messageHash) future.resolve(True) else: error = AuthenticationError(self.id + ' ' + self.json(message)) client.reject(error, messageHash) if messageHash in client.subscriptions: del client.subscriptions[messageHash] return message