Files
ccxt_with_mt5/ccxt/test/exchange/async/test_fetch_tickers.py
lz_db 0fab423a18 add
2025-11-16 12:31:03 +08:00

60 lines
3.0 KiB
Python

import os
import sys
root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
sys.path.append(root)
# ----------------------------------------------------------------------------
# PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN:
# https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code
# ----------------------------------------------------------------------------
# -*- coding: utf-8 -*-
import asyncio
from ccxt.test.exchange.base import test_ticker # noqa E402
from ccxt.test.exchange.base import test_shared_methods # noqa E402
async def test_fetch_tickers(exchange, skipped_properties, symbol):
without_symbol = test_fetch_tickers_helper(exchange, skipped_properties, None)
with_symbol = test_fetch_tickers_helper(exchange, skipped_properties, [symbol])
results = await asyncio.gather(*[without_symbol, with_symbol])
test_fetch_tickers_amounts(exchange, skipped_properties, results[0])
return results
async def test_fetch_tickers_helper(exchange, skipped_properties, arg_symbols, arg_params={}):
method = 'fetchTickers'
response = await exchange.fetch_tickers(arg_symbols, arg_params)
assert isinstance(response, dict), exchange.id + ' ' + method + ' ' + exchange.json(arg_symbols) + ' must return an object. ' + exchange.json(response)
values = list(response.values())
checked_symbol = None
if arg_symbols is not None and len(arg_symbols) == 1:
checked_symbol = arg_symbols[0]
test_shared_methods.assert_non_emtpy_array(exchange, skipped_properties, method, values, checked_symbol)
for i in range(0, len(values)):
# todo: symbol check here
ticker = values[i]
test_ticker(exchange, skipped_properties, method, ticker, checked_symbol)
return response
def test_fetch_tickers_amounts(exchange, skipped_properties, tickers):
tickers_values = list(tickers.values())
if not ('checkActiveSymbols' in skipped_properties):
#
# ensure all "active" symbols have tickers
#
non_inactive_markets = test_shared_methods.get_active_markets(exchange)
not_inactive_symbols_length = len(non_inactive_markets)
obtained_tickers_length = len(tickers_values)
min_ratio = 0.99 # 1.0 - 0.01 = 0.99, hardcoded to avoid C# transpiler type casting issues
assert obtained_tickers_length >= not_inactive_symbols_length * min_ratio, exchange.id + ' ' + 'fetchTickers' + ' must return tickers for all active markets. but returned: ' + str(obtained_tickers_length) + ' tickers, ' + str(not_inactive_symbols_length) + ' active markets'
#
# ensure tickers length is less than markets length
#
all_markets = exchange.markets
all_markets_length = len(list(all_markets.keys()))
assert obtained_tickers_length <= all_markets_length, exchange.id + ' ' + 'fetchTickers' + ' must return <= than all markets, but returned: ' + str(obtained_tickers_length) + ' tickers, ' + str(all_markets_length) + ' markets'