This commit is contained in:
lz_db
2025-11-22 16:08:27 +08:00
parent da459da0f3
commit 8646036ca5
5 changed files with 407 additions and 0 deletions

207
test/mytest.py Normal file
View File

@@ -0,0 +1,207 @@
import asyncio
import ccxt.pro as ccxt
from typing import List
from ccxt.base.types import Any, Balances, Bool, Int, Liquidation, Num, Order, OrderBook, OrderSide, OrderType, Position, Str, Strings, Ticker, Tickers, Trade
CONFIG: dict = {
'apiKey': 'rFdbMs40biKImtwGnQ',
'secret': 'JOuEiGT1uCX9l2qbn9uZuZrVFraAZAA59mqY',
# 'verbose': True, # 启用详细日志
'enableRateLimit': True,
'options': {
'loadAllOptions': True, #要添加这行,让市场数据完全加载,否则期权拿不到完整的市场数据
},
}
class Test:
def __init__(self):
self.exchange = ccxt.bybit(CONFIG)
self.exchange.enable_demo_trading(True) # 模拟交易
# await self.exchange.set_position_mode(True) # 设置双向持仓
self.symbol = 'BTC/USDT:USDT' # BTC永续合约
self.book = List[Order]
def get_position_idx(self,side:str, reduce_only:bool=False) -> int:
position_idx = 0
if side == 'buy':
position_idx = 1
if reduce_only:
position_idx = 2 # 双向持仓平空
else:
if reduce_only:
position_idx = 1 # 双向持仓平多
else:
position_idx = 2
return position_idx
async def open_limit(self,price:float=0,side:str='buy',reduce_only:bool=False):
# 创建限价单
# side = 'buy'
# reduce_only = False
position_idx = self.get_position_idx(side,reduce_only)
order = await self.exchange.create_order(
self.symbol ,
'limit',
side,
0.001, # 更合理的数量
price, # 市价单不需要价格
{
'timeInForce': 'GTC', # GTC:一直有效至取消 IOC:立即成交或取消 FOK:完全成交或取消
'positionIdx': position_idx, # 0: 单向模式
'reduceOnly': reduce_only,
}
)
print("开仓订单:", order)
async def open_option_order(self):
# await self.exchange.set_position_mode(True) # 设置双向持仓
# print(self.exchange.markets)
res = await self.exchange.load_markets(True)
# return
# btc_keys = [key for key in res.keys() if key.startswith('BTC/USDT:USDT')]
# print(btc_keys)
# return
self.symbol = 'BTC/USDT:USDT-251120-92500-C' # BTC/USDT:USDT-251125-92500-C
# aa = res[self.symbol]
# market = self.exchange.markets[self.symbol]
# market = self.markets[self.symbol]
# all_market = self.exchange.market()
# exchange.markets['ETH/BTC']
# market = self.exchange.market(self.symbol)
# print(market)
try:
side = 'buy'
reduce_only = False
position_idx = self.get_position_idx(side,reduce_only)
# 创建市价单
order = await self.exchange.create_order(
self.symbol,
'market',
side,
0.01, # 更合理的数量
None, # 市价单不需要价格
{
'timeInForce': 'GTC', # GTC:一直有效至取消 IOC:立即成交或取消 FOK:完全成交或取消
'positionIdx': position_idx, # 0: 单向模式
'reduceOnly': reduce_only,
# 'orderLinkId': '1763518435252a_option'
}
)
# map[
# category:option
# orderLinkId:1763518435252_option
# orderType:Market
# positionIdx:0
# qty:0.01
# side:Sell
# symbol:BTC-20NOV25-92500-C-USDT
# ]
# {
# 'symbol': 'BTC-20NOV25-92500-C',
# 'side': 'Buy',
# 'orderType': 'Market',
# 'orderLinkId': 'fdc3a5bee004b6a9',
# 'category': 'option',
# 'qty': '0.01',
# 'positionIdx': 0
# }
print("开仓订单:", order)
except Exception as e:
print(f"错误: {e}")
async def open_and_limit_close(self):
# 暂停5秒
await asyncio.sleep(5)
# 使用正确的合约符号
# res = await self.exchange.load_markets()
# aa = res[symbol]
try:
# if body["side"] == "Buy":
# body["positionIdx"] = 1
# else:
# body["positionIdx"] = 2
# if body["reduceOnly"] == True:
# if body["positionIdx"] == 1:
# body["positionIdx"] = 2 # 双向持仓平空
# else:
# body["positionIdx"] = 1 # 双向持仓平多
side = 'buy'
reduce_only = False
position_idx = self.get_position_idx(side,reduce_only)
if side == 'buy':
position_idx = 1
if reduce_only:
position_idx = 2
else:
if reduce_only:
position_idx = 1
else:
position_idx = 2
# 创建市价单
order = await self.exchange.create_order(
self.symbol,
'market',
side,
0.001, # 更合理的数量
None, # 市价单不需要价格
{
'timeInForce': 'FOK', # 对于市价单FOK可能不适用
'positionIdx': position_idx, # 0: 单向模式
'reduceOnly': reduce_only,
}
)
print("开仓订单:", order)
except Exception as e:
print(f"错误: {e}")
async def close_connection(self):
await self.exchange.close()
async def orderbook(self):
# async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook:
self.book = await self.exchange.watch_order_book(self.symbol,limit=1)
# print(self.book)
async def res_order(self):
run = True
while True:
# async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]:
res = await self.exchange.watch_orders(self.symbol,limit=1)
for item in res:
# print(item)
if run:
print("开始挂平仓单")
del item['info']
# print(item)
await self.open_limit(item['average'] + 20,'sell',True)
run = False
print(res)
async def main():
strategy = Test()
try:
await asyncio.gather(
strategy.open_option_order(),
# strategy.orderbook(),
# strategy.res_order(),
# strategy.open_and_limit_close(),
# strategy.monitor_orders()
)
# await strategy.open_and_limit_close()
finally:
await strategy.close_connection()
if __name__ == "__main__":
asyncio.run(main())

