Files
ccxt_with_mt5/ccxt/pro/kraken.py
lz_db 0fab423a18 add
2025-11-16 12:31:03 +08:00

1518 lines
68 KiB
Python

# -*- coding: utf-8 -*-
# PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN:
# https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code
import ccxt.async_support
from ccxt.async_support.base.ws.cache import ArrayCache, ArrayCacheBySymbolById, ArrayCacheByTimestamp
from ccxt.base.types import Any, Balances, Bool, Int, Num, Order, OrderBook, OrderSide, OrderType, Str, Strings, Ticker, Tickers, Trade
from ccxt.async_support.base.ws.client import Client
from typing import List
from ccxt.base.errors import ExchangeError
from ccxt.base.errors import AuthenticationError
from ccxt.base.errors import PermissionDenied
from ccxt.base.errors import AccountSuspended
from ccxt.base.errors import ArgumentsRequired
from ccxt.base.errors import BadRequest
from ccxt.base.errors import BadSymbol
from ccxt.base.errors import InsufficientFunds
from ccxt.base.errors import InvalidOrder
from ccxt.base.errors import OrderNotFound
from ccxt.base.errors import NotSupported
from ccxt.base.errors import RateLimitExceeded
from ccxt.base.errors import ExchangeNotAvailable
from ccxt.base.errors import ChecksumError
from ccxt.base.precise import Precise
class kraken(ccxt.async_support.kraken):
def describe(self) -> Any:
return self.deep_extend(super(kraken, self).describe(), {
'has': {
'ws': True,
'watchBalance': True,
'watchMyTrades': True,
'watchOHLCV': True,
'watchOrderBook': True,
'watchOrderBookForSymbols': True,
'watchOrders': True,
'watchTicker': True,
'watchTickers': True,
'watchBidsAsks': True,
'watchTrades': True,
'watchTradesForSymbols': True,
'createOrderWs': True,
'editOrderWs': True,
'cancelOrderWs': True,
'cancelOrdersWs': True,
'cancelAllOrdersWs': True,
# 'watchHeartbeat': True,
# 'watchStatus': True,
},
'urls': {
'api': {
'ws': {
'public': 'wss://ws.kraken.com',
'private': 'wss://ws-auth.kraken.com',
'privateV2': 'wss://ws-auth.kraken.com/v2',
'publicV2': 'wss://ws.kraken.com/v2',
'beta': 'wss://beta-ws.kraken.com',
'beta-private': 'wss://beta-ws-auth.kraken.com',
},
},
},
# 'versions': {
# 'ws': '0.2.0',
# },
'options': {
'tradesLimit': 1000,
'OHLCVLimit': 1000,
'ordersLimit': 1000,
'symbolsByOrderId': {},
'watchOrderBook': {
'checksum': False,
},
},
'streaming': {
'ping': self.ping,
'keepAlive': 6000,
},
'exceptions': {
'ws': {
'exact': {
'Event(s) not found': BadRequest,
},
'broad': {
'Already subscribed': BadRequest,
'Currency pair not in ISO 4217-A3 format': BadSymbol,
'Currency pair not supported': BadSymbol,
'Malformed request': BadRequest,
'Pair field must be an array': BadRequest,
'Pair field unsupported for self subscription type': BadRequest,
'Pair(s) not found': BadSymbol,
'Subscription book depth must be an integer': BadRequest,
'Subscription depth not supported': BadRequest,
'Subscription field must be an object': BadRequest,
'Subscription name invalid': BadRequest,
'Subscription object unsupported field': BadRequest,
'Subscription ohlc interval must be an integer': BadRequest,
'Subscription ohlc interval not supported': BadRequest,
'Subscription ohlc requires interval': BadRequest,
'EAccount:Invalid permissions': PermissionDenied,
'EAuth:Account temporary disabled': AccountSuspended,
'EAuth:Account unconfirmed': AuthenticationError,
'EAuth:Rate limit exceeded': RateLimitExceeded,
'EAuth:Too many requests': RateLimitExceeded,
'EDatabase: Internal error(to be deprecated)': ExchangeError,
'EGeneral:Internal error[:<code>]': ExchangeError,
'EGeneral:Invalid arguments': BadRequest,
'EOrder:Cannot open opposing position': InvalidOrder,
'EOrder:Cannot open position': InvalidOrder,
'EOrder:Insufficient funds(insufficient user funds)': InsufficientFunds,
'EOrder:Insufficient margin(exchange does not have sufficient funds to allow margin trading)': InsufficientFunds,
'EOrder:Invalid price': InvalidOrder,
'EOrder:Margin allowance exceeded': InvalidOrder,
'EOrder:Margin level too low': InvalidOrder,
'EOrder:Margin position size exceeded(client would exceed the maximum position size for self pair)': InvalidOrder,
'EOrder:Order minimum not met(volume too low)': InvalidOrder,
'EOrder:Orders limit exceeded': InvalidOrder,
'EOrder:Positions limit exceeded': InvalidOrder,
'EOrder:Rate limit exceeded': RateLimitExceeded,
'EOrder:Scheduled orders limit exceeded': InvalidOrder,
'EOrder:Unknown position': OrderNotFound,
'EOrder:Unknown order': OrderNotFound,
'EOrder:Invalid order': InvalidOrder,
'EService:Deadline elapsed': ExchangeNotAvailable,
'EService:Market in cancel_only mode': NotSupported,
'EService:Market in limit_only mode': NotSupported,
'EService:Market in post_only mode': NotSupported,
'EService:Unavailable': ExchangeNotAvailable,
'ETrade:Invalid request': BadRequest,
'ESession:Invalid session': AuthenticationError,
},
},
},
})
def order_request_ws(self, method: str, symbol: str, type: str, request: dict, amount: Num, price: Num = None, params={}):
isLimitOrder = type.endswith('limit') # supporting limit, stop-loss-limit, take-profit-limit, etc
if isLimitOrder:
if price is None:
raise ArgumentsRequired(self.id + ' limit orders require a price argument')
request['params']['limit_price'] = self.parse_to_numeric(self.price_to_precision(symbol, price))
isMarket = (type == 'market')
postOnly = None
postOnly, params = self.handle_post_only(isMarket, False, params)
if postOnly:
request['params']['post_only'] = True
clientOrderId = self.safe_string(params, 'clientOrderId')
if clientOrderId is not None:
request['params']['cl_ord_id'] = clientOrderId
cost = self.safe_string(params, 'cost')
if cost is not None:
request['params']['order_qty'] = self.parse_to_numeric(self.cost_to_precision(symbol, cost))
stopLoss = self.safe_dict(params, 'stopLoss', {})
takeProfit = self.safe_dict(params, 'takeProfit', {})
presetStopLoss = self.safe_string(stopLoss, 'triggerPrice')
presetTakeProfit = self.safe_string(takeProfit, 'triggerPrice')
presetStopLossLimit = self.safe_string(stopLoss, 'price')
presetTakeProfitLimit = self.safe_string(takeProfit, 'price')
isPresetStopLoss = presetStopLoss is not None
isPresetTakeProfit = presetTakeProfit is not None
stopLossPrice = self.safe_string(params, 'stopLossPrice')
takeProfitPrice = self.safe_string(params, 'takeProfitPrice')
isStopLossPriceOrder = stopLossPrice is not None
isTakeProfitPriceOrder = takeProfitPrice is not None
trailingAmount = self.safe_string(params, 'trailingAmount')
trailingPercent = self.safe_string(params, 'trailingPercent')
trailingLimitAmount = self.safe_string(params, 'trailingLimitAmount')
trailingLimitPercent = self.safe_string(params, 'trailingLimitPercent')
isTrailingAmountOrder = trailingAmount is not None
isTrailingPercentOrder = trailingPercent is not None
isTrailingLimitAmountOrder = trailingLimitAmount is not None
isTrailingLimitPercentOrder = trailingLimitPercent is not None
offset = self.safe_string(params, 'offset', '') # can set self to - for minus
trailingAmountString = offset + self.number_to_string(trailingAmount) if (trailingAmount is not None) else None
trailingPercentString = offset + self.number_to_string(trailingPercent) if (trailingPercent is not None) else None
trailingLimitAmountString = offset + self.number_to_string(trailingLimitAmount) if (trailingLimitAmount is not None) else None
trailingLimitPercentString = offset + self.number_to_string(trailingLimitPercent) if (trailingLimitPercent is not None) else None
priceType = 'pct' if (isTrailingPercentOrder or isTrailingLimitPercentOrder) else 'quote'
if method == 'createOrderWs':
reduceOnly = self.safe_bool(params, 'reduceOnly')
if reduceOnly:
request['params']['reduce_only'] = True
timeInForce = self.safe_string_lower(params, 'timeInForce')
if timeInForce is not None:
request['params']['time_in_force'] = timeInForce
params = self.omit(params, ['reduceOnly', 'timeInForce'])
if isStopLossPriceOrder or isTakeProfitPriceOrder or isTrailingAmountOrder or isTrailingPercentOrder or isTrailingLimitAmountOrder or isTrailingLimitPercentOrder:
request['params']['triggers'] = {}
if isPresetStopLoss or isPresetTakeProfit:
request['params']['conditional'] = {}
if isPresetStopLoss:
request['params']['conditional']['order_type'] = 'stop-loss'
request['params']['conditional']['trigger_price'] = self.parse_to_numeric(self.price_to_precision(symbol, presetStopLoss))
elif isPresetTakeProfit:
request['params']['conditional']['order_type'] = 'take-profit'
request['params']['conditional']['trigger_price'] = self.parse_to_numeric(self.price_to_precision(symbol, presetTakeProfit))
if presetStopLossLimit is not None:
request['params']['conditional']['order_type'] = 'stop-loss-limit'
request['params']['conditional']['limit_price'] = self.parse_to_numeric(self.price_to_precision(symbol, presetStopLossLimit))
elif presetTakeProfitLimit is not None:
request['params']['conditional']['order_type'] = 'take-profit-limit'
request['params']['conditional']['limit_price'] = self.parse_to_numeric(self.price_to_precision(symbol, presetTakeProfitLimit))
params = self.omit(params, ['stopLoss', 'takeProfit'])
elif isStopLossPriceOrder or isTakeProfitPriceOrder:
if isStopLossPriceOrder:
request['params']['triggers']['price'] = self.parse_to_numeric(self.price_to_precision(symbol, stopLossPrice))
if isLimitOrder:
request['params']['order_type'] = 'stop-loss-limit'
else:
request['params']['order_type'] = 'stop-loss'
else:
request['params']['triggers']['price'] = self.parse_to_numeric(self.price_to_precision(symbol, takeProfitPrice))
if isLimitOrder:
request['params']['order_type'] = 'take-profit-limit'
else:
request['params']['order_type'] = 'take-profit'
elif isTrailingAmountOrder or isTrailingPercentOrder or isTrailingLimitAmountOrder or isTrailingLimitPercentOrder:
request['params']['triggers']['price_type'] = priceType
if not isLimitOrder and (isTrailingAmountOrder or isTrailingPercentOrder):
request['params']['order_type'] = 'trailing-stop'
if isTrailingAmountOrder:
request['params']['triggers']['price'] = self.parse_to_numeric(trailingAmountString)
else:
request['params']['triggers']['price'] = self.parse_to_numeric(trailingPercentString)
else:
# trailing limit orders are not conventionally supported because the static limit_price_type param is not available for trailing-stop-limit orders
request['params']['limit_price_type'] = priceType
request['params']['order_type'] = 'trailing-stop-limit'
if isTrailingLimitAmountOrder:
request['params']['triggers']['price'] = self.parse_to_numeric(trailingLimitAmountString)
else:
request['params']['triggers']['price'] = self.parse_to_numeric(trailingLimitPercentString)
elif method == 'editOrderWs':
if isPresetStopLoss or isPresetTakeProfit:
raise NotSupported(self.id + ' editing the stopLoss and takeProfit on existing orders is currently not supported')
if isStopLossPriceOrder or isTakeProfitPriceOrder:
if isStopLossPriceOrder:
request['params']['trigger_price'] = self.parse_to_numeric(self.price_to_precision(symbol, stopLossPrice))
else:
request['params']['trigger_price'] = self.parse_to_numeric(self.price_to_precision(symbol, takeProfitPrice))
elif isTrailingAmountOrder or isTrailingPercentOrder or isTrailingLimitAmountOrder or isTrailingLimitPercentOrder:
request['params']['trigger_price_type'] = priceType
if not isLimitOrder and (isTrailingAmountOrder or isTrailingPercentOrder):
if isTrailingAmountOrder:
request['params']['trigger_price'] = self.parse_to_numeric(trailingAmountString)
else:
request['params']['trigger_price'] = self.parse_to_numeric(trailingPercentString)
else:
request['params']['limit_price_type'] = priceType
if isTrailingLimitAmountOrder:
request['params']['trigger_price'] = self.parse_to_numeric(trailingLimitAmountString)
else:
request['params']['trigger_price'] = self.parse_to_numeric(trailingLimitPercentString)
params = self.omit(params, ['clientOrderId', 'cost', 'offset', 'stopLossPrice', 'takeProfitPrice', 'trailingAmount', 'trailingPercent', 'trailingLimitAmount', 'trailingLimitPercent'])
return [request, params]
async def create_order_ws(self, symbol: str, type: OrderType, side: OrderSide, amount: float, price: Num = None, params={}) -> Order:
"""
create a trade order
https://docs.kraken.com/api/docs/websocket-v2/add_order
:param str symbol: unified symbol of the market to create an order in
:param str type: 'market' or 'limit'
:param str side: 'buy' or 'sell'
:param float amount: how much of currency you want to trade in units of base currency
:param float [price]: the price at which the order is to be fulfilled, in units of the quote currency, ignored in market orders
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: an `order structure <https://docs.ccxt.com/#/?id=order-structure>`
"""
await self.load_markets()
token = await self.authenticate()
market = self.market(symbol)
url = self.urls['api']['ws']['privateV2']
requestId = self.request_id()
messageHash = self.number_to_string(requestId)
request: dict = {
'method': 'add_order',
'params': {
'order_type': type,
'side': side,
'order_qty': self.parse_to_numeric(self.amount_to_precision(symbol, amount)),
'symbol': market['symbol'],
'token': token,
},
'req_id': requestId,
}
request, params = self.order_request_ws('createOrderWs', symbol, type, request, amount, price, params)
return await self.watch(url, messageHash, self.extend(request, params), messageHash)
def handle_create_edit_order(self, client, message):
#
# createOrder
# {
# "method": "add_order",
# "req_id": 1,
# "result": {
# "order_id": "OXM2QD-EALR2-YBAVEU"
# },
# "success": True,
# "time_in": "2025-05-13T10:12:13.876173Z",
# "time_out": "2025-05-13T10:12:13.890137Z"
# }
#
# editOrder
# {
# "method": "amend_order",
# "req_id": 1,
# "result": {
# "amend_id": "TYDLSQ-OYNYU-3MNRER",
# "order_id": "OGL7HR-SWFO4-NRQTHO"
# },
# "success": True,
# "time_in": "2025-05-14T13:54:10.840342Z",
# "time_out": "2025-05-14T13:54:10.855046Z"
# }
#
result = self.safe_dict(message, 'result', {})
order = self.parse_order(result)
messageHash = self.safe_string_2(message, 'reqid', 'req_id')
client.resolve(order, messageHash)
async def edit_order_ws(self, id: str, symbol: str, type: OrderType, side: OrderSide, amount: Num = None, price: Num = None, params={}) -> Order:
"""
edit a trade order
https://docs.kraken.com/api/docs/websocket-v2/amend_order
:param str id: order id
:param str symbol: unified symbol of the market to create an order in
:param str type: 'market' or 'limit'
:param str side: 'buy' or 'sell'
:param float amount: how much of the currency you want to trade in units of the base currency
:param float [price]: the price at which the order is to be fulfilled, in units of the quote currency, ignored in market orders
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: an `order structure <https://docs.ccxt.com/#/?id=order-structure>`
"""
await self.load_markets()
token = await self.authenticate()
url = self.urls['api']['ws']['privateV2']
requestId = self.request_id()
messageHash = self.number_to_string(requestId)
request: dict = {
'method': 'amend_order',
'params': {
'order_id': id,
'order_qty': self.parse_to_numeric(self.amount_to_precision(symbol, amount)),
'token': token,
},
'req_id': requestId,
}
request, params = self.order_request_ws('editOrderWs', symbol, type, request, amount, price, params)
return await self.watch(url, messageHash, self.extend(request, params), messageHash)
async def cancel_orders_ws(self, ids: List[str], symbol: Str = None, params={}):
"""
cancel multiple orders
https://docs.kraken.com/api/docs/websocket-v2/cancel_order
:param str[] ids: order ids
:param str [symbol]: unified market symbol, default is None
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: an list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
"""
if symbol is not None:
raise NotSupported(self.id + ' cancelOrdersWs() does not support cancelling orders for a specific symbol.')
await self.load_markets()
token = await self.authenticate()
url = self.urls['api']['ws']['privateV2']
requestId = self.request_id()
messageHash = self.number_to_string(requestId)
request: dict = {
'method': 'cancel_order',
'params': {
'order_id': ids,
'token': token,
},
'req_id': requestId,
}
return await self.watch(url, messageHash, self.extend(request, params), messageHash)
async def cancel_order_ws(self, id: str, symbol: Str = None, params={}) -> Order:
"""
cancels an open order
https://docs.kraken.com/api/docs/websocket-v2/cancel_order
:param str id: order id
:param str [symbol]: unified symbol of the market the order was made in
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: An `order structure <https://docs.ccxt.com/#/?id=order-structure>`
"""
if symbol is not None:
raise NotSupported(self.id + ' cancelOrderWs() does not support cancelling orders for a specific symbol.')
await self.load_markets()
token = await self.authenticate()
url = self.urls['api']['ws']['privateV2']
requestId = self.request_id()
messageHash = self.number_to_string(requestId)
request: dict = {
'method': 'cancel_order',
'params': {
'order_id': [id],
'token': token,
},
'req_id': requestId,
}
return await self.watch(url, messageHash, self.extend(request, params), messageHash)
def handle_cancel_order(self, client, message):
#
# {
# "method": "cancel_order",
# "req_id": 123456789,
# "result": {
# "order_id": "OKAGJC-YHIWK-WIOZWG"
# },
# "success": True,
# "time_in": "2023-09-21T14:36:57.428972Z",
# "time_out": "2023-09-21T14:36:57.437952Z"
# }
#
reqId = self.safe_string(message, 'req_id')
client.resolve(message, reqId)
async def cancel_all_orders_ws(self, symbol: Str = None, params={}) -> List[Order]:
"""
cancel all open orders
https://docs.kraken.com/api/docs/websocket-v2/cancel_all
:param str [symbol]: unified market symbol, only orders in the market of self symbol are cancelled when symbol is not None
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict[]: a list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
"""
if symbol is not None:
raise NotSupported(self.id + ' cancelAllOrdersWs() does not support cancelling orders in a specific market.')
await self.load_markets()
token = await self.authenticate()
url = self.urls['api']['ws']['privateV2']
requestId = self.request_id()
messageHash = self.number_to_string(requestId)
request: dict = {
'method': 'cancel_all',
'params': {
'token': token,
},
'req_id': requestId,
}
return await self.watch(url, messageHash, self.extend(request, params), messageHash)
def handle_cancel_all_orders(self, client, message):
#
# {
# "method": "cancel_all",
# "req_id": 123456789,
# "result": {
# "count": 1
# },
# "success": True,
# "time_in": "2023-09-21T14:36:57.428972Z",
# "time_out": "2023-09-21T14:36:57.437952Z"
# }
#
reqId = self.safe_string(message, 'req_id')
client.resolve(message, reqId)
def handle_ticker(self, client, message):
#
# {
# "channel": "ticker",
# "type": "snapshot",
# "data": [
# {
# "symbol": "BTC/USD",
# "bid": 108359.8,
# "bid_qty": 0.01362603,
# "ask": 108359.9,
# "ask_qty": 17.17988863,
# "last": 108359.8,
# "volume": 2158.32346723,
# "vwap": 108894.5,
# "low": 106824,
# "high": 111300,
# "change": -2679.9,
# "change_pct": -2.41
# }
# ]
# }
#
data = self.safe_list(message, 'data', [])
ticker = data[0]
symbol = self.safe_string(ticker, 'symbol')
messageHash = self.get_message_hash('ticker', None, symbol)
vwap = self.safe_string(ticker, 'vwap')
quoteVolume = None
baseVolume = self.safe_string(ticker, 'volume')
if baseVolume is not None and vwap is not None:
quoteVolume = Precise.string_mul(baseVolume, vwap)
last = self.safe_string(ticker, 'last')
result = self.safe_ticker({
'symbol': symbol,
'timestamp': None,
'datetime': None,
'high': self.safe_string(ticker, 'high'),
'low': self.safe_string(ticker, 'low'),
'bid': self.safe_string(ticker, 'bid'),
'bidVolume': self.safe_string(ticker, 'bid_qty'),
'ask': self.safe_string(ticker, 'ask'),
'askVolume': self.safe_string(ticker, 'ask_qty'),
'vwap': vwap,
'open': None,
'close': last,
'last': last,
'previousClose': None,
'change': self.safe_string(ticker, 'change'),
'percentage': self.safe_string(ticker, 'change_pct'),
'average': None,
'baseVolume': baseVolume,
'quoteVolume': quoteVolume,
'info': ticker,
})
self.tickers[symbol] = result
client.resolve(result, messageHash)
def handle_trades(self, client: Client, message):
#
# {
# "channel": "trade",
# "type": "update",
# "data": [
# {
# "symbol": "MATIC/USD",
# "side": "sell",
# "price": 0.5117,
# "qty": 40.0,
# "ord_type": "market",
# "trade_id": 4665906,
# "timestamp": "2023-09-25T07:49:37.708706Z"
# }
# ]
# }
#
data = self.safe_list(message, 'data', [])
trade = data[0]
symbol = self.safe_string(trade, 'symbol')
messageHash = self.get_message_hash('trade', None, symbol)
stored = self.safe_value(self.trades, symbol)
if stored is None:
limit = self.safe_integer(self.options, 'tradesLimit', 1000)
stored = ArrayCache(limit)
self.trades[symbol] = stored
market = self.market(symbol)
parsed = self.parse_trades(data, market)
for i in range(0, len(parsed)):
stored.append(parsed[i])
client.resolve(stored, messageHash)
def handle_ohlcv(self, client: Client, message):
#
# {
# "channel": "ohlc",
# "type": "update",
# "timestamp": "2023-10-04T16:26:30.524394914Z",
# "data": [
# {
# "symbol": "MATIC/USD",
# "open": 0.5624,
# "high": 0.5628,
# "low": 0.5622,
# "close": 0.5627,
# "trades": 12,
# "volume": 30927.68066226,
# "vwap": 0.5626,
# "interval_begin": "2023-10-04T16:25:00.000000000Z",
# "interval": 5,
# "timestamp": "2023-10-04T16:30:00.000000Z"
# }
# ]
# }
#
data = self.safe_list(message, 'data', [])
first = data[0]
marketId = self.safe_string(first, 'symbol')
symbol = self.safe_symbol(marketId)
if not (symbol in self.ohlcvs):
self.ohlcvs[symbol] = {}
interval = self.safe_integer(first, 'interval')
timeframe = self.find_timeframe(interval)
messageHash = self.get_message_hash('ohlcv', None, symbol)
stored = self.safe_value(self.ohlcvs[symbol], timeframe)
self.ohlcvs[symbol] = self.safe_value(self.ohlcvs, symbol, {})
if stored is None:
limit = self.safe_integer(self.options, 'OHLCVLimit', 1000)
stored = ArrayCacheByTimestamp(limit)
self.ohlcvs[symbol][timeframe] = stored
ohlcvsLength = len(data)
for i in range(0, ohlcvsLength):
candle = data[ohlcvsLength - i - 1]
datetime = self.safe_string(candle, 'timestamp')
timestamp = self.parse8601(datetime)
parsed = [
timestamp,
self.safe_string(candle, 'open'),
self.safe_string(candle, 'high'),
self.safe_string(candle, 'low'),
self.safe_string(candle, 'close'),
self.safe_string(candle, 'volume'),
]
stored.append(parsed)
client.resolve(stored, messageHash)
def request_id(self):
# their support said that reqid must be an int32, not documented
reqid = self.sum(self.safe_integer(self.options, 'reqid', 0), 1)
self.options['reqid'] = reqid
return reqid
async def watch_ticker(self, symbol: str, params={}) -> Ticker:
"""
watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market
https://docs.kraken.com/api/docs/websocket-v2/ticker
:param str symbol: unified symbol of the market to fetch the ticker for
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
"""
await self.load_markets()
symbol = self.symbol(symbol)
tickers = await self.watch_tickers([symbol], params)
return tickers[symbol]
async def watch_tickers(self, symbols: Strings = None, params={}) -> Tickers:
"""
watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market
https://docs.kraken.com/api/docs/websocket-v2/ticker
:param str[] symbols:
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
"""
await self.load_markets()
symbols = self.market_symbols(symbols, None, False)
ticker = await self.watch_multi_helper('ticker', 'ticker', symbols, None, params)
if self.newUpdates:
result: dict = {}
result[ticker['symbol']] = ticker
return result
return self.filter_by_array(self.tickers, 'symbol', symbols)
async def watch_bids_asks(self, symbols: Strings = None, params={}) -> Tickers:
"""
watches best bid & ask for symbols
https://docs.kraken.com/api/docs/websocket-v2/ticker
:param str[] symbols: unified symbol of the market to fetch the ticker for
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
"""
await self.load_markets()
symbols = self.market_symbols(symbols, None, False)
params['event_trigger'] = 'bbo'
ticker = await self.watch_multi_helper('bidask', 'ticker', symbols, None, params)
if self.newUpdates:
result: dict = {}
result[ticker['symbol']] = ticker
return result
return self.filter_by_array(self.bidsasks, 'symbol', symbols)
async def watch_trades(self, symbol: str, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
"""
get the list of most recent trades for a particular symbol
https://docs.kraken.com/api/docs/websocket-v2/trade
:param str symbol: unified symbol of the market to fetch trades for
:param int [since]: timestamp in ms of the earliest trade to fetch
:param int [limit]: the maximum amount of trades to fetch
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
"""
return await self.watch_trades_for_symbols([symbol], since, limit, params)
async def watch_trades_for_symbols(self, symbols: List[str], since: Int = None, limit: Int = None, params={}) -> List[Trade]:
"""
get the list of most recent trades for a list of symbols
https://docs.kraken.com/api/docs/websocket-v2/trade
:param str[] symbols: unified symbol of the market to fetch trades for
:param int [since]: timestamp in ms of the earliest trade to fetch
:param int [limit]: the maximum amount of trades to fetch
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
"""
trades = await self.watch_multi_helper('trade', 'trade', symbols, None, params)
if self.newUpdates:
first = self.safe_list(trades, 0)
tradeSymbol = self.safe_string(first, 'symbol')
limit = trades.getLimit(tradeSymbol, limit)
return self.filter_by_since_limit(trades, since, limit, 'timestamp', True)
async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook:
"""
watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
https://docs.kraken.com/api/docs/websocket-v2/book
:param str symbol: unified symbol of the market to fetch the order book for
:param int [limit]: the maximum amount of order book entries to return
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
"""
return await self.watch_order_book_for_symbols([symbol], limit, params)
async def watch_order_book_for_symbols(self, symbols: List[str], limit: Int = None, params={}) -> OrderBook:
"""
watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
https://docs.kraken.com/api/docs/websocket-v2/book
:param str[] symbols: unified array of symbols
:param int [limit]: the maximum amount of order book entries to return
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
"""
requiredParams: dict = {}
if limit is not None:
if self.in_array(limit, [10, 25, 100, 500, 1000]):
requiredParams['depth'] = limit # default 10, valid options 10, 25, 100, 500, 1000
else:
raise NotSupported(self.id + ' watchOrderBook accepts limit values of 10, 25, 100, 500 and 1000 only')
orderbook = await self.watch_multi_helper('orderbook', 'book', symbols, {'limit': limit}, self.extend(requiredParams, params))
return orderbook.limit()
async def watch_ohlcv(self, symbol: str, timeframe: str = '1m', since: Int = None, limit: Int = None, params={}) -> List[list]:
"""
watches historical candlestick data containing the open, high, low, and close price, and the volume of a market
https://docs.kraken.com/api/docs/websocket-v2/ohlc
:param str symbol: unified symbol of the market to fetch OHLCV data for
:param str timeframe: the length of time each candle represents
:param int [since]: timestamp in ms of the earliest candle to fetch
:param int [limit]: the maximum amount of candles to fetch
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns int[][]: A list of candles ordered, open, high, low, close, volume
"""
await self.load_markets()
name = 'ohlc'
market = self.market(symbol)
symbol = market['symbol']
url = self.urls['api']['ws']['publicV2']
requestId = self.request_id()
messageHash = self.get_message_hash('ohlcv', None, symbol)
subscribe: dict = {
'method': 'subscribe',
'params': {
'channel': name,
'symbol': [symbol],
'interval': self.safe_value(self.timeframes, timeframe, timeframe),
},
'req_id': requestId,
}
request = self.deep_extend(subscribe, params)
ohlcv = await self.watch(url, messageHash, request, messageHash)
if self.newUpdates:
limit = ohlcv.getLimit(symbol, limit)
return self.filter_by_since_limit(ohlcv, since, limit, 'timestamp', True)
async def load_markets(self, reload=False, params={}):
markets = await super(kraken, self).load_markets(reload, params)
marketsByWsName = self.safe_value(self.options, 'marketsByWsName')
if (marketsByWsName is None) or reload:
marketsByWsName = {}
for i in range(0, len(self.symbols)):
symbol = self.symbols[i]
market = self.markets[symbol]
info = self.safe_value(market, 'info', {})
wsName = self.safe_string(info, 'wsname')
marketsByWsName[wsName] = market
self.options['marketsByWsName'] = marketsByWsName
return markets
def ping(self, client: Client):
url = client.url
request = {}
if url.find('v2') >= 0:
request['method'] = 'ping'
else:
request['event'] = 'ping'
return request
def handle_pong(self, client: Client, message):
client.lastPong = self.milliseconds()
return message
async def watch_heartbeat(self, params={}):
await self.load_markets()
event = 'heartbeat'
url = self.urls['api']['ws']['publicV2']
return await self.watch(url, event)
def handle_heartbeat(self, client: Client, message):
#
# every second(approx) if no other updates are sent
#
# {"channel": "heartbeat"}
#
event = self.safe_string(message, 'channel')
client.resolve(message, event)
def handle_order_book(self, client: Client, message):
#
# first message(snapshot)
#
# {
# "channel": "book",
# "type": "snapshot",
# "data": [
# {
# "symbol": "MATIC/USD",
# "bids": [
# {
# "price": 0.5666,
# "qty": 4831.75496356
# },
# {
# "price": 0.5665,
# "qty": 6658.22734739
# }
# ],
# "asks": [
# {
# "price": 0.5668,
# "qty": 4410.79769741
# },
# {
# "price": 0.5669,
# "qty": 4655.40412487
# }
# ],
# "checksum": 2439117997
# }
# ]
# }
#
# subsequent updates
#
# {
# "channel": "book",
# "type": "update",
# "data": [
# {
# "symbol": "MATIC/USD",
# "bids": [
# {
# "price": 0.5657,
# "qty": 1098.3947558
# }
# ],
# "asks": [],
# "checksum": 2114181697,
# "timestamp": "2023-10-06T17:35:55.440295Z"
# }
# ]
# }
#
type = self.safe_string(message, 'type')
data = self.safe_list(message, 'data', [])
first = self.safe_dict(data, 0, {})
symbol = self.safe_string(first, 'symbol')
a = self.safe_value(first, 'asks', [])
b = self.safe_value(first, 'bids', [])
c = self.safe_integer(first, 'checksum')
messageHash = self.get_message_hash('orderbook', None, symbol)
orderbook = None
if type == 'update':
orderbook = self.orderbooks[symbol]
storedAsks = orderbook['asks']
storedBids = orderbook['bids']
if a is not None:
self.custom_handle_deltas(storedAsks, a)
if b is not None:
self.custom_handle_deltas(storedBids, b)
datetime = self.safe_string(first, 'timestamp')
orderbook['symbol'] = symbol
orderbook['timestamp'] = self.parse8601(datetime)
orderbook['datetime'] = datetime
else:
# snapshot
depth = len(a)
self.orderbooks[symbol] = self.order_book({}, depth)
orderbook = self.orderbooks[symbol]
keys = ['asks', 'bids']
for i in range(0, len(keys)):
key = keys[i]
bookside = orderbook[key]
deltas = self.safe_value(first, key, [])
if len(deltas) > 0:
self.custom_handle_deltas(bookside, deltas)
orderbook['symbol'] = symbol
orderbook.limit()
# checksum temporarily disabled because the exchange checksum was not reliable
checksum = self.handle_option('watchOrderBook', 'checksum', False)
if checksum:
payloadArray = []
if c is not None:
checkAsks = orderbook['asks']
checkBids = orderbook['bids']
# checkAsks = asks.map((elem) => [elem['price'], elem['qty']])
# checkBids = bids.map((elem) => [elem['price'], elem['qty']])
for i in range(0, 10):
currentAsk = self.safe_value(checkAsks, i, {})
formattedAsk = self.format_number(currentAsk[0]) + self.format_number(currentAsk[1])
payloadArray.append(formattedAsk)
for i in range(0, 10):
currentBid = self.safe_value(checkBids, i, {})
formattedBid = self.format_number(currentBid[0]) + self.format_number(currentBid[1])
payloadArray.append(formattedBid)
payload = ''.join(payloadArray)
localChecksum = self.crc32(payload, False)
if localChecksum != c:
error = ChecksumError(self.id + ' ' + self.orderbook_checksum_message(symbol))
del client.subscriptions[messageHash]
del self.orderbooks[symbol]
client.reject(error, messageHash)
return
client.resolve(orderbook, messageHash)
def custom_handle_deltas(self, bookside, deltas):
# sortOrder = True if (key == 'bids') else False
for j in range(0, len(deltas)):
delta = deltas[j]
price = self.safe_number(delta, 'price')
amount = self.safe_number(delta, 'qty')
bookside.store(price, amount)
# if amount == 0:
# index = bookside.findIndex((x: Int) => x[0] == price)
# bookside.splice(index, 1)
# else:
# bookside.store(price, amount)
# }
# bookside = self.sort_by(bookside, 0, sortOrder)
# bookside[0:9]
def format_number(self, data):
parts = data.split('.')
integer = self.safe_string(parts, 0)
decimals = self.safe_string(parts, 1, '')
joinedResult = integer + decimals
i = 0
while(joinedResult[i] == '0'):
i += 1
if i > 0:
joinedResult = joinedResult[i:]
return joinedResult
def handle_system_status(self, client: Client, message):
#
# todo: answer the question whether handleSystemStatus should be renamed
# and unified for any usage pattern that
# involves system status and maintenance updates
#
# {
# "connectionID": 15527282728335292000,
# "event": "systemStatus",
# "status": "online", # online|maintenance|(custom status tbd)
# "version": "0.2.0"
# }
#
# v2
# {
# channel: 'status',
# type: 'update',
# data: [
# {
# version: '2.0.10',
# system: 'online',
# api_version: 'v2',
# connection_id: 6447481662169813000
# }
# ]
# }
#
return message
async def authenticate(self, params={}):
url = self.urls['api']['ws']['private']
client = self.client(url)
authenticated = 'authenticated'
subscription = self.safe_value(client.subscriptions, authenticated)
now = self.seconds()
start = self.safe_integer(subscription, 'start')
expires = self.safe_integer(subscription, 'expires')
if (subscription is None) or ((subscription is not None) and (start + expires) <= now):
# https://docs.kraken.com/api/docs/rest-api/get-websockets-token
response = await self.privatePostGetWebSocketsToken(params)
#
# {
# "error":[],
# "result":{
# "token":"xeAQ\/RCChBYNVh53sTv1yZ5H4wIbwDF20PiHtTF+4UI",
# "expires":900
# }
# }
#
subscription = self.safe_dict(response, 'result')
subscription['start'] = now
client.subscriptions[authenticated] = subscription
return self.safe_string(subscription, 'token')
async def watch_private(self, name, symbol: Str = None, since: Int = None, limit: Int = None, params={}):
await self.load_markets()
token = await self.authenticate()
subscriptionHash = 'executions'
messageHash = name
if symbol is not None:
symbol = self.symbol(symbol)
messageHash += ':' + symbol
url = self.urls['api']['ws']['privateV2']
requestId = self.request_id()
subscribe: dict = {
'method': 'subscribe',
'params': {
'channel': 'executions',
'token': token,
},
'req_id': requestId,
}
if params is not None:
subscribe['params'] = self.deep_extend(subscribe['params'], params)
result = await self.watch(url, messageHash, subscribe, subscriptionHash)
if self.newUpdates:
limit = result.getLimit(symbol, limit)
return self.filter_by_symbol_since_limit(result, symbol, since, limit)
async def watch_my_trades(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
"""
watches information on multiple trades made by the user
https://docs.kraken.com/api/docs/websocket-v2/executions
:param str symbol: unified market symbol of the market trades were made in
:param int [since]: the earliest time in ms to fetch trades for
:param int [limit]: the maximum number of trade structures to retrieve
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=trade-structure>`
"""
params['snap_trades'] = True
return await self.watch_private('myTrades', symbol, since, limit, params)
def handle_my_trades(self, client: Client, message, subscription=None):
#
# {
# "channel": "executions",
# "type": "update",
# "data": [
# {
# "order_id": "O6NTZC-K6FRH-ATWBCK",
# "exec_id": "T5DIUI-5N4KO-Z5BPXK",
# "exec_type": "trade",
# "trade_id": 8253473,
# "symbol": "USDC/USD",
# "side": "sell",
# "last_qty": 15.44,
# "last_price": 1.0002,
# "liquidity_ind": "t",
# "cost": 15.443088,
# "order_userref": 0,
# "order_status": "filled",
# "order_type": "market",
# "fee_usd_equiv": 0.03088618,
# "fees": [
# {
# "asset": "USD",
# "qty": 0.3458
# }
# ]
# }
# ],
# "sequence": 10
# }
#
allTrades = self.safe_list(message, 'data', [])
allTradesLength = len(allTrades)
if allTradesLength > 0:
if self.myTrades is None:
limit = self.safe_integer(self.options, 'tradesLimit', 1000)
self.myTrades = ArrayCache(limit)
stored = self.myTrades
symbols: dict = {}
for i in range(0, len(allTrades)):
trade = self.safe_dict(allTrades, i, {})
parsed = self.parse_ws_trade(trade)
stored.append(parsed)
symbol = parsed['symbol']
symbols[symbol] = True
name = 'myTrades'
client.resolve(self.myTrades, name)
keys = list(symbols.keys())
for i in range(0, len(keys)):
messageHash = name + ':' + keys[i]
client.resolve(self.myTrades, messageHash)
def parse_ws_trade(self, trade, market=None):
#
# {
# "order_id": "O6NTZC-K6FRH-ATWBCK",
# "exec_id": "T5DIUI-5N4KO-Z5BPXK",
# "exec_type": "trade",
# "trade_id": 8253473,
# "symbol": "USDC/USD",
# "side": "sell",
# "last_qty": 15.44,
# "last_price": 1.0002,
# "liquidity_ind": "t",
# "cost": 15.443088,
# "order_userref": 0,
# "order_status": "filled",
# "order_type": "market",
# "fee_usd_equiv": 0.03088618,
# "fees": [
# {
# "asset": "USD",
# "qty": 0.3458
# }
# ]
# }
#
symbol = self.safe_string(trade, 'symbol')
if market is not None:
symbol = market['symbol']
fee = None
if 'fees' in trade:
fees = self.safe_list(trade, 'fees', [])
firstFee = self.safe_dict(fees, 0, {})
fee = {
'cost': self.safe_number(firstFee, 'qty'),
'currency': self.safe_string(firstFee, 'asset'),
}
datetime = self.safe_string(trade, 'timestamp')
liquidityIndicator = self.safe_string(trade, 'liquidity_ind')
takerOrMaker = 'taker' if (liquidityIndicator == 't') else 'maker'
return {
'info': trade,
'id': self.safe_string(trade, 'exec_id'),
'order': self.safe_string(trade, 'order_id'),
'timestamp': self.parse8601(datetime),
'datetime': datetime,
'symbol': symbol,
'type': self.safe_string(trade, 'order_type'),
'side': self.safe_string(trade, 'side'),
'takerOrMaker': takerOrMaker,
'price': self.safe_number(trade, 'last_price'),
'amount': self.safe_number(trade, 'last_qty'),
'cost': self.safe_number(trade, 'cost'),
'fee': fee,
}
async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]:
"""
watches information on multiple orders made by the user
https://docs.kraken.com/api/docs/websocket-v2/executions
:param str symbol: unified market symbol of the market orders were made in
:param int [since]: the earliest time in ms to fetch orders for
:param int [limit]: the maximum number of orde structures to retrieve
:param dict [params]: maximum number of orderic to the exchange API endpoint
:returns dict[]: a list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
"""
return await self.watch_private('orders', symbol, since, limit, self.extend(params, {'snap_orders': True}))
def handle_orders(self, client: Client, message, subscription=None):
#
# {
# "channel": "executions",
# "type": "update",
# "data": [
# {
# "order_id": "OK4GJX-KSTLS-7DZZO5",
# "order_userref": 3,
# "symbol": "BTC/USD",
# "order_qty": 0.005,
# "cum_cost": 0.0,
# "time_in_force": "GTC",
# "exec_type": "pending_new",
# "side": "sell",
# "order_type": "limit",
# "limit_price_type": "static",
# "limit_price": 26500.0,
# "stop_price": 0.0,
# "order_status": "pending_new",
# "fee_usd_equiv": 0.0,
# "fee_ccy_pref": "fciq",
# "timestamp": "2023-09-22T10:33:05.709950Z"
# }
# ],
# "sequence": 8
# }
#
allOrders = self.safe_list(message, 'data', [])
allOrdersLength = len(allOrders)
if allOrdersLength > 0:
limit = self.safe_integer(self.options, 'ordersLimit', 1000)
if self.orders is None:
self.orders = ArrayCacheBySymbolById(limit)
stored = self.orders
symbols: dict = {}
for i in range(0, len(allOrders)):
order = self.safe_dict(allOrders, i, {})
id = self.safe_string(order, 'order_id')
parsed = self.parse_ws_order(order)
symbol = self.safe_string(order, 'symbol')
previousOrders = self.safe_value(stored.hashmap, symbol)
previousOrder = self.safe_value(previousOrders, id)
newOrder = parsed
if previousOrder is not None:
newRawOrder = self.extend(previousOrder['info'], newOrder['info'])
newOrder = self.parse_ws_order(newRawOrder)
length = len(stored)
if length == limit and (previousOrder is None):
first = stored[0]
symbolsByOrderId = self.safe_value(self.options, 'symbolsByOrderId', {})
if first['id'] in symbolsByOrderId:
del symbolsByOrderId[first['id']]
stored.append(newOrder)
if symbol is not None:
symbols[symbol] = True
name = 'orders'
client.resolve(self.orders, name)
keys = list(symbols.keys())
for i in range(0, len(keys)):
messageHash = name + ':' + keys[i]
client.resolve(self.orders, messageHash)
def parse_ws_order(self, order, market=None):
#
# watchOrders
#
# open order
# {
# "order_id": "OK4GJX-KSTLS-7DZZO5",
# "order_userref": 3,
# "symbol": "BTC/USD",
# "order_qty": 0.005,
# "cum_cost": 0.0,
# "time_in_force": "GTC",
# "exec_type": "pending_new",
# "side": "sell",
# "order_type": "limit",
# "limit_price_type": "static",
# "limit_price": 26500.0,
# "stop_price": 0.0,
# "order_status": "pending_new",
# "fee_usd_equiv": 0.0,
# "fee_ccy_pref": "fciq",
# "timestamp": "2023-09-22T10:33:05.709950Z"
# }
#
# canceled order
#
# {
# "timestamp": "2025-10-11T15:11:47.695226Z",
# "order_status": "canceled",
# "exec_type": "canceled",
# "order_userref": 0,
# "order_id": "OGAB7Y-BKX5F-PTK5RW",
# "cum_qty": 0,
# "cum_cost": 0,
# "fee_usd_equiv": 0,
# "avg_price": 0,
# "cancel_reason": "User requested",
# "reason": "User requested"
# }
#
fee = {
'cost': self.safe_string(order, 'fee_usd_equiv'),
'currency': 'USD',
}
stopPrice = self.safe_string(order, 'stop_price')
datetime = self.safe_string(order, 'timestamp')
return self.safe_order({
'id': self.safe_string(order, 'order_id'),
'clientOrderId': self.safe_string(order, 'order_userref'),
'info': order,
'timestamp': self.parse8601(datetime),
'datetime': datetime,
'lastTradeTimestamp': None,
'status': self.parse_order_status(self.safe_string(order, 'order_status')),
'symbol': self.safe_string(order, 'symbol'),
'type': self.safe_string(order, 'order_type'),
'timeInForce': self.safe_string(order, 'time_in_force'),
'postOnly': None,
'side': self.safe_string(order, 'side'),
'price': self.safe_string(order, 'limit_price'),
'stopPrice': stopPrice,
'triggerPrice': stopPrice,
'cost': self.safe_string(order, 'cum_cost'),
'amount': self.safe_string_2(order, 'order_qty', 'cum_qty'),
'filled': None,
'average': self.safe_string(order, 'avg_price'),
'remaining': None,
'fee': fee,
'trades': None,
})
async def watch_multi_helper(self, unifiedName: str, channelName: str, symbols: Strings = None, subscriptionArgs=None, params={}):
await self.load_markets()
# symbols are required
symbols = self.market_symbols(symbols, None, False, True, False)
messageHashes = []
for i in range(0, len(symbols)):
eventTrigger = self.safe_string(params, 'event_trigger')
if eventTrigger is not None:
messageHashes.append(self.get_message_hash(channelName, None, self.symbol(symbols[i])))
else:
messageHashes.append(self.get_message_hash(unifiedName, None, self.symbol(symbols[i])))
request: dict = {
'method': 'subscribe',
'params': {
'channel': channelName,
'symbol': symbols,
},
'req_id': self.request_id(),
}
request['params'] = self.deep_extend(request['params'], params)
url = self.urls['api']['ws']['publicV2']
return await self.watch_multiple(url, messageHashes, request, messageHashes, subscriptionArgs)
async def watch_balance(self, params={}) -> Balances:
"""
watch balance and get the amount of funds available for trading or funds locked in orders
https://docs.kraken.com/api/docs/websocket-v2/balances
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: a `balance structure <https://docs.ccxt.com/#/?id=balance-structure>`
"""
await self.load_markets()
token = await self.authenticate()
messageHash = 'balances'
url = self.urls['api']['ws']['privateV2']
requestId = self.request_id()
subscribe: dict = {
'method': 'subscribe',
'req_id': requestId,
'params': {
'channel': 'balances',
'token': token,
},
}
request = self.deep_extend(subscribe, params)
return await self.watch(url, messageHash, request, messageHash)
def handle_balance(self, client: Client, message):
#
# {
# "channel": "balances",
# "data": [
# {
# "asset": "BTC",
# "asset_class": "currency",
# "balance": 1.2,
# "wallets": [
# {
# "type": "spot",
# "id": "main",
# "balance": 1.2
# }
# ]
# }
# ],
# "type": "snapshot",
# "sequence": 1
# }
#
data = self.safe_list(message, 'data', [])
result: dict = {'info': message}
for i in range(0, len(data)):
currencyId = self.safe_string(data[i], 'asset')
code = self.safe_currency_code(currencyId)
account = self.account()
eq = self.safe_string(data[i], 'balance')
account['total'] = eq
result[code] = account
type = 'spot'
balance = self.safe_balance(result)
oldBalance = self.safe_value(self.balance, type, {})
newBalance = self.deep_extend(oldBalance, balance)
self.balance[type] = self.safe_balance(newBalance)
channel = self.safe_string(message, 'channel')
client.resolve(self.balance[type], channel)
def get_message_hash(self, unifiedElementName: str, subChannelName: Str = None, symbol: Str = None):
# unifiedElementName can be : orderbook, trade, ticker, bidask ...
# subChannelName only applies to channel that needs specific variation(i.e. depth_50, depth_100..) to be selected
withSymbol = symbol is not None
messageHash = unifiedElementName
if not withSymbol:
messageHash += 's'
else:
messageHash += '@' + symbol
if subChannelName is not None:
messageHash += '#' + subChannelName
return messageHash
def handle_subscription_status(self, client: Client, message):
#
# public
#
# {
# "channelID": 210,
# "channelName": "book-10",
# "event": "subscriptionStatus",
# "reqid": 1574146735269,
# "pair": "ETH/XBT",
# "status": "subscribed",
# "subscription": {depth: 10, name: "book"}
# }
#
# private
#
# {
# "channelName": "openOrders",
# "event": "subscriptionStatus",
# "reqid": 1,
# "status": "subscribed",
# "subscription": {maxratecount: 125, name: "openOrders"}
# }
#
channelId = self.safe_string(message, 'channelID')
if channelId is not None:
client.subscriptions[channelId] = message
# requestId = self.safe_string(message, "reqid")
# if requestId in client.futures:
# del client.futures[requestId]
# }
def handle_error_message(self, client: Client, message) -> Bool:
#
# {
# "errorMessage": "Currency pair not in ISO 4217-A3 format foobar",
# "event": "subscriptionStatus",
# "pair": "foobar",
# "reqid": 1574146735269,
# "status": "error",
# "subscription": {name: "ticker"}
# }
#
# v2
# {
# "error": "Unsupported field: 'price' for the given msg type: add order",
# "method": "add_order",
# "success": False,
# "time_in": "2025-05-13T08:59:44.803511Z",
# "time_out": "2025-05-13T08:59:44.803542Z'
# }
#
errorMessage = self.safe_string_2(message, 'errorMessage', 'error')
if errorMessage is not None:
requestId = self.safe_string_2(message, 'reqid', 'req_id')
broad = self.exceptions['ws']['broad']
broadKey = self.find_broadly_matched_key(broad, errorMessage)
exception = None
if broadKey is None:
exception = ExchangeError(errorMessage) # c# requirement to convert the errorMessage to string
else:
exception = broad[broadKey](errorMessage)
if requestId is not None:
client.reject(exception, requestId)
return False
return True
def handle_message(self, client: Client, message):
channel = self.safe_string(message, 'channel')
if channel is not None:
if channel == 'executions':
data = self.safe_list(message, 'data', [])
first = self.safe_dict(data, 0, {})
execType = self.safe_string(first, 'exec_type')
channel = 'myTrades' if (execType == 'trade') else 'orders'
methods: dict = {
'balances': self.handle_balance,
'book': self.handle_order_book,
'ohlc': self.handle_ohlcv,
'ticker': self.handle_ticker,
'trade': self.handle_trades,
# private
'myTrades': self.handle_my_trades,
'orders': self.handle_orders,
}
method = self.safe_value(methods, channel)
if method is not None:
method(client, message)
if self.handle_error_message(client, message):
event = self.safe_string_2(message, 'event', 'method')
methods: dict = {
'heartbeat': self.handle_heartbeat,
'systemStatus': self.handle_system_status,
'subscriptionStatus': self.handle_subscription_status,
'add_order': self.handle_create_edit_order,
'amend_order': self.handle_create_edit_order,
'cancel_order': self.handle_cancel_order,
'cancel_all': self.handle_cancel_all_orders,
'pong': self.handle_pong,
}
method = self.safe_value(methods, event)
if method is not None:
method(client, message)