Files
ccxt_with_mt5/ccxt/pro/okx.py
lz_db 0fab423a18 add
2025-11-16 12:31:03 +08:00

2366 lines
104 KiB
Python

# -*- coding: utf-8 -*-
# PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN:
# https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code
import ccxt.async_support
from ccxt.async_support.base.ws.cache import ArrayCache, ArrayCacheBySymbolById, ArrayCacheBySymbolBySide, ArrayCacheByTimestamp
import hashlib
from ccxt.base.types import Any, Balances, Bool, Int, Liquidation, Num, Order, OrderBook, OrderSide, OrderType, Position, Str, Strings, Ticker, Tickers, FundingRate, FundingRates, Trade
from ccxt.async_support.base.ws.client import Client
from typing import List
from ccxt.base.errors import ExchangeError
from ccxt.base.errors import AuthenticationError
from ccxt.base.errors import ArgumentsRequired
from ccxt.base.errors import BadRequest
from ccxt.base.errors import InvalidNonce
from ccxt.base.errors import ChecksumError
class okx(ccxt.async_support.okx):
def describe(self) -> Any:
return self.deep_extend(super(okx, self).describe(), {
'has': {
'ws': True,
'watchTicker': True,
'watchMarkPrice': True,
'watchMarkPrices': True,
'watchTickers': True,
'watchBidsAsks': True,
'watchOrderBook': True,
'watchTrades': True,
'watchTradesForSymbols': True,
'watchOrderBookForSymbols': True,
'watchBalance': True,
'watchLiquidations': 'emulated',
'watchLiquidationsForSymbols': True,
'watchMyLiquidations': 'emulated',
'watchMyLiquidationsForSymbols': True,
'watchOHLCV': True,
'watchOHLCVForSymbols': True,
'watchOrders': True,
'watchMyTrades': True,
'watchPositions': True,
'watchFundingRate': True,
'watchFundingRates': True,
'createOrderWs': True,
'editOrderWs': True,
'cancelOrderWs': True,
'cancelOrdersWs': True,
'cancelAllOrdersWs': True,
},
'urls': {
'api': {
'ws': 'wss://ws.okx.com:8443/ws/v5',
},
'test': {
'ws': 'wss://wspap.okx.com:8443/ws/v5',
},
},
'options': {
'watchOrderBook': {
'checksum': True,
#
# bbo-tbt
# 1. Newly added channel that sends tick-by-tick Level 1 data
# 2. All API users can subscribe
# 3. Public depth channel, verification not required
#
# books-l2-tbt
# 1. Only users who're VIP5 and above can subscribe
# 2. Identity verification required before subscription
#
# books50-l2-tbt
# 1. Only users who're VIP4 and above can subscribe
# 2. Identity verification required before subscription
#
# books
# 1. All API users can subscribe
# 2. Public depth channel, verification not required
#
# books5
# 1. All API users can subscribe
# 2. Public depth channel, verification not required
# 3. Data feeds will be delivered every 100ms(vs. every 200ms now)
#
'depth': 'books',
},
'watchBalance': 'spot', # margin, futures, swap
'watchTicker': {
'channel': 'tickers', # tickers, sprd-tickers, index-tickers, block-tickers
},
'watchTickers': {
'channel': 'tickers', # tickers, sprd-tickers, index-tickers, block-tickers
},
'watchOrders': {
'type': 'ANY', # SPOT, MARGIN, SWAP, FUTURES, OPTION, ANY
},
'watchMyTrades': {
'type': 'ANY', # SPOT, MARGIN, SWAP, FUTURES, OPTION, ANY
},
'createOrderWs': {
'op': 'batch-orders', # order, batch-orders
},
'editOrderWs': {
'op': 'amend-order', # amend-order, batch-amend-orders
},
'ws': {
# 'inflate': True,
},
},
'streaming': {
# okex does not support built-in ws protocol-level ping-pong
# instead it requires a custom text-based ping-pong
'ping': self.ping,
'keepAlive': 18000,
},
})
def get_url(self, channel: str, access='public'):
# for context: https://www.okx.com/help-center/changes-to-v5-api-websocket-subscription-parameter-and-url
isSandbox = self.options['sandboxMode']
sandboxSuffix = '?brokerId=9999' if isSandbox else ''
isBusiness = (access == 'business')
isPublic = (access == 'public')
url = self.urls['api']['ws']
if isBusiness or (channel.find('candle') > -1) or (channel == 'orders-algo'):
return url + '/business' + sandboxSuffix
elif isPublic:
return url + '/public' + sandboxSuffix
return url + '/private' + sandboxSuffix
async def subscribe_multiple(self, access, channel, symbols: Strings = None, params={}):
await self.load_markets()
if symbols is None:
symbols = self.symbols
symbols = self.market_symbols(symbols)
url = self.get_url(channel, access)
messageHashes = []
args = []
for i in range(0, len(symbols)):
marketId = self.market_id(symbols[i])
arg: dict = {
'channel': channel,
'instId': marketId,
}
args.append(self.extend(arg, params))
messageHashes.append(channel + '::' + symbols[i])
request: dict = {
'op': 'subscribe',
'args': args,
}
return await self.watch_multiple(url, messageHashes, request, messageHashes)
async def subscribe(self, access, messageHash, channel, symbol, params={}):
await self.load_markets()
url = self.get_url(channel, access)
firstArgument: dict = {
'channel': channel,
}
if symbol is not None:
market = self.market(symbol)
messageHash += ':' + market['id']
firstArgument['instId'] = market['id']
request: dict = {
'op': 'subscribe',
'args': [
self.deep_extend(firstArgument, params),
],
}
return await self.watch(url, messageHash, request, messageHash)
async def watch_trades(self, symbol: str, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
"""
https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-trades-channel
https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-all-trades-channel
get the list of most recent trades for a particular symbol
:param str symbol: unified symbol of the market to fetch trades for
:param int [since]: timestamp in ms of the earliest trade to fetch
:param int [limit]: the maximum amount of trades to fetch
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
"""
return await self.watch_trades_for_symbols([symbol], since, limit, params)
async def watch_trades_for_symbols(self, symbols: List[str], since: Int = None, limit: Int = None, params={}) -> List[Trade]:
"""
https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-trades-channel
https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-all-trades-channel
get the list of most recent trades for a particular symbol
:param str symbols:
:param int [since]: timestamp in ms of the earliest trade to fetch
:param int [limit]: the maximum amount of trades to fetch
:param dict [params]: extra parameters specific to the exchange API endpoint
:param str [params.channel]: the channel to subscribe to, trades by default. Can be 'trades' and 'trades-all'
:returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
"""
symbolsLength = len(symbols)
if symbolsLength == 0:
raise ArgumentsRequired(self.id + ' watchTradesForSymbols() requires a non-empty array of symbols')
await self.load_markets()
symbols = self.market_symbols(symbols)
channel = None
channel, params = self.handle_option_and_params(params, 'watchTrades', 'channel', 'trades')
topics = []
messageHashes = []
for i in range(0, len(symbols)):
symbol = symbols[i]
messageHashes.append(channel + ':' + symbol)
marketId = self.market_id(symbol)
topic: dict = {
'channel': channel,
'instId': marketId,
}
topics.append(topic)
request: dict = {
'op': 'subscribe',
'args': topics,
}
access = 'public'
if channel == 'trades-all':
access = 'business'
await self.authenticate({'access': access})
url = self.get_url(channel, access)
trades = await self.watch_multiple(url, messageHashes, request, messageHashes)
if self.newUpdates:
first = self.safe_value(trades, 0)
tradeSymbol = self.safe_string(first, 'symbol')
limit = trades.getLimit(tradeSymbol, limit)
return self.filter_by_since_limit(trades, since, limit, 'timestamp', True)
async def un_watch_trades_for_symbols(self, symbols: List[str], params={}) -> Any:
"""
unWatches from the stream channel
:param str[] symbols:
:param dict [params]: extra parameters specific to the exchange API endpoint
:param str [params.channel]: the channel to subscribe to, trades by default. Can be trades, trades-all
:returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
"""
await self.load_markets()
symbols = self.market_symbols(symbols, None, False)
channel = None
channel, params = self.handle_option_and_params(params, 'watchTrades', 'channel', 'trades')
topics = []
messageHashes = []
for i in range(0, len(symbols)):
symbol = symbols[i]
messageHashes.append('unsubscribe:' + channel + symbol)
marketId = self.market_id(symbol)
topic: dict = {
'channel': channel,
'instId': marketId,
}
topics.append(topic)
request: dict = {
'op': 'unsubscribe',
'args': topics,
}
access = 'public'
if channel == 'trades-all':
access = 'business'
await self.authenticate({'access': access})
url = self.get_url(channel, access)
return await self.watch_multiple(url, messageHashes, request, messageHashes)
async def un_watch_trades(self, symbol: str, params={}) -> Any:
"""
unWatches from the stream channel
:param str symbol: unified symbol of the market to fetch trades for
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
"""
return await self.un_watch_trades_for_symbols([symbol], params)
def handle_trades(self, client: Client, message):
#
# {
# "arg": {channel: "trades", instId: "BTC-USDT"},
# "data": [
# {
# "instId": "BTC-USDT",
# "tradeId": "216970876",
# "px": "31684.5",
# "sz": "0.00001186",
# "side": "buy",
# "ts": "1626531038288"
# }
# ]
# }
# {
# "arg": {
# "channel": "trades-all",
# "instId": "BTC-USDT"
# },
# "data": [
# {
# "instId": "BTC-USDT",
# "tradeId": "130639474",
# "px": "42219.9",
# "sz": "0.12060306",
# "side": "buy",
# "source": "0",
# "ts": "1630048897897"
# }
# ]
# }
#
arg = self.safe_value(message, 'arg', {})
channel = self.safe_string(arg, 'channel')
marketId = self.safe_string(arg, 'instId')
symbol = self.safe_symbol(marketId)
data = self.safe_value(message, 'data', [])
tradesLimit = self.safe_integer(self.options, 'tradesLimit', 1000)
for i in range(0, len(data)):
trade = self.parse_trade(data[i])
messageHash = channel + ':' + symbol
stored = self.safe_value(self.trades, symbol)
if stored is None:
stored = ArrayCache(tradesLimit)
self.trades[symbol] = stored
stored.append(trade)
client.resolve(stored, messageHash)
async def watch_funding_rate(self, symbol: str, params={}) -> FundingRate:
"""
watch the current funding rate
https://www.okx.com/docs-v5/en/#public-data-websocket-funding-rate-channel
:param str symbol: unified market symbol
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: a `funding rate structure <https://docs.ccxt.com/#/?id=funding-rate-structure>`
"""
symbol = self.symbol(symbol)
fr = await self.watch_funding_rates([symbol], params)
return fr[symbol]
async def watch_funding_rates(self, symbols: List[str], params={}) -> FundingRates:
"""
watch the funding rate for multiple markets
https://www.okx.com/docs-v5/en/#public-data-websocket-funding-rate-channel
:param str[] symbols: list of unified market symbols
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: a dictionary of `funding rates structures <https://docs.ccxt.com/#/?id=funding-rates-structure>`, indexe by market symbols
"""
await self.load_markets()
symbols = self.market_symbols(symbols)
channel = 'funding-rate'
topics = []
messageHashes = []
for i in range(0, len(symbols)):
symbol = symbols[i]
messageHashes.append(channel + ':' + symbol)
marketId = self.market_id(symbol)
topic: dict = {
'channel': channel,
'instId': marketId,
}
topics.append(topic)
request: dict = {
'op': 'subscribe',
'args': topics,
}
url = self.get_url(channel, 'public')
fundingRate = await self.watch_multiple(url, messageHashes, request, messageHashes)
if self.newUpdates:
symbol = self.safe_string(fundingRate, 'symbol')
result: dict = {}
result[symbol] = fundingRate
return result
return self.filter_by_array(self.fundingRates, 'symbol', symbols)
def handle_funding_rate(self, client: Client, message):
#
# "data":[
# {
# "fundingRate":"0.0001875391284828",
# "fundingTime":"1700726400000",
# "instId":"BTC-USD-SWAP",
# "instType":"SWAP",
# "method": "next_period",
# "maxFundingRate":"0.00375",
# "minFundingRate":"-0.00375",
# "nextFundingRate":"0.0002608059239328",
# "nextFundingTime":"1700755200000",
# "premium": "0.0001233824646391",
# "settFundingRate":"0.0001699799259033",
# "settState":"settled",
# "ts":"1700724675402"
# }
# ]
#
data = self.safe_list(message, 'data', [])
for i in range(0, len(data)):
rawfr = data[i]
fundingRate = self.parse_funding_rate(rawfr)
symbol = fundingRate['symbol']
self.fundingRates[symbol] = fundingRate
client.resolve(fundingRate, 'funding-rate' + ':' + fundingRate['symbol'])
async def watch_ticker(self, symbol: str, params={}) -> Ticker:
"""
https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-tickers-channel
watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market
:param str symbol: unified symbol of the market to fetch the ticker for
:param dict [params]: extra parameters specific to the exchange API endpoint
:param str [params.channel]: the channel to subscribe to, tickers by default. Can be tickers, sprd-tickers, index-tickers, block-tickers
:returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
"""
channel = None
channel, params = self.handle_option_and_params(params, 'watchTicker', 'channel', 'tickers')
params['channel'] = channel
market = self.market(symbol)
symbol = market['symbol']
ticker = await self.watch_tickers([symbol], params)
return self.safe_value(ticker, symbol)
async def un_watch_ticker(self, symbol: str, params={}) -> Any:
"""
https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-tickers-channel
unWatches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market
:param str symbol: unified symbol of the market to fetch the ticker for
:param dict [params]: extra parameters specific to the exchange API endpoint
:param str [params.channel]: the channel to subscribe to, tickers by default. Can be tickers, sprd-tickers, index-tickers, block-tickers
:returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
"""
return await self.un_watch_tickers([symbol], params)
async def watch_tickers(self, symbols: Strings = None, params={}) -> Tickers:
"""
https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-tickers-channel
watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for all markets of a specific list
:param str[] [symbols]: unified symbol of the market to fetch the ticker for
:param dict [params]: extra parameters specific to the exchange API endpoint
:param str [params.channel]: the channel to subscribe to, tickers by default. Can be tickers, sprd-tickers, index-tickers, block-tickers
:returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
"""
await self.load_markets()
symbols = self.market_symbols(symbols, None, False)
channel = None
channel, params = self.handle_option_and_params(params, 'watchTickers', 'channel', 'tickers')
newTickers = await self.subscribe_multiple('public', channel, symbols, params)
if self.newUpdates:
return newTickers
return self.filter_by_array(self.tickers, 'symbol', symbols)
async def watch_mark_price(self, symbol: str, params={}) -> Ticker:
"""
https://www.okx.com/docs-v5/en/#public-data-websocket-mark-price-channel
watches a mark price
:param str symbol: unified symbol of the market to fetch the ticker for
:param dict [params]: extra parameters specific to the exchange API endpoint
:param str [params.channel]: the channel to subscribe to, tickers by default. Can be tickers, sprd-tickers, index-tickers, block-tickers
:returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
"""
channel = None
channel, params = self.handle_option_and_params(params, 'watchMarkPrice', 'channel', 'mark-price')
params['channel'] = channel
market = self.market(symbol)
symbol = market['symbol']
ticker = await self.watch_mark_prices([symbol], params)
return ticker[symbol]
async def watch_mark_prices(self, symbols: Strings = None, params={}) -> Tickers:
"""
https://www.okx.com/docs-v5/en/#public-data-websocket-mark-price-channel
watches mark prices
:param str[] [symbols]: unified symbol of the market to fetch the ticker for
:param dict [params]: extra parameters specific to the exchange API endpoint
:param str [params.channel]: the channel to subscribe to, tickers by default. Can be tickers, sprd-tickers, index-tickers, block-tickers
:returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
"""
await self.load_markets()
symbols = self.market_symbols(symbols, None, False)
channel = None
channel, params = self.handle_option_and_params(params, 'watchMarkPrices', 'channel', 'mark-price')
newTickers = await self.subscribe_multiple('public', channel, symbols, params)
if self.newUpdates:
return newTickers
return self.filter_by_array(self.tickers, 'symbol', symbols)
async def un_watch_tickers(self, symbols: Strings = None, params={}) -> Any:
"""
https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-tickers-channel
unWatches a price ticker, a statistical calculation with the information calculated over the past 24 hours for all markets of a specific list
:param str[] [symbols]: unified symbol of the market to fetch the ticker for
:param dict [params]: extra parameters specific to the exchange API endpoint
:param str [params.channel]: the channel to subscribe to, tickers by default. Can be tickers, sprd-tickers, index-tickers, block-tickers
:returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
"""
await self.load_markets()
symbols = self.market_symbols(symbols, None, False)
channel = None
channel, params = self.handle_option_and_params(params, 'watchTickers', 'channel', 'tickers')
topics = []
messageHashes = []
for i in range(0, len(symbols)):
symbol = symbols[i]
messageHashes.append('unsubscribe:ticker:' + symbol)
marketId = self.market_id(symbol)
topic: dict = {
'channel': channel,
'instId': marketId,
}
topics.append(topic)
request: dict = {
'op': 'unsubscribe',
'args': topics,
}
url = self.get_url(channel, 'public')
return await self.watch_multiple(url, messageHashes, request, messageHashes)
def handle_ticker(self, client: Client, message):
#
# {
# "arg": {channel: "tickers", instId: "BTC-USDT"},
# "data": [
# {
# "instType": "SPOT",
# "instId": "BTC-USDT",
# "last": "31500.1",
# "lastSz": "0.00001754",
# "askPx": "31500.1",
# "askSz": "0.00998144",
# "bidPx": "31500",
# "bidSz": "3.05652439",
# "open24h": "31697",
# "high24h": "32248",
# "low24h": "31165.6",
# "sodUtc0": "31385.5",
# "sodUtc8": "32134.9",
# "volCcy24h": "503403597.38138519",
# "vol24h": "15937.10781721",
# "ts": "1626526618762"
# }
# ]
# }
#
self.handle_bid_ask(client, message)
arg = self.safe_value(message, 'arg', {})
marketId = self.safe_string(arg, 'instId')
market = self.safe_market(marketId, None, '-')
symbol = market['symbol']
channel = self.safe_string(arg, 'channel')
data = self.safe_value(message, 'data', [])
newTickers: dict = {}
for i in range(0, len(data)):
ticker = self.parse_ticker(data[i])
self.tickers[symbol] = ticker
newTickers[symbol] = ticker
messageHash = channel + '::' + symbol
client.resolve(newTickers, messageHash)
async def watch_bids_asks(self, symbols: Strings = None, params={}) -> Tickers:
"""
https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-tickers-channel
watches best bid & ask for symbols
:param str[] symbols: unified symbol of the market to fetch the ticker for
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
"""
await self.load_markets()
symbols = self.market_symbols(symbols, None, False)
channel = None
channel, params = self.handle_option_and_params(params, 'watchBidsAsks', 'channel', 'tickers')
url = self.get_url(channel, 'public')
messageHashes = []
args = []
for i in range(0, len(symbols)):
marketId = self.market_id(symbols[i])
arg: dict = {
'channel': channel,
'instId': marketId,
}
args.append(self.extend(arg, params))
messageHashes.append('bidask::' + symbols[i])
request: dict = {
'op': 'subscribe',
'args': args,
}
newTickers = await self.watch_multiple(url, messageHashes, request, messageHashes)
if self.newUpdates:
tickers: dict = {}
tickers[newTickers['symbol']] = newTickers
return tickers
return self.filter_by_array(self.bidsasks, 'symbol', symbols)
def handle_bid_ask(self, client: Client, message):
#
# {
# "arg": {channel: "tickers", instId: "BTC-USDT"},
# "data": [
# {
# "instType": "SPOT",
# "instId": "BTC-USDT",
# "last": "31500.1",
# "lastSz": "0.00001754",
# "askPx": "31500.1",
# "askSz": "0.00998144",
# "bidPx": "31500",
# "bidSz": "3.05652439",
# "open24h": "31697",
# "high24h": "32248",
# "low24h": "31165.6",
# "sodUtc0": "31385.5",
# "sodUtc8": "32134.9",
# "volCcy24h": "503403597.38138519",
# "vol24h": "15937.10781721",
# "ts": "1626526618762"
# }
# ]
# }
#
data = self.safe_list(message, 'data', [])
ticker = self.safe_dict(data, 0, {})
parsedTicker = self.parse_ws_bid_ask(ticker)
symbol = parsedTicker['symbol']
self.bidsasks[symbol] = parsedTicker
messageHash = 'bidask::' + symbol
client.resolve(parsedTicker, messageHash)
def parse_ws_bid_ask(self, ticker, market=None):
marketId = self.safe_string(ticker, 'instId')
market = self.safe_market(marketId, market)
symbol = self.safe_string(market, 'symbol')
timestamp = self.safe_integer(ticker, 'ts')
return self.safe_ticker({
'symbol': symbol,
'timestamp': timestamp,
'datetime': self.iso8601(timestamp),
'ask': self.safe_string(ticker, 'askPx'),
'askVolume': self.safe_string(ticker, 'askSz'),
'bid': self.safe_string(ticker, 'bidPx'),
'bidVolume': self.safe_string(ticker, 'bidSz'),
'info': ticker,
}, market)
async def watch_liquidations_for_symbols(self, symbols: List[str], since: Int = None, limit: Int = None, params={}) -> List[Liquidation]:
"""
watch the public liquidations of a trading pair
https://www.okx.com/docs-v5/en/#public-data-websocket-liquidation-orders-channel
:param str symbols:
:param int [since]: the earliest time in ms to fetch liquidations for
:param int [limit]: the maximum number of liquidation structures to retrieve
:param dict [params]: exchange specific parameters for the okx api endpoint
:returns dict: an array of `liquidation structures <https://github.com/ccxt/ccxt/wiki/Manual#liquidation-structure>`
"""
await self.load_markets()
symbols = self.market_symbols(symbols, None, True, True)
messageHash = 'liquidations'
messageHashes = []
if symbols is not None:
for i in range(0, len(symbols)):
symbol = symbols[i]
messageHashes.append(messageHash + '::' + symbol)
else:
messageHashes.append(messageHash)
market = self.get_market_from_symbols(symbols)
type = None
type, params = self.handle_market_type_and_params('watchliquidationsForSymbols', market, params)
channel = 'liquidation-orders'
if type == 'spot':
type = 'SWAP'
elif type == 'future':
type = 'futures'
uppercaseType = type.upper()
request = {
'op': 'subscribe',
'args': [
{
'channel': channel,
'instType': uppercaseType,
},
],
}
url = self.get_url(channel, 'public')
newLiquidations = await self.watch_multiple(url, messageHashes, request, messageHashes)
if self.newUpdates:
return newLiquidations
return self.filter_by_symbols_since_limit(self.liquidations, symbols, since, limit, True)
def handle_liquidation(self, client: Client, message):
#
# {
# "arg": {
# "channel": "liquidation-orders",
# "instType": "SWAP"
# },
# "data": [
# {
# "details": [
# {
# "bkLoss": "0",
# "bkPx": "0.007831",
# "ccy": "",
# "posSide": "short",
# "side": "buy",
# "sz": "13",
# "ts": "1692266434010"
# }
# ],
# "instFamily": "IOST-USDT",
# "instId": "IOST-USDT-SWAP",
# "instType": "SWAP",
# "uly": "IOST-USDT"
# }
# ]
# }
#
rawLiquidations = self.safe_list(message, 'data', [])
for i in range(0, len(rawLiquidations)):
rawLiquidation = rawLiquidations[i]
liquidation = self.parse_ws_liquidation(rawLiquidation)
symbol = self.safe_string(liquidation, 'symbol')
liquidations = self.safe_value(self.liquidations, symbol)
if liquidations is None:
limit = self.safe_integer(self.options, 'liquidationsLimit', 1000)
liquidations = ArrayCache(limit)
liquidations.append(liquidation)
self.liquidations[symbol] = liquidations
client.resolve([liquidation], 'liquidations')
client.resolve([liquidation], 'liquidations::' + symbol)
async def watch_my_liquidations_for_symbols(self, symbols: List[str], since: Int = None, limit: Int = None, params={}) -> List[Liquidation]:
"""
watch the private liquidations of a trading pair
https://www.okx.com/docs-v5/en/#trading-account-websocket-balance-and-position-channel
:param str[] symbols:
:param int [since]: the earliest time in ms to fetch liquidations for
:param int [limit]: the maximum number of liquidation structures to retrieve
:param dict [params]: exchange specific parameters for the okx api endpoint
:returns dict: an array of `liquidation structures <https://github.com/ccxt/ccxt/wiki/Manual#liquidation-structure>`
"""
await self.load_markets()
isTrigger = self.safe_value_2(params, 'stop', 'trigger', False)
params = self.omit(params, ['stop', 'trigger'])
await self.authenticate({'access': 'business' if isTrigger else 'private'})
symbols = self.market_symbols(symbols, None, True, True)
messageHash = 'myLiquidations'
messageHashes = []
if symbols is not None:
for i in range(0, len(symbols)):
symbol = symbols[i]
messageHashes.append(messageHash + '::' + symbol)
else:
messageHashes.append(messageHash)
channel = 'balance_and_position'
request: dict = {
'op': 'subscribe',
'args': [
{
'channel': channel,
},
],
}
url = self.get_url(channel, 'private')
newLiquidations = await self.watch_multiple(url, messageHashes, self.deep_extend(request, params), messageHashes)
if self.newUpdates:
return newLiquidations
return self.filter_by_symbols_since_limit(self.liquidations, symbols, since, limit, True)
def handle_my_liquidation(self, client: Client, message):
#
# {
# "arg": {
# "channel": "balance_and_position",
# "uid": "77982378738415879"
# },
# "data": [{
# "pTime": "1597026383085",
# "eventType": "snapshot",
# "balData": [{
# "ccy": "BTC",
# "cashBal": "1",
# "uTime": "1597026383085"
# }],
# "posData": [{
# "posId": "1111111111",
# "tradeId": "2",
# "instId": "BTC-USD-191018",
# "instType": "FUTURES",
# "mgnMode": "cross",
# "posSide": "long",
# "pos": "10",
# "ccy": "BTC",
# "posCcy": "",
# "avgPx": "3320",
# "uTIme": "1597026383085"
# }],
# "trades": [{
# "instId": "BTC-USD-191018",
# "tradeId": "2",
# }]
# }]
# }
#
rawLiquidations = self.safe_list(message, 'data', [])
for i in range(0, len(rawLiquidations)):
rawLiquidation = rawLiquidations[i]
eventType = self.safe_string(rawLiquidation, 'eventType')
if eventType != 'liquidation':
return
liquidation = self.parse_ws_my_liquidation(rawLiquidation)
symbol = self.safe_string(liquidation, 'symbol')
liquidations = self.safe_value(self.liquidations, symbol)
if liquidations is None:
limit = self.safe_integer(self.options, 'myLiquidationsLimit', 1000)
liquidations = ArrayCache(limit)
liquidations.append(liquidation)
self.liquidations[symbol] = liquidations
client.resolve([liquidation], 'myLiquidations')
client.resolve([liquidation], 'myLiquidations::' + symbol)
def parse_ws_my_liquidation(self, liquidation, market=None):
#
# {
# "pTime": "1597026383085",
# "eventType": "snapshot",
# "balData": [{
# "ccy": "BTC",
# "cashBal": "1",
# "uTime": "1597026383085"
# }],
# "posData": [{
# "posId": "1111111111",
# "tradeId": "2",
# "instId": "BTC-USD-191018",
# "instType": "FUTURES",
# "mgnMode": "cross",
# "posSide": "long",
# "pos": "10",
# "ccy": "BTC",
# "posCcy": "",
# "avgPx": "3320",
# "uTIme": "1597026383085"
# }],
# "trades": [{
# "instId": "BTC-USD-191018",
# "tradeId": "2",
# }]
# }
#
posData = self.safe_list(liquidation, 'posData', [])
firstPosData = self.safe_dict(posData, 0, {})
marketId = self.safe_string(firstPosData, 'instId')
market = self.safe_market(marketId, market)
timestamp = self.safe_integer(firstPosData, 'uTIme')
return self.safe_liquidation({
'info': liquidation,
'symbol': self.safe_symbol(marketId, market),
'contracts': self.safe_number(firstPosData, 'pos'),
'contractSize': self.safe_number(market, 'contractSize'),
'price': self.safe_number(liquidation, 'avgPx'),
'baseValue': None,
'quoteValue': None,
'timestamp': timestamp,
'datetime': self.iso8601(timestamp),
})
def parse_ws_liquidation(self, liquidation, market=None):
#
# public liquidation
# {
# "details": [
# {
# "bkLoss": "0",
# "bkPx": "0.007831",
# "ccy": "",
# "posSide": "short",
# "side": "buy",
# "sz": "13",
# "ts": "1692266434010"
# }
# ],
# "instFamily": "IOST-USDT",
# "instId": "IOST-USDT-SWAP",
# "instType": "SWAP",
# "uly": "IOST-USDT"
# }
#
details = self.safe_list(liquidation, 'details', [])
liquidationDetails = self.safe_dict(details, 0, {})
marketId = self.safe_string(liquidation, 'instId')
market = self.safe_market(marketId, market)
timestamp = self.safe_integer(liquidationDetails, 'ts')
return self.safe_liquidation({
'info': liquidation,
'symbol': self.safe_symbol(marketId, market),
'contracts': self.safe_number(liquidationDetails, 'sz'),
'contractSize': self.safe_number(market, 'contractSize'),
'price': self.safe_number(liquidationDetails, 'bkPx'),
'side': self.safe_string(liquidationDetails, 'side'),
'baseValue': None,
'quoteValue': None,
'timestamp': timestamp,
'datetime': self.iso8601(timestamp),
})
async def watch_ohlcv(self, symbol: str, timeframe: str = '1m', since: Int = None, limit: Int = None, params={}) -> List[list]:
"""
watches historical candlestick data containing the open, high, low, and close price, and the volume of a market
:param str symbol: unified symbol of the market to fetch OHLCV data for
:param str timeframe: the length of time each candle represents
:param int [since]: timestamp in ms of the earliest candle to fetch
:param int [limit]: the maximum amount of candles to fetch
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns int[][]: A list of candles ordered, open, high, low, close, volume
"""
await self.load_markets()
symbol = self.symbol(symbol)
interval = self.safe_string(self.timeframes, timeframe, timeframe)
name = 'candle' + interval
ohlcv = await self.subscribe('public', name, name, symbol, params)
if self.newUpdates:
limit = ohlcv.getLimit(symbol, limit)
return self.filter_by_since_limit(ohlcv, since, limit, 0, True)
async def un_watch_ohlcv(self, symbol: str, timeframe: str = '1m', params={}) -> Any:
"""
watches historical candlestick data containing the open, high, low, and close price, and the volume of a market
:param str symbol: unified symbol of the market to fetch OHLCV data for
:param str timeframe: the length of time each candle represents
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns int[][]: A list of candles ordered, open, high, low, close, volume
"""
await self.load_markets()
return await self.un_watch_ohlcv_for_symbols([[symbol, timeframe]], params)
async def watch_ohlcv_for_symbols(self, symbolsAndTimeframes: List[List[str]], since: Int = None, limit: Int = None, params={}):
"""
watches historical candlestick data containing the open, high, low, and close price, and the volume of a market
:param str[][] symbolsAndTimeframes: array of arrays containing unified symbols and timeframes to fetch OHLCV data for, example [['BTC/USDT', '1m'], ['LTC/USDT', '5m']]
:param int [since]: timestamp in ms of the earliest candle to fetch
:param int [limit]: the maximum amount of candles to fetch
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns int[][]: A list of candles ordered, open, high, low, close, volume
"""
symbolsLength = len(symbolsAndTimeframes)
if symbolsLength == 0 or not isinstance(symbolsAndTimeframes[0], list):
raise ArgumentsRequired(self.id + " watchOHLCVForSymbols() requires a an array of symbols and timeframes, like [['BTC/USDT', '1m'], ['LTC/USDT', '5m']]")
await self.load_markets()
topics = []
messageHashes = []
for i in range(0, len(symbolsAndTimeframes)):
symbolAndTimeframe = symbolsAndTimeframes[i]
sym = symbolAndTimeframe[0]
tf = symbolAndTimeframe[1]
marketId = self.market_id(sym)
interval = self.safe_string(self.timeframes, tf, tf)
channel = 'candle' + interval
topic: dict = {
'channel': channel,
'instId': marketId,
}
topics.append(topic)
messageHashes.append('multi:' + channel + ':' + sym)
request: dict = {
'op': 'subscribe',
'args': topics,
}
url = self.get_url('candle', 'public')
symbol, timeframe, candles = await self.watch_multiple(url, messageHashes, request, messageHashes)
if self.newUpdates:
limit = candles.getLimit(symbol, limit)
filtered = self.filter_by_since_limit(candles, since, limit, 0, True)
return self.create_ohlcv_object(symbol, timeframe, filtered)
async def un_watch_ohlcv_for_symbols(self, symbolsAndTimeframes: List[List[str]], params={}) -> Any:
"""
unWatches historical candlestick data containing the open, high, low, and close price, and the volume of a market
:param str[][] symbolsAndTimeframes: array of arrays containing unified symbols and timeframes to fetch OHLCV data for, example [['BTC/USDT', '1m'], ['LTC/USDT', '5m']]
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns int[][]: A list of candles ordered, open, high, low, close, volume
"""
symbolsLength = len(symbolsAndTimeframes)
if symbolsLength == 0 or not isinstance(symbolsAndTimeframes[0], list):
raise ArgumentsRequired(self.id + " watchOHLCVForSymbols() requires a an array of symbols and timeframes, like [['BTC/USDT', '1m'], ['LTC/USDT', '5m']]")
await self.load_markets()
topics = []
messageHashes = []
for i in range(0, len(symbolsAndTimeframes)):
symbolAndTimeframe = symbolsAndTimeframes[i]
sym = symbolAndTimeframe[0]
tf = symbolAndTimeframe[1]
marketId = self.market_id(sym)
interval = self.safe_string(self.timeframes, tf, tf)
channel = 'candle' + interval
topic: dict = {
'channel': channel,
'instId': marketId,
}
topics.append(topic)
messageHashes.append('unsubscribe:multi:' + channel + ':' + sym)
request: dict = {
'op': 'unsubscribe',
'args': topics,
}
url = self.get_url('candle', 'public')
return await self.watch_multiple(url, messageHashes, request, messageHashes)
def handle_ohlcv(self, client: Client, message):
#
# {
# "arg": {channel: "candle1m", instId: "BTC-USDT"},
# "data": [
# [
# "1626690720000",
# "31334",
# "31334",
# "31334",
# "31334",
# "0.0077",
# "241.2718"
# ]
# ]
# }
#
arg = self.safe_value(message, 'arg', {})
channel = self.safe_string(arg, 'channel')
data = self.safe_value(message, 'data', [])
marketId = self.safe_string(arg, 'instId')
market = self.safe_market(marketId)
symbol = market['symbol']
interval = channel.replace('candle', '')
# use a reverse lookup in a static map instead
timeframe = self.find_timeframe(interval)
for i in range(0, len(data)):
parsed = self.parse_ohlcv(data[i], market)
self.ohlcvs[symbol] = self.safe_value(self.ohlcvs, symbol, {})
stored = self.safe_value(self.ohlcvs[symbol], timeframe)
if stored is None:
limit = self.safe_integer(self.options, 'OHLCVLimit', 1000)
stored = ArrayCacheByTimestamp(limit)
self.ohlcvs[symbol][timeframe] = stored
stored.append(parsed)
messageHash = channel + ':' + market['id']
client.resolve(stored, messageHash)
# for multiOHLCV we need special object, to other "multi"
# methods, because OHLCV response item does not contain symbol
# or timeframe, thus otherwise it would be unrecognizable
messageHashForMulti = 'multi:' + channel + ':' + symbol
client.resolve([symbol, timeframe, stored], messageHashForMulti)
async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook:
"""
https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-order-book-channel
watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
:param str symbol: unified symbol of the market to fetch the order book for
:param int [limit]: the maximum amount of order book entries to return
:param dict [params]: extra parameters specific to the exchange API endpoint
:param str [params.depth]: okx order book depth, can be books, books5, books-l2-tbt, books50-l2-tbt, bbo-tbt
:returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
"""
#
# bbo-tbt
# 1. Newly added channel that sends tick-by-tick Level 1 data
# 2. All API users can subscribe
# 3. Public depth channel, verification not required
#
# books-l2-tbt
# 1. Only users who're VIP5 and above can subscribe
# 2. Identity verification required before subscription
#
# books50-l2-tbt
# 1. Only users who're VIP4 and above can subscribe
# 2. Identity verification required before subscription
#
# books
# 1. All API users can subscribe
# 2. Public depth channel, verification not required
#
# books5
# 1. All API users can subscribe
# 2. Public depth channel, verification not required
# 3. Data feeds will be delivered every 100ms(vs. every 200ms now)
#
return await self.watch_order_book_for_symbols([symbol], limit, params)
async def watch_order_book_for_symbols(self, symbols: List[str], limit: Int = None, params={}) -> OrderBook:
"""
https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-order-book-channel
watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
:param str[] symbols: unified array of symbols
:param int [limit]: 1,5, 400, 50(l2-tbt, vip4+) or 40000(vip5+) the maximum amount of order book entries to return
:param dict [params]: extra parameters specific to the exchange API endpoint
:param str [params.depth]: okx order book depth, can be books, books5, books-l2-tbt, books50-l2-tbt, bbo-tbt
:returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
"""
await self.load_markets()
symbols = self.market_symbols(symbols)
depth = None
depth, params = self.handle_option_and_params(params, 'watchOrderBook', 'depth', 'books')
if limit is not None:
if limit == 1:
depth = 'bbo-tbt'
elif limit > 1 and limit <= 5:
depth = 'books5'
elif limit == 50:
depth = 'books50-l2-tbt' # Make sure you have VIP4 and above
elif limit == 400:
depth = 'books'
if (depth == 'books-l2-tbt') or (depth == 'books50-l2-tbt'):
if not self.check_required_credentials(False):
raise AuthenticationError(self.id + ' watchOrderBook/watchOrderBookForSymbols requires authentication for self depth. Add credentials or change the depth option to books or books5')
await self.authenticate({'access': 'public'})
topics = []
messageHashes = []
for i in range(0, len(symbols)):
symbol = symbols[i]
messageHashes.append(depth + ':' + symbol)
marketId = self.market_id(symbol)
topic: dict = {
'channel': depth,
'instId': marketId,
}
topics.append(topic)
request: dict = {
'op': 'subscribe',
'args': topics,
}
url = self.get_url(depth, 'public')
orderbook = await self.watch_multiple(url, messageHashes, request, messageHashes)
return orderbook.limit()
async def un_watch_order_book_for_symbols(self, symbols: List[str], params={}) -> Any:
"""
https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-order-book-channel
unWatches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
:param str[] symbols: unified array of symbols
:param dict [params]: extra parameters specific to the exchange API endpoint
:param int [params.limit]: the maximum amount of order book entries to return
:param str [params.depth]: okx order book depth, can be books, books5, books-l2-tbt, books50-l2-tbt, bbo-tbt
:returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
"""
await self.load_markets()
symbols = self.market_symbols(symbols, None, False)
depth = None
depth, params = self.handle_option_and_params(params, 'watchOrderBook', 'depth', 'books')
limit = self.safe_integer(params, 'limit')
if limit is not None:
if limit == 1:
depth = 'bbo-tbt'
elif limit > 1 and limit <= 5:
depth = 'books5'
elif limit == 50:
depth = 'books50-l2-tbt' # Make sure you have VIP4 and above
elif limit == 400:
depth = 'books'
topics = []
subMessageHashes = []
messageHashes = []
for i in range(0, len(symbols)):
symbol = symbols[i]
subMessageHashes.append(depth + ':' + symbol)
messageHashes.append('unsubscribe:orderbook:' + symbol)
marketId = self.market_id(symbol)
topic: dict = {
'channel': depth,
'instId': marketId,
}
topics.append(topic)
request: dict = {
'op': 'unsubscribe',
'args': topics,
}
url = self.get_url(depth, 'public')
return await self.watch_multiple(url, messageHashes, request, messageHashes)
async def un_watch_order_book(self, symbol: str, params={}) -> Any:
"""
https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-order-book-channel
unWatches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
:param str symbol: unified array of symbols
:param dict [params]: extra parameters specific to the exchange API endpoint
:param int [params.limit]: the maximum amount of order book entries to return
:param str [params.depth]: okx order book depth, can be books, books5, books-l2-tbt, books50-l2-tbt, bbo-tbt
:returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
"""
return await self.un_watch_order_book_for_symbols([symbol], params)
def handle_delta(self, bookside, delta):
#
# [
# "31685", # price
# "0.78069158", # amount
# "0", # liquidated orders
# "17" # orders
# ]
#
price = self.safe_float(delta, 0)
amount = self.safe_float(delta, 1)
bookside.store(price, amount)
def handle_deltas(self, bookside, deltas):
for i in range(0, len(deltas)):
self.handle_delta(bookside, deltas[i])
def handle_order_book_message(self, client: Client, message, orderbook, messageHash, market=None):
#
# {
# "asks": [
# ['31738.3', '0.05973179', "0", "3"],
# ['31738.5', '0.11035404', "0", "2"],
# ['31739.6', '0.01', "0", "1"],
# ],
# "bids": [
# ['31738.2', '0.67557666', "0", "9"],
# ['31738', '0.02466947', "0", "2"],
# ['31736.3', '0.01705046', "0", "2"],
# ],
# "instId": "BTC-USDT",
# "ts": "1626537446491"
# "checksum": -855196043,
# "prevSeqId": 123456,
# "seqId": 123457
# }
#
asks = self.safe_value(message, 'asks', [])
bids = self.safe_value(message, 'bids', [])
storedAsks = orderbook['asks']
storedBids = orderbook['bids']
self.handle_deltas(storedAsks, asks)
self.handle_deltas(storedBids, bids)
marketId = self.safe_string(message, 'instId')
symbol = self.safe_symbol(marketId, market)
checksum = self.handle_option('watchOrderBook', 'checksum', True)
seqId = self.safe_integer(message, 'seqId')
if checksum:
prevSeqId = self.safe_integer(message, 'prevSeqId')
nonce = orderbook['nonce']
asksLength = len(storedAsks)
bidsLength = len(storedBids)
payloadArray = []
for i in range(0, 25):
if i < bidsLength:
payloadArray.append(self.number_to_string(storedBids[i][0]))
payloadArray.append(self.number_to_string(storedBids[i][1]))
if i < asksLength:
payloadArray.append(self.number_to_string(storedAsks[i][0]))
payloadArray.append(self.number_to_string(storedAsks[i][1]))
payload = ':'.join(payloadArray)
responseChecksum = self.safe_integer(message, 'checksum')
localChecksum = self.crc32(payload, True)
error = None
if prevSeqId != -1 and nonce != prevSeqId:
error = InvalidNonce(self.id + ' watchOrderBook received invalid nonce')
if responseChecksum != localChecksum:
error = ChecksumError(self.id + ' ' + self.orderbook_checksum_message(symbol))
if error is not None:
del client.subscriptions[messageHash]
if symbol is not None:
del self.orderbooks[symbol]
client.reject(error, messageHash)
timestamp = self.safe_integer(message, 'ts')
orderbook['nonce'] = seqId
orderbook['timestamp'] = timestamp
orderbook['datetime'] = self.iso8601(timestamp)
return orderbook
def handle_order_book(self, client: Client, message):
#
# snapshot
#
# {
# "arg": {channel: 'books-l2-tbt', instId: "BTC-USDT"},
# "action": "snapshot",
# "data": [
# {
# "asks": [
# ['31685', '0.78069158', "0", "17"],
# ['31685.1', '0.0001', "0", "1"],
# ['31685.6', '0.04543165', "0", "1"],
# ],
# "bids": [
# ['31684.9', '0.01', "0", "1"],
# ['31682.9', '0.0001', "0", "1"],
# ['31680.7', '0.01', "0", "1"],
# ],
# "ts": "1626532416403",
# "checksum": -1023440116
# }
# ]
# }
#
# update
#
# {
# "arg": {channel: 'books-l2-tbt', instId: "BTC-USDT"},
# "action": "update",
# "data": [
# {
# "asks": [
# ['31657.7', '0', "0", "0"],
# ['31659.7', '0.01', "0", "1"],
# ['31987.3', '0.01', "0", "1"]
# ],
# "bids": [
# ['31642.9', '0.50296385', "0", "4"],
# ['31639.9', '0', "0", "0"],
# ['31638.7', '0.01', "0", "1"],
# ],
# "ts": "1626535709008",
# "checksum": 830931827
# }
# ]
# }
#
# books5
#
# {
# "arg": {channel: "books5", instId: "BTC-USDT"},
# "data": [
# {
# "asks": [
# ['31738.3', '0.05973179', "0", "3"],
# ['31738.5', '0.11035404', "0", "2"],
# ['31739.6', '0.01', "0", "1"],
# ],
# "bids": [
# ['31738.2', '0.67557666', "0", "9"],
# ['31738', '0.02466947', "0", "2"],
# ['31736.3', '0.01705046', "0", "2"],
# ],
# "instId": "BTC-USDT",
# "ts": "1626537446491"
# }
# ]
# }
#
# bbo-tbt
#
# {
# "arg":{
# "channel":"bbo-tbt",
# "instId":"BTC-USDT"
# },
# "data":[
# {
# "asks":[["36232.2","1.8826134","0","17"]],
# "bids":[["36232.1","0.00572212","0","2"]],
# "ts":"1651826598363"
# }
# ]
# }
#
arg = self.safe_dict(message, 'arg', {})
channel = self.safe_string(arg, 'channel')
action = self.safe_string(message, 'action')
data = self.safe_list(message, 'data', [])
marketId = self.safe_string(arg, 'instId')
market = self.safe_market(marketId)
symbol = market['symbol']
depths: dict = {
'bbo-tbt': 1,
'books': 400,
'books5': 5,
'books-l2-tbt': 400,
'books50-l2-tbt': 50,
}
limit = self.safe_integer(depths, channel)
messageHash = channel + ':' + symbol
if action == 'snapshot':
for i in range(0, len(data)):
update = data[i]
orderbook = self.order_book({}, limit)
self.orderbooks[symbol] = orderbook
orderbook['symbol'] = symbol
self.handle_order_book_message(client, update, orderbook, messageHash)
client.resolve(orderbook, messageHash)
elif action == 'update':
if symbol in self.orderbooks:
orderbook = self.orderbooks[symbol]
for i in range(0, len(data)):
update = data[i]
self.handle_order_book_message(client, update, orderbook, messageHash, market)
client.resolve(orderbook, messageHash)
elif (channel == 'books5') or (channel == 'bbo-tbt'):
if not (symbol in self.orderbooks):
self.orderbooks[symbol] = self.order_book({}, limit)
orderbook = self.orderbooks[symbol]
for i in range(0, len(data)):
update = data[i]
timestamp = self.safe_integer(update, 'ts')
snapshot = self.parse_order_book(update, symbol, timestamp, 'bids', 'asks', 0, 1)
orderbook.reset(snapshot)
client.resolve(orderbook, messageHash)
return message
async def authenticate(self, params={}):
self.check_required_credentials()
access = self.safe_string(params, 'access', 'private')
params = self.omit(params, ['access'])
url = self.get_url('users', access)
messageHash = 'authenticated'
client = self.client(url)
future = client.reusableFuture(messageHash)
authenticated = self.safe_value(client.subscriptions, messageHash)
if authenticated is None:
timestamp = str(self.seconds())
method = 'GET'
path = '/users/self/verify'
auth = timestamp + method + path
signature = self.hmac(self.encode(auth), self.encode(self.secret), hashlib.sha256, 'base64')
operation = 'login'
request: dict = {
'op': operation,
'args': [
{
'apiKey': self.apiKey,
'passphrase': self.password,
'timestamp': timestamp,
'sign': signature,
},
],
}
# Only add params['access'] to prevent sending custom parameters, such.
if 'access' in params:
request['access'] = params['access']
self.watch(url, messageHash, request, messageHash)
return await future
async def watch_balance(self, params={}) -> Balances:
"""
watch balance and get the amount of funds available for trading or funds locked in orders
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: a `balance structure <https://docs.ccxt.com/#/?id=balance-structure>`
"""
await self.load_markets()
await self.authenticate()
return await self.subscribe('private', 'account', 'account', None, params)
def handle_balance_and_position(self, client: Client, message):
self.handle_my_liquidation(client, message)
def handle_balance(self, client: Client, message):
#
# {
# "arg": {channel: "account"},
# "data": [
# {
# "adjEq": '',
# "details": [
# {
# "availBal": '',
# "availEq": "8.21009913",
# "cashBal": "8.21009913",
# "ccy": "USDT",
# "coinUsdPrice": "0.99994",
# "crossLiab": '',
# "disEq": "8.2096065240522",
# "eq": "8.21009913",
# "eqUsd": "8.2096065240522",
# "frozenBal": "0",
# "interest": '',
# "isoEq": "0",
# "isoLiab": '',
# "liab": '',
# "maxLoan": '',
# "mgnRatio": '',
# "notionalLever": "0",
# "ordFrozen": "0",
# "twap": "0",
# "uTime": "1621927314996",
# "upl": "0"
# },
# ],
# "imr": '',
# "isoEq": "0",
# "mgnRatio": '',
# "mmr": '',
# "notionalUsd": '',
# "ordFroz": '',
# "totalEq": "22.1930992296832",
# "uTime": "1626692120916"
# }
# ]
# }
#
arg = self.safe_value(message, 'arg', {})
channel = self.safe_string(arg, 'channel')
type = 'spot'
balance = self.parseTradingBalance(message)
oldBalance = self.safe_value(self.balance, type, {})
newBalance = self.deep_extend(oldBalance, balance)
self.balance[type] = self.safe_balance(newBalance)
client.resolve(self.balance[type], channel)
def order_to_trade(self, order, market=None):
info = self.safe_value(order, 'info', {})
timestamp = self.safe_integer(info, 'fillTime')
feeMarketId = self.safe_string(info, 'fillFeeCcy')
isTaker = self.safe_string(info, 'execType', '') == 'T'
return self.safe_trade({
'info': info,
'timestamp': timestamp,
'datetime': self.iso8601(timestamp),
'symbol': self.safe_string(order, 'symbol'),
'id': self.safe_string(info, 'tradeId'),
'order': self.safe_string(order, 'id'),
'type': self.safe_string(order, 'type'),
'takerOrMaker': 'taker' if (isTaker) else 'maker',
'side': self.safe_string(order, 'side'),
'price': self.safe_number(info, 'fillPx'),
'amount': self.safe_number(info, 'fillSz'),
'cost': self.safe_number(order, 'cost'),
'fee': {
'cost': self.safe_number(info, 'fillFee'),
'currency': self.safe_currency_code(feeMarketId),
},
}, market)
async def watch_my_trades(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
"""
watches information on multiple trades made by the user
https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-order-channel
:param str [symbol]: unified market symbol of the market trades were made in
:param int [since]: the earliest time in ms to fetch trades for
:param int [limit]: the maximum number of trade structures to retrieve
:param dict [params]: extra parameters specific to the exchange API endpoint
:param bool [params.trigger]: True if fetching trigger or conditional trades
:param str [params.type]: 'spot', 'swap', 'future', 'option', 'ANY', 'SPOT', 'MARGIN', 'SWAP', 'FUTURES' or 'OPTION'
:param str [params.marginMode]: 'cross' or 'isolated', for automatically setting the type to spot margin
:returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=trade-structure>`
"""
# By default, receive order updates from any instrument type
type = None
type, params = self.handle_option_and_params(params, 'watchMyTrades', 'type', 'ANY')
isTrigger = self.safe_bool_2(params, 'trigger', 'stop', False)
params = self.omit(params, ['trigger', 'stop'])
await self.load_markets()
await self.authenticate({'access': 'business' if isTrigger else 'private'})
channel = 'orders-algo' if isTrigger else 'orders'
messageHash = channel + '::myTrades'
market = None
if symbol is not None:
market = self.market(symbol)
symbol = market['symbol']
type = market['type']
messageHash = messageHash + '::' + symbol
if type == 'future':
type = 'futures'
uppercaseType = type.upper()
marginMode = None
marginMode, params = self.handle_margin_mode_and_params('watchMyTrades', params)
if uppercaseType == 'SPOT':
if marginMode is not None:
uppercaseType = 'MARGIN'
request: dict = {
'instType': uppercaseType,
}
orders = await self.subscribe('private', messageHash, channel, None, self.extend(request, params))
if self.newUpdates:
limit = orders.getLimit(symbol, limit)
return self.filter_by_symbol_since_limit(orders, symbol, since, limit, True)
async def watch_positions(self, symbols: Strings = None, since: Int = None, limit: Int = None, params={}) -> List[Position]:
"""
https://www.okx.com/docs-v5/en/#trading-account-websocket-positions-channel
watch all open positions
:param str[]|None symbols: list of unified market symbols
@param since
@param limit
:param dict params: extra parameters specific to the exchange API endpoint
:returns dict[]: a list of `position structure <https://docs.ccxt.com/en/latest/manual.html#position-structure>`
"""
await self.load_markets()
await self.authenticate(params)
symbols = self.market_symbols(symbols)
request: dict = {
'instType': 'ANY',
}
channel = 'positions'
newPositions = None
if symbols is None:
arg: dict = {
'channel': 'positions',
'instType': 'ANY',
}
args = [self.extend(arg, params)]
nonSymbolRequest: dict = {
'op': 'subscribe',
'args': args,
}
url = self.get_url(channel, 'private')
newPositions = await self.watch(url, channel, nonSymbolRequest, channel)
else:
newPositions = await self.subscribe_multiple('private', channel, symbols, self.extend(request, params))
if self.newUpdates:
return newPositions
return self.filter_by_symbols_since_limit(self.positions, symbols, since, limit, True)
def handle_positions(self, client, message):
#
# {
# arg: {
# channel: 'positions',
# instType: 'ANY',
# instId: 'XRP-USDT-SWAP',
# uid: '464737184507959869'
# },
# data: [{
# adl: '1',
# availPos: '',
# avgPx: '0.52668',
# baseBal: '',
# baseBorrowed: '',
# baseInterest: '',
# bizRefId: '',
# bizRefType: '',
# cTime: '1693151444408',
# ccy: 'USDT',
# closeOrderAlgo: [],
# deltaBS: '',
# deltaPA: '',
# gammaBS: '',
# gammaPA: '',
# idxPx: '0.52683',
# imr: '17.564000000000004',
# instId: 'XRP-USDT-SWAP',
# instType: 'SWAP',
# interest: '',
# last: '0.52691',
# lever: '3',
# liab: '',
# liabCcy: '',
# liqPx: '0.3287514731020614',
# margin: '',
# markPx: '0.52692',
# mgnMode: 'cross',
# mgnRatio: '69.00363001456147',
# mmr: '0.26346',
# notionalUsd: '52.68620388000001',
# optVal: '',
# pTime: '1693151906023',
# pendingCloseOrdLiabVal: '',
# pos: '1',
# posCcy: '',
# posId: '616057041198907393',
# posSide: 'net',
# quoteBal: '',
# quoteBorrowed: '',
# quoteInterest: '',
# spotInUseAmt: '',
# spotInUseCcy: '',
# thetaBS: '',
# thetaPA: '',
# tradeId: '138745402',
# uTime: '1693151444408',
# upl: '0.0240000000000018',
# uplLastPx: '0.0229999999999952',
# uplRatio: '0.0013670539986328',
# uplRatioLastPx: '0.001310093415356',
# usdPx: '',
# vegaBS: '',
# vegaPA: ''
# }]
# }
#
arg = self.safe_value(message, 'arg', {})
marketId = self.safe_string(arg, 'instId')
market = self.safe_market(marketId, None, '-')
symbol = market['symbol']
channel = self.safe_string(arg, 'channel', '')
data = self.safe_value(message, 'data', [])
if self.positions is None:
self.positions = ArrayCacheBySymbolBySide()
cache = self.positions
newPositions = []
for i in range(0, len(data)):
rawPosition = data[i]
position = self.parse_position(rawPosition)
if position['contracts'] == 0:
position['side'] = 'long'
shortPosition = self.clone(position)
shortPosition['side'] = 'short'
cache.append(shortPosition)
newPositions.append(shortPosition)
newPositions.append(position)
cache.append(position)
messageHash = channel
if symbol is not None:
messageHash = channel + '::' + symbol
client.resolve(newPositions, messageHash)
async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]:
"""
watches information on multiple orders made by the user
https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-order-channel
:param str [symbol]: unified market symbol of the market the orders were made in
:param int [since]: the earliest time in ms to fetch orders for
:param int [limit]: the maximum number of order structures to retrieve
:param dict [params]: extra parameters specific to the exchange API endpoint
:param bool [params.trigger]: True if fetching trigger or conditional orders
:param str [params.type]: 'spot', 'swap', 'future', 'option', 'ANY', 'SPOT', 'MARGIN', 'SWAP', 'FUTURES' or 'OPTION'
:param str [params.marginMode]: 'cross' or 'isolated', for automatically setting the type to spot margin
:returns dict[]: a list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
"""
type = None
# By default, receive order updates from any instrument type
type, params = self.handle_option_and_params(params, 'watchOrders', 'type', 'ANY')
isTrigger = self.safe_value_2(params, 'stop', 'trigger', False)
params = self.omit(params, ['stop', 'trigger'])
await self.load_markets()
await self.authenticate({'access': 'business' if isTrigger else 'private'})
market = None
if symbol is not None:
market = self.market(symbol)
symbol = market['symbol']
type = market['type']
if type == 'future':
type = 'futures'
uppercaseType = type.upper()
marginMode = None
marginMode, params = self.handle_margin_mode_and_params('watchOrders', params)
if uppercaseType == 'SPOT':
if marginMode is not None:
uppercaseType = 'MARGIN'
request: dict = {
'instType': uppercaseType,
}
channel = 'orders-algo' if isTrigger else 'orders'
orders = await self.subscribe('private', channel, channel, symbol, self.extend(request, params))
if self.newUpdates:
limit = orders.getLimit(symbol, limit)
return self.filter_by_symbol_since_limit(orders, symbol, since, limit, True)
def handle_orders(self, client: Client, message, subscription=None):
#
# {
# "arg":{
# "channel":"orders",
# "instType":"SPOT"
# },
# "data":[
# {
# "accFillSz":"0",
# "amendResult":"",
# "avgPx":"",
# "cTime":"1634548275191",
# "category":"normal",
# "ccy":"",
# "clOrdId":"e847386590ce4dBC330547db94a08ba0",
# "code":"0",
# "execType":"",
# "fee":"0",
# "feeCcy":"USDT",
# "fillFee":"0",
# "fillFeeCcy":"",
# "fillNotionalUsd":"",
# "fillPx":"",
# "fillSz":"0",
# "fillTime":"",
# "instId":"ETH-USDT",
# "instType":"SPOT",
# "lever":"",
# "msg":"",
# "notionalUsd":"451.4516256",
# "ordId":"370257534141235201",
# "ordType":"limit",
# "pnl":"0",
# "posSide":"",
# "px":"60000",
# "rebate":"0",
# "rebateCcy":"ETH",
# "reqId":"",
# "side":"sell",
# "slOrdPx":"",
# "slTriggerPx":"",
# "state":"live",
# "sz":"0.007526",
# "tag":"",
# "tdMode":"cash",
# "tgtCcy":"",
# "tpOrdPx":"",
# "tpTriggerPx":"",
# "tradeId":"",
# "uTime":"1634548275191"
# }
# ]
# }
#
self.handle_my_trades(client, message)
arg = self.safe_value(message, 'arg', {})
channel = self.safe_string(arg, 'channel')
orders = self.safe_value(message, 'data', [])
ordersLength = len(orders)
if ordersLength > 0:
limit = self.safe_integer(self.options, 'ordersLimit', 1000)
if self.orders is None:
self.orders = ArrayCacheBySymbolById(limit)
self.triggerOrders = ArrayCacheBySymbolById(limit)
stored = self.triggerOrders if (channel == 'orders-algo') else self.orders
marketIds = []
parsed = self.parse_orders(orders)
for i in range(0, len(parsed)):
order = parsed[i]
stored.append(order)
symbol = order['symbol']
market = self.market(symbol)
marketIds.append(market['id'])
client.resolve(stored, channel)
for i in range(0, len(marketIds)):
messageHash = channel + ':' + marketIds[i]
client.resolve(stored, messageHash)
def handle_my_trades(self, client: Client, message):
#
# {
# "arg":{
# "channel":"orders",
# "instType":"SPOT"
# },
# "data":[
# {
# "accFillSz":"0",
# "amendResult":"",
# "avgPx":"",
# "cTime":"1634548275191",
# "category":"normal",
# "ccy":"",
# "clOrdId":"e847386590ce4dBC330547db94a08ba0",
# "code":"0",
# "execType":"",
# "fee":"0",
# "feeCcy":"USDT",
# "fillFee":"0",
# "fillFeeCcy":"",
# "fillNotionalUsd":"",
# "fillPx":"",
# "fillSz":"0",
# "fillTime":"",
# "instId":"ETH-USDT",
# "instType":"SPOT",
# "lever":"",
# "msg":"",
# "notionalUsd":"451.4516256",
# "ordId":"370257534141235201",
# "ordType":"limit",
# "pnl":"0",
# "posSide":"",
# "px":"60000",
# "rebate":"0",
# "rebateCcy":"ETH",
# "reqId":"",
# "side":"sell",
# "slOrdPx":"",
# "slTriggerPx":"",
# "state":"live",
# "sz":"0.007526",
# "tag":"",
# "tdMode":"cash",
# "tgtCcy":"",
# "tpOrdPx":"",
# "tpTriggerPx":"",
# "tradeId":"",
# "uTime":"1634548275191"
# }
# ]
# }
#
arg = self.safe_value(message, 'arg', {})
channel = self.safe_string(arg, 'channel')
rawOrders = self.safe_value(message, 'data', [])
filteredOrders = []
# filter orders with no last trade id
for i in range(0, len(rawOrders)):
rawOrder = rawOrders[i]
tradeId = self.safe_string(rawOrder, 'tradeId', '')
if len(tradeId) > 0:
order = self.parse_order(rawOrder)
filteredOrders.append(order)
tradesLength = len(filteredOrders)
if tradesLength == 0:
return
if self.myTrades is None:
limit = self.safe_integer(self.options, 'tradesLimit', 1000)
self.myTrades = ArrayCacheBySymbolById(limit)
myTrades = self.myTrades
symbols: dict = {}
for i in range(0, len(filteredOrders)):
rawTrade = filteredOrders[i]
trade = self.order_to_trade(rawTrade)
myTrades.append(trade)
symbol = trade['symbol']
symbols[symbol] = True
messageHash = channel + '::myTrades'
client.resolve(self.myTrades, messageHash)
tradeSymbols = list(symbols.keys())
for i in range(0, len(tradeSymbols)):
symbolMessageHash = messageHash + '::' + tradeSymbols[i]
client.resolve(self.myTrades, symbolMessageHash)
def request_id(self):
ts = str(self.milliseconds())
randomNumber = self.rand_number(4)
randomPart = str(randomNumber)
return ts + randomPart
async def create_order_ws(self, symbol: str, type: OrderType, side: OrderSide, amount: float, price: Num = None, params={}) -> Order:
"""
https://www.okx.com/docs-v5/en/#websocket-api-trade-place-order
create a trade order
:param str symbol: unified symbol of the market to create an order in
:param str type: 'market' or 'limit'
:param str side: 'buy' or 'sell'
:param float amount: how much of currency you want to trade in units of base currency
:param float|None [price]: the price at which the order is to be fulfilled, in units of the quote currency, ignored in market orders
:param dict [params]: extra parameters specific to the exchange API endpoint
:param boolean params['test']: test order, default False
:returns dict: an `order structure <https://docs.ccxt.com/#/?id=order-structure>`
"""
await self.load_markets()
await self.authenticate()
url = self.get_url('private', 'private')
messageHash = self.request_id()
op = None
op, params = self.handle_option_and_params(params, 'createOrderWs', 'op', 'batch-orders')
args = self.create_order_request(symbol, type, side, amount, price, params)
ordType = self.safe_string(args, 'ordType')
if (ordType == 'trigger') or (ordType == 'conditional') or (type == 'oco') or (type == 'move_order_stop') or (type == 'iceberg') or (type == 'twap'):
raise BadRequest(self.id + ' createOrderWs() does not support algo trading. self.options["createOrderWs"]["op"] must be either order or batch-order')
if (op != 'order') and (op != 'batch-orders'):
raise BadRequest(self.id + ' createOrderWs() does not support algo trading. self.options["createOrderWs"]["op"] must be either order or privatePostTradeOrder or privatePostTradeOrderAlgo')
request: dict = {
'id': messageHash,
'op': op,
'args': [args],
}
return await self.watch(url, messageHash, request, messageHash)
def handle_place_orders(self, client: Client, message):
#
# batch-orders/order/cancel-order
# {
# "id": "1689281055",
# "op": "batch-orders",
# "code": "0",
# "msg": '',
# "data": [{
# "tag": "e847386590ce4dBC",
# "ordId": "599823446566084608",
# "clOrdId": "e847386590ce4dBCb939511604f394b0",
# "sCode": "0",
# "sMsg": "Order successfully placed."
# },
# ...
# ]
# }
#
messageHash = self.safe_string(message, 'id')
args = self.safe_value(message, 'data', [])
# filter out partial errors
args = self.filter_by(args, 'sCode', '0')
# if empty means request failed and handle error
if self.is_empty(args):
method = self.safe_string(message, 'op')
stringMsg = self.json(message)
self.handle_errors(1, '', client.url, method, {}, stringMsg, message, {}, {})
orders = self.parse_orders(args, None, None, None)
first = self.safe_dict(orders, 0, {})
client.resolve(first, messageHash)
async def edit_order_ws(self, id: str, symbol: str, type: OrderType, side: OrderSide, amount: Num = None, price: Num = None, params={}) -> Order:
"""
edit a trade order
https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-amend-order
https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-amend-multiple-orders
:param str id: order id
:param str symbol: unified symbol of the market to create an order in
:param str type: 'market' or 'limit'
:param str side: 'buy' or 'sell'
:param float amount: how much of the currency you want to trade in units of the base currency
:param float|None [price]: the price at which the order is to be fulfilled, in units of the quote currency, ignored in market orders
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: an `order structure <https://docs.ccxt.com/#/?id=order-structure>`
"""
await self.load_markets()
await self.authenticate()
url = self.get_url('private', 'private')
messageHash = self.request_id()
op = None
op, params = self.handle_option_and_params(params, 'editOrderWs', 'op', 'amend-order')
args = self.edit_order_request(id, symbol, type, side, amount, price, params)
request: dict = {
'id': messageHash,
'op': op,
'args': [args],
}
return await self.watch(url, messageHash, self.extend(request, params), messageHash)
async def cancel_order_ws(self, id: str, symbol: Str = None, params={}) -> Order:
"""
https://okx-docs.github.io/apidocs/websocket_api/en/#cancel-order-trade
cancel multiple orders
:param str id: order id
:param str symbol: unified market symbol, default is None
:param dict [params]: extra parameters specific to the exchange API endpoint
:param str [params.clOrdId]: client order id
:returns dict: an list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
"""
if symbol is None:
raise BadRequest(self.id + ' cancelOrderWs() requires a symbol argument')
await self.load_markets()
await self.authenticate()
url = self.get_url('private', 'private')
messageHash = self.request_id()
clientOrderId = self.safe_string_2(params, 'clOrdId', 'clientOrderId')
params = self.omit(params, ['clientOrderId', 'clOrdId'])
arg: dict = {
'instId': self.market_id(symbol),
}
if clientOrderId is not None:
arg['clOrdId'] = clientOrderId
else:
arg['ordId'] = id
request: dict = {
'id': messageHash,
'op': 'cancel-order',
'args': [self.extend(arg, params)],
}
return await self.watch(url, messageHash, request, messageHash)
async def cancel_orders_ws(self, ids: List[str], symbol: Str = None, params={}):
"""
https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-mass-cancel-order
cancel multiple orders
:param str[] ids: order ids
:param str symbol: unified market symbol, default is None
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict: an list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
"""
idsLength: number = len(ids)
if idsLength > 20:
raise BadRequest(self.id + ' cancelOrdersWs() accepts up to 20 ids at a time')
if symbol is None:
raise BadRequest(self.id + ' cancelOrdersWs() requires a symbol argument')
await self.load_markets()
await self.authenticate()
url = self.get_url('private', 'private')
messageHash = self.request_id()
args = []
for i in range(0, idsLength):
arg: dict = {
'instId': self.market_id(symbol),
'ordId': ids[i],
}
args.append(arg)
request: dict = {
'id': messageHash,
'op': 'batch-cancel-orders',
'args': args,
}
return await self.watch(url, messageHash, self.deep_extend(request, params), messageHash)
async def cancel_all_orders_ws(self, symbol: Str = None, params={}) -> List[Order]:
"""
https://docs.okx.com/websockets/#message-cancelAll
cancel all open orders of a type. Only applicable to Option in Portfolio Margin mode, and MMP privilege is required.
:param str symbol: unified market symbol, only orders in the market of self symbol are cancelled when symbol is not None
:param dict [params]: extra parameters specific to the exchange API endpoint
:returns dict[]: a list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
"""
if symbol is None:
raise BadRequest(self.id + ' cancelAllOrdersWs() requires a symbol argument')
await self.load_markets()
await self.authenticate()
market = self.market(symbol)
if market['type'] != 'option':
raise BadRequest(self.id + ' cancelAllOrdersWs is only applicable to Option in Portfolio Margin mode, and MMP privilege is required.')
url = self.get_url('private', 'private')
messageHash = self.request_id()
request: dict = {
'id': messageHash,
'op': 'mass-cancel',
'args': [self.extend({
'instType': 'OPTION',
'instFamily': market['id'],
}, params)],
}
return await self.watch(url, messageHash, request, messageHash)
def handle_cancel_all_orders(self, client: Client, message):
#
# {
# "id": "1512",
# "op": "mass-cancel",
# "data": [
# {
# "result": True
# }
# ],
# "code": "0",
# "msg": ""
# }
#
messageHash = self.safe_string(message, 'id')
data = self.safe_value(message, 'data', [])
client.resolve(data, messageHash)
def handle_subscription_status(self, client: Client, message):
#
# {event: 'subscribe', arg: {channel: "tickers", instId: "BTC-USDT"}}
#
# channel = self.safe_string(message, "channel")
# client.subscriptions[channel] = message
return message
def handle_authenticate(self, client: Client, message):
#
# {event: "login", success: True}
#
future = self.safe_value(client.futures, 'authenticated')
future.resolve(True)
def ping(self, client: Client):
# OKX does not support the built-in WebSocket protocol-level ping-pong.
# Instead, it requires a custom text-based ping-pong mechanism.
return 'ping'
def handle_pong(self, client: Client, message):
client.lastPong = self.milliseconds()
return message
def handle_error_message(self, client: Client, message) -> Bool:
#
# {event: 'error', msg: "Illegal request: {"op":"subscribe","args":["spot/ticker:BTC-USDT"]}", code: "60012"}
# {event: 'error", msg: "channel:ticker,instId:BTC-USDT doesn"t exist", code: "60018"}
# {"event":"error","msg":"Illegal request: {\\"id\\":\\"17321173472466905\\",\\"op\\":\\"amend-order\\",\\"args\\":[{\\"instId\\":\\"ETH-USDC\\",\\"ordId\\":\\"2000345622407479296\\",\\"newSz\\":\\"0.050857\\",\\"newPx\\":\\"2949.4\\",\\"postOnly\\":true}],\\"postOnly\\":true}","code":"60012","connId":"0808af6c"}
#
errorCode = self.safe_string(message, 'code')
try:
if errorCode and errorCode != '0':
feedback = self.id + ' ' + self.json(message)
if errorCode != '1':
self.throw_exactly_matched_exception(self.exceptions['exact'], errorCode, feedback)
messageString = self.safe_value(message, 'msg')
if messageString is not None:
self.throw_broadly_matched_exception(self.exceptions['broad'], messageString, feedback)
else:
data = self.safe_list(message, 'data', [])
for i in range(0, len(data)):
d = data[i]
errorCode = self.safe_string(d, 'sCode')
if errorCode is not None:
self.throw_exactly_matched_exception(self.exceptions['exact'], errorCode, feedback)
messageString = self.safe_value(message, 'sMsg')
if messageString is not None:
self.throw_broadly_matched_exception(self.exceptions['broad'], messageString, feedback)
raise ExchangeError(feedback)
except Exception as e:
# if the message contains an id, it means it is a response to a request
# so we only reject that promise, instead of deleting all futures, destroying the authentication future
id = self.safe_string(message, 'id')
if id is None:
# try to parse it from the stringified json inside msg
msg = self.safe_string(message, 'msg')
if msg is not None and msg.startswith('Illegal request: {'):
stringifiedJson = msg.replace('Illegal request: ', '')
parsedJson = self.parse_json(stringifiedJson)
id = self.safe_string(parsedJson, 'id')
if id is not None:
client.reject(e, id)
return False
client.reject(e)
return False
return True
def handle_message(self, client: Client, message):
if not self.handle_error_message(client, message):
return
#
# {event: 'subscribe', arg: {channel: "tickers", instId: "BTC-USDT"}}
# {event: 'login", msg: '", code: "0"}
#
# {
# "arg": {channel: "tickers", instId: "BTC-USDT"},
# "data": [
# {
# "instType": "SPOT",
# "instId": "BTC-USDT",
# "last": "31500.1",
# "lastSz": "0.00001754",
# "askPx": "31500.1",
# "askSz": "0.00998144",
# "bidPx": "31500",
# "bidSz": "3.05652439",
# "open24h": "31697",
# "high24h": "32248",
# "low24h": "31165.6",
# "sodUtc0": "31385.5",
# "sodUtc8": "32134.9",
# "volCcy24h": "503403597.38138519",
# "vol24h": "15937.10781721",
# "ts": "1626526618762"
# }
# ]
# }
#
# {event: 'error', msg: "Illegal request: {"op":"subscribe","args":["spot/ticker:BTC-USDT"]}", code: "60012"}
# {event: 'error", msg: "channel:ticker,instId:BTC-USDT doesn"t exist", code: "60018"}
# {event: 'error', msg: "Invalid OK_ACCESS_KEY", code: "60005"}
# {
# "event": "error",
# "msg": "Illegal request: {"op":"login","args":["de89b035-b233-44b2-9a13-0ccdd00bda0e","7KUcc8YzQhnxBE3K","1626691289","H57N99mBt5NvW8U19FITrPdOxycAERFMaapQWRqLaSE="]}",
# "code": "60012"
# }
#
#
#
if message == 'pong':
self.handle_pong(client, message)
return
# table = self.safe_string(message, 'table')
# if table is None:
event = self.safe_string_2(message, 'event', 'op')
if event is not None:
methods: dict = {
# 'info': self.handleSystemStatus,
# 'book': 'handleOrderBook',
'login': self.handle_authenticate,
'subscribe': self.handle_subscription_status,
'unsubscribe': self.handle_unsubscription,
'order': self.handle_place_orders,
'batch-orders': self.handle_place_orders,
'amend-order': self.handle_place_orders,
'batch-amend-orders': self.handle_place_orders,
'cancel-order': self.handle_place_orders,
'mass-cancel': self.handle_cancel_all_orders,
}
method = self.safe_value(methods, event)
if method is not None:
method(client, message)
else:
arg = self.safe_value(message, 'arg', {})
channel = self.safe_string(arg, 'channel')
methods: dict = {
'bbo-tbt': self.handle_order_book, # newly added channel that sends tick-by-tick Level 1 data, all API users can subscribe, public depth channel, verification not required
'books': self.handle_order_book, # all API users can subscribe, public depth channel, verification not required
'books5': self.handle_order_book, # all API users can subscribe, public depth channel, verification not required, data feeds will be delivered every 100ms(vs. every 200ms now)
'books50-l2-tbt': self.handle_order_book, # only users who're VIP4 and above can subscribe, identity verification required before subscription
'books-l2-tbt': self.handle_order_book, # only users who're VIP5 and above can subscribe, identity verification required before subscription
'tickers': self.handle_ticker,
'mark-price': self.handle_ticker,
'positions': self.handle_positions,
'index-tickers': self.handle_ticker,
'sprd-tickers': self.handle_ticker,
'block-tickers': self.handle_ticker,
'trades': self.handle_trades,
'trades-all': self.handle_trades,
'account': self.handle_balance,
'funding-rate': self.handle_funding_rate,
# 'margin_account': self.handle_balance,
'orders': self.handle_orders,
'orders-algo': self.handle_orders,
'liquidation-orders': self.handle_liquidation,
'balance_and_position': self.handle_balance_and_position,
}
method = self.safe_value(methods, channel)
if method is None:
if channel.find('candle') == 0:
self.handle_ohlcv(client, message)
else:
method(client, message)
def handle_un_subscription_trades(self, client: Client, symbol: str, channel: str):
subMessageHash = channel + ':' + symbol
messageHash = 'unsubscribe:' + subMessageHash
self.clean_unsubscription(client, subMessageHash, messageHash)
if symbol in self.trades:
del self.trades[symbol]
def handle_unsubscription_order_book(self, client: Client, symbol: str, channel: str):
subMessageHash = channel + ':' + symbol
messageHash = 'unsubscribe:orderbook:' + symbol
self.clean_unsubscription(client, subMessageHash, messageHash)
if symbol in self.orderbooks:
del self.orderbooks[symbol]
def handle_unsubscription_ohlcv(self, client: Client, symbol: str, channel: str):
tf = channel.replace('candle', '')
timeframe = self.find_timeframe(tf)
subMessageHash = 'multi:' + channel + ':' + symbol
messageHash = 'unsubscribe:' + subMessageHash
self.clean_unsubscription(client, subMessageHash, messageHash)
if timeframe in self.ohlcvs[symbol]:
del self.ohlcvs[symbol][timeframe]
def handle_unsubscription_ticker(self, client: Client, symbol: str, channel):
subMessageHash = channel + '::' + symbol
messageHash = 'unsubscribe:ticker:' + symbol
self.clean_unsubscription(client, subMessageHash, messageHash)
if symbol in self.tickers:
del self.tickers[symbol]
def handle_unsubscription(self, client: Client, message):
#
# {
# "event": "unsubscribe",
# "arg": {
# "channel": "tickers",
# "instId": "LTC-USD-200327"
# },
# "connId": "a4d3ae55"
# }
# arg might be an array or list
arg = self.safe_dict(message, 'arg', {})
channel = self.safe_string(arg, 'channel', '')
marketId = self.safe_string(arg, 'instId')
symbol = self.safe_symbol(marketId)
if channel == 'trades' or channel == 'trades-all':
self.handle_un_subscription_trades(client, symbol, channel)
elif channel.startswith('bbo') or channel.startswith('book'):
self.handle_unsubscription_order_book(client, symbol, channel)
elif channel.find('tickers') > -1:
self.handle_unsubscription_ticker(client, symbol, channel)
elif channel.startswith('candle'):
self.handle_unsubscription_ohlcv(client, symbol, channel)