77
test/mytest2.py Normal file
View File

@@ -0,0 +1,77 @@
import ccxt
# 使用您的配置
CONFIG = {
'apiKey': 'rFdbMs40biKImtwGnQ',
'secret': 'JOuEiGT1uCX9l2qbn9uZuZrVFraAZAA59mqY',
'verbose': False,
'enableRateLimit': True,
'enableDemoTrading': True,
# 'defaultType': 'swap', # 'swap', 'future', 'option', 'spot'
# 'options': {
# 'defaultType': 'swap', # 使用合约交易
# 'fetchMarkets': {
# 'types': ['linear'],
# },
# 'defaultSubType': 'linear', # 'linear', 'inverse'
# },
}
def place_market_order(symbol, side, amount):
"""
在Bybit下市价单
Args:
symbol: 交易对,例如 'BTC/USDT'
side: 方向 'buy''sell'
amount: 数量
"""
try:
# 创建交易所实例
exchange = ccxt.bybit({
'apiKey': CONFIG['apiKey'],
'secret': CONFIG['secret'],
'sandbox': False, # 使用实盘,如果是测试网改为 True
'verbose': CONFIG['verbose'],
'enableRateLimit': CONFIG['enableRateLimit'],
})
exchange.enable_demo_trading(True) # 模拟交易
exchange.set_position_mode(True) # 设置双向持仓
# 下市价单
order = exchange.create_order(
symbol=symbol,
type='market',
side=side,
amount=amount,
params={
'timeInForce': 'FOK', # 对于市价单FOK可能不适用
'positionIdx': 1 # 0: 单向模式
} # 额外参数
)
print("订单创建成功!")
print(f"订单ID: {order['id']}")
print(f"状态: {order['status']}")
print(f"数量: {order['amount']}")
print(f"成交数量: {order['filled']}")
return order
except Exception as e:
print(f"下单失败: {e}")
return None
# 使用示例
if __name__ == "__main__":
# 示例:市价买入 0.001 BTC
symbol = 'BTC/USDT:USDT'
side = 'buy' # 'buy' 或 'sell'
amount = 0.001
order_result = place_market_order(symbol, side, amount)
if order_result:
print("\n订单详情:")
print(order_result)
# for key, value in order_result.items():
# print(f"{key}: {value}")

