Files
ccxt_with_mt5/ccxt/pro/toobit.py
lz_db 0fab423a18 add
2025-11-16 12:31:03 +08:00

1110 lines
46 KiB
Python

# -*- coding: utf-8 -*-
# PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN:
# https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code
import ccxt.async_support
from ccxt.async_support.base.ws.cache import ArrayCache, ArrayCacheBySymbolById, ArrayCacheBySymbolBySide, ArrayCacheByTimestamp
from ccxt.base.types import Any, Balances, Bool, Int, Market, Order, OrderBook, Position, Str, Strings, Ticker, Tickers, Trade
from ccxt.async_support.base.ws.client import Client
from typing import List
from ccxt.base.errors import ExchangeError
from ccxt.base.errors import AuthenticationError
from ccxt.base.errors import NotSupported
class toobit(ccxt.async_support.toobit):
def describe(self) -> Any:
return self.deep_extend(super(toobit, self).describe(), {
'has': {
'ws': True,
'watchBalance': True,
'watchMyTrades': True,
'watchOHLCV': True,
'watchOHLCVForSymbols': True,
'watchOrderBook': True,
'watchOrderBookForSymbols': True,
'watchOrders': True,
'watchTicker': True,
'watchTickers': True,
'watchTrades': True,
'watchTradesForSymbols': True,
# 'watchPosition': False,
},
'urls': {
'api': {
'ws': {
'common': 'wss://stream.toobit.com',
},
},
},
'options': {
'ws': {
'timeframes': {
'1m': '1m',
'3m': '3m',
'5m': '5m',
'15m': '15m',
'30m': '30m',
'1h': '1h',
'2h': '2h',
'4h': '4h',
'6h': '6h',
'8h': '8h',
'12h': '12h',
'1d': '1d',
'1w': '1w',
'1M': '1M',
},
'watchOrderBook': {
'channel': 'depth', # depth, diffDepth
},
'listenKeyRefreshRate': 1200000, # 20 mins
},
},
'streaming': {
'keepAlive': (60 - 1) * 5 * 1000, # every 5 minutes
'ping': self.ping,
},
'exceptions': {
'ws': {
'exact': {
},
},
},
})
def ping(self, client: Client):
return {
'ping': self.milliseconds(),
}
def handle_message(self, client: Client, message):
#
# public
#
# {
# topic: "trade",
# symbol: "DOGEUSDT",
# symbolName: "DOGEUSDT",
# params: {
# realtimeInterval: "24h",
# binary: "false",
# },
# data: [
# {
# v: "4864732022868004630",
# t: 1757243788405,
# p: "0.21804",
# q: "80",
# m: True,
# },
# ],
# f: True, # initial first snapshot or not
# sendTime: 1757244002117,
# shared: False,
# }
#
# private
#
# [
# {
# e: 'outboundContractAccountInfo',
# E: '1758228398234',
# T: True,
# W: True,
# D: True,
# B: [[Object]]
# }
# ]
#
topic = self.safe_string(message, 'topic')
if self.handle_error_message(client, message):
return
#
# handle ping-pong: {ping: 1758540450000}
#
pongTimestamp = self.safe_integer(message, 'pong')
if pongTimestamp is not None:
self.handle_incoming_pong(client, pongTimestamp)
return
methods: dict = {
'trade': self.handle_trades,
'kline': self.handle_ohlcv,
'realtimes': self.handle_tickers,
'depth': self.handle_order_book_partial_snapshot,
'diffDepth': self.handle_order_book,
'outboundAccountInfo': self.handle_balance,
'outboundContractAccountInfo': self.handle_balance,
'executionReport': self.handle_order,
'contractExecutionReport': self.handle_order,
'ticketInfo': self.handle_my_trade,
'outboundContractPositionInfo': self.handle_positions,
}
method = self.safe_value(methods, topic)
if method is not None:
method(client, message)
else:
# check private streams
for i in range(0, len(message)):
item = message[i]
event = self.safe_string(item, 'e')
method2 = self.safe_value(methods, event)
if method2 is not None:
method2(client, item)
def handle_incoming_pong(self, client: Client, pongTimestamp: Int):
client.lastPong = pongTimestamp
async def watch_trades(self, symbol: str, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
"""
watches information on multiple trades made in a market
https://toobit-docs.github.io/apidocs/spot/v1/en/#trade-streams
:param str symbol: unified market symbol of the market trades were made in
:param int [since]: the earliest time in ms to fetch trades for
:param int [limit]: the maximum number of trade structures to retrieve
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=trade-structure>`
"""
return await self.watch_trades_for_symbols([symbol], since, limit, params)
async def watch_trades_for_symbols(self, symbols: List[str], since: Int = None, limit: Int = None, params={}) -> List[Trade]:
"""
get the list of most recent trades for a list of symbols
https://toobit-docs.github.io/apidocs/spot/v1/en/#trade-streams
:param str[] symbols: unified symbol of the market to fetch trades for
:param int [since]: timestamp in ms of the earliest trade to fetch
:param int [limit]: the maximum amount of trades to fetch
:param dict [params]: extra parameters specific to the exchange API endpoint
:param str [params.name]: the name of the method to call, 'trade' or 'aggTrade', default is 'trade'
:returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
"""
await self.load_markets()
symbols = self.market_symbols(symbols, None, False)
messageHashes = []
subParams = []
for i in range(0, len(symbols)):
symbol = symbols[i]
market = self.market(symbol)
messageHashes.append('trade::' + symbol)
rawHash = market['id']
subParams.append(rawHash)
marketIds = self.market_ids(symbols)
url = self.urls['api']['ws']['common'] + '/quote/ws/v1'
request: dict = {
'symbol': ','.join(marketIds),
'topic': 'trade',
'event': 'sub',
}
trades = await self.watch_multiple(url, messageHashes, self.extend(request, params), messageHashes)
if self.newUpdates:
first = self.safe_value(trades, 0)
tradeSymbol = self.safe_string(first, 'symbol')
limit = trades.getLimit(tradeSymbol, limit)
return self.filter_by_since_limit(trades, since, limit, 'timestamp', True)
def handle_trades(self, client: Client, message):
#
# {
# symbol: "DOGEUSDT",
# symbolName: "DOGEUSDT",
# topic: "trade",
# params: {
# realtimeInterval: "24h",
# binary: "false",
# },
# data: [
# {
# v: "4864732022868004630",
# t: 1757243788405,
# p: "0.21804",
# q: "80",
# m: True,
# },
# ],
# f: True, # initial first snapshot or not
# sendTime: 1757244002117,
# shared: False,
# }
#
marketId = self.safe_string(message, 'symbol')
market = self.safe_market(marketId)
symbol = market['symbol']
if not (symbol in self.trades):
limit = self.safe_integer(self.options, 'tradesLimit', 1000)
self.trades[symbol] = ArrayCache(limit)
stored = self.trades[symbol]
data = self.safe_list(message, 'data', [])
parsed = self.parse_ws_trades(data, market)
for i in range(0, len(parsed)):
trade = parsed[i]
trade['symbol'] = symbol
stored.append(trade)
messageHash = 'trade::' + symbol
client.resolve(stored, messageHash)
def parse_ws_trade(self, trade: dict, market: Market = None) -> Trade:
return self.parse_trade(trade, market)
async def watch_ohlcv(self, symbol: str, timeframe='1m', since: Int = None, limit: Int = None, params={}) -> List[list]:
"""
watches historical candlestick data containing the open, high, low, and close price, and the volume of a market
https://toobit-docs.github.io/apidocs/spot/v1/en/#kline-candlestick-streams
:param str symbol: unified symbol of the market to fetch OHLCV data for
:param str timeframe: the length of time each candle represents
:param int [since]: timestamp in ms of the earliest candle to fetch
:param int [limit]: the maximum amount of candles to fetch
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns int[][]: A list of candles ordered, open, high, low, close, volume
"""
params['callerMethodName'] = 'watchOHLCV'
result = await self.watch_ohlcv_for_symbols([[symbol, timeframe]], since, limit, params)
return result[symbol][timeframe]
async def watch_ohlcv_for_symbols(self, symbolsAndTimeframes: List[List[str]], since: Int = None, limit: Int = None, params={}):
"""
watches historical candlestick data containing the open, high, low, and close price, and the volume of a market
https://toobit-docs.github.io/apidocs/spot/v1/en/#kline-candlestick-streams
:param str[][] symbolsAndTimeframes: array of arrays containing unified symbols and timeframes to fetch OHLCV data for, example [['BTC/USDT', '1m'], ['LTC/USDT', '5m']]
:param int [since]: timestamp in ms of the earliest candle to fetch
:param int [limit]: the maximum amount of candles to fetch
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: A list of candles ordered, open, high, low, close, volume
"""
await self.load_markets()
url = self.urls['api']['ws']['common'] + '/quote/ws/v1'
messageHashes = []
timeframes = self.safe_dict(self.options['ws'], 'timeframes', {})
marketIds = []
selectedTimeframe: Str = None
for i in range(0, len(symbolsAndTimeframes)):
data = symbolsAndTimeframes[i]
symbolStr = self.safe_string(data, 0)
market = self.market(symbolStr)
marketId = market['id']
unfiedTimeframe = self.safe_string(data, 1, '1m')
rawTimeframe = self.safe_string(timeframes, unfiedTimeframe, unfiedTimeframe)
if selectedTimeframe is not None and selectedTimeframe != rawTimeframe:
raise NotSupported(self.id + ' watchOHLCVForSymbols() only supports a single timeframe for all symbols')
else:
selectedTimeframe = rawTimeframe
marketIds.append(marketId)
messageHashes.append('ohlcv::' + symbolStr + '::' + unfiedTimeframe)
request: dict = {
'symbol': ','.join(marketIds),
'topic': 'kline_' + selectedTimeframe,
'event': 'sub',
}
symbol, timeframe, stored = await self.watch_multiple(url, messageHashes, self.extend(request, params), messageHashes)
if self.newUpdates:
limit = stored.getLimit(symbol, limit)
filtered = self.filter_by_since_limit(stored, since, limit, 0, True)
return self.create_ohlcv_object(symbol, timeframe, filtered)
def handle_ohlcv(self, client: Client, message):
#
# {
# symbol: 'DOGEUSDT',
# symbolName: 'DOGEUSDT',
# klineType: '1m',
# topic: 'kline',
# params: {realtimeInterval: '24h', klineType: '1m', binary: 'false'},
# data: [
# {
# t: 1757251200000,
# s: 'DOGEUSDT',
# sn: 'DOGEUSDT',
# c: '0.21889',
# h: '0.21898',
# l: '0.21889',
# o: '0.21897',
# v: '5247',
# st: 0
# }
# ],
# f: True,
# sendTime: 1757251217643,
# shared: False
# }
#
marketId = self.safe_string(message, 'symbol')
market = self.market(marketId)
symbol = market['symbol']
params = self.safe_dict(message, 'params', {})
timeframeId = self.safe_string(params, 'klineType')
timeframe = self.find_timeframe(timeframeId)
if not (symbol in self.ohlcvs):
self.ohlcvs[symbol] = {}
if not (timeframe in self.ohlcvs[symbol]):
limit = self.safe_integer(self.options['ws'], 'OHLCVLimit', 1000)
self.ohlcvs[symbol][timeframe] = ArrayCacheByTimestamp(limit)
stored = self.ohlcvs[symbol][timeframe]
data = self.safe_list(message, 'data', [])
for i in range(0, len(data)):
parsed = self.parse_ws_ohlcv(data[i], market)
stored.append(parsed)
messageHash = 'ohlcv::' + symbol + '::' + timeframe
resolveData = [symbol, timeframe, stored]
client.resolve(resolveData, messageHash)
def parse_ws_ohlcv(self, ohlcv, market=None) -> list:
#
# {
# t: 1757251200000,
# o: '0.21897',
# h: '0.21898',
# l: '0.21889',
# c: '0.21889',
# v: '5247',
# s: 'DOGEUSDT',
# sn: 'DOGEUSDT',
# st: 0
# }
#
parsed = self.parse_ohlcv(ohlcv, market)
return parsed
async def watch_ticker(self, symbol: str, params={}) -> Ticker:
"""
https://toobit-docs.github.io/apidocs/spot/v1/en/#individual-symbol-ticker-streams
watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market
:param str symbol: unified symbol of the market to fetch the ticker for
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
"""
await self.load_markets()
symbol = self.symbol(symbol)
tickers = await self.watch_tickers([symbol], params)
return tickers[symbol]
async def watch_tickers(self, symbols: Strings = None, params={}) -> Tickers:
"""
https://toobit-docs.github.io/apidocs/spot/v1/en/#individual-symbol-ticker-streams
watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for all markets of a specific list
:param str[] symbols: unified symbol of the market to fetch the ticker for
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
"""
await self.load_markets()
symbols = self.market_symbols(symbols, None, False)
messageHashes = []
subParams = []
for i in range(0, len(symbols)):
symbol = symbols[i]
market = self.market(symbol)
messageHashes.append('ticker::' + symbol)
rawHash = market['id']
subParams.append(rawHash)
marketIds = self.market_ids(symbols)
url = self.urls['api']['ws']['common'] + '/quote/ws/v1'
request: dict = {
'symbol': ','.join(marketIds),
'topic': 'realtimes',
'event': 'sub',
}
ticker = await self.watch_multiple(url, messageHashes, self.extend(request, params), messageHashes)
if self.newUpdates:
result: dict = {}
result[ticker['symbol']] = ticker
return result
return self.filter_by_array(self.tickers, 'symbol', symbols)
def handle_tickers(self, client: Client, message):
#
# {
# "symbol": "DOGEUSDT",
# "symbolName": "DOGEUSDT",
# "topic": "realtimes",
# "params": {
# "realtimeInterval": "24h"
# },
# "data": [
# {
# "t": 1757257643683,
# "s": "DOGEUSDT",
# "o": "0.21462",
# "h": "0.22518",
# "l": "0.21229",
# "c": "0.2232",
# "v": "283337017",
# "qv": "62063771.42702",
# "sn": "DOGEUSDT",
# "m": "0.04",
# "e": 301,
# "c24h": "0.2232",
# "h24h": "0.22518",
# "l24h": "0.21229",
# "o24h": "0.21462",
# "v24h": "283337017",
# "qv24h": "62063771.42702",
# "m24h": "0.04"
# }
# ],
# "f": False,
# "sendTime": 1757257643751,
# "shared": False
# }
#
data = self.safe_list(message, 'data')
newTickers = {}
for i in range(0, len(data)):
ticker = data[i]
parsed = self.parse_ws_ticker(ticker)
symbol = parsed['symbol']
self.tickers[symbol] = parsed
newTickers[symbol] = parsed
messageHash = 'ticker::' + symbol
client.resolve(parsed, messageHash)
client.resolve(newTickers, 'tickers')
def parse_ws_ticker(self, ticker, market=None):
return self.parse_ticker(ticker, market)
async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook:
"""
watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
https://toobit-docs.github.io/apidocs/spot/v1/en/#partial-book-depth-streams
:param str symbol: unified symbol of the market to fetch the order book for
:param int [limit]: the maximum amount of order book entries to return.
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
"""
return await self.watch_order_book_for_symbols([symbol], limit, params)
async def watch_order_book_for_symbols(self, symbols: List[str], limit: Int = None, params={}) -> OrderBook:
"""
watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
https://toobit-docs.github.io/apidocs/spot/v1/en/#partial-book-depth-streams
:param str[] symbols: unified array of symbols
:param int [limit]: the maximum amount of order book entries to return.
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
"""
await self.load_markets()
symbols = self.market_symbols(symbols, None, False)
channel: Str = None
channel, params = self.handle_option_and_params(params, 'watchOrderBook', 'channel', 'depth')
messageHashes = []
subParams = []
for i in range(0, len(symbols)):
symbol = symbols[i]
market = self.market(symbol)
messageHashes.append('orderBook::' + symbol + '::' + channel)
rawHash = market['id']
subParams.append(rawHash)
marketIds = self.market_ids(symbols)
url = self.urls['api']['ws']['common'] + '/quote/ws/v1'
request: dict = {
'symbol': ','.join(marketIds),
'topic': channel,
'event': 'sub',
}
orderbook = await self.watch_multiple(url, messageHashes, self.extend(request, params), messageHashes)
return orderbook.limit()
def handle_order_book(self, client: Client, message):
#
# {
# symbol: 'DOGEUSDT',
# symbolName: 'DOGEUSDT',
# topic: 'depth',
# params: {realtimeInterval: '24h'},
# data: [
# {
# e: 301,
# t: 1757304842860,
# v: '9814355_1E-18',
# b: [Array],
# a: [Array],
# o: 0
# }
# ],
# f: False,
# sendTime: 1757304843047,
# shared: False
# }
#
isSnapshot = self.safe_bool(message, 'f', False)
if isSnapshot:
self.set_order_book_snapshot(client, message, 'diffDepth')
return
marketId = self.safe_string(message, 'symbol')
market = self.safe_market(marketId)
symbol = market['symbol']
data = self.safe_list(message, 'data', [])
for i in range(0, len(data)):
entry = data[i]
messageHash = 'orderBook::' + symbol + '::' + 'diffDepth'
if not (symbol in self.orderbooks):
limit = self.safe_integer(self.options['ws'], 'orderBookLimit', 1000)
self.orderbooks[symbol] = self.order_book({}, limit)
orderBook = self.orderbooks[symbol]
timestamp = self.safe_integer(entry, 't')
bids = self.safe_list(entry, 'b', [])
asks = self.safe_list(entry, 'a', [])
self.handle_deltas(orderBook['asks'], asks)
self.handle_deltas(orderBook['bids'], bids)
orderBook['timestamp'] = timestamp
self.orderbooks[symbol] = orderBook
client.resolve(orderBook, messageHash)
def handle_delta(self, bookside, delta):
bidAsk = self.parse_bid_ask(delta)
bookside.storeArray(bidAsk)
def handle_order_book_partial_snapshot(self, client: Client, message):
#
# {
# symbol: 'DOGEUSDT',
# symbolName: 'DOGEUSDT',
# topic: 'depth',
# params: {realtimeInterval: '24h'},
# data: [
# {
# e: 301,
# s: 'DOGEUSDT',
# t: 1757304842860,
# v: '9814355_1E-18',
# b: [Array],
# a: [Array],
# o: 0
# }
# ],
# f: False,
# sendTime: 1757304843047,
# shared: False
# }
#
self.set_order_book_snapshot(client, message, 'depth')
def set_order_book_snapshot(self, client: Client, message, channel: str):
data = self.safe_list(message, 'data', [])
length = len(data)
if length == 0:
return
for i in range(0, length):
entry = data[i]
marketId = self.safe_string(entry, 's')
symbol = self.safe_symbol(marketId)
messageHash = 'orderBook::' + symbol + '::' + channel
if not (symbol in self.orderbooks):
limit = self.safe_integer(self.options['ws'], 'orderBookLimit', 1000)
self.orderbooks[symbol] = self.order_book({}, limit)
orderbook = self.orderbooks[symbol]
timestamp = self.safe_integer(entry, 't')
snapshot = self.parse_order_book(entry, symbol, timestamp, 'b', 'a')
orderbook.reset(snapshot)
client.resolve(orderbook, messageHash)
async def watch_balance(self, params={}) -> Balances:
"""
query for balance and get the amount of funds available for trading or funds locked in orders
https://toobit-docs.github.io/apidocs/spot/v1/en/#payload-account-update
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: a `balance structure <https://docs.ccxt.com/#/?id=balance-structure>`
"""
await self.load_markets()
await self.authenticate()
marketType = None
marketType, params = self.handle_market_type_and_params('watchBalance', None, params)
isSpot = (marketType == 'spot')
type = 'spot' if isSpot else 'contract'
spotSubHash = 'spot:balance'
swapSubHash = 'contract:private'
spotMessageHash = 'spot:balance'
swapMessageHash = 'contract:balance'
messageHash = spotMessageHash if isSpot else swapMessageHash
subscriptionHash = spotSubHash if isSpot else swapSubHash
url = self.get_user_stream_url()
client = self.client(url)
self.set_balance_cache(client, marketType, subscriptionHash, params)
client.future(type + ':fetchBalanceSnapshot')
return await self.watch(url, messageHash, params, subscriptionHash)
def set_balance_cache(self, client: Client, marketType, subscriptionHash: Str = None, params={}):
if subscriptionHash in client.subscriptions:
return
type = 'spot' if (marketType == 'spot') else 'contract'
messageHash = type + ':fetchBalanceSnapshot'
if not (messageHash in client.futures):
client.future(messageHash)
self.spawn(self.load_balance_snapshot, client, messageHash, marketType)
def handle_balance(self, client: Client, message):
#
# spot
#
# [
# {
# e: 'outboundAccountInfo',
# E: '1758226989725',
# T: True,
# W: True,
# D: True,
# B: [
# {
# a: "USDT",
# f: "6.37242839",
# l: "0",
# },
# ]
# }
# ]
#
# contract
#
# [
# {
# e: 'outboundContractAccountInfo',
# E: '1758226989742',
# T: True,
# W: True,
# D: True,
# B: [[Object]]
# }
# ]
#
channel = self.safe_string(message, 'e')
data = self.safe_list(message, 'B', [])
timestamp = self.safe_integer(message, 'E')
type = 'contract' if (channel == 'outboundContractAccountInfo') else 'spot'
if not (type in self.balance):
self.balance[type] = {}
self.balance[type]['info'] = data
self.balance[type]['timestamp'] = timestamp
self.balance[type]['datetime'] = self.iso8601(timestamp)
for i in range(0, len(data)):
balance = data[i]
currencyId = self.safe_string(balance, 'a')
code = self.safe_currency_code(currencyId)
account = self.account()
account['info'] = balance
account['used'] = self.safe_string(balance, 'l')
account['free'] = self.safe_string(balance, 'f')
self.balance[type][code] = account
self.balance[type] = self.safe_balance(self.balance[type])
client.resolve(self.balance[type], type + ':balance')
async def load_balance_snapshot(self, client, messageHash, marketType):
response = await self.fetch_balance({'type': marketType})
type = 'spot' if (marketType == 'spot') else 'contract'
self.balance[type] = self.extend(response, self.safe_dict(self.balance, type, {}))
# don't remove the future from the .futures cache
future = client.futures[messageHash]
future.resolve()
client.resolve(self.balance[type], type + ':fetchBalanceSnapshot')
client.resolve(self.balance[type], type + ':balance') # we should also resolve right away after snapshot, so user doesn't double-fetch balance
async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]:
"""
watches information on multiple orders made by the user
https://toobit-docs.github.io/apidocs/spot/v1/en/#payload-order-update
:param str symbol: unified market symbol of the market orders were made in
:param int [since]: the earliest time in ms to fetch orders for
:param int [limit]: the maximum number of order structures to retrieve
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict[]: a list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
"""
await self.load_markets()
await self.authenticate()
market = self.market_or_None(symbol)
symbol = self.safe_string(market, 'symbol', symbol)
messageHash = 'orders'
if symbol is not None:
messageHash = messageHash + ':' + symbol
url = self.get_user_stream_url()
orders = await self.watch(url, messageHash, params, messageHash)
if self.newUpdates:
limit = orders.getLimit(symbol, limit)
return self.filter_by_symbol_since_limit(orders, symbol, since, limit, True)
def handle_order(self, client: Client, message):
#
# {
# "e": "executionReport",
# "E": "1758311011844",
# "s": "DOGEUSDT",
# "c": "1758311011948",
# "S": "BUY",
# "o": "LIMIT",
# "f": "GTC",
# "q": "22",
# "p": "0.23",
# "pt": "INPUT",
# "X": "NEW",
# "i": "2043255292855185152",
# "l": "0", # Last executed quantity
# "z": "0", # Cumulative filled quantity
# "L": "0", # Last executed price
# "n": "0",
# "N": "",
# "u": True,
# "w": True,
# "m": False,
# "O": "1758311011833",
# "U": "1758311011841",
# "Z": "0",
# "C": False,
# "v": "0",
# "rp": "0",
# "td": "0"
# }
#
if self.orders is None:
limit = self.safe_integer(self.options, 'ordersLimit', 1000)
self.orders = ArrayCacheBySymbolById(limit)
orders = self.orders
order = self.parse_ws_order(message)
orders.append(order)
messageHash = 'orders'
client.resolve(orders, messageHash)
messageHash = 'orders:' + self.safe_string(order, 'symbol')
client.resolve(orders, messageHash)
def parse_ws_order(self, order, market=None):
timestamp = self.safe_integer(order, 'O')
marketId = self.safe_string(order, 's')
symbol = self.safe_symbol(marketId, market)
priceType = self.safe_string_lower(order, 'pt')
rawOrderType = self.safe_string_lower(order, 'o')
orderType: Str = None
if priceType == 'market':
orderType = 'market'
else:
orderType = rawOrderType
feeCost = self.safe_number(order, 'n')
fee = None
if feeCost is not None:
fee = {
'cost': feeCost,
'currency': None,
}
return self.safe_order({
'info': order,
'id': self.safe_string(order, 'i'),
'clientOrderId': self.safe_string(order, 'c'),
'timestamp': timestamp,
'datetime': self.iso8601(timestamp),
'lastUpdateTimestamp': self.safe_integer_2(order, 'U', 'E'),
'symbol': symbol,
'type': orderType,
'timeInForce': self.safe_string_upper(order, 'f'),
'postOnly': None,
'side': self.safe_string_lower(order, 'S'),
'price': self.safe_string(order, 'L'),
'stopPrice': None,
'triggerPrice': None,
'amount': self.safe_string(order, 'q'),
'cost': None,
'average': self.safe_string(order, 'p'),
'filled': self.safe_string(order, 'z'),
'remaining': None,
'status': self.parse_order_status(self.safe_string(order, 'X')),
'fee': fee,
'trades': None,
}, market)
async def watch_my_trades(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
"""
watches information on multiple trades made by the user
https://toobit-docs.github.io/apidocs/spot/v1/en/#payload-ticket-push
:param str symbol: unified market symbol of the market trades were made in
:param int [since]: the earliest time in ms to fetch trades for
:param int [limit]: the maximum number of trade structures to retrieve
:param dict [params]: extra parameters specific to the exchange API endpoint
:param boolean [params.unifiedMargin]: use unified margin account
:returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=trade-structure>`
"""
await self.load_markets()
await self.authenticate()
market = self.market_or_None(symbol)
symbol = self.safe_string(market, 'symbol', symbol)
messageHash = 'myTrades'
if symbol is not None:
messageHash = messageHash + ':' + symbol
url = self.get_user_stream_url()
trades = await self.watch(url, messageHash, params, messageHash)
if self.newUpdates:
limit = trades.getLimit(symbol, limit)
return self.filter_by_since_limit(trades, since, limit, 'timestamp', True)
def handle_my_trade(self, client: Client, message):
#
# {
# "e": "ticketInfo",
# "E": "1758314657847",
# "s": "DOGEUSDT",
# "q": "22.0",
# "t": "1758314657842",
# "p": "0.26667",
# "T": "4864732022877055421",
# "o": "2043285877770284800",
# "c": "1758314657002",
# "a": "1783404067076253952",
# "m": False,
# "S": "BUY"
# }
#
myTrades = self.myTrades
if myTrades is None:
limit = self.safe_integer(self.options, 'tradesLimit', 1000)
myTrades = ArrayCacheBySymbolById(limit)
trade = self.parse_my_trade(message)
myTrades.append(trade)
messageHash = 'myTrades:' + trade['symbol']
client.resolve(myTrades, messageHash)
messageHash = 'myTrades'
client.resolve(myTrades, messageHash)
def parse_my_trade(self, trade, market=None):
marketId = self.safe_string(trade, 's')
ts = self.safe_string(trade, 't')
return self.safe_trade({
'info': trade,
'id': self.safe_string(trade, 'T'),
'timestamp': ts,
'datetime': self.iso8601(ts),
'symbol': self.safe_symbol(marketId, market),
'order': self.safe_string(trade, 'o'),
'type': None,
'side': self.safe_string_lower(trade, 'S'),
'takerOrMaker': 'maker' if self.safe_bool(trade, 'm') else 'taker',
'price': self.safe_string(trade, 'p'),
'amount': self.safe_string(trade, 'q'),
'cost': None,
'fee': None,
}, market)
async def watch_positions(self, symbols: Strings = None, since: Int = None, limit: Int = None, params={}) -> List[Position]:
"""
https://toobit-docs.github.io/apidocs/usdt_swap/v1/en/#event-position-update
watch all open positions
:param str[] [symbols]: list of unified market symbols
:param int [since]: the earliest time in ms to fetch positions for
:param int [limit]: the maximum number of positions to retrieve
:param dict params: extra parameters specific to the exchange API endpoint
:returns dict[]: a list of `position structure <https://docs.ccxt.com/en/latest/manual.html#position-structure>`
"""
await self.load_markets()
await self.authenticate()
messageHash = ''
if not self.is_empty(symbols):
symbols = self.market_symbols(symbols)
messageHash = '::' + ','.join(symbols)
url = self.get_user_stream_url()
client = self.client(url)
await self.authenticate(url)
self.set_positions_cache(client, symbols)
cache = self.positions
if cache is None:
snapshot = await client.future('fetchPositionsSnapshot')
return self.filter_by_symbols_since_limit(snapshot, symbols, since, limit, True)
newPositions = await self.watch(url, messageHash, None, messageHash)
if self.newUpdates:
return newPositions
return self.filter_by_symbols_since_limit(cache, symbols, since, limit, True)
def set_positions_cache(self, client: Client, type, symbols: Strings = None, isPortfolioMargin=False):
if self.positions is None:
self.positions = {}
if type in self.positions:
return
fetchPositionsSnapshot = self.handle_option('watchPositions', 'fetchPositionsSnapshot', False)
if fetchPositionsSnapshot:
messageHash = type + ':fetchPositionsSnapshot'
if not (messageHash in client.futures):
client.future(messageHash)
self.spawn(self.load_positions_snapshot, client, messageHash, type, isPortfolioMargin)
else:
self.positions[type] = ArrayCacheBySymbolBySide()
async def load_positions_snapshot(self, client, messageHash, type):
params: dict = {
'type': type,
}
positions = await self.fetch_positions(None, params)
self.positions[type] = ArrayCacheBySymbolBySide()
cache = self.positions[type]
for i in range(0, len(positions)):
position = positions[i]
cache.append(position)
# don't remove the future from the .futures cache
future = client.futures[messageHash]
future.resolve(cache)
client.resolve(cache, type + ':positions')
def handle_positions(self, client, message):
#
# [
# {
# e: 'outboundContractPositionInfo',
# E: '1758316454554',
# A: '1783404067076253954',
# s: 'DOGE-SWAP-USDT',
# S: 'LONG',
# p: '0',
# P: '0',
# a: '0',
# f: '0.1228',
# m: '0',
# r: '0',
# up: '0',
# pr: '0',
# pv: '0',
# v: '3.0',
# mt: 'CROSS',
# mm: '0',
# mp: '0.265410000000000000'
# }
# ]
#
subscriptions = list(client.subscriptions.keys())
accountType = subscriptions[0]
if self.positions is None:
self.positions = {}
if not (accountType in self.positions):
self.positions[accountType] = ArrayCacheBySymbolBySide()
cache = self.positions[accountType]
newPositions = []
for i in range(0, len(message)):
rawPosition = message[i]
position = self.parse_ws_position(rawPosition)
timestamp = self.safe_integer(rawPosition, 'E')
position['timestamp'] = timestamp
position['datetime'] = self.iso8601(timestamp)
newPositions.append(position)
cache.append(position)
messageHashes = self.find_message_hashes(client, accountType + ':positions::')
for i in range(0, len(messageHashes)):
messageHash = messageHashes[i]
parts = messageHash.split('::')
symbolsString = parts[1]
symbols = symbolsString.split(',')
positions = self.filter_by_array(newPositions, 'symbol', symbols, False)
if not self.is_empty(positions):
client.resolve(positions, messageHash)
client.resolve(newPositions, accountType + ':positions')
def parse_ws_position(self, position, market=None):
marketId = self.safe_string(position, 's')
return self.safe_position({
'info': position,
'id': None,
'symbol': self.safe_symbol(marketId, None),
'notional': self.omit_zero(self.safe_string(position, 'pv')),
'marginMode': self.safe_string_lower(position, 'mt'),
'liquidationPrice': self.safe_string(position, 'f'),
'entryPrice': self.safe_string(position, 'p'),
'unrealizedPnl': self.safe_string(position, 'up'),
'realizedPnl': self.safe_number(position, 'r'),
'percentage': None,
'contracts': None,
'contractSize': None,
'markPrice': self.safe_string(position, 'mp'),
'side': self.safe_string_lower(position, 'S'),
'hedged': None,
'timestamp': None,
'datetime': None,
'maintenanceMargin': self.safe_string(position, 'mm'),
'maintenanceMarginPercentage': None,
'collateral': None,
'initialMargin': self.omit_zero(self.safe_string(position, 'm')),
'initialMarginPercentage': None,
'leverage': self.safe_string(position, 'v'),
'marginRatio': None,
})
async def authenticate(self, params={}):
client = self.client(self.get_user_stream_url())
messageHash = 'authenticated'
future = client.reusableFuture(messageHash)
authenticated = self.safe_value(client.subscriptions, messageHash)
if authenticated is None:
self.check_required_credentials()
time = self.milliseconds()
lastAuthenticatedTime = self.safe_integer(self.options['ws'], 'lastAuthenticatedTime', 0)
listenKeyRefreshRate = self.safe_integer(self.options['ws'], 'listenKeyRefreshRate', 1200000)
delay = self.sum(listenKeyRefreshRate, 10000)
if time - lastAuthenticatedTime > delay:
try:
client.subscriptions[messageHash] = True
response = await self.privatePostApiV1UserDataStream(params)
self.options['ws']['listenKey'] = self.safe_string(response, 'listenKey')
self.options['ws']['lastAuthenticatedTime'] = time
future.resolve(True)
self.delay(listenKeyRefreshRate, self.keep_alive_listen_key, params)
except Exception as e:
err = AuthenticationError(self.id + ' ' + self.json(e))
client.reject(err, messageHash)
if messageHash in client.subscriptions:
del client.subscriptions[messageHash]
return await future
async def keep_alive_listen_key(self, params={}):
options = self.safe_value(self.options, 'ws', {})
listenKey = self.safe_string(options, 'listenKey')
if listenKey is None:
# A network error happened: we can't renew a listen key that does not exist.
return
try:
response = await self.privatePostApiV1UserDataStream(params)
self.options['ws']['listenKey'] = self.safe_string(response, 'listenKey')
self.options['ws']['lastAuthenticatedTime'] = self.milliseconds()
except Exception as error:
url = self.get_user_stream_url()
client = self.client(url)
messageHashes = list(client.futures.keys())
for i in range(0, len(messageHashes)):
messageHash = messageHashes[i]
client.reject(error, messageHash)
self.options['ws']['listenKey'] = None
self.options['ws']['lastAuthenticatedTime'] = 0
return
# whether or not to schedule another listenKey keepAlive request
listenKeyRefreshRate = self.safe_integer(self.options, 'listenKeyRefreshRate', 1200000)
self.delay(listenKeyRefreshRate, self.keep_alive_listen_key, params)
def get_user_stream_url(self):
return self.urls['api']['ws']['common'] + '/api/v1/ws/' + self.options['ws']['listenKey']
def handle_error_message(self, client: Client, message) -> Bool:
#
# {
# "code": '-100010',
# "desc": "Invalid Symbols!"
# }
#
code = self.safe_string(message, 'code')
if code is not None:
desc = self.safe_string(message, 'desc')
msg = self.id + ' code: ' + code + ' message: ' + desc
exception = ExchangeError(msg) # c# fix
client.reject(exception)
return True
return False