from datetime import datetime, timedelta from typing import List, Dict, Optional, Any from loguru import logger def safe_float(value, default=0.0): """安全转换为float,处理None和空值""" if value is None: return default try: return float(value) except (ValueError, TypeError): return default def safe_int(value, default=0): """安全转换为int""" if value is None: return default try: return int(float(value)) except (ValueError, TypeError): return default def safe_str(self, value: Any, default: str = '') -> str: """安全转换为str""" if value is None: return "" try: return str(value) except Exception as e: logger.error(f"safe_str error: {e}") return "" def convert_timestamp(timestamp, return_type='str', format_str='%Y-%m-%d %H:%M:%S'): """ 时间戳转换函数,支持多种返回格式 Args: timestamp: 时间戳(支持整数、浮点数、字符串) return_type: 返回类型 'str'/'datetime'/'dict' format_str: 当return_type='str'时的格式字符串 Returns: 根据return_type返回不同格式的数据 """ try: # 转换为浮点数 if isinstance(timestamp, str): timestamp = float(timestamp) else: timestamp = float(timestamp) # 判断时间戳精度 original_timestamp = timestamp # 精确判断逻辑 if timestamp > 4102444800000: # 2100-01-01 的毫秒级时间戳 # 可能是微秒级,转换为秒级 timestamp = timestamp / 1000000 precision = 'microseconds' elif timestamp > 4102444800: # 2100-01-01 的秒级时间戳 # 毫秒级时间戳 timestamp = timestamp / 1000 precision = 'milliseconds' else: # 秒级时间戳 precision = 'seconds' # 转换为 datetime 对象 dt = datetime.fromtimestamp(timestamp) # 根据返回类型返回不同格式 if return_type == 'datetime': return dt elif return_type == 'dict': return { 'datetime': dt, 'formatted': dt.strftime(format_str), 'precision': precision, 'original_timestamp': original_timestamp, 'converted_timestamp': timestamp } else: # 默认返回字符串 return dt.strftime(format_str) except (ValueError, TypeError, OSError) as e: logger.error(f"时间戳转换失败: {timestamp}, 错误: {e}") return None def timestamp(unit='seconds'): """ 简化版时间戳获取 Args: unit: 's'/'ms'/'us' 或 'seconds'/'milliseconds'/'microseconds' Returns: int: 时间戳 """ current = time.time() unit_map = { 's': 1, 'seconds': 1, 'ms': 1000, 'milliseconds': 1000, 'us': 1000000, 'microseconds': 1000000 } multiplier = unit_map.get(unit.lower(), 1) return int(current * multiplier) # 别名函数 def ts(unit='seconds'): """timestamp的别名""" return timestamp(unit)