Files
exchange_monitor_sync/utils/helpers.py

114 lines
3.2 KiB
Python
Raw Normal View History

2025-12-05 13:25:26 +08:00
from datetime import datetime, timedelta
2025-12-04 15:40:19 +08:00
from typing import List, Dict, Optional, Any
from loguru import logger
def safe_float(value, default=0.0):
"""安全转换为float处理None和空值"""
if value is None:
return default
try:
return float(value)
except (ValueError, TypeError):
return default
def safe_int(value, default=0):
"""安全转换为int"""
if value is None:
return default
try:
return int(float(value))
except (ValueError, TypeError):
return default
def safe_str(self, value: Any, default: str = '') -> str:
"""安全转换为str"""
if value is None:
return ""
try:
return str(value)
except Exception as e:
logger.error(f"safe_str error: {e}")
return ""
2025-12-05 13:25:26 +08:00
def convert_timestamp(timestamp, return_type='str', format_str='%Y-%m-%d %H:%M:%S'):
"""
时间戳转换函数支持多种返回格式
Args:
timestamp: 时间戳支持整数浮点数字符串
return_type: 返回类型 'str'/'datetime'/'dict'
format_str: 当return_type='str'时的格式字符串
Returns:
根据return_type返回不同格式的数据
"""
try:
# 转换为浮点数
if isinstance(timestamp, str):
timestamp = float(timestamp)
else:
timestamp = float(timestamp)
# 判断时间戳精度
original_timestamp = timestamp
# 精确判断逻辑
if timestamp > 4102444800000: # 2100-01-01 的毫秒级时间戳
# 可能是微秒级,转换为秒级
timestamp = timestamp / 1000000
precision = 'microseconds'
elif timestamp > 4102444800: # 2100-01-01 的秒级时间戳
# 毫秒级时间戳
timestamp = timestamp / 1000
precision = 'milliseconds'
else:
# 秒级时间戳
precision = 'seconds'
# 转换为 datetime 对象
dt = datetime.fromtimestamp(timestamp)
# 根据返回类型返回不同格式
if return_type == 'datetime':
return dt
elif return_type == 'dict':
return {
'datetime': dt,
'formatted': dt.strftime(format_str),
'precision': precision,
'original_timestamp': original_timestamp,
'converted_timestamp': timestamp
}
else: # 默认返回字符串
return dt.strftime(format_str)
except (ValueError, TypeError, OSError) as e:
logger.error(f"时间戳转换失败: {timestamp}, 错误: {e}")
return None
def timestamp(unit='seconds'):
"""
简化版时间戳获取
Args:
unit: 's'/'ms'/'us' 'seconds'/'milliseconds'/'microseconds'
Returns:
int: 时间戳
"""
current = time.time()
unit_map = {
's': 1, 'seconds': 1,
'ms': 1000, 'milliseconds': 1000,
'us': 1000000, 'microseconds': 1000000
}
multiplier = unit_map.get(unit.lower(), 1)
return int(current * multiplier)
# 别名函数
def ts(unit='seconds'):
"""timestamp的别名"""
return timestamp(unit)