# -*- coding: utf-8 -*- # PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN: # https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code import ccxt.async_support from ccxt.async_support.base.ws.cache import ArrayCache, ArrayCacheBySymbolById, ArrayCacheBySymbolBySide, ArrayCacheByTimestamp import hashlib from ccxt.base.types import Any, Balances, Bool, Int, Order, OrderBook, Position, Str, Strings, Ticker, Trade from ccxt.async_support.base.ws.client import Client from typing import List from ccxt.base.errors import ExchangeError class arkham(ccxt.async_support.arkham): def describe(self) -> Any: return self.deep_extend(super(arkham, self).describe(), { 'has': { 'ws': True, 'watchTrades': True, 'watchTradesForSymbols': False, 'watchOrderBook': True, 'watchOrderBookForSymbols': False, 'watchOHLCV': True, 'watchOHLCVForSymbols': False, 'watchOrders': True, 'watchMyTrades': False, 'watchTicker': True, 'watchTickers': False, 'watchBalance': True, }, 'urls': { 'api': { 'ws': 'wss://arkm.com/ws', }, }, 'options': { 'watchOrderBook': { 'depth': 100, # 5, 10, 20, 50, 100 'interval': 500, # 100, 200, 500, 1000 }, }, 'streaming': { 'keepAlive': 300000, # 5 minutes }, }) def handle_message(self, client: Client, message): # # confirmation # # {channel: 'confirmations', confirmationId: 'myCustomId-123'} if self.handle_error_message(client, message): return methods: dict = { 'ticker': self.handle_ticker, 'candles': self.handle_ohlcv, 'l2_updates': self.handle_order_book, 'trades': self.handle_trades, 'balances': self.handle_balance, 'positions': self.handle_positions, 'order_statuses': self.handle_order, 'trigger_orders': self.handle_order, # 'confirmations': self.handle_ticker, } channel = self.safe_string(message, 'channel') if channel == 'confirmations': return # type = self.safe_string(message, 'type') # if type != 'update' and type != 'snapshot': # debugger # } method = self.safe_value(methods, channel) if method is not None: method(client, message) async def subscribe(self, messageHash: str, rawChannel: str, params: dict) -> Any: subscriptionHash = messageHash request: dict = { 'args': { 'channel': rawChannel, 'params': params, }, 'confirmationId': self.uuid(), 'method': 'subscribe', } return await self.watch(self.urls['api']['ws'], messageHash, request, subscriptionHash) async def watch_ticker(self, symbol: str, params={}) -> Ticker: """ watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market https://arkm.com/docs#stream/ticker :param str symbol: unified symbol of the market to fetch the ticker for :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: a `ticker structure ` """ await self.load_markets() market = self.market(symbol) requestArg = { 'symbol': market['id'], } messageHash = 'ticker::' + market['symbol'] return await self.subscribe(messageHash, 'ticker', self.extend(params, requestArg)) def handle_ticker(self, client: Client, message): # # { # channel: 'ticker', # type: 'update', # data: { # symbol: 'BTC_USDT', # baseSymbol: 'BTC', # quoteSymbol: 'USDT', # price: '118962.74', # price24hAgo: '118780.42', # high24h: '120327.96', # low24h: '118217.28', # volume24h: '32.89729', # quoteVolume24h: '3924438.7146048', # markPrice: '0', # indexPrice: '118963.080293501', # fundingRate: '0', # nextFundingRate: '0', # nextFundingTime: 0, # productType: 'spot', # openInterest: '0', # indexCurrency: 'USDT', # usdVolume24h: '3924438.7146048', # openInterestUSD: '0' # } # } # data = self.safe_dict(message, 'data', {}) marketId = self.safe_string(data, 'symbol') market = self.safe_market(marketId, None) symbol = market['symbol'] ticker = self.parse_ws_ticker(data, market) self.tickers[symbol] = ticker client.resolve(ticker, 'ticker::' + symbol) # if self.safe_string(message, 'dataType') == 'all@ticker': # client.resolve(ticker, self.getMessageHash('ticker')) # } def parse_ws_ticker(self, message, market=None): # same dict api return self.parse_ticker(message, market) async def watch_ohlcv(self, symbol: str, timeframe='1m', since: Int = None, limit: Int = None, params={}) -> List[list]: """ watches historical candlestick data containing the open, high, low, and close price, and the volume of a market https://arkm.com/docs#stream/candles :param str symbol: unified symbol of the market to fetch OHLCV data for :param str timeframe: the length of time each candle represents :param int [since]: timestamp in ms of the earliest candle to fetch :param int [limit]: the maximum amount of candles to fetch :param dict [params]: extra parameters specific to the exchange API endpoint :returns int[][]: A list of candles ordered, open, high, low, close, volume """ await self.load_markets() market = self.market(symbol) rawTimeframe = self.safe_string(self.timeframes, timeframe, timeframe) requestArg = { 'symbol': market['id'], 'duration': rawTimeframe, } messageHash = 'ohlcv::' + market['symbol'] + '::' + rawTimeframe result = await self.subscribe(messageHash, 'candles', self.extend(requestArg, params)) ohlcv = result if self.newUpdates: limit = ohlcv.getLimit(market['symbol'], limit) return self.filter_by_since_limit(ohlcv, since, limit, 0, True) def handle_ohlcv(self, client: Client, message): # # { # channel: 'candles', # type: 'update', # data: { # symbol: 'BTC_USDT', # time: '1755076380000000', # duration: 60000000, # open: '120073.01', # high: '120073.01', # low: '120073.01', # close: '120073.01', # volume: '0', # quoteVolume: '0' # } # } # data = self.safe_dict(message, 'data', {}) marketId = self.safe_string(data, 'symbol') market = self.safe_market(marketId, None) symbol = market['symbol'] duration = self.safe_integer(data, 'duration') timeframe = self.findTimeframeByDuration(duration) messageHash = 'ohlcv::' + symbol + '::' + timeframe self.ohlcvs[symbol] = self.safe_value(self.ohlcvs, symbol, {}) if not (timeframe in self.ohlcvs[symbol]): limit = self.handle_option('watchOHLCV', 'limit', 1000) self.ohlcvs[symbol][timeframe] = ArrayCacheByTimestamp(limit) stored = self.ohlcvs[symbol][timeframe] parsed = self.parse_ws_ohlcv(data, market) stored.append(parsed) client.resolve(stored, messageHash) return message def parse_ws_ohlcv(self, ohlcv, market=None) -> list: # same api return self.parse_ohlcv(ohlcv, market) async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook: """ watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data https://arkm.com/docs#stream/l2_updates :param str symbol: unified symbol of the market to fetch the order book for :param int [limit]: the maximum amount of order book entries to return :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict: A dictionary of `order book structures ` indexed by market symbols """ await self.load_markets() market = self.market(symbol) requestArg = { 'symbol': market['id'], 'snapshot': True, } messageHash = 'orderBook::' + market['symbol'] orderbook = await self.subscribe(messageHash, 'l2_updates', self.extend(requestArg, params)) return orderbook.limit() def handle_order_book(self, client: Client, message): # # snapshot: # # { # channel: 'l2_updates', # type: 'snapshot', # data: { # symbol: 'BTC_USDT', # group: '0.01', # asks: [ [Object], [Object], ...], # bids: [ [Object], [Object], ...], # lastTime: 1755115180608299 # } # } # # update: # # { # channel: "l2_updates", # type: "update", # data: { # symbol: "BTC_USDT", # group: "0.01", # side: "sell", # size: "0.05295", # price: "122722.76", # revisionId: 2455511217, # time: 1755115736475207, # } # } # data = self.safe_dict(message, 'data') type = self.safe_string(message, 'type') marketId = self.safe_string(data, 'symbol') market = self.safe_market(marketId) symbol = market['symbol'] messageHash = 'orderBook::' + symbol if not (symbol in self.orderbooks): ob = self.order_book({}) ob['symbol'] = symbol self.orderbooks[symbol] = ob orderbook = self.orderbooks[symbol] if type == 'snapshot': timestamp = self.safe_integer_product(data, 'lastTime', 0.001) parsedOrderBook = self.parse_order_book(data, symbol, timestamp, 'bids', 'asks', 'price', 'size') orderbook.reset(parsedOrderBook) elif type == 'update': timestamp = self.safe_integer_product(data, 'time', 0.001) side = self.safe_string(data, 'side') bookside = orderbook['bids'] if (side == 'buy') else orderbook['asks'] self.handle_delta(bookside, data) orderbook['timestamp'] = timestamp orderbook['datetime'] = self.iso8601(timestamp) self.orderbooks[symbol] = orderbook client.resolve(self.orderbooks[symbol], messageHash) def handle_delta(self, bookside, delta): bidAsk = self.parse_bid_ask(delta, 'price', 'size') bookside.storeArray(bidAsk) async def watch_trades(self, symbol: str, since: Int = None, limit: Int = None, params={}) -> List[Trade]: """ watches information on multiple trades made in a market https://arkm.com/docs#stream/trades :param str symbol: unified market symbol of the market trades were made in :param int [since]: the earliest time in ms to fetch orders for :param int [limit]: the maximum number of trade structures to retrieve :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `trade structures ` """ await self.load_markets() market = self.market(symbol) requestArg = { 'symbol': market['id'], } messageHash = 'trade::' + market['symbol'] trades = await self.subscribe(messageHash, 'trades', self.extend(requestArg, params)) if self.newUpdates: limit = trades.getLimit(market['symbol'], limit) return self.filter_by_since_limit(trades, since, limit, 'timestamp', True) def handle_trades(self, client: Client, message): # # { # channel: 'trades', # type: 'update', # data: { # symbol: 'BTC_USDT', # revisionId: 2643896903, # size: '0.00261', # price: '118273.2', # takerSide: 'buy', # time: 1755200320146389 # } # } # data = self.safe_dict(message, 'data') marketId = self.safe_string(data, 'symbol') symbol = self.safe_symbol(marketId) if not (symbol in self.trades): limit = self.safe_integer(self.options, 'tradesLimit', 1000) self.trades[symbol] = ArrayCache(limit) parsed = self.parse_ws_trade(data) stored = self.trades[symbol] stored.append(parsed) client.resolve(stored, 'trade::' + symbol) def parse_ws_trade(self, trade, market=None): # same api return self.parse_trade(trade, market) async def authenticate(self, params={}): self.check_required_credentials() expires = (self.milliseconds() + self.safe_integer(self.options, 'requestExpiration', 5000)) * 1000 # need macroseconds wsOptions: dict = self.safe_dict(self.options, 'ws', {}) authenticated = self.safe_string(wsOptions, 'token') if authenticated is None: method = 'GET' bodyStr = '' path = 'ws' payload = self.apiKey + str(expires) + method.upper() + '/' + path + bodyStr decodedSecret = self.base64_to_binary(self.secret) signature = self.hmac(self.encode(payload), decodedSecret, hashlib.sha256, 'base64') defaultOptions: dict = { 'ws': { 'options': { 'headers': { 'Content-Type': 'application/json', 'Accept': 'application/json', 'Arkham-Api-Key': self.apiKey, 'Arkham-Expires': str(expires), 'Arkham-Signature': signature, }, }, }, } self.extend_exchange_options(defaultOptions) self.client(self.urls['api']['ws']) async def watch_balance(self, params={}) -> Balances: """ watch balance and get the amount of funds available for trading or funds locked in orders https://arkm.com/docs#stream/balances :param dict [params]: extra parameters specific to the exchange API endpoint :param str [params.type]: spot or contract if not provided self.options['defaultType'] is used :returns dict: a `balance structure ` """ await self.authenticate() await self.load_markets() requestArg = { 'snapshot': True, } messageHash = 'balances' result = await self.subscribe(messageHash, 'balances', self.extend(requestArg, params)) return result def handle_balance(self, client: Client, message): # # snapshot: # # { # channel: 'balances', # type: 'snapshot', # data: [ # { # subaccountId: 0, # symbol: 'USDT', # balance: '7.035335375', # free: '7.035335375', # priceUSDT: '1', # balanceUSDT: '7.035335375', # freeUSDT: '7.035335375', # lastUpdateReason: 'withdrawalFee', # lastUpdateTime: '1753905990432678', # lastUpdateId: 250483404, # lastUpdateAmount: '-2' # }, # { # subaccountId: 0, # symbol: 'SOL', # balance: '0.03', # free: '0.03', # priceUSDT: '197.37823276', # balanceUSDT: '5.921346982', # freeUSDT: '5.921346982', # lastUpdateReason: 'orderFill', # lastUpdateTime: '1753777760560164', # lastUpdateId: 248588190, # lastUpdateAmount: '0.03' # } # ] # } # # update: # # { # channel: 'balances', # type: 'update', # data: { # subaccountId: 0, # symbol: 'USDT', # balance: '7.028357615', # free: '7.028357615', # priceUSDT: '1', # balanceUSDT: '7.028357615', # freeUSDT: '7.028357615', # lastUpdateReason: 'tradingFee', # lastUpdateTime: '1755240882544056', # lastUpdateId: 2697860787, # lastUpdateAmount: '-0.00697776' # } # } # type = self.safe_string(message, 'type') parsed = {} if type == 'snapshot': # response same api data = self.safe_list(message, 'data') parsed = self.parse_ws_balance(data) parsed['info'] = message self.balance = parsed else: data = self.safe_dict(message, 'data') balancesArray = [data] parsed = self.parse_ws_balance(balancesArray) currencyId = self.safe_string(data, 'symbol') code = self.safe_currency_code(currencyId) self.balance[code] = parsed[code] messageHash = 'balances' client.resolve(self.safe_balance(self.balance), messageHash) def parse_ws_balance(self, balance): # same api return self.parse_balance(balance) async def watch_positions(self, symbols: Strings = None, since: Int = None, limit: Int = None, params={}) -> List[Position]: """ https://arkm.com/docs#stream/positions watch all open positions :param str[] [symbols]: list of unified market symbols :param int [since]: the earliest time in ms to fetch positions for :param int [limit]: the maximum number of positions to retrieve :param dict params: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `position structure ` """ await self.authenticate() await self.load_markets() messageHash = 'positions' if not self.is_empty(symbols): symbols = self.market_symbols(symbols) messageHash += '::' + ','.join(symbols) self.positions = ArrayCacheBySymbolBySide() requestArg = { 'snapshot': False, # no need for initial snapshot, it's done in REST api } newPositions = await self.subscribe(messageHash, 'positions', self.extend(requestArg, params)) if self.newUpdates: return newPositions return self.filter_by_symbols_since_limit(self.positions, symbols, since, limit, True) def handle_positions(self, client, message): # # snapshot: # # { # channel: 'positions', # type: 'snapshot', # data: [ # { # subaccountId: 0, # symbol: 'SOL_USDT_PERP', # base: '0.059', # quote: '-11.50618', # openBuySize: '0', # openSellSize: '0', # openBuyNotional: '0', # openSellNotional: '0', # lastUpdateReason: 'orderFill', # lastUpdateTime: '1755251065621402', # lastUpdateId: 2709589783, # lastUpdateBaseDelta: '0.059', # lastUpdateQuoteDelta: '-11.50618', # breakEvenPrice: '195.02', # markPrice: '195', # value: '11.505', # pnl: '-0.00118', # initialMargin: '1.1505', # maintenanceMargin: '0.6903', # averageEntryPrice: '195.02' # } # ] # } # newPositions = [] if self.positions is None: self.positions = {} type = self.safe_string(message, 'type') if type == 'snapshot': data = self.safe_list(message, 'data', []) for i in range(0, len(data)): position = self.parse_ws_position(data[i]) if self.safe_integer(position, 'entryPrice') != 0: newPositions.append(position) symbol = self.safe_string(position, 'symbol') self.positions[symbol] = position else: data = self.safe_dict(message, 'data') position = self.parse_ws_position(data) symbol = self.safe_string(position, 'symbol') self.positions[symbol] = position newPositions.append(position) messageHashes = self.find_message_hashes(client, 'positions::') for i in range(0, len(messageHashes)): messageHash = messageHashes[i] parts = messageHash.split('::') symbolsString = parts[1] symbols = symbolsString.split(',') positions = self.filter_by_array(newPositions, 'symbol', symbols, False) if not self.is_empty(positions): client.resolve(positions, messageHash) length = len(newPositions) if length > 0: client.resolve(newPositions, 'positions') def parse_ws_positions(self, positions: List[Any], symbols: List[str] = None, params={}) -> List[Position]: symbols = self.market_symbols(symbols) positions = self.to_array(positions) result = [] for i in range(0, len(positions)): position = self.extend(self.parse_ws_position(positions[i], None), params) result.append(position) return self.filter_by_array_positions(result, 'symbol', symbols, False) def parse_ws_position(self, position, market=None): # same api return self.parse_position(position, market) async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]: """ watches information on multiple orders made by the user https://arkm.com/docs#stream/order_statuses :param str symbol: unified market symbol of the market orders were made in :param int [since]: the earliest time in ms to fetch orders for :param int [limit]: the maximum number of order structures to retrieve :param dict [params]: extra parameters specific to the exchange API endpoint :returns dict[]: a list of `order structures ` """ await self.authenticate() await self.load_markets() market = None if symbol is not None: market = self.market(symbol) requestArg = { 'snapshot': False, } isTriggerOrder = False isTriggerOrder, params = self.handle_option_and_params(params, 'watchOrders', 'trigger', False) rawChannel = 'trigger_orders' if isTriggerOrder else 'order_statuses' messageHash = 'orders' if symbol is not None: messageHash += '::' + market['symbol'] messageHash += '::' + rawChannel orders = await self.subscribe(messageHash, rawChannel, self.extend(requestArg, params)) if self.newUpdates: limit = orders.getLimit(symbol, limit) return self.filter_by_symbol_since_limit(orders, symbol, since, limit, True) def handle_order(self, client: Client, message): # # { # channel: "order_statuses", # type: "update", # data: { # orderId: 4200775347657, # userId: 2959880, # subaccountId: 0, # symbol: "ARKM_USDT_PERP", # time: "1755253639782186", # side: "buy", # type: "limitGtc", # size: "10", # price: "0.5", # postOnly: False, # reduceOnly: False, # executedSize: "0", # status: "cancelled", # avgPrice: "0", # executedNotional: "0", # creditFeePaid: "0", # marginBonusFeePaid: "0", # quoteFeePaid: "0", # arkmFeePaid: "0", # revisionId: 2752963990, # lastTime: "1755272026403545", # clientOrderId: "", # lastSize: "0", # lastPrice: "0", # lastCreditFee: "0", # lastMarginBonusFee: "0", # lastQuoteFee: "0", # lastArkmFee: "0", # } # } # channel = self.safe_string(message, 'channel') data = self.safe_dict(message, 'data') if self.orders is None: limit = self.safe_integer(self.options, 'ordersLimit', 1000) self.orders = ArrayCacheBySymbolById(limit) orders = self.orders order = self.parse_ws_order(data) orders.append(order) client.resolve(orders, 'orders') client.resolve(orders, 'orders::' + order['symbol'] + '::' + channel) client.resolve(orders, 'orders::' + channel) def parse_ws_order(self, order, market=None) -> Order: # same api return self.parse_order(order, market) def handle_error_message(self, client: Client, response) -> Bool: # # error example: # # { # "id": "30005", # "name": "InvalidNotional", # "message": "order validation failed: invalid notional: notional 0.25 is less than min notional 1" # } # message = self.safe_string(response, 'message') if message is not None: body = self.json(response) errorCode = self.safe_string(response, 'id') feedback = self.id + ' ' + body self.throw_exactly_matched_exception(self.exceptions['exact'], errorCode, feedback) self.throw_exactly_matched_exception(self.exceptions['exact'], message, feedback) self.throw_broadly_matched_exception(self.exceptions['broad'], message, feedback) raise ExchangeError(self.id + ' ' + body) return False