import asyncio import ccxt.pro as ccxt from typing import List from ccxt.base.types import Any, Balances, Bool, Int, Liquidation, Num, Order, OrderBook, OrderSide, OrderType, Position, Str, Strings, Ticker, Tickers, Trade CONFIG: dict = { 'apiKey': 'rFdbMs40biKImtwGnQ', 'secret': 'JOuEiGT1uCX9l2qbn9uZuZrVFraAZAA59mqY', # 'verbose': True, # 启用详细日志 'enableRateLimit': True, 'options': { 'loadAllOptions': True, #要添加这行,让市场数据完全加载,否则期权拿不到完整的市场数据 }, } class Test: def __init__(self): self.exchange = ccxt.bybit(CONFIG) self.exchange.enable_demo_trading(True) # 模拟交易 # await self.exchange.set_position_mode(True) # 设置双向持仓 self.symbol = 'BTC/USDT:USDT' # BTC永续合约 self.book = List[Order] def get_position_idx(self,side:str, reduce_only:bool=False) -> int: position_idx = 0 if side == 'buy': position_idx = 1 if reduce_only: position_idx = 2 # 双向持仓平空 else: if reduce_only: position_idx = 1 # 双向持仓平多 else: position_idx = 2 return position_idx async def open_limit(self,price:float=0,side:str='buy',reduce_only:bool=False): # 创建限价单 # side = 'buy' # reduce_only = False position_idx = self.get_position_idx(side,reduce_only) order = await self.exchange.create_order( self.symbol , 'limit', side, 0.001, # 更合理的数量 price, # 市价单不需要价格 { 'timeInForce': 'GTC', # GTC:一直有效至取消 IOC:立即成交或取消 FOK:完全成交或取消 'positionIdx': position_idx, # 0: 单向模式 'reduceOnly': reduce_only, } ) print("开仓订单:", order) async def open_option_order(self): # await self.exchange.set_position_mode(True) # 设置双向持仓 # print(self.exchange.markets) res = await self.exchange.load_markets(True) # return # btc_keys = [key for key in res.keys() if key.startswith('BTC/USDT:USDT')] # print(btc_keys) # return self.symbol = 'BTC/USDT:USDT-251120-92500-C' # BTC/USDT:USDT-251125-92500-C # aa = res[self.symbol] # market = self.exchange.markets[self.symbol] # market = self.markets[self.symbol] # all_market = self.exchange.market() # exchange.markets['ETH/BTC'] # market = self.exchange.market(self.symbol) # print(market) try: side = 'buy' reduce_only = False position_idx = self.get_position_idx(side,reduce_only) # 创建市价单 order = await self.exchange.create_order( self.symbol, 'market', side, 0.01, # 更合理的数量 None, # 市价单不需要价格 { 'timeInForce': 'GTC', # GTC:一直有效至取消 IOC:立即成交或取消 FOK:完全成交或取消 'positionIdx': position_idx, # 0: 单向模式 'reduceOnly': reduce_only, # 'orderLinkId': '1763518435252a_option' } ) # map[ # category:option # orderLinkId:1763518435252_option # orderType:Market # positionIdx:0 # qty:0.01 # side:Sell # symbol:BTC-20NOV25-92500-C-USDT # ] # { # 'symbol': 'BTC-20NOV25-92500-C', # 'side': 'Buy', # 'orderType': 'Market', # 'orderLinkId': 'fdc3a5bee004b6a9', # 'category': 'option', # 'qty': '0.01', # 'positionIdx': 0 # } print("开仓订单:", order) except Exception as e: print(f"错误: {e}") async def open_and_limit_close(self): # 暂停5秒 await asyncio.sleep(5) # 使用正确的合约符号 # res = await self.exchange.load_markets() # aa = res[symbol] try: # if body["side"] == "Buy": # body["positionIdx"] = 1 # else: # body["positionIdx"] = 2 # if body["reduceOnly"] == True: # if body["positionIdx"] == 1: # body["positionIdx"] = 2 # 双向持仓平空 # else: # body["positionIdx"] = 1 # 双向持仓平多 side = 'buy' reduce_only = False position_idx = self.get_position_idx(side,reduce_only) if side == 'buy': position_idx = 1 if reduce_only: position_idx = 2 else: if reduce_only: position_idx = 1 else: position_idx = 2 # 创建市价单 order = await self.exchange.create_order( self.symbol, 'market', side, 0.001, # 更合理的数量 None, # 市价单不需要价格 { 'timeInForce': 'FOK', # 对于市价单,FOK可能不适用 'positionIdx': position_idx, # 0: 单向模式 'reduceOnly': reduce_only, } ) print("开仓订单:", order) except Exception as e: print(f"错误: {e}") async def close_connection(self): await self.exchange.close() async def orderbook(self): # async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook: self.book = await self.exchange.watch_order_book(self.symbol,limit=1) # print(self.book) async def res_order(self): run = True while True: # async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]: res = await self.exchange.watch_orders(self.symbol,limit=1) for item in res: # print(item) if run: print("开始挂平仓单") del item['info'] # print(item) await self.open_limit(item['average'] + 20,'sell',True) run = False print(res) async def main(): strategy = Test() try: await asyncio.gather( strategy.open_option_order(), # strategy.orderbook(), # strategy.res_order(), # strategy.open_and_limit_close(), # strategy.monitor_orders() ) # await strategy.open_and_limit_close() finally: await strategy.close_connection() if __name__ == "__main__": asyncio.run(main())