119
test/test3.py Normal file
View File

@@ -0,0 +1,119 @@
import ccxt
# 使用您的配置
CONFIG = {
'apiKey': 'rFdbMs40biKImtwGnQ',
'secret': 'JOuEiGT1uCX9l2qbn9uZuZrVFraAZAA59mqY',
'verbose': False,
'enableRateLimit': True,
}
def place_contract_market_order(symbol, side, amount, position_side=None, leverage=10):
"""
在Bybit合约市场下市价单
Args:
symbol: 交易对,例如 'BTC/USDT:USDT'
side: 方向 'buy''sell'
amount: 合约数量
position_side: 持仓方向 'long''short'None为自动判断
leverage: 杠杆倍数
"""
try:
# 创建交易所实例
exchange = ccxt.bybit({
'apiKey': CONFIG['apiKey'],
'secret': CONFIG['secret'],
'sandbox': False, # 使用实盘,测试网改为 True
'verbose': CONFIG['verbose'],
'enableRateLimit': CONFIG['enableRateLimit'],
'options': {
'defaultType': 'swap', # 使用合约交易
}
})
exchange.enable_demo_trading(True) # 模拟交易
# 设置杠杆
exchange.set_leverage(leverage, symbol)
# 下市价单
params = {}
if position_side:
params['positionSide'] = position_side
order = exchange.create_order(
symbol=symbol,
type='market',
side=side,
amount=amount,
params=params
)
print("合约市价单创建成功!")
print(f"订单ID: {order['id']}")
print(f"交易对: {order['symbol']}")
print(f"方向: {order['side']}")
print(f"数量: {order['amount']}")
print(f"状态: {order['status']}")
return order
except Exception as e:
print(f"合约下单失败: {e}")
return None
def place_contract_market_order_with_cost(symbol, side, cost, leverage=10):
"""
按金额下合约市价单
Args:
symbol: 交易对
side: 方向
cost: 投入金额(USDT)
leverage: 杠杆倍数
"""
try:
exchange = ccxt.bybit({
'apiKey': CONFIG['apiKey'],
'secret': CONFIG['secret'],
'sandbox': False,
'options': {'defaultType': 'swap'}
})
# 获取当前价格
ticker = exchange.fetch_ticker(symbol)
current_price = ticker['last']
# 计算合约数量
amount = cost * leverage / current_price
return place_contract_market_order(symbol, side, amount, leverage=leverage)
except Exception as e:
print(f"按金额下单失败: {e}")
return None
# 使用示例
if __name__ == "__main__":
# 示例1直接按数量下单
print("=== 按数量下单 ===")
symbol = 'BTC/USDT:USDT' # 永续合约
side = 'buy' # 开多单
amount = 0.01 # 合约数量
order_result = place_contract_market_order(symbol, side, amount, leverage=20)
# 示例2按金额下单
print("\n=== 按金额下单 ===")
cost = 100 # 投入100 USDT
order_result2 = place_contract_market_order_with_cost(symbol, 'sell', cost, leverage=10)
# 示例3指定持仓方向
print("\n=== 指定持仓方向下单 ===")
order_result3 = place_contract_market_order(
symbol='ETH/USDT:USDT',
side='buy',
amount=0.1,
position_side='long', # 明确开多仓
leverage=15
)

View File

@@ -119,6 +119,9 @@ async def websocket_quick_test():
while True: while True:
# print("111111111") # print("111111111")
res = await exchange.watch_ticker(symbol='BTCUSD') res = await exchange.watch_ticker(symbol='BTCUSD')
logger.error("aaa")
logger.warning("bbb")
logger.debug("ccc")
print("===========================收到信息") print("===========================收到信息")
print(res) print(res)
# for order in res: # for order in res:

View File

@@ -440,6 +440,7 @@ async def main():
print("1. 完整测试 (包含测试订单)") print("1. 完整测试 (包含测试订单)")
print("2. 实时监控模式") print("2. 实时监控模式")
print("3. 仅测试连接") print("3. 仅测试连接")
logger.error(f"测试执行错误: ")
choice = input("请输入选择 (1-3, 默认1): ").strip() choice = input("请输入选择 (1-3, 默认1): ").strip()