# -*- coding: utf-8 -*- # PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN: # https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code import ccxt.async_support from ccxt.async_support.base.ws.cache import ArrayCache, ArrayCacheBySymbolById, ArrayCacheBySymbolBySide, ArrayCacheByTimestamp import hashlib from ccxt.base.types import Any, Balances, Int, Market, Order, OrderBook, Position, Str, Strings, Ticker, Tickers, Trade from ccxt.async_support.base.ws.client import Client from typing import List from ccxt.base.errors import ExchangeError from ccxt.base.errors import ArgumentsRequired from ccxt.base.errors import NotSupported class blofin(ccxt.async_support.blofin): def describe(self) -> Any: return self.deep_extend(super(blofin, self).describe(), { 'has': { 'ws': True, 'watchTrades': True, 'watchTradesForSymbols': True, 'watchOrderBook': True, 'watchOrderBookForSymbols': True, 'watchTicker': True, 'watchTickers': True, 'watchBidsAsks': True, 'watchOHLCV': True, 'watchOHLCVForSymbols': True, 'watchOrders': True, 'watchOrdersForSymbols': True, 'watchPositions': True, }, 'urls': { 'api': { 'ws': { 'swap': { 'public': 'wss://openapi.blofin.com/ws/public', 'private': 'wss://openapi.blofin.com/ws/private', }, }, }, 'test': { 'ws': { 'swap': { 'public': 'wss://demo-trading-openapi.blofin.com/ws/public', 'private': 'wss://demo-trading-openapi.blofin.com/ws/private', }, }, }, }, 'options': { 'defaultType': 'swap', 'tradesLimit': 1000, # orderbook channel can be one from: # - "books": 200 depth levels will be pushed in the initial full snapshot. Incremental data will be pushed every 100 ms for the changes in the order book during that period of time. # - "books5": 5 depth levels snapshot will be pushed every time. Snapshot data will be pushed every 100 ms when there are changes in the 5 depth levels snapshot. 'watchOrderBook': { 'channel': 'books', }, 'watchOrderBookForSymbols': { 'channel': 'books', }, }, 'streaming': { 'ping': self.ping, 'keepAlive': 25000, # 30 seconds max }, }) def ping(self, client): return 'ping' def handle_pong(self, client: Client, message): # # 'pong' # client.lastPong = self.milliseconds() async def watch_trades(self, symbol: str, since: Int = None, limit: Int = None, params={}) -> List[Trade]: """ get the list of most recent trades for a particular symbol https://docs.blofin.com/index.html#ws-trades-channel :param str symbol: unified symbol of the market to fetch trades for :param int [since]: timestamp in ms of the earliest trade to fetch :param int [limit]: the maximum amount of trades to fetch :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `trade structures ` """ params['callerMethodName'] = 'watchTrades' return await self.watch_trades_for_symbols([symbol], since, limit, params) async def watch_trades_for_symbols(self, symbols: List[str], since: Int = None, limit: Int = None, params={}) -> List[Trade]: """ get the list of most recent trades for a list of symbols https://docs.blofin.com/index.html#ws-trades-channel :param str[] symbols: unified symbol of the market to fetch trades for :param int [since]: timestamp in ms of the earliest trade to fetch :param int [limit]: the maximum amount of trades to fetch :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `trade structures ` """ await self.load_markets() trades = await self.watch_multiple_wrapper(True, 'trades', 'watchTradesForSymbols', symbols, params) if self.newUpdates: firstMarket = self.safe_dict(trades, 0) firstSymbol = self.safe_string(firstMarket, 'symbol') limit = trades.getLimit(firstSymbol, limit) return self.filter_by_since_limit(trades, since, limit, 'timestamp', True) def handle_trades(self, client: Client, message): # # { # arg: { # channel: "trades", # instId: "DOGE-USDT", # }, # data : [ # , # ... # ] # } # arg = self.safe_dict(message, 'arg') channelName = self.safe_string(arg, 'channel') data = self.safe_list(message, 'data') if data is None: return for i in range(0, len(data)): rawTrade = data[i] trade = self.parse_ws_trade(rawTrade) symbol = trade['symbol'] stored = self.safe_value(self.trades, symbol) if stored is None: limit = self.safe_integer(self.options, 'tradesLimit', 1000) stored = ArrayCache(limit) self.trades[symbol] = stored stored.append(trade) messageHash = channelName + ':' + symbol client.resolve(stored, messageHash) def parse_ws_trade(self, trade, market: Market = None) -> Trade: return self.parse_trade(trade, market) async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook: """ watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data https://docs.blofin.com/index.html#ws-order-book-channel :param str symbol: unified symbol of the market to fetch the order book for :param int [limit]: the maximum amount of order book entries to return :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: A dictionary of `order book structures ` indexed by market symbols """ params['callerMethodName'] = 'watchOrderBook' return await self.watch_order_book_for_symbols([symbol], limit, params) async def watch_order_book_for_symbols(self, symbols: List[str], limit: Int = None, params={}) -> OrderBook: """ watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data https://docs.blofin.com/index.html#ws-order-book-channel :param str[] symbols: unified array of symbols :param int [limit]: the maximum amount of order book entries to return :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.depth]: the type of order book to subscribe to, default is 'depth/increase100', also accepts 'depth5' or 'depth20' or depth50 :returns dict: A dictionary of `order book structures ` indexed by market symbols """ await self.load_markets() callerMethodName = None callerMethodName, params = self.handle_param_string(params, 'callerMethodName', 'watchOrderBookForSymbols') channelName = None channelName, params = self.handle_option_and_params(params, callerMethodName, 'channel', 'books') # due to some problem, temporarily disable other channels if channelName != 'books': raise NotSupported(self.id + ' ' + callerMethodName + '() at self moment ' + channelName + ' is not supported, coming soon') orderbook = await self.watch_multiple_wrapper(True, channelName, callerMethodName, symbols, params) return orderbook.limit() def handle_order_book(self, client: Client, message): # # { # arg: { # channel: "books", # instId: "DOGE-USDT", # }, # action: "snapshot", # can be 'snapshot' or 'update' # data: { # asks: [ [0.08096, 1], [0.08097, 123], ... ], # bids: [ [0.08095, 4], [0.08094, 237], ... ], # ts: "1707491587909", # prevSeqId: "0", # in case of 'update' there will be some value, less then seqId # seqId: "3374250786", # }, # } # arg = self.safe_dict(message, 'arg') channelName = self.safe_string(arg, 'channel') data = self.safe_dict(message, 'data') marketId = self.safe_string(arg, 'instId') market = self.safe_market(marketId) symbol = market['symbol'] messageHash = channelName + ':' + symbol if not (symbol in self.orderbooks): self.orderbooks[symbol] = self.order_book() orderbook = self.orderbooks[symbol] timestamp = self.safe_integer(data, 'ts') action = self.safe_string(message, 'action') if action == 'snapshot': orderBookSnapshot = self.parse_order_book(data, symbol, timestamp) orderBookSnapshot['nonce'] = self.safe_integer(data, 'seqId') orderbook.reset(orderBookSnapshot) else: asks = self.safe_list(data, 'asks', []) bids = self.safe_list(data, 'bids', []) self.handle_deltas_with_keys(orderbook['asks'], asks) self.handle_deltas_with_keys(orderbook['bids'], bids) orderbook['timestamp'] = timestamp orderbook['datetime'] = self.iso8601(timestamp) self.orderbooks[symbol] = orderbook client.resolve(orderbook, messageHash) async def watch_ticker(self, symbol: str, params={}) -> Ticker: """ watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market https://docs.blofin.com/index.html#ws-tickers-channel :param str symbol: unified symbol of the market to fetch the ticker for :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: a `ticker structure ` """ params['callerMethodName'] = 'watchTicker' market = self.market(symbol) symbol = market['symbol'] result = await self.watch_tickers([symbol], params) return result[symbol] async def watch_tickers(self, symbols: Strings = None, params={}) -> Tickers: """ watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for all markets of a specific list https://docs.blofin.com/index.html#ws-tickers-channel :param str[] symbols: unified symbol of the market to fetch the ticker for :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: a `ticker structure ` """ if symbols is None: raise NotSupported(self.id + ' watchTickers() requires a list of symbols') ticker = await self.watch_multiple_wrapper(True, 'tickers', 'watchTickers', symbols, params) if self.newUpdates: tickers = {} tickers[ticker['symbol']] = ticker return tickers return self.filter_by_array(self.tickers, 'symbol', symbols) def handle_ticker(self, client: Client, message): # # message # # { # arg: { # channel: "tickers", # instId: "DOGE-USDT", # }, # data: [ # # ], # } # self.handle_bid_ask(client, message) arg = self.safe_dict(message, 'arg') channelName = self.safe_string(arg, 'channel') data = self.safe_list(message, 'data') for i in range(0, len(data)): ticker = self.parse_ws_ticker(data[i]) symbol = ticker['symbol'] messageHash = channelName + ':' + symbol self.tickers[symbol] = ticker client.resolve(self.tickers[symbol], messageHash) def parse_ws_ticker(self, ticker, market: Market = None) -> Ticker: return self.parse_ticker(ticker, market) async def watch_bids_asks(self, symbols: Strings = None, params={}) -> Tickers: """ watches best bid & ask for symbols https://docs.blofin.com/index.html#ws-tickers-channel :param str[] symbols: unified symbol of the market to fetch the ticker for :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: a `ticker structure ` """ await self.load_markets() symbols = self.market_symbols(symbols, None, False) firstMarket = self.market(symbols[0]) channel = 'tickers' marketType = None marketType, params = self.handle_market_type_and_params('watchBidsAsks', firstMarket, params) url = self.implode_hostname(self.urls['api']['ws'][marketType]['public']) messageHashes = [] args = [] for i in range(0, len(symbols)): market = self.market(symbols[i]) messageHashes.append('bidask:' + market['symbol']) args.append({ 'channel': channel, 'instId': market['id'], }) request = self.get_subscription_request(args) ticker = await self.watch_multiple(url, messageHashes, self.deep_extend(request, params), messageHashes) if self.newUpdates: tickers = {} tickers[ticker['symbol']] = ticker return tickers return self.filter_by_array(self.bidsasks, 'symbol', symbols) def handle_bid_ask(self, client: Client, message): data = self.safe_list(message, 'data') for i in range(0, len(data)): ticker = self.parse_ws_bid_ask(data[i]) symbol = ticker['symbol'] messageHash = 'bidask:' + symbol self.bidsasks[symbol] = ticker client.resolve(ticker, messageHash) def parse_ws_bid_ask(self, ticker, market=None): marketId = self.safe_string(ticker, 'instId') market = self.safe_market(marketId, market, '-') symbol = self.safe_string(market, 'symbol') timestamp = self.safe_integer(ticker, 'ts') return self.safe_ticker({ 'symbol': symbol, 'timestamp': timestamp, 'datetime': self.iso8601(timestamp), 'ask': self.safe_string(ticker, 'askPrice'), 'askVolume': self.safe_string(ticker, 'askSize'), 'bid': self.safe_string(ticker, 'bidPrice'), 'bidVolume': self.safe_string(ticker, 'bidSize'), 'info': ticker, }, market) async def watch_ohlcv(self, symbol: str, timeframe: str = '1m', since: Int = None, limit: Int = None, params={}) -> List[list]: """ watches historical candlestick data containing the open, high, low, and close price, and the volume of a market :param str symbol: unified symbol of the market to fetch OHLCV data for :param str timeframe: the length of time each candle represents :param int [since]: timestamp in ms of the earliest candle to fetch :param int [limit]: the maximum amount of candles to fetch :param dict [params]: extra parameters specific to the exchange API endpoint :returns int[][]: A list of candles ordered, open, high, low, close, volume """ params['callerMethodName'] = 'watchOHLCV' result = await self.watch_ohlcv_for_symbols([[symbol, timeframe]], since, limit, params) return result[symbol][timeframe] async def watch_ohlcv_for_symbols(self, symbolsAndTimeframes: List[List[str]], since: Int = None, limit: Int = None, params={}): """ watches historical candlestick data containing the open, high, low, and close price, and the volume of a market https://docs.blofin.com/index.html#ws-candlesticks-channel :param str[][] symbolsAndTimeframes: array of arrays containing unified symbols and timeframes to fetch OHLCV data for, example [['BTC/USDT', '1m'], ['LTC/USDT', '5m']] :param int [since]: timestamp in ms of the earliest candle to fetch :param int [limit]: the maximum amount of candles to fetch :param dict [params]: extra parameters specific to the exchange API endpoint :returns int[][]: A list of candles ordered, open, high, low, close, volume """ symbolsLength = len(symbolsAndTimeframes) if symbolsLength == 0 or not isinstance(symbolsAndTimeframes[0], list): raise ArgumentsRequired(self.id + " watchOHLCVForSymbols() requires a an array of symbols and timeframes, like [['BTC/USDT', '1m'], ['LTC/USDT', '5m']]") await self.load_markets() symbol, timeframe, candles = await self.watch_multiple_wrapper(True, 'candle', 'watchOHLCVForSymbols', symbolsAndTimeframes, params) if self.newUpdates: limit = candles.getLimit(symbol, limit) filtered = self.filter_by_since_limit(candles, since, limit, 0, True) return self.create_ohlcv_object(symbol, timeframe, filtered) def handle_ohlcv(self, client: Client, message): # # message # # { # arg: { # channel: "candle1m", # instId: "DOGE-USDT", # }, # data: [ # [same object in REST example] # ], # } # arg = self.safe_dict(message, 'arg') channelName = self.safe_string(arg, 'channel') data = self.safe_list(message, 'data') marketId = self.safe_string(arg, 'instId') market = self.safe_market(marketId) symbol = market['symbol'] interval = channelName.replace('candle', '') unifiedTimeframe = self.find_timeframe(interval) self.ohlcvs[symbol] = self.safe_dict(self.ohlcvs, symbol, {}) stored = self.safe_value(self.ohlcvs[symbol], unifiedTimeframe) if stored is None: limit = self.safe_integer(self.options, 'OHLCVLimit', 1000) stored = ArrayCacheByTimestamp(limit) self.ohlcvs[symbol][unifiedTimeframe] = stored for i in range(0, len(data)): candle = data[i] parsed = self.parse_ohlcv(candle, market) stored.append(parsed) resolveData = [symbol, unifiedTimeframe, stored] messageHash = 'candle' + interval + ':' + symbol client.resolve(resolveData, messageHash) async def watch_balance(self, params={}) -> Balances: """ query for balance and get the amount of funds available for trading or funds locked in orders https://docs.blofin.com/index.html#ws-account-channel :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: a `balance structure ` """ await self.load_markets() await self.authenticate() marketType = None marketType, params = self.handle_market_type_and_params('watchBalance', None, params) if marketType == 'spot': raise NotSupported(self.id + ' watchBalance() is not supported for spot markets yet') messageHash = marketType + ':balance' sub = { 'channel': 'account', } request = self.get_subscription_request([sub]) url = self.implode_hostname(self.urls['api']['ws'][marketType]['private']) return await self.watch(url, messageHash, self.deep_extend(request, params), messageHash) def handle_balance(self, client: Client, message): # # { # arg: { # channel: "account", # }, # data: , # } # marketType = 'swap' # for now if not (marketType in self.balance): self.balance[marketType] = {} self.balance[marketType] = self.parse_ws_balance(message) messageHash = marketType + ':balance' client.resolve(self.balance[marketType], messageHash) def parse_ws_balance(self, message): return self.parse_balance(message) async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]: """ watches information on multiple orders made by the user https://docs.blofin.com/index.html#ws-order-channel https://docs.blofin.com/index.html#ws-algo-orders-channel :param str symbol: unified market symbol of the market orders were made in :param int [since]: the earliest time in ms to fetch orders for :param int [limit]: the maximum number of order structures to retrieve :param dict [params]: extra parameters specific to the exchange API endpoint :param boolean [params.trigger]: set to True for trigger orders :returns dict[]: a list of [order structures]{@link https://docs.ccxt.com/#/?id=order-structure """ params['callerMethodName'] = 'watchOrders' symbolsArray = [symbol] if (symbol is not None) else [] return await self.watch_orders_for_symbols(symbolsArray, since, limit, params) async def watch_orders_for_symbols(self, symbols: List[str], since: Int = None, limit: Int = None, params={}) -> List[Order]: """ watches information on multiple orders made by the user across multiple symbols https://docs.blofin.com/index.html#ws-order-channel https://docs.blofin.com/index.html#ws-algo-orders-channel :param str[] symbols: :param int [since]: the earliest time in ms to fetch orders for :param int [limit]: the maximum number of order structures to retrieve :param dict [params]: extra parameters specific to the exchange API endpoint :param boolean [params.trigger]: set to True for trigger orders :returns dict[]: a list of [order structures]{@link https://docs.ccxt.com/#/?id=order-structure """ await self.authenticate() await self.load_markets() trigger = self.safe_value_2(params, 'stop', 'trigger') params = self.omit(params, ['stop', 'trigger']) channel = 'orders-algo' if trigger else 'orders' orders = await self.watch_multiple_wrapper(False, channel, 'watchOrdersForSymbols', symbols, params) if self.newUpdates: first = self.safe_value(orders, 0) tradeSymbol = self.safe_string(first, 'symbol') limit = orders.getLimit(tradeSymbol, limit) return self.filter_by_since_limit(orders, since, limit, 'timestamp', True) def handle_orders(self, client: Client, message): # # { # action: 'update', # arg: {channel: 'orders'}, # data: [ # # ] # } # if self.orders is None: limit = self.safe_integer(self.options, 'ordersLimit', 1000) self.orders = ArrayCacheBySymbolById(limit) orders = self.orders arg = self.safe_dict(message, 'arg') channelName = self.safe_string(arg, 'channel') data = self.safe_list(message, 'data') for i in range(0, len(data)): order = self.parse_ws_order(data[i]) symbol = order['symbol'] messageHash = channelName + ':' + symbol orders.append(order) client.resolve(orders, messageHash) client.resolve(orders, channelName) def parse_ws_order(self, order, market: Market = None) -> Order: return self.parse_order(order, market) async def watch_positions(self, symbols: Strings = None, since: Int = None, limit: Int = None, params={}) -> List[Position]: """ https://docs.blofin.com/index.html#ws-positions-channel watch all open positions :param str[]|None symbols: list of unified market symbols :param int [since]: the earliest time in ms to fetch positions for :param int [limit]: the maximum number of positions to retrieve :param dict params: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `position structure ` """ await self.authenticate() await self.load_markets() newPositions = await self.watch_multiple_wrapper(False, 'positions', 'watchPositions', symbols, params) if self.newUpdates: return newPositions return self.filter_by_symbols_since_limit(self.positions, symbols, since, limit) def handle_positions(self, client: Client, message): # # { # arg: {channel: 'positions'}, # data: [ # # ] # } # if self.positions is None: self.positions = ArrayCacheBySymbolBySide() cache = self.positions arg = self.safe_dict(message, 'arg') channelName = self.safe_string(arg, 'channel') data = self.safe_list(message, 'data') newPositions = [] for i in range(0, len(data)): position = self.parse_ws_position(data[i]) newPositions.append(position) cache.append(position) messageHash = channelName + ':' + position['symbol'] client.resolve(position, messageHash) def parse_ws_position(self, position, market: Market = None) -> Position: return self.parse_position(position, market) async def watch_multiple_wrapper(self, isPublic: bool, channelName: str, callerMethodName: str, symbolsArray: List[Any] = None, params={}): # underlier method for all watch-multiple symbols await self.load_markets() callerMethodName, params = self.handle_param_string(params, 'callerMethodName', callerMethodName) # if OHLCV method are being called, then symbols would be symbolsAndTimeframes(multi-dimensional) array isOHLCV = (channelName == 'candle') symbols = self.get_list_from_object_values(symbolsArray, 0) if isOHLCV else symbolsArray symbols = self.market_symbols(symbols, None, True, True) firstMarket = None firstSymbol = self.safe_string(symbols, 0) if firstSymbol is not None: firstMarket = self.market(firstSymbol) marketType = None marketType, params = self.handle_market_type_and_params(callerMethodName, firstMarket, params) if marketType != 'swap': raise NotSupported(self.id + ' ' + callerMethodName + '() does not support ' + marketType + ' markets yet') rawSubscriptions = [] messageHashes = [] if symbols is None: symbols = [] symbolsLength = len(symbols) if symbolsLength > 0: for i in range(0, len(symbols)): current = symbols[i] market = None channel = channelName if isOHLCV: market = self.market(current) tfArray = symbolsArray[i] tf = tfArray[1] interval = self.safe_string(self.timeframes, tf, tf) channel += interval else: market = self.market(current) topic = { 'channel': channel, 'instId': market['id'], } rawSubscriptions.append(topic) messageHashes.append(channel + ':' + market['symbol']) else: rawSubscriptions.append({'channel': channelName}) messageHashes.append(channelName) # private channel are difference, they only need plural channel name for multiple symbols if self.in_array(channelName, ['orders', 'orders-algo', 'positions']): rawSubscriptions = [{'channel': channelName}] request = self.get_subscription_request(rawSubscriptions) privateOrPublic = 'public' if isPublic else 'private' url = self.implode_hostname(self.urls['api']['ws'][marketType][privateOrPublic]) return await self.watch_multiple(url, messageHashes, self.deep_extend(request, params), messageHashes) def get_subscription_request(self, args): return { 'op': 'subscribe', 'args': args, } def handle_message(self, client: Client, message): # # message examples # # { # arg: { # channel: "trades", # instId: "DOGE-USDT", # }, # event: "subscribe" # } # # incoming data updates' examples can be seen under each handler method # methods = { # public 'pong': self.handle_pong, 'trades': self.handle_trades, 'books': self.handle_order_book, 'tickers': self.handle_ticker, 'candle': self.handle_ohlcv, # candle1m, candle5m, etc # private 'account': self.handle_balance, 'orders': self.handle_orders, 'orders-algo': self.handle_orders, 'positions': self.handle_positions, } method = None if message == 'pong': method = self.safe_value(methods, 'pong') else: event = self.safe_string(message, 'event') if event == 'subscribe': return elif event == 'login': future = self.safe_value(client.futures, 'authenticate_hash') future.resolve(True) return elif event == 'error': raise ExchangeError(self.id + ' error: ' + self.json(message)) arg = self.safe_dict(message, 'arg') channelName = self.safe_string(arg, 'channel') method = self.safe_value(methods, channelName) if not method and channelName.find('candle') >= 0: method = methods['candle'] if method: method(client, message) async def authenticate(self, params={}): self.check_required_credentials() milliseconds = self.milliseconds() messageHash = 'authenticate_hash' timestamp = str(milliseconds) nonce = 'n_' + timestamp auth = '/users/self/verify' + 'GET' + timestamp + '' + nonce signature = self.string_to_base64(self.hmac(self.encode(auth), self.encode(self.secret), hashlib.sha256)) request = { 'op': 'login', 'args': [ { 'apiKey': self.apiKey, 'passphrase': self.password, 'timestamp': timestamp, 'nonce': nonce, 'sign': signature, }, ], } marketType = 'swap' # for now url = self.implode_hostname(self.urls['api']['ws'][marketType]['private']) await self.watch(url, messageHash, self.deep_extend(request, params), messageHash)