from .base_sync import BaseSync from loguru import logger from typing import List, Dict, Any, Tuple import asyncio import time from sqlalchemy import text class OrderSyncBatch(BaseSync): """订单数据批量同步器""" def __init__(self): super().__init__() self.batch_size = 1000 # 每批处理数量 self.recent_days = 3 # 同步最近几天的数据 async def sync_batch(self, accounts: Dict[str, Dict]): """批量同步所有账号的订单数据""" try: logger.info(f"开始批量同步订单数据,共 {len(accounts)} 个账号") start_time = time.time() # 1. 收集所有账号的订单数据 all_orders = await self.redis_client._collect_all_orders(accounts) if not all_orders: logger.info("无订单数据需要同步") return logger.info(f"收集到 {len(all_orders)} 条订单数据") # 2. 批量同步到数据库 # 使用基本版本 success, processed_count = await self._sync_orders_batch_to_db(all_orders) # 或者使用增强版本 # success, results = await self._sync_orders_batch_to_db_enhanced(all_orders) elapsed = time.time() - start_time if success: logger.info(f"订单批量同步完成: 处理 {processed_count} 条订单,耗时 {elapsed:.2f}秒") else: logger.error("订单批量同步失败") except Exception as e: logger.error(f"订单批量同步失败: {e}") async def _sync_orders_batch_to_db(self, all_orders: List[Dict]) -> Tuple[bool, int]: """批量同步订单数据到数据库 Args: all_orders: 订单数据列表 Returns: Tuple[bool, int]: (是否成功, 处理的订单数量) """ if not all_orders: return True, 0 session = self.db_manager.get_session() processed_count = 0 errors = [] try: # 按批次处理 for i in range(0, len(all_orders), self.batch_size): batch_orders = all_orders[i:i + self.batch_size] try: session.begin() # 转换数据并准备批量插入 converted_orders = [] for raw_order in batch_orders: try: converted = self._convert_order_data(raw_order) # 检查必要字段 if not all([ converted.get('order_id'), converted.get('symbol'), converted.get('k_id'), converted.get('side') ]): logger.warning(f"订单缺少必要字段: {raw_order}") continue converted_orders.append(converted) except Exception as e: logger.error(f"转换订单数据失败: {raw_order}, error={e}") continue if not converted_orders: session.commit() continue # 批量插入或更新 upsert_sql = text(""" INSERT INTO deh_strategy_order_new (st_id, k_id, asset, order_id, symbol, side, price, time, order_qty, last_qty, avg_price, exchange_id) VALUES (:st_id, :k_id, :asset, :order_id, :symbol, :side, :price, :time, :order_qty, :last_qty, :avg_price, :exchange_id) ON DUPLICATE KEY UPDATE price = VALUES(price), time = VALUES(time), order_qty = VALUES(order_qty), last_qty = VALUES(last_qty), avg_price = VALUES(avg_price) """) result = session.execute(upsert_sql, converted_orders) processed_count += len(converted_orders) # 计算统计信息 batch_size = len(converted_orders) total_affected = result.rowcount updated_count = max(0, total_affected - batch_size) inserted_count = batch_size - updated_count logger.debug(f"订单批次 {i//self.batch_size + 1}: " f"处理 {batch_size} 条, " f"插入 {inserted_count} 条, " f"更新 {updated_count} 条") session.commit() except Exception as e: session.rollback() error_msg = f"订单批次 {i//self.batch_size + 1} 处理失败: {str(e)}" logger.error(error_msg, exc_info=True) errors.append(error_msg) # 继续处理下一个批次 if errors: logger.error(f"订单同步完成但有错误: {len(errors)} 个错误") for error in errors[:5]: # 只打印前5个错误 logger.error(f"错误详情: {error}") if len(errors) > 5: logger.error(f"...还有 {len(errors) - 5} 个错误") success = len(errors) == 0 return success, processed_count except Exception as e: logger.error(f"订单批量同步失败: {e}", exc_info=True) return False, processed_count finally: session.close() async def _sync_orders_batch_to_db_enhanced(self, all_orders: List[Dict]) -> Tuple[bool, Dict]: """增强版:批量同步订单数据到数据库(带详细统计) Args: all_orders: 订单数据列表 Returns: Tuple[bool, Dict]: (是否成功, 统计结果) """ if not all_orders: return True, {'total': 0, 'processed': 0, 'inserted': 0, 'updated': 0, 'errors': []} session = self.db_manager.get_session() results = { 'total': len(all_orders), 'processed': 0, 'inserted': 0, 'updated': 0, 'errors': [], 'invalid_orders': 0 } try: logger.info(f"开始同步 {results['total']} 条订单数据,批次大小: {self.batch_size}") # 按批次处理 total_batches = (len(all_orders) + self.batch_size - 1) // self.batch_size for batch_idx in range(total_batches): start_idx = batch_idx * self.batch_size end_idx = start_idx + self.batch_size batch_orders = all_orders[start_idx:end_idx] logger.debug(f"处理批次 {batch_idx + 1}/{total_batches}: " f"订单 {start_idx + 1}-{min(end_idx, len(all_orders))}") try: session.begin() # 转换数据 converted_orders = [] batch_invalid = 0 for raw_order in batch_orders: try: converted = self._convert_order_data(raw_order) # 验证必要字段 required_fields = ['order_id', 'symbol', 'k_id', 'side'] missing_fields = [field for field in required_fields if not converted.get(field)] if missing_fields: logger.warning(f"订单缺少必要字段 {missing_fields}: {raw_order}") batch_invalid += 1 continue # 验证字段长度(防止数据库错误) order_id = converted.get('order_id', '') if len(order_id) > 765: # 根据表结构限制 converted['order_id'] = order_id[:765] logger.warning(f"order_id过长已截断: {order_id}") symbol = converted.get('symbol', '') if len(symbol) > 120: converted['symbol'] = symbol[:120] side = converted.get('side', '') if len(side) > 120: converted['side'] = side[:120] converted_orders.append(converted) except Exception as e: logger.error(f"处理订单失败: {raw_order}, error={e}") batch_invalid += 1 continue results['invalid_orders'] += batch_invalid if not converted_orders: session.commit() continue # 批量插入或更新 upsert_sql = text(""" INSERT INTO deh_strategy_order_new (st_id, k_id, asset, order_id, symbol, side, price, time, order_qty, last_qty, avg_price, exchange_id) VALUES (:st_id, :k_id, :asset, :order_id, :symbol, :side, :price, :time, :order_qty, :last_qty, :avg_price, :exchange_id) ON DUPLICATE KEY UPDATE price = VALUES(price), time = VALUES(time), order_qty = VALUES(order_qty), last_qty = VALUES(last_qty), avg_price = VALUES(avg_price), updated_at = CURRENT_TIMESTAMP """) result = session.execute(upsert_sql, converted_orders) # 统计本批次结果 batch_size = len(converted_orders) total_affected = result.rowcount batch_updated = max(0, total_affected - batch_size) batch_inserted = batch_size - batch_updated # 累加到总结果 results['processed'] += batch_size results['inserted'] += batch_inserted results['updated'] += batch_updated logger.info(f"批次 {batch_idx + 1} 完成: " f"有效 {batch_size} 条, " f"无效 {batch_invalid} 条, " f"插入 {batch_inserted} 条, " f"更新 {batch_updated} 条") session.commit() except Exception as e: session.rollback() error_msg = f"批次 {batch_idx + 1} 处理失败: {str(e)}" logger.error(error_msg, exc_info=True) results['errors'].append(error_msg) # 继续处理下一个批次 # 最终统计 success_rate = results['processed'] / results['total'] * 100 if results['total'] > 0 else 0 logger.info(f"订单同步完成: " f"总数={results['total']}, " f"处理={results['processed']}({success_rate:.1f}%), " f"插入={results['inserted']}, " f"更新={results['updated']}, " f"无效={results['invalid_orders']}, " f"错误={len(results['errors'])}") success = len(results['errors']) == 0 return success, results except Exception as e: logger.error(f"订单批量同步失败: {e}", exc_info=True) results['errors'].append(f"同步过程失败: {str(e)}") return False, results finally: session.close() def _convert_order_data(self, data: Dict) -> Dict: """转换订单数据格式""" try: # 安全转换函数 def safe_float(value): if value is None or value == '': return None try: return float(value) except (ValueError, TypeError): return None def safe_int(value): if value is None or value == '': return None try: return int(float(value)) except (ValueError, TypeError): return None def safe_str(value): if value is None: return '' return str(value) return { 'st_id': safe_int(data.get('st_id')) or 0, 'k_id': safe_int(data.get('k_id')) or 0, 'asset': safe_str(data.get('asset')) or 'USDT', 'order_id': safe_str(data.get('order_id')), 'symbol': safe_str(data.get('symbol')), 'side': safe_str(data.get('side')), 'price': safe_float(data.get('price')), 'time': safe_int(data.get('time')), 'order_qty': safe_float(data.get('order_qty')), 'last_qty': safe_float(data.get('last_qty')), 'avg_price': safe_float(data.get('avg_price')), 'exchange_id': None # 忽略该字段 } except Exception as e: logger.error(f"转换订单数据异常: {data}, error={e}") return {}