Compare commits
7 Commits
8dc0f0dbc3
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
76ac5370d0 | ||
|
|
f472755d84 | ||
|
|
0d42a584b7 | ||
|
|
64c993319a | ||
|
|
efbcb63cec | ||
|
|
878901826c | ||
|
|
a7152f58ea |
10
.env
10
.env
@@ -2,14 +2,14 @@
|
||||
REDIS_HOST=10.0.4.6
|
||||
REDIS_PORT=6379
|
||||
REDIS_PASSWORD=
|
||||
REDIS_DB=1
|
||||
REDIS_DB=0
|
||||
|
||||
# MySQL配置
|
||||
DB_HOST=10.0.4.17
|
||||
DB_PORT=3306
|
||||
DB_USER=root
|
||||
DB_PASSWORD=lz_mysqlLZ
|
||||
DB_DATABASE=lz_app_test
|
||||
DB_DATABASE=lz_app2
|
||||
DB_POOL_SIZE=10
|
||||
DB_MAX_OVERFLOW=20
|
||||
DB_POOL_RECYCLE=3600
|
||||
@@ -19,12 +19,12 @@ SQLALCHEMY_ECHO=false
|
||||
SQLALCHEMY_ECHO_POOL=false
|
||||
|
||||
# 同步配置
|
||||
SYNC_INTERVAL=20
|
||||
SYNC_INTERVAL=15
|
||||
RECENT_DAYS=3
|
||||
CHUNK_SIZE=1000
|
||||
ENABLE_POSITION_SYNC=true
|
||||
ENABLE_ORDER_SYNC=true
|
||||
ENABLE_ACCOUNT_SYNC=true
|
||||
ENABLE_ACCOUNT_SYNC=false
|
||||
|
||||
# 日志配置
|
||||
LOG_LEVEL=INFO
|
||||
@@ -49,7 +49,7 @@ ORDER_REDIS_SCAN_COUNT=1000
|
||||
POSITION_BATCH_SIZE=500
|
||||
|
||||
# 账户同步配置
|
||||
ACCOUNT_SYNC_RECENT_DAYS=3
|
||||
ACCOUNT_SYNC_RECENT_DAYS=0
|
||||
|
||||
# 并发控制
|
||||
MAX_CONCURRENT_ACCOUNTS=50
|
||||
|
||||
6
.gitignore
vendored
Normal file
6
.gitignore
vendored
Normal file
@@ -0,0 +1,6 @@
|
||||
test.py
|
||||
__pycache__
|
||||
logs
|
||||
.env
|
||||
*.log
|
||||
.DS_Store
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -17,6 +17,8 @@ REDIS_CONFIG = {
|
||||
SYNC_CONFIG = {
|
||||
'interval': int(os.getenv('SYNC_INTERVAL', 60)), # 同步间隔(秒)
|
||||
'recent_days': int(os.getenv('RECENT_DAYS', 3)), # 同步最近几天数据
|
||||
'recent_days_account': int(os.getenv('ACCOUNT_SYNC_RECENT_DAYS', 3)), # 同步最近几天数据
|
||||
'recent_days_order': int(os.getenv('ORDER_SYNC_RECENT_DAYS', 3)), # 同步最近几天数据
|
||||
'chunk_size': int(os.getenv('CHUNK_SIZE', 1000)), # 批量处理大小
|
||||
'enable_position_sync': os.getenv('ENABLE_POSITION_SYNC', 'true').lower() == 'true',
|
||||
'enable_order_sync': os.getenv('ENABLE_ORDER_SYNC', 'true').lower() == 'true',
|
||||
|
||||
File diff suppressed because one or more lines are too long
Binary file not shown.
Binary file not shown.
@@ -73,4 +73,4 @@ class StrategyKX(Base):
|
||||
up_time: Mapped[Optional[datetime]] = mapped_column(DateTime, default=func.now(), onupdate=func.now(), comment='最后更新时间')
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"StrategyKX(id={self.id!r}, k_id={self.k_id!r}, time={self.time!r})"
|
||||
return f"StrategyKX(id={self.id!r}, k_id={self.k_id!r}, st_id={self.st_id!r}, time={self.time!r})"
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,10 +1,7 @@
|
||||
from .base_sync import BaseSync
|
||||
from loguru import logger
|
||||
from typing import List, Dict, Any, Set
|
||||
import json
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from sqlalchemy import text, and_
|
||||
from typing import List, Dict
|
||||
from sqlalchemy import text, select, func, and_
|
||||
from models.orm_models import StrategyKX
|
||||
|
||||
class AccountSyncBatch(BaseSync):
|
||||
@@ -15,14 +12,9 @@ class AccountSyncBatch(BaseSync):
|
||||
try:
|
||||
logger.info(f"开始批量同步账户信息,共 {len(accounts)} 个账号")
|
||||
|
||||
# 测试
|
||||
# res = await self.redis_client._get_account_info_from_redis(10140, 5548, 'mt5')
|
||||
# print(res)
|
||||
# return
|
||||
# 收集所有账号的数据
|
||||
all_account_data = await self.redis_client._collect_all_account_data(accounts)
|
||||
|
||||
|
||||
if not all_account_data:
|
||||
logger.info("无账户信息数据需要同步")
|
||||
return
|
||||
@@ -186,7 +178,43 @@ class AccountSyncBatch(BaseSync):
|
||||
|
||||
return existing_records
|
||||
|
||||
async def sync(self):
|
||||
"""兼容旧接口"""
|
||||
accounts = self.get_accounts_from_redis()
|
||||
await self.sync_batch(accounts)
|
||||
async def networth(self, accounts: Dict[str, Dict]):
|
||||
"""计算所有策略的净值"""
|
||||
|
||||
# 从accounts中获取所有策略的st_id
|
||||
st_ids = set()
|
||||
for account in accounts.values():
|
||||
st_ids.add(account['st_id'])
|
||||
|
||||
stmt = select(
|
||||
StrategyKX.st_id,
|
||||
StrategyKX.time,
|
||||
func.sum(StrategyKX.balance).label('balance_sum'),
|
||||
func.sum(StrategyKX.profit).label('profit_sum'),
|
||||
func.sum(StrategyKX.withdrawal).label('withdrawal_sum'),
|
||||
func.sum(StrategyKX.deposit).label('deposit_sum'),
|
||||
func.sum(StrategyKX.other).label('other_sum')
|
||||
).where(
|
||||
StrategyKX.st_id.in_(st_ids)
|
||||
).group_by(
|
||||
StrategyKX.st_id,
|
||||
StrategyKX.time
|
||||
).order_by(
|
||||
StrategyKX.time.asc(),
|
||||
StrategyKX.st_id.asc()
|
||||
)
|
||||
|
||||
results = self.session.execute(stmt).all()
|
||||
|
||||
return [
|
||||
{
|
||||
'st_id': row.st_id,
|
||||
'time': row.time,
|
||||
'balance_sum': float(row.balance_sum) if row.balance_sum else 0.0,
|
||||
'profit_sum': float(row.profit_sum) if row.profit_sum else 0.0,
|
||||
'withdrawal_sum': float(row.withdrawal_sum) if row.withdrawal_sum else 0.0,
|
||||
'deposit_sum': float(row.deposit_sum) if row.deposit_sum else 0.0,
|
||||
'other_sum': float(row.other_sum) if row.other_sum else 0.0,
|
||||
}
|
||||
for row in results
|
||||
]
|
||||
@@ -6,6 +6,7 @@ import time
|
||||
import json
|
||||
from typing import Dict
|
||||
import utils.helpers as helpers
|
||||
import redis
|
||||
|
||||
from utils.redis_client import RedisClient
|
||||
from config.settings import SYNC_CONFIG
|
||||
@@ -15,6 +16,7 @@ from .account_sync import AccountSyncBatch
|
||||
from utils.redis_batch_helper import RedisBatchHelper
|
||||
from config.settings import COMPUTER_NAMES, COMPUTER_NAME_PATTERN
|
||||
from typing import List, Dict, Any, Set, Optional
|
||||
from config.settings import REDIS_CONFIG
|
||||
|
||||
class SyncManager:
|
||||
"""同步管理器(完整批量版本)"""
|
||||
@@ -62,7 +64,8 @@ class SyncManager:
|
||||
async def start(self):
|
||||
"""启动同步服务"""
|
||||
logger.info(f"同步服务启动,间隔 {self.sync_interval} 秒")
|
||||
|
||||
# await self.cp()
|
||||
# return
|
||||
while self.is_running:
|
||||
try:
|
||||
|
||||
@@ -156,3 +159,61 @@ class SyncManager:
|
||||
syncer.db_manager.close()
|
||||
|
||||
logger.info("同步服务停止")
|
||||
|
||||
async def cp(self):
|
||||
"""把Redis 0库的资产数据复制到Redis 1库"""
|
||||
# 创建到Redis 0库的连接
|
||||
redis_0 = redis.Redis(
|
||||
host=REDIS_CONFIG['host'], # 你的Redis主机
|
||||
port=REDIS_CONFIG['port'], # 你的Redis端口
|
||||
db=0, # 数据库0
|
||||
password=None, # 如果有密码
|
||||
decode_responses=True # 自动解码为字符串
|
||||
)
|
||||
|
||||
# 创建到Redis 1库的连接
|
||||
redis_1 = redis.Redis(
|
||||
host=REDIS_CONFIG['host'],
|
||||
port=REDIS_CONFIG['port'],
|
||||
db=1, # 数据库1
|
||||
password=None,
|
||||
decode_responses=True
|
||||
)
|
||||
accounts = self.redis_client.get_accounts_from_redis()
|
||||
print(f"共 {len(accounts)} 个账号")
|
||||
all_kid = accounts.keys()
|
||||
|
||||
keys1 = redis_0.keys("Bybit_fin_*")
|
||||
# print(f"找到 {len(keys1)} 个匹配的键")
|
||||
keys2 = redis_0.keys("Metatrader_fin_*")
|
||||
# print(f"找到 {len(keys2)} 个匹配的键")
|
||||
all_keys = keys1 + keys2
|
||||
print(f"共 {len(all_keys)} 个键")
|
||||
for key in all_keys:
|
||||
key_split = key.split("_")
|
||||
k_id = key_split[-1]
|
||||
if k_id not in all_kid:
|
||||
continue
|
||||
exchange_id = None
|
||||
if key_split[0] == "Bybit":
|
||||
exchange_id = "bybit"
|
||||
elif key_split[0] == "Metatrader":
|
||||
exchange_id = "mt5"
|
||||
# data = redis_0.hget(key)
|
||||
print(f"开始处理键 {key} 的数据")
|
||||
data = redis_0.hgetall(key)
|
||||
for k, v in data.items():
|
||||
# print(f"{k}")
|
||||
data_dict = json.loads(v)
|
||||
data_dict['lz_amount'] = float(data_dict['lz_money'])
|
||||
# del data_dict['lz_money']
|
||||
if data_dict['lz_type'] == 'lz_balance':
|
||||
row_key = k
|
||||
else:
|
||||
row_key = f"fund_{k}"
|
||||
# print(row_key)
|
||||
redis_0.hset(f"{exchange_id}:balance:{data_dict['k_id']}",row_key,json.dumps(data_dict))
|
||||
# print(f"键 {key} 的数据为 {data}")
|
||||
redis_0.close()
|
||||
redis_0.close()
|
||||
print("复制完成")
|
||||
@@ -1,12 +1,9 @@
|
||||
from .base_sync import BaseSync
|
||||
from loguru import logger
|
||||
from typing import List, Dict, Any, Tuple
|
||||
import json
|
||||
import asyncio
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from sqlalchemy import text
|
||||
import redis
|
||||
|
||||
class OrderSyncBatch(BaseSync):
|
||||
"""订单数据批量同步器"""
|
||||
@@ -20,11 +17,11 @@ class OrderSyncBatch(BaseSync):
|
||||
"""批量同步所有账号的订单数据"""
|
||||
try:
|
||||
logger.info(f"开始批量同步订单数据,共 {len(accounts)} 个账号")
|
||||
return
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
# 1. 收集所有账号的订单数据
|
||||
all_orders = await self._collect_all_orders(accounts)
|
||||
all_orders = await self.redis_client._collect_all_orders(accounts)
|
||||
|
||||
if not all_orders:
|
||||
logger.info("无订单数据需要同步")
|
||||
@@ -33,8 +30,12 @@ class OrderSyncBatch(BaseSync):
|
||||
logger.info(f"收集到 {len(all_orders)} 条订单数据")
|
||||
|
||||
# 2. 批量同步到数据库
|
||||
# 使用基本版本
|
||||
success, processed_count = await self._sync_orders_batch_to_db(all_orders)
|
||||
|
||||
# 或者使用增强版本
|
||||
# success, results = await self._sync_orders_batch_to_db_enhanced(all_orders)
|
||||
|
||||
elapsed = time.time() - start_time
|
||||
if success:
|
||||
logger.info(f"订单批量同步完成: 处理 {processed_count} 条订单,耗时 {elapsed:.2f}秒")
|
||||
@@ -44,188 +45,272 @@ class OrderSyncBatch(BaseSync):
|
||||
except Exception as e:
|
||||
logger.error(f"订单批量同步失败: {e}")
|
||||
|
||||
async def _collect_all_orders(self, accounts: Dict[str, Dict]) -> List[Dict]:
|
||||
"""收集所有账号的订单数据"""
|
||||
all_orders = []
|
||||
|
||||
try:
|
||||
# 按交易所分组账号
|
||||
account_groups = self._group_accounts_by_exchange(accounts)
|
||||
|
||||
# 并发收集每个交易所的数据
|
||||
tasks = []
|
||||
for exchange_id, account_list in account_groups.items():
|
||||
task = self._collect_exchange_orders(exchange_id, account_list)
|
||||
tasks.append(task)
|
||||
|
||||
# 等待所有任务完成并合并结果
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
for result in results:
|
||||
if isinstance(result, list):
|
||||
all_orders.extend(result)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"收集订单数据失败: {e}")
|
||||
|
||||
return all_orders
|
||||
|
||||
def _group_accounts_by_exchange(self, accounts: Dict[str, Dict]) -> Dict[str, List[Dict]]:
|
||||
"""按交易所分组账号"""
|
||||
groups = {}
|
||||
for account_id, account_info in accounts.items():
|
||||
exchange_id = account_info.get('exchange_id')
|
||||
if exchange_id:
|
||||
if exchange_id not in groups:
|
||||
groups[exchange_id] = []
|
||||
groups[exchange_id].append(account_info)
|
||||
return groups
|
||||
|
||||
async def _collect_exchange_orders(self, exchange_id: str, account_list: List[Dict]) -> List[Dict]:
|
||||
"""收集某个交易所的订单数据"""
|
||||
orders_list = []
|
||||
|
||||
try:
|
||||
# 并发获取每个账号的数据
|
||||
tasks = []
|
||||
for account_info in account_list:
|
||||
k_id = int(account_info['k_id'])
|
||||
st_id = account_info.get('st_id', 0)
|
||||
task = self._get_recent_orders_from_redis(k_id, st_id, exchange_id)
|
||||
tasks.append(task)
|
||||
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
for result in results:
|
||||
if isinstance(result, list):
|
||||
orders_list.extend(result)
|
||||
|
||||
logger.debug(f"交易所 {exchange_id}: 收集到 {len(orders_list)} 条订单")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"收集交易所 {exchange_id} 订单数据失败: {e}")
|
||||
|
||||
return orders_list
|
||||
|
||||
async def _get_recent_orders_from_redis(self, k_id: int, st_id: int, exchange_id: str) -> List[Dict]:
|
||||
"""从Redis获取最近N天的订单数据"""
|
||||
try:
|
||||
redis_key = f"{exchange_id}:orders:{k_id}"
|
||||
|
||||
# 计算最近N天的日期
|
||||
today = datetime.now()
|
||||
recent_dates = []
|
||||
for i in range(self.recent_days):
|
||||
date = today - timedelta(days=i)
|
||||
date_format = date.strftime('%Y-%m-%d')
|
||||
recent_dates.append(date_format)
|
||||
|
||||
# 使用scan获取所有符合条件的key
|
||||
cursor = 0
|
||||
recent_keys = []
|
||||
|
||||
while True:
|
||||
cursor, keys = self.redis_client.client.hscan(redis_key, cursor, count=1000)
|
||||
|
||||
for key, _ in keys.items():
|
||||
key_str = key.decode('utf-8') if isinstance(key, bytes) else key
|
||||
|
||||
if key_str == 'positions':
|
||||
continue
|
||||
|
||||
# 检查是否以最近N天的日期开头
|
||||
for date_format in recent_dates:
|
||||
if key_str.startswith(date_format + '_'):
|
||||
recent_keys.append(key_str)
|
||||
break
|
||||
|
||||
if cursor == 0:
|
||||
break
|
||||
|
||||
if not recent_keys:
|
||||
return []
|
||||
|
||||
# 批量获取订单数据
|
||||
orders_list = []
|
||||
|
||||
# 分批获取,避免单次hgetall数据量太大
|
||||
chunk_size = 500
|
||||
for i in range(0, len(recent_keys), chunk_size):
|
||||
chunk_keys = recent_keys[i:i + chunk_size]
|
||||
|
||||
# 使用hmget批量获取
|
||||
chunk_values = self.redis_client.client.hmget(redis_key, chunk_keys)
|
||||
|
||||
for key, order_json in zip(chunk_keys, chunk_values):
|
||||
if not order_json:
|
||||
continue
|
||||
|
||||
try:
|
||||
order = json.loads(order_json)
|
||||
|
||||
# 验证时间
|
||||
order_time = order.get('time', 0)
|
||||
if order_time >= int(time.time()) - self.recent_days * 24 * 3600:
|
||||
# 添加账号信息
|
||||
order['k_id'] = k_id
|
||||
order['st_id'] = st_id
|
||||
order['exchange_id'] = exchange_id
|
||||
orders_list.append(order)
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.debug(f"解析订单JSON失败: key={key}, error={e}")
|
||||
continue
|
||||
|
||||
return orders_list
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"获取Redis订单数据失败: k_id={k_id}, error={e}")
|
||||
return []
|
||||
|
||||
async def _sync_orders_batch_to_db(self, all_orders: List[Dict]) -> Tuple[bool, int]:
|
||||
"""批量同步订单数据到数据库"""
|
||||
try:
|
||||
"""批量同步订单数据到数据库
|
||||
|
||||
Args:
|
||||
all_orders: 订单数据列表
|
||||
|
||||
Returns:
|
||||
Tuple[bool, int]: (是否成功, 处理的订单数量)
|
||||
"""
|
||||
if not all_orders:
|
||||
return True, 0
|
||||
|
||||
# 转换数据
|
||||
converted_orders = []
|
||||
for order in all_orders:
|
||||
try:
|
||||
order_dict = self._convert_order_data(order)
|
||||
session = self.db_manager.get_session()
|
||||
|
||||
# 检查完整性
|
||||
required_fields = ['order_id', 'symbol', 'side', 'time']
|
||||
if not all(order_dict.get(field) for field in required_fields):
|
||||
processed_count = 0
|
||||
errors = []
|
||||
|
||||
try:
|
||||
# 按批次处理
|
||||
for i in range(0, len(all_orders), self.batch_size):
|
||||
batch_orders = all_orders[i:i + self.batch_size]
|
||||
|
||||
try:
|
||||
session.begin()
|
||||
|
||||
# 转换数据并准备批量插入
|
||||
converted_orders = []
|
||||
for raw_order in batch_orders:
|
||||
try:
|
||||
converted = self._convert_order_data(raw_order)
|
||||
|
||||
# 检查必要字段
|
||||
if not all([
|
||||
converted.get('order_id'),
|
||||
converted.get('symbol'),
|
||||
converted.get('k_id'),
|
||||
converted.get('side')
|
||||
]):
|
||||
logger.warning(f"订单缺少必要字段: {raw_order}")
|
||||
continue
|
||||
|
||||
converted_orders.append(order_dict)
|
||||
converted_orders.append(converted)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"转换订单数据失败: {order}, error={e}")
|
||||
logger.error(f"转换订单数据失败: {raw_order}, error={e}")
|
||||
continue
|
||||
|
||||
if not converted_orders:
|
||||
return True, 0
|
||||
session.commit()
|
||||
continue
|
||||
|
||||
# 使用批量工具同步
|
||||
from utils.batch_order_sync import BatchOrderSync
|
||||
batch_tool = BatchOrderSync(self.db_manager, self.batch_size)
|
||||
# 批量插入或更新
|
||||
upsert_sql = text("""
|
||||
INSERT INTO deh_strategy_order_new
|
||||
(st_id, k_id, asset, order_id, symbol, side, price, time,
|
||||
order_qty, last_qty, avg_price, exchange_id)
|
||||
VALUES
|
||||
(:st_id, :k_id, :asset, :order_id, :symbol, :side, :price, :time,
|
||||
:order_qty, :last_qty, :avg_price, :exchange_id)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
price = VALUES(price),
|
||||
time = VALUES(time),
|
||||
order_qty = VALUES(order_qty),
|
||||
last_qty = VALUES(last_qty),
|
||||
avg_price = VALUES(avg_price)
|
||||
""")
|
||||
|
||||
success, processed_count = batch_tool.sync_orders_batch(converted_orders)
|
||||
result = session.execute(upsert_sql, converted_orders)
|
||||
processed_count += len(converted_orders)
|
||||
|
||||
# 计算统计信息
|
||||
batch_size = len(converted_orders)
|
||||
total_affected = result.rowcount
|
||||
updated_count = max(0, total_affected - batch_size)
|
||||
inserted_count = batch_size - updated_count
|
||||
|
||||
logger.debug(f"订单批次 {i//self.batch_size + 1}: "
|
||||
f"处理 {batch_size} 条, "
|
||||
f"插入 {inserted_count} 条, "
|
||||
f"更新 {updated_count} 条")
|
||||
|
||||
session.commit()
|
||||
|
||||
except Exception as e:
|
||||
session.rollback()
|
||||
error_msg = f"订单批次 {i//self.batch_size + 1} 处理失败: {str(e)}"
|
||||
logger.error(error_msg, exc_info=True)
|
||||
errors.append(error_msg)
|
||||
# 继续处理下一个批次
|
||||
|
||||
if errors:
|
||||
logger.error(f"订单同步完成但有错误: {len(errors)} 个错误")
|
||||
for error in errors[:5]: # 只打印前5个错误
|
||||
logger.error(f"错误详情: {error}")
|
||||
if len(errors) > 5:
|
||||
logger.error(f"...还有 {len(errors) - 5} 个错误")
|
||||
|
||||
success = len(errors) == 0
|
||||
return success, processed_count
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"批量同步订单到数据库失败: {e}")
|
||||
return False, 0
|
||||
logger.error(f"订单批量同步失败: {e}", exc_info=True)
|
||||
return False, processed_count
|
||||
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
async def _sync_orders_batch_to_db_enhanced(self, all_orders: List[Dict]) -> Tuple[bool, Dict]:
|
||||
"""增强版:批量同步订单数据到数据库(带详细统计)
|
||||
|
||||
Args:
|
||||
all_orders: 订单数据列表
|
||||
|
||||
Returns:
|
||||
Tuple[bool, Dict]: (是否成功, 统计结果)
|
||||
"""
|
||||
if not all_orders:
|
||||
return True, {'total': 0, 'processed': 0, 'inserted': 0, 'updated': 0, 'errors': []}
|
||||
|
||||
session = self.db_manager.get_session()
|
||||
|
||||
results = {
|
||||
'total': len(all_orders),
|
||||
'processed': 0,
|
||||
'inserted': 0,
|
||||
'updated': 0,
|
||||
'errors': [],
|
||||
'invalid_orders': 0
|
||||
}
|
||||
|
||||
try:
|
||||
logger.info(f"开始同步 {results['total']} 条订单数据,批次大小: {self.batch_size}")
|
||||
|
||||
# 按批次处理
|
||||
total_batches = (len(all_orders) + self.batch_size - 1) // self.batch_size
|
||||
|
||||
for batch_idx in range(total_batches):
|
||||
start_idx = batch_idx * self.batch_size
|
||||
end_idx = start_idx + self.batch_size
|
||||
batch_orders = all_orders[start_idx:end_idx]
|
||||
|
||||
logger.debug(f"处理批次 {batch_idx + 1}/{total_batches}: "
|
||||
f"订单 {start_idx + 1}-{min(end_idx, len(all_orders))}")
|
||||
|
||||
try:
|
||||
session.begin()
|
||||
|
||||
# 转换数据
|
||||
converted_orders = []
|
||||
batch_invalid = 0
|
||||
|
||||
for raw_order in batch_orders:
|
||||
try:
|
||||
converted = self._convert_order_data(raw_order)
|
||||
|
||||
# 验证必要字段
|
||||
required_fields = ['order_id', 'symbol', 'k_id', 'side']
|
||||
missing_fields = [field for field in required_fields if not converted.get(field)]
|
||||
|
||||
if missing_fields:
|
||||
logger.warning(f"订单缺少必要字段 {missing_fields}: {raw_order}")
|
||||
batch_invalid += 1
|
||||
continue
|
||||
|
||||
# 验证字段长度(防止数据库错误)
|
||||
order_id = converted.get('order_id', '')
|
||||
if len(order_id) > 765: # 根据表结构限制
|
||||
converted['order_id'] = order_id[:765]
|
||||
logger.warning(f"order_id过长已截断: {order_id}")
|
||||
|
||||
symbol = converted.get('symbol', '')
|
||||
if len(symbol) > 120:
|
||||
converted['symbol'] = symbol[:120]
|
||||
|
||||
side = converted.get('side', '')
|
||||
if len(side) > 120:
|
||||
converted['side'] = side[:120]
|
||||
|
||||
converted_orders.append(converted)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"处理订单失败: {raw_order}, error={e}")
|
||||
batch_invalid += 1
|
||||
continue
|
||||
|
||||
results['invalid_orders'] += batch_invalid
|
||||
|
||||
if not converted_orders:
|
||||
session.commit()
|
||||
continue
|
||||
|
||||
# 批量插入或更新
|
||||
upsert_sql = text("""
|
||||
INSERT INTO deh_strategy_order_new
|
||||
(st_id, k_id, asset, order_id, symbol, side, price, time,
|
||||
order_qty, last_qty, avg_price, exchange_id)
|
||||
VALUES
|
||||
(:st_id, :k_id, :asset, :order_id, :symbol, :side, :price, :time,
|
||||
:order_qty, :last_qty, :avg_price, :exchange_id)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
price = VALUES(price),
|
||||
time = VALUES(time),
|
||||
order_qty = VALUES(order_qty),
|
||||
last_qty = VALUES(last_qty),
|
||||
avg_price = VALUES(avg_price),
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
""")
|
||||
|
||||
result = session.execute(upsert_sql, converted_orders)
|
||||
|
||||
# 统计本批次结果
|
||||
batch_size = len(converted_orders)
|
||||
total_affected = result.rowcount
|
||||
batch_updated = max(0, total_affected - batch_size)
|
||||
batch_inserted = batch_size - batch_updated
|
||||
|
||||
# 累加到总结果
|
||||
results['processed'] += batch_size
|
||||
results['inserted'] += batch_inserted
|
||||
results['updated'] += batch_updated
|
||||
|
||||
logger.info(f"批次 {batch_idx + 1} 完成: "
|
||||
f"有效 {batch_size} 条, "
|
||||
f"无效 {batch_invalid} 条, "
|
||||
f"插入 {batch_inserted} 条, "
|
||||
f"更新 {batch_updated} 条")
|
||||
|
||||
session.commit()
|
||||
|
||||
except Exception as e:
|
||||
session.rollback()
|
||||
error_msg = f"批次 {batch_idx + 1} 处理失败: {str(e)}"
|
||||
logger.error(error_msg, exc_info=True)
|
||||
results['errors'].append(error_msg)
|
||||
# 继续处理下一个批次
|
||||
|
||||
# 最终统计
|
||||
success_rate = results['processed'] / results['total'] * 100 if results['total'] > 0 else 0
|
||||
|
||||
logger.info(f"订单同步完成: "
|
||||
f"总数={results['total']}, "
|
||||
f"处理={results['processed']}({success_rate:.1f}%), "
|
||||
f"插入={results['inserted']}, "
|
||||
f"更新={results['updated']}, "
|
||||
f"无效={results['invalid_orders']}, "
|
||||
f"错误={len(results['errors'])}")
|
||||
|
||||
success = len(results['errors']) == 0
|
||||
return success, results
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"订单批量同步失败: {e}", exc_info=True)
|
||||
results['errors'].append(f"同步过程失败: {str(e)}")
|
||||
return False, results
|
||||
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def _convert_order_data(self, data: Dict) -> Dict:
|
||||
"""转换订单数据格式"""
|
||||
try:
|
||||
# 安全转换函数
|
||||
def safe_float(value):
|
||||
if value is None:
|
||||
if value is None or value == '':
|
||||
return None
|
||||
try:
|
||||
return float(value)
|
||||
@@ -233,7 +318,7 @@ class OrderSyncBatch(BaseSync):
|
||||
return None
|
||||
|
||||
def safe_int(value):
|
||||
if value is None:
|
||||
if value is None or value == '':
|
||||
return None
|
||||
try:
|
||||
return int(float(value))
|
||||
@@ -246,9 +331,9 @@ class OrderSyncBatch(BaseSync):
|
||||
return str(value)
|
||||
|
||||
return {
|
||||
'st_id': safe_int(data.get('st_id'), 0),
|
||||
'k_id': safe_int(data.get('k_id'), 0),
|
||||
'asset': 'USDT',
|
||||
'st_id': safe_int(data.get('st_id')) or 0,
|
||||
'k_id': safe_int(data.get('k_id')) or 0,
|
||||
'asset': safe_str(data.get('asset')) or 'USDT',
|
||||
'order_id': safe_str(data.get('order_id')),
|
||||
'symbol': safe_str(data.get('symbol')),
|
||||
'side': safe_str(data.get('side')),
|
||||
@@ -264,7 +349,3 @@ class OrderSyncBatch(BaseSync):
|
||||
logger.error(f"转换订单数据异常: {data}, error={e}")
|
||||
return {}
|
||||
|
||||
async def sync(self):
|
||||
"""兼容旧接口"""
|
||||
accounts = self.get_accounts_from_redis()
|
||||
await self.sync_batch(accounts)
|
||||
@@ -15,7 +15,6 @@ class PositionSyncBatch(BaseSync):
|
||||
|
||||
async def sync_batch(self, accounts: Dict[str, Dict]):
|
||||
"""批量同步所有账号的持仓数据"""
|
||||
return
|
||||
try:
|
||||
logger.info(f"开始批量同步持仓数据,共 {len(accounts)} 个账号")
|
||||
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,3 +1,4 @@
|
||||
from datetime import datetime, timedelta
|
||||
from typing import List, Dict, Optional, Any
|
||||
from loguru import logger
|
||||
|
||||
@@ -29,3 +30,85 @@ def safe_str(self, value: Any, default: str = '') -> str:
|
||||
except Exception as e:
|
||||
logger.error(f"safe_str error: {e}")
|
||||
return ""
|
||||
|
||||
def convert_timestamp(timestamp, return_type='str', format_str='%Y-%m-%d %H:%M:%S'):
|
||||
"""
|
||||
时间戳转换函数,支持多种返回格式
|
||||
|
||||
Args:
|
||||
timestamp: 时间戳(支持整数、浮点数、字符串)
|
||||
return_type: 返回类型 'str'/'datetime'/'dict'
|
||||
format_str: 当return_type='str'时的格式字符串
|
||||
|
||||
Returns:
|
||||
根据return_type返回不同格式的数据
|
||||
"""
|
||||
try:
|
||||
# 转换为浮点数
|
||||
if isinstance(timestamp, str):
|
||||
timestamp = float(timestamp)
|
||||
else:
|
||||
timestamp = float(timestamp)
|
||||
|
||||
# 判断时间戳精度
|
||||
original_timestamp = timestamp
|
||||
|
||||
# 精确判断逻辑
|
||||
if timestamp > 4102444800000: # 2100-01-01 的毫秒级时间戳
|
||||
# 可能是微秒级,转换为秒级
|
||||
timestamp = timestamp / 1000000
|
||||
precision = 'microseconds'
|
||||
elif timestamp > 4102444800: # 2100-01-01 的秒级时间戳
|
||||
# 毫秒级时间戳
|
||||
timestamp = timestamp / 1000
|
||||
precision = 'milliseconds'
|
||||
else:
|
||||
# 秒级时间戳
|
||||
precision = 'seconds'
|
||||
|
||||
# 转换为 datetime 对象
|
||||
dt = datetime.fromtimestamp(timestamp)
|
||||
|
||||
# 根据返回类型返回不同格式
|
||||
if return_type == 'datetime':
|
||||
return dt
|
||||
elif return_type == 'dict':
|
||||
return {
|
||||
'datetime': dt,
|
||||
'formatted': dt.strftime(format_str),
|
||||
'precision': precision,
|
||||
'original_timestamp': original_timestamp,
|
||||
'converted_timestamp': timestamp
|
||||
}
|
||||
else: # 默认返回字符串
|
||||
return dt.strftime(format_str)
|
||||
|
||||
except (ValueError, TypeError, OSError) as e:
|
||||
logger.error(f"时间戳转换失败: {timestamp}, 错误: {e}")
|
||||
return None
|
||||
|
||||
def timestamp(unit='seconds'):
|
||||
"""
|
||||
简化版时间戳获取
|
||||
|
||||
Args:
|
||||
unit: 's'/'ms'/'us' 或 'seconds'/'milliseconds'/'microseconds'
|
||||
|
||||
Returns:
|
||||
int: 时间戳
|
||||
"""
|
||||
current = time.time()
|
||||
|
||||
unit_map = {
|
||||
's': 1, 'seconds': 1,
|
||||
'ms': 1000, 'milliseconds': 1000,
|
||||
'us': 1000000, 'microseconds': 1000000
|
||||
}
|
||||
|
||||
multiplier = unit_map.get(unit.lower(), 1)
|
||||
return int(current * multiplier)
|
||||
|
||||
# 别名函数
|
||||
def ts(unit='seconds'):
|
||||
"""timestamp的别名"""
|
||||
return timestamp(unit)
|
||||
@@ -2,8 +2,9 @@ import asyncio
|
||||
import redis
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
from loguru import logger
|
||||
from config.settings import REDIS_CONFIG, COMPUTER_NAMES, COMPUTER_NAME_PATTERN
|
||||
from config.settings import REDIS_CONFIG, COMPUTER_NAMES, COMPUTER_NAME_PATTERN,SYNC_CONFIG
|
||||
import utils.helpers as helpers
|
||||
from typing import List, Dict, Any, Set, Tuple, Optional
|
||||
from datetime import datetime, timedelta
|
||||
@@ -360,9 +361,10 @@ class RedisClient:
|
||||
for account_info in account_list:
|
||||
k_id = int(account_info['k_id'])
|
||||
st_id = account_info.get('st_id', 0)
|
||||
add_time = account_info.get('add_time', 0)
|
||||
|
||||
# 从Redis获取账户信息数据
|
||||
account_data = await self._get_account_info_from_redis(k_id, st_id, exchange_id)
|
||||
account_data = await self._get_account_info_from_redis(k_id, st_id, exchange_id, add_time)
|
||||
account_data_list.extend(account_data)
|
||||
|
||||
logger.debug(f"交易所 {exchange_id}: 收集到 {len(account_data_list)} 条账户信息")
|
||||
@@ -372,7 +374,7 @@ class RedisClient:
|
||||
|
||||
return account_data_list
|
||||
|
||||
async def _get_account_info_from_redis(self, k_id: int, st_id: int, exchange_id: str) -> List[Dict]:
|
||||
async def _get_account_info_from_redis(self, k_id: int, st_id: int, exchange_id: str, add_time: int) -> List[Dict]:
|
||||
"""从Redis获取账户信息数据(批量优化版本)"""
|
||||
try:
|
||||
redis_key = f"{exchange_id}:balance:{k_id}"
|
||||
@@ -382,8 +384,7 @@ class RedisClient:
|
||||
return []
|
||||
|
||||
# 按天统计数据
|
||||
from config.settings import SYNC_CONFIG
|
||||
recent_days = SYNC_CONFIG['recent_days']
|
||||
recent_days = SYNC_CONFIG['recent_days_account']
|
||||
|
||||
today = datetime.now()
|
||||
date_stats = {}
|
||||
@@ -394,12 +395,16 @@ class RedisClient:
|
||||
fund_data = json.loads(fund_json)
|
||||
date_str = fund_data.get('lz_time', '')
|
||||
lz_type = fund_data.get('lz_type', '')
|
||||
add_time_str = helpers.convert_timestamp(add_time,format_str='%Y-%m-%d')
|
||||
if date_str < add_time_str:
|
||||
continue
|
||||
|
||||
if not date_str or lz_type not in ['lz_balance', 'deposit', 'withdrawal']:
|
||||
continue
|
||||
|
||||
# 只处理最近N天的数据
|
||||
date_obj = datetime.strptime(date_str, '%Y-%m-%d')
|
||||
if recent_days != 0:
|
||||
if (today - date_obj).days > recent_days:
|
||||
continue
|
||||
|
||||
@@ -410,8 +415,11 @@ class RedisClient:
|
||||
'withdrawal': 0.0,
|
||||
'has_balance': False
|
||||
}
|
||||
|
||||
if fund_data.get('lz_amount'):
|
||||
lz_amount = float(fund_data.get('lz_amount', 0))
|
||||
else:
|
||||
lz_amount = float(fund_data.get('lz_money', 0))
|
||||
|
||||
|
||||
if lz_type == 'lz_balance':
|
||||
date_stats[date_str]['balance'] = lz_amount
|
||||
@@ -495,6 +503,133 @@ class RedisClient:
|
||||
return prev_balance_map
|
||||
|
||||
|
||||
async def _collect_all_orders(self, accounts: Dict[str, Dict]) -> List[Dict]:
|
||||
"""收集所有账号的订单数据"""
|
||||
all_orders = []
|
||||
|
||||
try:
|
||||
# 按交易所分组账号
|
||||
account_groups = self._group_accounts_by_exchange(accounts)
|
||||
|
||||
# 并发收集每个交易所的数据
|
||||
tasks = []
|
||||
for exchange_id, account_list in account_groups.items():
|
||||
task = self._collect_exchange_orders(exchange_id, account_list)
|
||||
tasks.append(task)
|
||||
|
||||
# 等待所有任务完成并合并结果
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
for result in results:
|
||||
if isinstance(result, list):
|
||||
all_orders.extend(result)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"收集订单数据失败: {e}")
|
||||
|
||||
return all_orders
|
||||
|
||||
async def _collect_exchange_orders(self, exchange_id: str, account_list: List[Dict]) -> List[Dict]:
|
||||
"""收集某个交易所的订单数据"""
|
||||
orders_list = []
|
||||
|
||||
try:
|
||||
# 并发获取每个账号的数据
|
||||
tasks = []
|
||||
for account_info in account_list:
|
||||
k_id = int(account_info['k_id'])
|
||||
st_id = account_info.get('st_id', 0)
|
||||
task = self._get_recent_orders_from_redis(k_id, st_id, exchange_id)
|
||||
tasks.append(task)
|
||||
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
for result in results:
|
||||
if isinstance(result, list):
|
||||
orders_list.extend(result)
|
||||
|
||||
logger.debug(f"交易所 {exchange_id}: 收集到 {len(orders_list)} 条订单")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"收集交易所 {exchange_id} 订单数据失败: {e}")
|
||||
|
||||
return orders_list
|
||||
|
||||
async def _get_recent_orders_from_redis(self, k_id: int, st_id: int, exchange_id: str) -> List[Dict]:
|
||||
"""从Redis获取最近N天的订单数据"""
|
||||
try:
|
||||
redis_key = f"{exchange_id}:orders:{k_id}"
|
||||
recent_days = SYNC_CONFIG['recent_days_order']
|
||||
# 计算最近N天的日期
|
||||
today = datetime.now()
|
||||
recent_dates = []
|
||||
for i in range(recent_days):
|
||||
date = today - timedelta(days=i)
|
||||
date_format = date.strftime('%Y-%m-%d')
|
||||
recent_dates.append(date_format)
|
||||
|
||||
# 使用scan获取所有符合条件的key
|
||||
cursor = 0
|
||||
recent_keys = []
|
||||
|
||||
while True:
|
||||
cursor, keys = self.client.hscan(redis_key, cursor, count=1000)
|
||||
|
||||
for key, _ in keys.items():
|
||||
key_str = key.decode('utf-8') if isinstance(key, bytes) else key
|
||||
|
||||
if key_str == 'positions':
|
||||
continue
|
||||
|
||||
# 检查是否以最近N天的日期开头
|
||||
for date_format in recent_dates:
|
||||
if key_str.startswith(date_format + '_'):
|
||||
recent_keys.append(key_str)
|
||||
break
|
||||
|
||||
if cursor == 0:
|
||||
break
|
||||
|
||||
if not recent_keys:
|
||||
return []
|
||||
|
||||
# 批量获取订单数据
|
||||
orders_list = []
|
||||
|
||||
# 分批获取,避免单次hgetall数据量太大
|
||||
chunk_size = 500
|
||||
for i in range(0, len(recent_keys), chunk_size):
|
||||
chunk_keys = recent_keys[i:i + chunk_size]
|
||||
|
||||
# 使用hmget批量获取
|
||||
chunk_values = self.client.hmget(redis_key, chunk_keys)
|
||||
|
||||
for key, order_json in zip(chunk_keys, chunk_values):
|
||||
if not order_json:
|
||||
continue
|
||||
|
||||
try:
|
||||
order = json.loads(order_json)
|
||||
|
||||
# 验证时间
|
||||
order_time = order.get('time', 0)
|
||||
if order_time >= int(time.time()) - recent_days * 24 * 3600:
|
||||
# 添加账号信息
|
||||
order['k_id'] = k_id
|
||||
order['st_id'] = st_id
|
||||
order['exchange_id'] = exchange_id
|
||||
orders_list.append(order)
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.debug(f"解析订单JSON失败: key={key}, error={e}")
|
||||
continue
|
||||
|
||||
return orders_list
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"获取Redis订单数据失败: k_id={k_id}, error={e}")
|
||||
return []
|
||||
|
||||
|
||||
def close(self):
|
||||
"""关闭连接池"""
|
||||
|
||||
204
数据库.sql
Normal file
204
数据库.sql
Normal file
@@ -0,0 +1,204 @@
|
||||
/*
|
||||
Navicat Premium Data Transfer
|
||||
|
||||
Source Server : lz_mysql_host
|
||||
Source Server Type : MySQL
|
||||
Source Server Version : 50740
|
||||
Source Host : localhost:3306
|
||||
Source Schema : lz_app_test
|
||||
|
||||
Target Server Type : MySQL
|
||||
Target Server Version : 50740
|
||||
File Encoding : 65001
|
||||
|
||||
Date: 04/12/2025 21:57:38
|
||||
*/
|
||||
|
||||
SET NAMES utf8mb4;
|
||||
SET FOREIGN_KEY_CHECKS = 0;
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for deh_strategy_asset_new
|
||||
-- ----------------------------
|
||||
DROP TABLE IF EXISTS `deh_strategy_asset_new`;
|
||||
CREATE TABLE `deh_strategy_asset_new` (
|
||||
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
|
||||
`st_id` int(11) NOT NULL COMMENT '策略id',
|
||||
`asset` varchar(32) NOT NULL COMMENT '资产名称',
|
||||
`win_rate` decimal(11,8) NOT NULL DEFAULT '0.00000000' COMMENT '胜率',
|
||||
`aror` decimal(12,8) NOT NULL DEFAULT '0.00000000' COMMENT '年化收益',
|
||||
`networth` decimal(12,8) NOT NULL DEFAULT '1.00000000' COMMENT '净值',
|
||||
`max_down` decimal(12,8) NOT NULL DEFAULT '0.00000000' COMMENT '最大回撤',
|
||||
`profit` decimal(20,8) NOT NULL DEFAULT '0.00000000' COMMENT '总收益',
|
||||
`profit_rate` decimal(12,8) NOT NULL DEFAULT '0.00000000' COMMENT '总收益率',
|
||||
`current_num` decimal(20,8) NOT NULL DEFAULT '0.00000000' COMMENT '资产数量',
|
||||
`usd_num` decimal(20,8) NOT NULL DEFAULT '0.00000000' COMMENT '价值美元数',
|
||||
`deposit` decimal(20,8) NOT NULL DEFAULT '0.00000000' COMMENT '总充值',
|
||||
`withdrawal` decimal(20,8) NOT NULL DEFAULT '0.00000000' COMMENT '总提现',
|
||||
`other` decimal(20,8) NOT NULL DEFAULT '0.00000000' COMMENT '其他总变动',
|
||||
`time` int(11) NOT NULL COMMENT 'unix时间戳',
|
||||
PRIMARY KEY (`id`) USING BTREE,
|
||||
KEY `st_id` (`st_id`) USING BTREE,
|
||||
KEY `asset` (`asset`) USING BTREE
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='策略资产表';
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for deh_strategy_kx_new
|
||||
-- ----------------------------
|
||||
DROP TABLE IF EXISTS `deh_strategy_kx_new`;
|
||||
CREATE TABLE `deh_strategy_kx_new` (
|
||||
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
|
||||
`st_id` int(11) DEFAULT NULL COMMENT '策略id',
|
||||
`k_id` int(11) DEFAULT '0' COMMENT '对应strategy_key 的ID',
|
||||
`asset` varchar(32) DEFAULT NULL COMMENT '资产名',
|
||||
`balance` decimal(20,8) DEFAULT '0.00000000' COMMENT '当日账户金额',
|
||||
`withdrawal` decimal(20,8) DEFAULT '0.00000000' COMMENT '当日提现',
|
||||
`deposit` decimal(20,8) DEFAULT '0.00000000' COMMENT '当日充值',
|
||||
`other` decimal(20,8) DEFAULT '0.00000000' COMMENT '当日其他',
|
||||
`profit` decimal(20,8) DEFAULT '0.00000000' COMMENT '当日利润',
|
||||
`time` int(11) DEFAULT NULL COMMENT '时间',
|
||||
`up_time` datetime DEFAULT NULL COMMENT '最后更新时间',
|
||||
PRIMARY KEY (`id`) USING BTREE,
|
||||
KEY `kid_asset_time` (`k_id`,`asset`,`time`) USING BTREE,
|
||||
KEY `st_id_asset_time` (`st_id`,`asset`,`time`) USING BTREE
|
||||
) ENGINE=InnoDB AUTO_INCREMENT=259 DEFAULT CHARSET=utf8;
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for deh_strategy_networth
|
||||
-- ----------------------------
|
||||
DROP TABLE IF EXISTS `deh_strategy_networth`;
|
||||
CREATE TABLE `deh_strategy_networth` (
|
||||
`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
|
||||
`st_id` int(11) DEFAULT '0' COMMENT '策略ID对应deh_strategy表的ID',
|
||||
`asset` varchar(32) CHARACTER SET utf8 DEFAULT NULL COMMENT '资产名',
|
||||
`share` decimal(20,8) DEFAULT '0.00000000' COMMENT '份额-净值算法',
|
||||
`net_worth` decimal(13,8) DEFAULT '0.00000000' COMMENT '净值-净值算法',
|
||||
`total_profit` decimal(20,8) DEFAULT NULL COMMENT '累计利润',
|
||||
`time` int(10) DEFAULT '0' COMMENT '日期',
|
||||
`up_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间',
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `net_worth_index` (`st_id`,`asset`,`time`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for deh_strategy_order_new
|
||||
-- ----------------------------
|
||||
DROP TABLE IF EXISTS `deh_strategy_order_new`;
|
||||
CREATE TABLE `deh_strategy_order_new` (
|
||||
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
|
||||
`st_id` int(11) DEFAULT NULL COMMENT '策略id',
|
||||
`k_id` int(11) DEFAULT '0' COMMENT '对应strategy_key 的ID',
|
||||
`asset` varchar(32) DEFAULT NULL COMMENT '资产名称',
|
||||
`order_id` varchar(765) DEFAULT NULL COMMENT '订单id',
|
||||
`symbol` varchar(120) DEFAULT NULL COMMENT '交易对',
|
||||
`side` varchar(120) DEFAULT NULL COMMENT '订单方向',
|
||||
`price` float DEFAULT NULL COMMENT '订单价格',
|
||||
`time` int(11) DEFAULT NULL COMMENT '订单时间',
|
||||
`order_qty` float DEFAULT NULL COMMENT '订单挂单数量',
|
||||
`last_qty` float DEFAULT NULL COMMENT '订单成交数量',
|
||||
`avg_price` float DEFAULT NULL COMMENT '订单成交均价',
|
||||
`exchange_id` int(11) DEFAULT NULL COMMENT '交易所id',
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `order_index` (`order_id`,`symbol`,`k_id`,`side`) USING BTREE
|
||||
) ENGINE=InnoDB AUTO_INCREMENT=1269 DEFAULT CHARSET=utf8 COMMENT='订单表';
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for deh_strategy_position_new
|
||||
-- ----------------------------
|
||||
DROP TABLE IF EXISTS `deh_strategy_position_new`;
|
||||
CREATE TABLE `deh_strategy_position_new` (
|
||||
`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
|
||||
`st_id` int(11) NOT NULL COMMENT '策略id',
|
||||
`k_id` int(11) DEFAULT '0' COMMENT '对应strategy_key 的ID',
|
||||
`asset` varchar(32) DEFAULT '' COMMENT '使用资产名称,如BTC或USDT',
|
||||
`symbol` varchar(50) NOT NULL COMMENT '交易对',
|
||||
`price` float DEFAULT NULL COMMENT '持仓均价',
|
||||
`side` varchar(10) NOT NULL COMMENT '方向',
|
||||
`sum` float NOT NULL COMMENT '仓位(张数)',
|
||||
`asset_num` decimal(20,8) DEFAULT '0.00000000' COMMENT '资产数量',
|
||||
`asset_profit` decimal(20,8) DEFAULT NULL COMMENT '利润数量',
|
||||
`leverage` int(11) DEFAULT '0' COMMENT '杠杆倍数',
|
||||
`uptime` int(11) NOT NULL COMMENT '更新时间',
|
||||
`profit_price` decimal(20,8) DEFAULT '0.00000000' COMMENT '止盈价格',
|
||||
`stop_price` decimal(20,8) DEFAULT '0.00000000' COMMENT '止损价格',
|
||||
`liquidation_price` decimal(20,8) DEFAULT '0.00000000' COMMENT '强平价格',
|
||||
PRIMARY KEY (`id`) USING BTREE,
|
||||
UNIQUE KEY `st_id_asset_sum` (`k_id`,`st_id`,`symbol`,`side`) USING BTREE,
|
||||
KEY `k_id` (`k_id`) USING BTREE
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='持仓表';
|
||||
|
||||
-- ----------------------------
|
||||
-- Event structure for auto_del
|
||||
-- ----------------------------
|
||||
DROP EVENT IF EXISTS `auto_del`;
|
||||
delimiter ;;
|
||||
CREATE EVENT `auto_del`
|
||||
ON SCHEDULE
|
||||
EVERY '1' MINUTE STARTS '2021-11-01 00:00:00'
|
||||
DO BEGIN
|
||||
|
||||
|
||||
DELETE FROM deh_strategy_kx_new WHERE CONCAT(k_id,asset,time) IN (
|
||||
SELECT p_key FROM (
|
||||
SELECT CONCAT(k_id,asset,time) AS p_key FROM deh_strategy_kx_new GROUP BY CONCAT(k_id,asset,time) HAVING count(CONCAT(k_id,asset,time)) > 1)
|
||||
AS tmp
|
||||
) AND id NOT IN (
|
||||
SELECT id FROM (
|
||||
SELECT min(id) AS id FROM deh_strategy_kx_new GROUP BY CONCAT(k_id,asset,time) HAVING count(CONCAT(k_id,asset,time)) > 1)
|
||||
AS tmp1
|
||||
);
|
||||
|
||||
|
||||
DELETE FROM deh_strategy_networth WHERE CONCAT(st_id,asset,time) IN (
|
||||
SELECT p_key FROM (
|
||||
SELECT CONCAT(st_id,asset,time) AS p_key FROM deh_strategy_networth GROUP BY CONCAT(st_id,asset,time) HAVING count(CONCAT(st_id,asset,time)) > 1)
|
||||
AS tmp
|
||||
) AND id NOT IN (
|
||||
SELECT id FROM (
|
||||
SELECT min(id) AS id FROM deh_strategy_networth GROUP BY CONCAT(st_id,asset,time) HAVING count(CONCAT(st_id,asset,time)) > 1)
|
||||
AS tmp1
|
||||
);
|
||||
|
||||
|
||||
|
||||
DELETE FROM deh_strategy_asset_new WHERE CONCAT(st_id,asset) IN (
|
||||
SELECT p_key FROM (
|
||||
SELECT CONCAT(st_id,asset) AS p_key FROM deh_strategy_asset_new GROUP BY CONCAT(st_id,asset) HAVING count(CONCAT(st_id,asset)) > 1)
|
||||
AS tmp
|
||||
) AND id NOT IN (
|
||||
SELECT id FROM (
|
||||
SELECT min(id) AS id FROM deh_strategy_asset_new GROUP BY CONCAT(st_id,asset) HAVING count(CONCAT(st_id,asset)) > 1)
|
||||
AS tmp1
|
||||
);
|
||||
|
||||
|
||||
END
|
||||
;;
|
||||
delimiter ;
|
||||
|
||||
-- ----------------------------
|
||||
-- Event structure for auto_del_order
|
||||
-- ----------------------------
|
||||
DROP EVENT IF EXISTS `auto_del_order`;
|
||||
delimiter ;;
|
||||
CREATE EVENT `auto_del_order`
|
||||
ON SCHEDULE
|
||||
EVERY '1' HOUR STARTS '2021-11-01 00:00:00'
|
||||
DO BEGIN
|
||||
-- DELETE FROM `deh_strategy_order_new` WHERE `time` < UNIX_TIMESTAMP(NOW() - INTERVAL 3 DAY);
|
||||
DELETE FROM `deh_strategy_order_new` WHERE `time` < UNIX_TIMESTAMP(NOW() - INTERVAL 3 DAY) AND st_id IN (SELECT st_id FROM `deh_strategy` WHERE is_ia=1);
|
||||
DELETE FROM `deh_strategy_order_new` WHERE `time` < UNIX_TIMESTAMP(NOW() - INTERVAL 30 DAY) AND st_id IN (SELECT st_id FROM `deh_strategy` WHERE is_ia=0);
|
||||
DELETE from `deh_strategy_order_new` where order_id in (
|
||||
select order_id from (
|
||||
SELECT order_id FROM `deh_strategy_order_new` GROUP BY order_id HAVING count(order_id) > 1)
|
||||
as tmp
|
||||
) and id not in (
|
||||
select id from (
|
||||
SELECT min(id) as id FROM `deh_strategy_order_new` GROUP BY order_id HAVING count(order_id) > 1)
|
||||
as tmp1
|
||||
);
|
||||
END
|
||||
;;
|
||||
delimiter ;
|
||||
|
||||
SET FOREIGN_KEY_CHECKS = 1;
|
||||
Reference in New Issue
Block a user