Files
exchange_monitor_sync/utils/helpers.py
lz_db 64c993319a 1
2025-12-05 13:25:26 +08:00

114 lines
3.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Any
from loguru import logger
def safe_float(value, default=0.0):
"""安全转换为float处理None和空值"""
if value is None:
return default
try:
return float(value)
except (ValueError, TypeError):
return default
def safe_int(value, default=0):
"""安全转换为int"""
if value is None:
return default
try:
return int(float(value))
except (ValueError, TypeError):
return default
def safe_str(self, value: Any, default: str = '') -> str:
"""安全转换为str"""
if value is None:
return ""
try:
return str(value)
except Exception as e:
logger.error(f"safe_str error: {e}")
return ""
def convert_timestamp(timestamp, return_type='str', format_str='%Y-%m-%d %H:%M:%S'):
"""
时间戳转换函数,支持多种返回格式
Args:
timestamp: 时间戳(支持整数、浮点数、字符串)
return_type: 返回类型 'str'/'datetime'/'dict'
format_str: 当return_type='str'时的格式字符串
Returns:
根据return_type返回不同格式的数据
"""
try:
# 转换为浮点数
if isinstance(timestamp, str):
timestamp = float(timestamp)
else:
timestamp = float(timestamp)
# 判断时间戳精度
original_timestamp = timestamp
# 精确判断逻辑
if timestamp > 4102444800000: # 2100-01-01 的毫秒级时间戳
# 可能是微秒级,转换为秒级
timestamp = timestamp / 1000000
precision = 'microseconds'
elif timestamp > 4102444800: # 2100-01-01 的秒级时间戳
# 毫秒级时间戳
timestamp = timestamp / 1000
precision = 'milliseconds'
else:
# 秒级时间戳
precision = 'seconds'
# 转换为 datetime 对象
dt = datetime.fromtimestamp(timestamp)
# 根据返回类型返回不同格式
if return_type == 'datetime':
return dt
elif return_type == 'dict':
return {
'datetime': dt,
'formatted': dt.strftime(format_str),
'precision': precision,
'original_timestamp': original_timestamp,
'converted_timestamp': timestamp
}
else: # 默认返回字符串
return dt.strftime(format_str)
except (ValueError, TypeError, OSError) as e:
logger.error(f"时间戳转换失败: {timestamp}, 错误: {e}")
return None
def timestamp(unit='seconds'):
"""
简化版时间戳获取
Args:
unit: 's'/'ms'/'us''seconds'/'milliseconds'/'microseconds'
Returns:
int: 时间戳
"""
current = time.time()
unit_map = {
's': 1, 'seconds': 1,
'ms': 1000, 'milliseconds': 1000,
'us': 1000000, 'microseconds': 1000000
}
multiplier = unit_map.get(unit.lower(), 1)
return int(current * multiplier)
# 别名函数
def ts(unit='seconds'):
"""timestamp的别名"""
return timestamp(unit)