Files
ccxt_with_mt5/ccxt/pro/cryptocom.py
lz_db 0fab423a18 add
2025-11-16 12:31:03 +08:00

1351 lines
60 KiB
Python

# -*- coding: utf-8 -*-
# PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN:
# https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code
import ccxt.async_support
from ccxt.async_support.base.ws.cache import ArrayCache, ArrayCacheBySymbolById, ArrayCacheBySymbolBySide, ArrayCacheByTimestamp
import hashlib
from ccxt.base.types import Any, Balances, Bool, Int, Market, Num, Order, OrderBook, OrderSide, OrderType, Position, Str, Strings, Ticker, Tickers, Trade
from ccxt.async_support.base.ws.client import Client
from typing import List
from ccxt.base.errors import ExchangeError
from ccxt.base.errors import AuthenticationError
from ccxt.base.errors import NetworkError
from ccxt.base.errors import ChecksumError
class cryptocom(ccxt.async_support.cryptocom):
def describe(self) -> Any:
return self.deep_extend(super(cryptocom, self).describe(), {
'has': {
'ws': True,
'watchBalance': True,
'watchTicker': True,
'watchTickers': True,
'watchBidsAsks': True,
'watchMyTrades': True,
'watchTrades': True,
'watchTradesForSymbols': True,
'watchOrderBook': True,
'watchOrderBookForSymbols': True,
'watchOrders': True,
'watchOHLCV': True,
'watchPositions': True,
'createOrderWs': True,
'cancelOrderWs': True,
'cancelAllOrders': True,
'editOrderWs': True,
},
'urls': {
'api': {
'ws': {
'public': 'wss://stream.crypto.com/exchange/v1/market',
'private': 'wss://stream.crypto.com/exchange/v1/user',
},
},
'test': {
'public': 'wss://uat-stream.3ona.co/exchange/v1/market',
'private': 'wss://uat-stream.3ona.co/exchange/v1/user',
},
},
'options': {
'watchPositions': {
'fetchPositionsSnapshot': True, # or False
'awaitPositionsSnapshot': True, # whether to wait for the positions snapshot before providing updates
},
'watchOrderBook': {
'checksum': True,
},
},
'streaming': {
},
})
async def pong(self, client, message):
# {
# "id": 1587523073344,
# "method": "public/heartbeat",
# "code": 0
# }
try:
await client.send({'id': self.safe_integer(message, 'id'), 'method': 'public/respond-heartbeat'})
except Exception as e:
error = NetworkError(self.id + ' pong failed with error ' + self.json(e))
client.reset(error)
async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook:
"""
watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#book-instrument_name
:param str symbol: unified symbol of the market to fetch the order book for
:param int [limit]: the maximum amount of order book entries to return
:param dict [params]: extra parameters specific to the exchange API endpoint
:param str [params.bookSubscriptionType]: The subscription type. Allowed values: SNAPSHOT full snapshot. This is the default if not specified. SNAPSHOT_AND_UPDATE delta updates
:param int [params.bookUpdateFrequency]: Book update interval in ms. Allowed values: 100 for snapshot subscription 10 for delta subscription
:returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
"""
return await self.watch_order_book_for_symbols([symbol], limit, params)
async def un_watch_order_book(self, symbol: str, params={}) -> Any:
"""
unWatches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#book-instrument_name
:param str symbol: unified symbol of the market to fetch the order book for
:param dict [params]: extra parameters specific to the exchange API endpoint
:param str [params.bookSubscriptionType]: The subscription type. Allowed values: SNAPSHOT full snapshot. This is the default if not specified. SNAPSHOT_AND_UPDATE delta updates
:param int [params.bookUpdateFrequency]: Book update interval in ms. Allowed values: 100 for snapshot subscription 10 for delta subscription
:returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
"""
return await self.un_watch_order_book_for_symbols([symbol], params)
async def watch_order_book_for_symbols(self, symbols: List[str], limit: Int = None, params={}) -> OrderBook:
"""
watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#book-instrument_name
:param str[] symbols: unified array of symbols
:param int [limit]: the maximum amount of order book entries to return
:param dict [params]: extra parameters specific to the exchange API endpoint
:param str [params.bookSubscriptionType]: The subscription type. Allowed values: SNAPSHOT full snapshot. This is the default if not specified. SNAPSHOT_AND_UPDATE delta updates
:param int [params.bookUpdateFrequency]: Book update interval in ms. Allowed values: 100 for snapshot subscription 10 for delta subscription
:returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
"""
await self.load_markets()
symbols = self.market_symbols(symbols)
topics = []
messageHashes = []
if not limit:
limit = 50
topicParams = self.safe_value(params, 'params')
if topicParams is None:
params['params'] = {}
bookSubscriptionType = None
bookSubscriptionType2 = None
bookSubscriptionType, params = self.handle_option_and_params(params, 'watchOrderBook', 'bookSubscriptionType', 'SNAPSHOT_AND_UPDATE')
bookSubscriptionType2, params = self.handle_option_and_params(params, 'watchOrderBookForSymbols', 'bookSubscriptionType', bookSubscriptionType)
params['params']['bookSubscriptionType'] = bookSubscriptionType2
bookUpdateFrequency = None
bookUpdateFrequency2 = None
bookUpdateFrequency, params = self.handle_option_and_params(params, 'watchOrderBook', 'bookUpdateFrequency')
bookUpdateFrequency2, params = self.handle_option_and_params(params, 'watchOrderBookForSymbols', 'bookUpdateFrequency', bookUpdateFrequency)
if bookUpdateFrequency2 is not None:
params['params']['bookSubscriptionType'] = bookUpdateFrequency2
for i in range(0, len(symbols)):
symbol = symbols[i]
market = self.market(symbol)
currentTopic = 'book' + '.' + market['id'] + '.' + str(limit)
messageHash = 'orderbook:' + market['symbol']
messageHashes.append(messageHash)
topics.append(currentTopic)
orderbook = await self.watch_public_multiple(messageHashes, topics, params)
return orderbook.limit()
async def un_watch_order_book_for_symbols(self, symbols: List[str], params={}) -> OrderBook:
"""
unWatches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#book-instrument_name
:param str[] symbols: unified array of symbols
:param dict [params]: extra parameters specific to the exchange API endpoint
:param int [params.limit]: orderbook limit, default is 50
:param str [params.bookSubscriptionType]: The subscription type. Allowed values: SNAPSHOT full snapshot. This is the default if not specified. SNAPSHOT_AND_UPDATE delta updates
:param int [params.bookUpdateFrequency]: Book update interval in ms. Allowed values: 100 for snapshot subscription 10 for delta subscription
:returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
"""
await self.load_markets()
symbols = self.market_symbols(symbols)
topics = []
subMessageHashes = []
messageHashes = []
limit = self.safe_integer(params, 'limit', 50)
topicParams = self.safe_value(params, 'params')
if topicParams is None:
params['params'] = {}
bookSubscriptionType = None
bookSubscriptionType2 = None
bookSubscriptionType, params = self.handle_option_and_params(params, 'watchOrderBook', 'bookSubscriptionType', 'SNAPSHOT_AND_UPDATE')
bookSubscriptionType2, params = self.handle_option_and_params(params, 'watchOrderBookForSymbols', 'bookSubscriptionType', bookSubscriptionType)
params['params']['bookSubscriptionType'] = bookSubscriptionType2
bookUpdateFrequency = None
bookUpdateFrequency2 = None
bookUpdateFrequency, params = self.handle_option_and_params(params, 'watchOrderBook', 'bookUpdateFrequency')
bookUpdateFrequency2, params = self.handle_option_and_params(params, 'watchOrderBookForSymbols', 'bookUpdateFrequency', bookUpdateFrequency)
if bookUpdateFrequency2 is not None:
params['params']['bookSubscriptionType'] = bookUpdateFrequency2
for i in range(0, len(symbols)):
symbol = symbols[i]
market = self.market(symbol)
currentTopic = 'book' + '.' + market['id'] + '.' + str(limit)
messageHash = 'orderbook:' + market['symbol']
subMessageHashes.append(messageHash)
messageHashes.append('unsubscribe:' + messageHash)
topics.append(currentTopic)
return await self.un_watch_public_multiple('orderbook', symbols, messageHashes, subMessageHashes, topics, params)
def handle_delta(self, bookside, delta):
price = self.safe_float(delta, 0)
amount = self.safe_float(delta, 1)
count = self.safe_integer(delta, 2)
bookside.storeArray([price, amount, count])
def handle_deltas(self, bookside, deltas):
for i in range(0, len(deltas)):
self.handle_delta(bookside, deltas[i])
def handle_order_book(self, client: Client, message):
#
# snapshot
# {
# "instrument_name":"LTC_USDT",
# "subscription":"book.LTC_USDT.150",
# "channel":"book",
# "depth":150,
# "data": [
# {
# "bids": [
# [122.21, 0.74041, 4]
# ],
# "asks": [
# [122.29, 0.00002, 1]
# ]
# "t": 1648123943803,
# "s":754560122
# }
# ]
# }
# update
# {
# "instrument_name":"BTC_USDT",
# "subscription":"book.BTC_USDT.50",
# "channel":"book.update",
# "depth":50,
# "data":[
# {
# "update":{
# "asks":[
# [
# "43755.46",
# "0.10000",
# "1"
# ],
# ...
# ],
# "bids":[
# [
# "43737.46",
# "0.14096",
# "1"
# ],
# ...
# ]
# },
# "t":1704484068898,
# "tt":1704484068892,
# "u":78795598253024,
# "pu":78795598162080,
# "cs":-781431132
# }
# ]
# }
#
marketId = self.safe_string(message, 'instrument_name')
market = self.safe_market(marketId)
symbol = market['symbol']
data = self.safe_value(message, 'data')
data = self.safe_value(data, 0)
timestamp = self.safe_integer(data, 't')
if not (symbol in self.orderbooks):
limit = self.safe_integer(message, 'depth')
self.orderbooks[symbol] = self.counted_order_book({}, limit)
orderbook = self.orderbooks[symbol]
channel = self.safe_string(message, 'channel')
nonce = self.safe_integer_2(data, 'u', 's')
books = data
if channel == 'book': # snapshot
orderbook.reset({})
orderbook['symbol'] = symbol
orderbook['timestamp'] = timestamp
orderbook['datetime'] = self.iso8601(timestamp)
orderbook['nonce'] = nonce
else:
books = self.safe_value(data, 'update', {})
previousNonce = self.safe_integer(data, 'pu')
currentNonce = orderbook['nonce']
if currentNonce != previousNonce:
checksum = self.handle_option('watchOrderBook', 'checksum', True)
if checksum:
raise ChecksumError(self.id + ' ' + self.orderbook_checksum_message(symbol))
self.handle_deltas(orderbook['asks'], self.safe_value(books, 'asks', []))
self.handle_deltas(orderbook['bids'], self.safe_value(books, 'bids', []))
orderbook['nonce'] = nonce
self.orderbooks[symbol] = orderbook
messageHash = 'orderbook:' + symbol
client.resolve(orderbook, messageHash)
async def watch_trades(self, symbol: str, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
"""
get the list of most recent trades for a particular symbol
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#trade-instrument_name
:param str symbol: unified symbol of the market to fetch trades for
:param int [since]: timestamp in ms of the earliest trade to fetch
:param int [limit]: the maximum amount of trades to fetch
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
"""
return await self.watch_trades_for_symbols([symbol], since, limit, params)
async def un_watch_trades(self, symbol: str, params={}) -> List[Trade]:
"""
get the list of most recent trades for a particular symbol
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#trade-instrument_name
:param str symbol: unified symbol of the market to fetch trades for
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
"""
return await self.un_watch_trades_for_symbols([symbol], params)
async def watch_trades_for_symbols(self, symbols: List[str], since: Int = None, limit: Int = None, params={}) -> List[Trade]:
"""
get the list of most recent trades for a particular symbol
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#trade-instrument_name
:param str[] symbols: unified symbol of the market to fetch trades for
:param int [since]: timestamp in ms of the earliest trade to fetch
:param int [limit]: the maximum amount of trades to fetch
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
"""
await self.load_markets()
symbols = self.market_symbols(symbols)
topics = []
for i in range(0, len(symbols)):
symbol = symbols[i]
market = self.market(symbol)
currentTopic = 'trade' + '.' + market['id']
topics.append(currentTopic)
trades = await self.watch_public_multiple(topics, topics, params)
if self.newUpdates:
first = self.safe_value(trades, 0)
tradeSymbol = self.safe_string(first, 'symbol')
limit = trades.getLimit(tradeSymbol, limit)
return self.filter_by_since_limit(trades, since, limit, 'timestamp', True)
async def un_watch_trades_for_symbols(self, symbols: List[str], params={}) -> Any:
"""
get the list of most recent trades for a particular symbol
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#trade-instrument_name
:param str[] [symbols]: list of unified market symbols to unwatch trades for
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
"""
await self.load_markets()
symbols = self.market_symbols(symbols)
topics = []
messageHashes = []
for i in range(0, len(symbols)):
symbol = symbols[i]
market = self.market(symbol)
currentTopic = 'trade' + '.' + market['id']
messageHashes.append('unsubscribe:trades:' + market['symbol'])
topics.append(currentTopic)
return await self.un_watch_public_multiple('trades', symbols, messageHashes, topics, topics, params)
def handle_trades(self, client: Client, message):
#
# {
# "code": 0,
# "method": "subscribe",
# "result": {
# "instrument_name": "BTC_USDT",
# "subscription": "trade.BTC_USDT",
# "channel": "trade",
# "data": [
# {
# "dataTime":1648122434405,
# "d":"2358394540212355488",
# "s":"SELL",
# "p":42980.85,
# "q":0.002325,
# "t":1648122434404,
# "i":"BTC_USDT"
# }
# (...)
# ]
# }
#
channel = self.safe_string(message, 'channel')
marketId = self.safe_string(message, 'instrument_name')
symbolSpecificMessageHash = self.safe_string(message, 'subscription')
market = self.safe_market(marketId)
symbol = market['symbol']
stored = self.safe_value(self.trades, symbol)
if stored is None:
limit = self.safe_integer(self.options, 'tradesLimit', 1000)
stored = ArrayCache(limit)
self.trades[symbol] = stored
data = self.safe_value(message, 'data', [])
dataLength = len(data)
if dataLength == 0:
return
parsedTrades = self.parse_trades(data, market)
for j in range(0, len(parsedTrades)):
stored.append(parsedTrades[j])
channelReplaced = channel.replace('.' + marketId, '')
client.resolve(stored, symbolSpecificMessageHash)
client.resolve(stored, channelReplaced)
async def watch_my_trades(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
"""
watches information on multiple trades made by the user
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#user-trade-instrument_name
:param str symbol: unified market symbol of the market trades were made in
:param int [since]: the earliest time in ms to fetch trades for
:param int [limit]: the maximum number of trade structures to retrieve
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=trade-structure>`
"""
await self.load_markets()
market = None
if symbol is not None:
market = self.market(symbol)
symbol = market['symbol']
messageHash = 'user.trade'
messageHash = (messageHash + '.' + market['id']) if (market is not None) else messageHash
trades = await self.watch_private_subscribe(messageHash, params)
if self.newUpdates:
limit = trades.getLimit(symbol, limit)
return self.filter_by_symbol_since_limit(trades, symbol, since, limit, True)
async def watch_ticker(self, symbol: str, params={}) -> Ticker:
"""
watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#ticker-instrument_name
:param str symbol: unified symbol of the market to fetch the ticker for
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
"""
await self.load_markets()
market = self.market(symbol)
messageHash = 'ticker' + '.' + market['id']
return await self.watch_public(messageHash, params)
async def un_watch_ticker(self, symbol: str, params={}) -> Any:
"""
unWatches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#ticker-instrument_name
:param str symbol: unified symbol of the market to fetch the ticker for
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
"""
await self.load_markets()
market = self.market(symbol)
subMessageHash = 'ticker' + '.' + market['id']
messageHash = 'unsubscribe:ticker:' + market['symbol']
return await self.un_watch_public_multiple('ticker', [market['symbol']], [messageHash], [subMessageHash], [subMessageHash], params)
async def watch_tickers(self, symbols: Strings = None, params={}) -> Tickers:
"""
watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for all markets of a specific list
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#ticker-instrument_name
:param str[] symbols: unified symbol of the market to fetch the ticker for
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
"""
await self.load_markets()
symbols = self.market_symbols(symbols, None, False)
messageHashes = []
marketIds = self.market_ids(symbols)
for i in range(0, len(marketIds)):
marketId = marketIds[i]
messageHashes.append('ticker.' + marketId)
url = self.urls['api']['ws']['public']
id = self.nonce()
request: dict = {
'method': 'subscribe',
'params': {
'channels': messageHashes,
},
'nonce': id,
}
ticker = await self.watch_multiple(url, messageHashes, self.extend(request, params), messageHashes)
if self.newUpdates:
result: dict = {}
result[ticker['symbol']] = ticker
return result
return self.filter_by_array(self.tickers, 'symbol', symbols)
async def un_watch_tickers(self, symbols: Strings = None, params={}) -> Any:
"""
unWatches a price ticker
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#ticker-instrument_name
:param str[] symbols: unified symbol of the market to fetch the ticker for
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
"""
await self.load_markets()
symbols = self.market_symbols(symbols, None, False)
messageHashes = []
subMessageHashes = []
marketIds = self.market_ids(symbols)
for i in range(0, len(marketIds)):
marketId = marketIds[i]
symbol = symbols[i]
subMessageHashes.append('ticker.' + marketId)
messageHashes.append('unsubscribe:ticker:' + symbol)
return await self.un_watch_public_multiple('ticker', symbols, messageHashes, subMessageHashes, subMessageHashes, params)
def handle_ticker(self, client: Client, message):
#
# {
# "instrument_name": "ETHUSD-PERP",
# "subscription": "ticker.ETHUSD-PERP",
# "channel": "ticker",
# "data": [
# {
# "h": "2400.20",
# "l": "2277.10",
# "a": "2335.25",
# "c": "-0.0022",
# "b": "2335.10",
# "bs": "5.4000",
# "k": "2335.16",
# "ks": "1.9970",
# "i": "ETHUSD-PERP",
# "v": "1305697.6462",
# "vv": "3058704939.17",
# "oi": "161646.3614",
# "t": 1726069647560
# }
# ]
# }
#
self.handle_bid_ask(client, message)
messageHash = self.safe_string(message, 'subscription')
marketId = self.safe_string(message, 'instrument_name')
market = self.safe_market(marketId)
data = self.safe_value(message, 'data', [])
for i in range(0, len(data)):
ticker = data[i]
parsed = self.parse_ws_ticker(ticker, market)
symbol = parsed['symbol']
self.tickers[symbol] = parsed
client.resolve(parsed, messageHash)
def parse_ws_ticker(self, ticker: dict, market: Market = None) -> Ticker:
#
# {
# "h": "2400.20",
# "l": "2277.10",
# "a": "2335.25",
# "c": "-0.0022",
# "b": "2335.10",
# "bs": "5.4000",
# "k": "2335.16",
# "ks": "1.9970",
# "i": "ETHUSD-PERP",
# "v": "1305697.6462",
# "vv": "3058704939.17",
# "oi": "161646.3614",
# "t": 1726069647560
# }
#
timestamp = self.safe_integer(ticker, 't')
marketId = self.safe_string(ticker, 'i')
market = self.safe_market(marketId, market, '_')
quote = self.safe_string(market, 'quote')
last = self.safe_string(ticker, 'a')
return self.safe_ticker({
'symbol': market['symbol'],
'timestamp': timestamp,
'datetime': self.iso8601(timestamp),
'high': self.safe_number(ticker, 'h'),
'low': self.safe_number(ticker, 'l'),
'bid': self.safe_number(ticker, 'b'),
'bidVolume': self.safe_number(ticker, 'bs'),
'ask': self.safe_number(ticker, 'k'),
'askVolume': self.safe_number(ticker, 'ks'),
'vwap': None,
'open': None,
'close': last,
'last': last,
'previousClose': None,
'change': None,
'percentage': self.safe_string(ticker, 'c'),
'average': None,
'baseVolume': self.safe_string(ticker, 'v'),
'quoteVolume': self.safe_string(ticker, 'vv') if (quote == 'USD') else None,
'info': ticker,
}, market)
async def watch_bids_asks(self, symbols: Strings = None, params={}) -> Tickers:
"""
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#ticker-instrument_name
watches best bid & ask for symbols
:param str[] symbols: unified symbol of the market to fetch the ticker for
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
"""
await self.load_markets()
symbols = self.market_symbols(symbols, None, False)
messageHashes = []
topics = []
marketIds = self.market_ids(symbols)
for i in range(0, len(marketIds)):
marketId = marketIds[i]
messageHashes.append('bidask.' + symbols[i])
topics.append('ticker.' + marketId)
url = self.urls['api']['ws']['public']
id = self.nonce()
request: dict = {
'method': 'subscribe',
'params': {
'channels': topics,
},
'nonce': id,
}
newTickers = await self.watch_multiple(url, messageHashes, self.extend(request, params), messageHashes)
if self.newUpdates:
tickers: dict = {}
tickers[newTickers['symbol']] = newTickers
return tickers
return self.filter_by_array(self.bidsasks, 'symbol', symbols)
def handle_bid_ask(self, client: Client, message):
data = self.safe_list(message, 'data', [])
ticker = self.safe_dict(data, 0, {})
parsedTicker = self.parse_ws_bid_ask(ticker)
symbol = parsedTicker['symbol']
self.bidsasks[symbol] = parsedTicker
messageHash = 'bidask.' + symbol
client.resolve(parsedTicker, messageHash)
def parse_ws_bid_ask(self, ticker, market=None):
marketId = self.safe_string(ticker, 'i')
market = self.safe_market(marketId, market)
symbol = self.safe_string(market, 'symbol')
timestamp = self.safe_integer(ticker, 't')
return self.safe_ticker({
'symbol': symbol,
'timestamp': timestamp,
'datetime': self.iso8601(timestamp),
'ask': self.safe_string(ticker, 'k'),
'askVolume': self.safe_string(ticker, 'ks'),
'bid': self.safe_string(ticker, 'b'),
'bidVolume': self.safe_string(ticker, 'bs'),
'info': ticker,
}, market)
async def watch_ohlcv(self, symbol: str, timeframe: str = '1m', since: Int = None, limit: Int = None, params={}) -> List[list]:
"""
watches historical candlestick data containing the open, high, low, and close price, and the volume of a market
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#candlestick-time_frame-instrument_name
:param str symbol: unified symbol of the market to fetch OHLCV data for
:param str timeframe: the length of time each candle represents
:param int [since]: timestamp in ms of the earliest candle to fetch
:param int [limit]: the maximum amount of candles to fetch
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns int[][]: A list of candles ordered, open, high, low, close, volume
"""
await self.load_markets()
market = self.market(symbol)
symbol = market['symbol']
interval = self.safe_string(self.timeframes, timeframe, timeframe)
messageHash = 'candlestick' + '.' + interval + '.' + market['id']
ohlcv = await self.watch_public(messageHash, params)
if self.newUpdates:
limit = ohlcv.getLimit(symbol, limit)
return self.filter_by_since_limit(ohlcv, since, limit, 0, True)
async def un_watch_ohlcv(self, symbol: str, timeframe: str = '1m', params={}) -> Any:
"""
unWatches historical candlestick data containing the open, high, low, and close price, and the volume of a market
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#candlestick-time_frame-instrument_name
:param str symbol: unified symbol of the market to fetch OHLCV data for
:param str timeframe: the length of time each candle represents
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns int[][]: A list of candles ordered, open, high, low, close, volume
"""
await self.load_markets()
market = self.market(symbol)
symbol = market['symbol']
interval = self.safe_string(self.timeframes, timeframe, timeframe)
subMessageHash = 'candlestick' + '.' + interval + '.' + market['id']
messageHash = 'unsubscribe:ohlcv:' + market['symbol'] + ':' + timeframe
subExtend = {
'symbolsAndTimeframes': [[market['symbol'], timeframe]],
}
return await self.un_watch_public_multiple('ohlcv', [market['symbol']], [messageHash], [subMessageHash], [subMessageHash], params, subExtend)
def handle_ohlcv(self, client: Client, message):
#
# {
# "instrument_name": "BTC_USDT",
# "subscription": "candlestick.1m.BTC_USDT",
# "channel": "candlestick",
# "depth": 300,
# "interval": "1m",
# "data": [[Object]]
# }
#
messageHash = self.safe_string(message, 'subscription')
marketId = self.safe_string(message, 'instrument_name')
market = self.safe_market(marketId)
symbol = market['symbol']
interval = self.safe_string(message, 'interval')
timeframe = self.find_timeframe(interval)
self.ohlcvs[symbol] = self.safe_value(self.ohlcvs, symbol, {})
stored = self.safe_value(self.ohlcvs[symbol], timeframe)
if stored is None:
limit = self.safe_integer(self.options, 'OHLCVLimit', 1000)
stored = ArrayCacheByTimestamp(limit)
self.ohlcvs[symbol][timeframe] = stored
data = self.safe_value(message, 'data')
for i in range(0, len(data)):
tick = data[i]
parsed = self.parse_ohlcv(tick, market)
stored.append(parsed)
client.resolve(stored, messageHash)
async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]:
"""
watches information on multiple orders made by the user
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#user-order-instrument_name
:param str symbol: unified market symbol of the market orders were made in
:param int [since]: the earliest time in ms to fetch orders for
:param int [limit]: the maximum number of order structures to retrieve
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict[]: a list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
"""
await self.load_markets()
market = None
if symbol is not None:
market = self.market(symbol)
symbol = market['symbol']
messageHash = 'user.order'
messageHash = (messageHash + '.' + market['id']) if (market is not None) else messageHash
orders = await self.watch_private_subscribe(messageHash, params)
if self.newUpdates:
limit = orders.getLimit(symbol, limit)
return self.filter_by_symbol_since_limit(orders, symbol, since, limit, True)
def handle_orders(self, client: Client, message, subscription=None):
#
# {
# "method": "subscribe",
# "result": {
# "instrument_name": "ETH_CRO",
# "subscription": "user.order.ETH_CRO",
# "channel": "user.order",
# "data": [
# {
# "status": "ACTIVE",
# "side": "BUY",
# "price": 1,
# "quantity": 1,
# "order_id": "366455245775097673",
# "client_oid": "my_order_0002",
# "create_time": 1588758017375,
# "update_time": 1588758017411,
# "type": "LIMIT",
# "instrument_name": "ETH_CRO",
# "cumulative_quantity": 0,
# "cumulative_value": 0,
# "avg_price": 0,
# "fee_currency": "CRO",
# "time_in_force":"GOOD_TILL_CANCEL"
# }
# ],
# "channel": "user.order.ETH_CRO"
# }
# }
#
channel = self.safe_string(message, 'channel')
symbolSpecificMessageHash = self.safe_string(message, 'subscription')
orders = self.safe_value(message, 'data', [])
ordersLength = len(orders)
if ordersLength > 0:
if self.orders is None:
limit = self.safe_integer(self.options, 'ordersLimit', 1000)
self.orders = ArrayCacheBySymbolById(limit)
stored = self.orders
parsed = self.parse_orders(orders)
for i in range(0, len(parsed)):
stored.append(parsed[i])
client.resolve(stored, symbolSpecificMessageHash)
# non-symbol specific
client.resolve(stored, channel) # channel might have a symbol-specific suffix
client.resolve(stored, 'user.order')
async def watch_positions(self, symbols: Strings = None, since: Int = None, limit: Int = None, params={}) -> List[Position]:
"""
watch all open positions
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#user-position_balance
:param str[] [symbols]: list of unified market symbols to watch positions for
:param int [since]: the earliest time in ms to fetch positions for
:param int [limit]: the maximum number of positions to retrieve
:param dict params: extra parameters specific to the exchange API endpoint
:returns dict[]: a list of `position structure <https://docs.ccxt.com/en/latest/manual.html#position-structure>`
"""
await self.load_markets()
await self.authenticate()
url = self.urls['api']['ws']['private']
id = self.nonce()
request: dict = {
'method': 'subscribe',
'params': {
'channels': ['user.position_balance'],
},
'nonce': id,
}
messageHash = 'positions'
symbols = self.market_symbols(symbols)
if not self.is_empty(symbols):
messageHash = '::' + ','.join(symbols)
client = self.client(url)
self.set_positions_cache(client, symbols)
fetchPositionsSnapshot = self.handle_option('watchPositions', 'fetchPositionsSnapshot', True)
awaitPositionsSnapshot = self.handle_option('watchPositions', 'awaitPositionsSnapshot', True)
if fetchPositionsSnapshot and awaitPositionsSnapshot and self.positions is None:
snapshot = await client.future('fetchPositionsSnapshot')
return self.filter_by_symbols_since_limit(snapshot, symbols, since, limit, True)
newPositions = await self.watch(url, messageHash, self.extend(request, params))
if self.newUpdates:
return newPositions
return self.filter_by_symbols_since_limit(self.positions, symbols, since, limit, True)
def set_positions_cache(self, client: Client, type, symbols: Strings = None):
fetchPositionsSnapshot = self.handle_option('watchPositions', 'fetchPositionsSnapshot', False)
if fetchPositionsSnapshot:
messageHash = 'fetchPositionsSnapshot'
if not (messageHash in client.futures):
client.future(messageHash)
self.spawn(self.load_positions_snapshot, client, messageHash)
else:
self.positions = ArrayCacheBySymbolBySide()
async def load_positions_snapshot(self, client, messageHash):
positions = await self.fetch_positions()
self.positions = ArrayCacheBySymbolBySide()
cache = self.positions
for i in range(0, len(positions)):
position = positions[i]
contracts = self.safe_number(position, 'contracts', 0)
if contracts > 0:
cache.append(position)
# don't remove the future from the .futures cache
future = client.futures[messageHash]
future.resolve(cache)
client.resolve(cache, 'positions')
def handle_positions(self, client, message):
#
# {
# "subscription": "user.position_balance",
# "channel": "user.position_balance",
# "data": [{
# "balances": [{
# "instrument_name": "USD",
# "quantity": "8.9979961950886",
# "update_timestamp_ms": 1695598760597,
# }],
# "positions": [{
# "account_id": "96a0edb1-afb5-4c7c-af89-5cb610319e2c",
# "instrument_name": "LTCUSD-PERP",
# "type": "PERPETUAL_SWAP",
# "quantity": "1.8",
# "cost": "114.766",
# "open_position_pnl": "-0.0216206",
# "session_pnl": "0.00962994",
# "update_timestamp_ms": 1695598760597,
# "open_pos_cost": "114.766",
# }],
# }],
# }
#
# each account is connected to a different endpoint
# and has exactly one subscriptionhash which is the account type
data = self.safe_value(message, 'data', [])
firstData = self.safe_value(data, 0, {})
rawPositions = self.safe_value(firstData, 'positions', [])
if self.positions is None:
self.positions = ArrayCacheBySymbolBySide()
cache = self.positions
newPositions = []
for i in range(0, len(rawPositions)):
rawPosition = rawPositions[i]
position = self.parse_position(rawPosition)
newPositions.append(position)
cache.append(position)
messageHashes = self.find_message_hashes(client, 'positions::')
for i in range(0, len(messageHashes)):
messageHash = messageHashes[i]
parts = messageHash.split('::')
symbolsString = parts[1]
symbols = symbolsString.split(',')
positions = self.filter_by_array(newPositions, 'symbol', symbols, False)
if not self.is_empty(positions):
client.resolve(positions, messageHash)
client.resolve(newPositions, 'positions')
async def watch_balance(self, params={}) -> Balances:
"""
watch balance and get the amount of funds available for trading or funds locked in orders
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#user-balance
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: a `balance structure <https://docs.ccxt.com/#/?id=balance-structure>`
"""
messageHash = 'user.balance'
return await self.watch_private_subscribe(messageHash, params)
def handle_balance(self, client: Client, message):
#
# {
# "id": 1,
# "method": "subscribe",
# "code": 0,
# "result": {
# "subscription": "user.balance",
# "channel": "user.balance",
# "data": [
# {
# "total_available_balance": "5.84684368",
# "total_margin_balance": "5.84684368",
# "total_initial_margin": "0",
# "total_maintenance_margin": "0",
# "total_position_cost": "0",
# "total_cash_balance": "6.44412101",
# "total_collateral_value": "5.846843685",
# "total_session_unrealized_pnl": "0",
# "instrument_name": "USD",
# "total_session_realized_pnl": "0",
# "position_balances": [
# {
# "quantity": "0.0002119875",
# "reserved_qty": "0",
# "collateral_weight": "0.9",
# "collateral_amount": "5.37549592",
# "market_value": "5.97277325",
# "max_withdrawal_balance": "0.00021198",
# "instrument_name": "BTC",
# "hourly_interest_rate": "0"
# },
# ],
# "total_effective_leverage": "0",
# "position_limit": "3000000",
# "used_position_limit": "0",
# "total_borrow": "0",
# "margin_score": "0",
# "is_liquidating": False,
# "has_risk": False,
# "terminatable": True
# }
# ]
# }
# }
#
messageHash = self.safe_string(message, 'subscription')
data = self.safe_value(message, 'data', [])
positionBalances = self.safe_value(data[0], 'position_balances', [])
self.balance['info'] = data
for i in range(0, len(positionBalances)):
balance = positionBalances[i]
currencyId = self.safe_string(balance, 'instrument_name')
code = self.safe_currency_code(currencyId)
account = self.account()
account['total'] = self.safe_string(balance, 'quantity')
account['used'] = self.safe_string(balance, 'reserved_qty')
self.balance[code] = account
self.balance = self.safe_balance(self.balance)
client.resolve(self.balance, messageHash)
messageHashRequest = self.safe_string(message, 'id')
client.resolve(self.balance, messageHashRequest)
async def create_order_ws(self, symbol: str, type: OrderType, side: OrderSide, amount: float, price: Num = None, params={}) -> Order:
"""
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#private-create-order
create a trade order
:param str symbol: unified symbol of the market to create an order in
:param str type: 'market' or 'limit'
:param str side: 'buy' or 'sell'
:param float amount: how much of currency you want to trade in units of base currency
:param float [price]: the price at which the order is to be fulfilled, in units of the quote currency, ignored in market orders
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: an `order structure <https://docs.ccxt.com/#/?id=order-structure>`
"""
await self.load_markets()
params = self.create_order_request(symbol, type, side, amount, price, params)
request: dict = {
'method': 'private/create-order',
'params': params,
}
messageHash = self.nonce()
return await self.watch_private_request(messageHash, request)
async def edit_order_ws(self, id: str, symbol: str, type: OrderType, side: OrderSide, amount: Num = None, price: Num = None, params={}) -> Order:
"""
edit a trade order
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#private-amend-order
:param str id: order id
:param str symbol: unified market symbol of the order to edit
:param str [type]: not used by cryptocom editOrder
:param str [side]: not used by cryptocom editOrder
:param float amount:(mandatory) how much of the currency you want to trade in units of the base currency
:param float price:(mandatory) the price for the order, in units of the quote currency, ignored in market orders
:param dict [params]: extra parameters specific to the exchange API endpoint
:param str [params.clientOrderId]: the original client order id of the order to edit, required if id is not provided
:returns dict: an `order structure <https://docs.ccxt.com/#/?id=order-structure>`
"""
await self.load_markets()
params = self.edit_order_request(id, symbol, amount, price, params)
request: dict = {
'method': 'private/amend-order',
'params': params,
}
messageHash = self.nonce()
return await self.watch_private_request(messageHash, request)
def handle_order(self, client: Client, message):
#
# {
# "id": 1,
# "method": "private/create-order",
# "code": 0,
# "result": {
# "client_oid": "c5f682ed-7108-4f1c-b755-972fcdca0f02",
# "order_id": "18342311"
# }
# }
#
messageHash = self.safe_string(message, 'id')
rawOrder = self.safe_value(message, 'result', {})
order = self.parse_order(rawOrder)
client.resolve(order, messageHash)
async def cancel_order_ws(self, id: str, symbol: Str = None, params={}) -> Order:
"""
cancels an open order
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#private-cancel-order
:param str id: the order id of the order to cancel
:param str [symbol]: unified symbol of the market the order was made in
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: An `order structure <https://docs.ccxt.com/#/?id=order-structure>`
"""
await self.load_markets()
params = self.extend({
'order_id': id,
}, params)
request: dict = {
'method': 'private/cancel-order',
'params': params,
}
messageHash = self.nonce()
return await self.watch_private_request(messageHash, request)
async def cancel_all_orders_ws(self, symbol: Str = None, params={}):
"""
cancel all open orders
https://exchange-docs.crypto.com/exchange/v1/rest-ws/index.html#private-cancel-all-orders
:param str symbol: unified market symbol of the orders to cancel
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict} Returns exchange raw message {@link https://docs.ccxt.com/#/?id=order-structure:
"""
await self.load_markets()
market = None
request: dict = {
'method': 'private/cancel-all-orders',
'params': self.extend({}, params),
}
if symbol is not None:
market = self.market(symbol)
request['params']['instrument_name'] = market['id']
messageHash = self.nonce()
return await self.watch_private_request(messageHash, request)
def handle_cancel_all_orders(self, client: Client, message):
#
# {
# "id": 1688914586647,
# "method": "private/cancel-all-orders",
# "code": 0
# }
#
messageHash = self.safe_string(message, 'id')
client.resolve(message, messageHash)
async def watch_public(self, messageHash, params={}):
url = self.urls['api']['ws']['public']
id = self.nonce()
request: dict = {
'method': 'subscribe',
'params': {
'channels': [messageHash],
},
'nonce': id,
}
message = self.extend(request, params)
return await self.watch(url, messageHash, message, messageHash)
async def watch_public_multiple(self, messageHashes, topics, params={}):
url = self.urls['api']['ws']['public']
id = self.nonce()
request: dict = {
'method': 'subscribe',
'params': {
'channels': topics,
},
'nonce': id,
}
message = self.deep_extend(request, params)
return await self.watch_multiple(url, messageHashes, message, messageHashes)
async def un_watch_public_multiple(self, topic: str, symbols: List[str], messageHashes: List[str], subMessageHashes: List[str], topics: List[str], params={}, subExtend={}):
url = self.urls['api']['ws']['public']
id = self.nonce()
request: dict = {
'method': 'unsubscribe',
'params': {
'channels': topics,
},
'nonce': id,
'id': str(id),
}
subscription = {
'id': str(id),
'topic': topic,
'symbols': symbols,
'subMessageHashes': subMessageHashes,
'messageHashes': messageHashes,
}
message = self.deep_extend(request, params)
return await self.watch_multiple(url, messageHashes, message, messageHashes, self.extend(subscription, subExtend))
async def watch_private_request(self, nonce, params={}):
await self.authenticate()
url = self.urls['api']['ws']['private']
request: dict = {
'id': nonce,
'nonce': nonce,
}
message = self.extend(request, params)
return await self.watch(url, str(nonce), message, True)
async def watch_private_subscribe(self, messageHash, params={}):
await self.authenticate()
url = self.urls['api']['ws']['private']
id = self.nonce()
request: dict = {
'method': 'subscribe',
'params': {
'channels': [messageHash],
},
'nonce': id,
}
message = self.extend(request, params)
return await self.watch(url, messageHash, message, messageHash)
def handle_error_message(self, client: Client, message) -> Bool:
#
# {
# "id": 0,
# "code": 10004,
# "method": "subscribe",
# "message": "invalid channel {"channels":["trade.BTCUSD-PERP"]}"
# }
#
id = self.safe_string(message, 'id')
errorCode = self.safe_string(message, 'code')
try:
if errorCode and errorCode != '0':
feedback = self.id + ' ' + self.json(message)
self.throw_exactly_matched_exception(self.exceptions['exact'], errorCode, feedback)
messageString = self.safe_value(message, 'message')
if messageString is not None:
self.throw_broadly_matched_exception(self.exceptions['broad'], messageString, feedback)
raise ExchangeError(feedback)
return False
except Exception as e:
if isinstance(e, AuthenticationError):
messageHash = 'authenticated'
client.reject(e, messageHash)
if messageHash in client.subscriptions:
del client.subscriptions[messageHash]
else:
client.reject(e, id)
return True
def handle_subscribe(self, client: Client, message):
methods: dict = {
'candlestick': self.handle_ohlcv,
'ticker': self.handle_ticker,
'trade': self.handle_trades,
'book': self.handle_order_book,
'book.update': self.handle_order_book,
'user.order': self.handle_orders,
'user.trade': self.handle_trades,
'user.balance': self.handle_balance,
'user.position_balance': self.handle_positions,
}
result = self.safe_value_2(message, 'result', 'info')
channel = self.safe_string(result, 'channel')
if (channel is not None) and channel.find('user.trade') > -1:
# channel might be user.trade.BTC_USDT
self.handle_trades(client, result)
if (channel is not None) and channel.startswith('user.order'):
# channel might be user.order.BTC_USDT
self.handle_orders(client, result)
method = self.safe_value(methods, channel)
if method is not None:
method(client, result)
def handle_message(self, client: Client, message):
#
# ping
# {
# "id": 1587523073344,
# "method": "public/heartbeat",
# "code": 0
# }
# auth
# {id: 1648132625434, method: "public/auth", code: 0}
# ohlcv
# {
# "code": 0,
# "method": "subscribe",
# "result": {
# "instrument_name": "BTC_USDT",
# "subscription": "candlestick.1m.BTC_USDT",
# "channel": "candlestick",
# "depth": 300,
# "interval": "1m",
# "data": [[Object]]
# }
# }
# ticker
# {
# "info":{
# "instrument_name":"BTC_USDT",
# "subscription":"ticker.BTC_USDT",
# "channel":"ticker",
# "data":[{}]
#
# handle unsubscribe
# {"id":1725448572836,"method":"unsubscribe","code":0}
#
if self.handle_error_message(client, message):
return
method = self.safe_string(message, 'method')
methods: dict = {
'': self.handle_ping,
'public/heartbeat': self.handle_ping,
'public/auth': self.handle_authenticate,
'private/create-order': self.handle_order,
'private/amend-order': self.handle_order,
'private/cancel-order': self.handle_order,
'private/cancel-all-orders': self.handle_cancel_all_orders,
'private/close-position': self.handle_order,
'subscribe': self.handle_subscribe,
'unsubscribe': self.handle_unsubscribe,
}
callMethod = self.safe_value(methods, method)
if callMethod is not None:
callMethod(client, message)
async def authenticate(self, params={}):
self.check_required_credentials()
url = self.urls['api']['ws']['private']
client = self.client(url)
messageHash = 'authenticated'
future = client.reusableFuture(messageHash)
authenticated = self.safe_value(client.subscriptions, messageHash)
if authenticated is None:
method = 'public/auth'
nonce = str(self.nonce())
auth = method + nonce + self.apiKey + nonce
signature = self.hmac(self.encode(auth), self.encode(self.secret), hashlib.sha256)
request: dict = {
'id': nonce,
'nonce': nonce,
'method': method,
'api_key': self.apiKey,
'sig': signature,
}
message = self.extend(request, params)
self.watch(url, messageHash, message, messageHash)
return await future
def handle_ping(self, client: Client, message):
self.spawn(self.pong, client, message)
def handle_authenticate(self, client: Client, message):
#
# {id: 1648132625434, method: "public/auth", code: 0}
#
future = self.safe_value(client.futures, 'authenticated')
future.resolve(True)
def handle_unsubscribe(self, client: Client, message):
id = self.safe_string(message, 'id')
keys = list(client.subscriptions.keys())
for i in range(0, len(keys)):
messageHash = keys[i]
if not (messageHash in client.subscriptions):
continue
# the previous iteration can have deleted the messageHash from the subscriptions
if messageHash.startswith('unsubscribe'):
subscription = client.subscriptions[messageHash]
subId = self.safe_string(subscription, 'id')
if id != subId:
continue
messageHashes = self.safe_list(subscription, 'messageHashes', [])
subMessageHashes = self.safe_list(subscription, 'subMessageHashes', [])
for j in range(0, len(messageHashes)):
unsubHash = messageHashes[j]
subHash = subMessageHashes[j]
self.clean_unsubscription(client, subHash, unsubHash)
self.clean_cache(subscription)