From 64c993319a6af1c740a871a62b984d63ea947a89 Mon Sep 17 00:00:00 2001 From: lz_db Date: Fri, 5 Dec 2025 13:25:26 +0800 Subject: [PATCH] 1 --- .env | 4 +-- config/settings.py | 2 ++ models/orm_models.py | 2 +- sync/account_sync.py | 48 +++++++++++++++++++++---- sync/manager.py | 65 +++++++++++++++++++++++++++++++-- sync/order_sync.py | 1 + utils/helpers.py | 83 +++++++++++++++++++++++++++++++++++++++++++ utils/redis_client.py | 17 +++++---- 8 files changed, 205 insertions(+), 17 deletions(-) diff --git a/.env b/.env index 0eaa0d9..ea58281 100644 --- a/.env +++ b/.env @@ -19,7 +19,7 @@ SQLALCHEMY_ECHO=false SQLALCHEMY_ECHO_POOL=false # 同步配置 -SYNC_INTERVAL=20 +SYNC_INTERVAL=2 RECENT_DAYS=3 CHUNK_SIZE=1000 ENABLE_POSITION_SYNC=true @@ -49,7 +49,7 @@ ORDER_REDIS_SCAN_COUNT=1000 POSITION_BATCH_SIZE=500 # 账户同步配置 -ACCOUNT_SYNC_RECENT_DAYS=3 +ACCOUNT_SYNC_RECENT_DAYS=0 # 并发控制 MAX_CONCURRENT_ACCOUNTS=50 diff --git a/config/settings.py b/config/settings.py index 11c10e8..c8afece 100644 --- a/config/settings.py +++ b/config/settings.py @@ -17,6 +17,8 @@ REDIS_CONFIG = { SYNC_CONFIG = { 'interval': int(os.getenv('SYNC_INTERVAL', 60)), # 同步间隔(秒) 'recent_days': int(os.getenv('RECENT_DAYS', 3)), # 同步最近几天数据 + 'recent_days_account': int(os.getenv('ACCOUNT_SYNC_RECENT_DAYS', 3)), # 同步最近几天数据 + 'recent_days_order': int(os.getenv('ORDER_SYNC_RECENT_DAYS', 3)), # 同步最近几天数据 'chunk_size': int(os.getenv('CHUNK_SIZE', 1000)), # 批量处理大小 'enable_position_sync': os.getenv('ENABLE_POSITION_SYNC', 'true').lower() == 'true', 'enable_order_sync': os.getenv('ENABLE_ORDER_SYNC', 'true').lower() == 'true', diff --git a/models/orm_models.py b/models/orm_models.py index b06f67b..bf3ca4a 100644 --- a/models/orm_models.py +++ b/models/orm_models.py @@ -73,4 +73,4 @@ class StrategyKX(Base): up_time: Mapped[Optional[datetime]] = mapped_column(DateTime, default=func.now(), onupdate=func.now(), comment='最后更新时间') def __repr__(self) -> str: - return f"StrategyKX(id={self.id!r}, k_id={self.k_id!r}, time={self.time!r})" \ No newline at end of file + return f"StrategyKX(id={self.id!r}, k_id={self.k_id!r}, st_id={self.st_id!r}, time={self.time!r})" \ No newline at end of file diff --git a/sync/account_sync.py b/sync/account_sync.py index dea404f..acd2d7d 100644 --- a/sync/account_sync.py +++ b/sync/account_sync.py @@ -1,7 +1,7 @@ from .base_sync import BaseSync from loguru import logger from typing import List, Dict -from sqlalchemy import text +from sqlalchemy import text, select, func, and_ from models.orm_models import StrategyKX class AccountSyncBatch(BaseSync): @@ -12,14 +12,9 @@ class AccountSyncBatch(BaseSync): try: logger.info(f"开始批量同步账户信息,共 {len(accounts)} 个账号") - # 测试 - # res = await self.redis_client._get_account_info_from_redis(10140, 5548, 'mt5') - # print(res) - # return # 收集所有账号的数据 all_account_data = await self.redis_client._collect_all_account_data(accounts) - if not all_account_data: logger.info("无账户信息数据需要同步") return @@ -182,3 +177,44 @@ class AccountSyncBatch(BaseSync): logger.error(f"批量查询现有记录失败: {e}") return existing_records + + async def networth(self, accounts: Dict[str, Dict]): + """计算所有策略的净值""" + + # 从accounts中获取所有策略的st_id + st_ids = set() + for account in accounts.values(): + st_ids.add(account['st_id']) + + stmt = select( + StrategyKX.st_id, + StrategyKX.time, + func.sum(StrategyKX.balance).label('balance_sum'), + func.sum(StrategyKX.profit).label('profit_sum'), + func.sum(StrategyKX.withdrawal).label('withdrawal_sum'), + func.sum(StrategyKX.deposit).label('deposit_sum'), + func.sum(StrategyKX.other).label('other_sum') + ).where( + StrategyKX.st_id.in_(st_ids) + ).group_by( + StrategyKX.st_id, + StrategyKX.time + ).order_by( + StrategyKX.time.asc(), + StrategyKX.st_id.asc() + ) + + results = self.session.execute(stmt).all() + + return [ + { + 'st_id': row.st_id, + 'time': row.time, + 'balance_sum': float(row.balance_sum) if row.balance_sum else 0.0, + 'profit_sum': float(row.profit_sum) if row.profit_sum else 0.0, + 'withdrawal_sum': float(row.withdrawal_sum) if row.withdrawal_sum else 0.0, + 'deposit_sum': float(row.deposit_sum) if row.deposit_sum else 0.0, + 'other_sum': float(row.other_sum) if row.other_sum else 0.0, + } + for row in results + ] \ No newline at end of file diff --git a/sync/manager.py b/sync/manager.py index d0b5b73..ecc50de 100644 --- a/sync/manager.py +++ b/sync/manager.py @@ -6,6 +6,7 @@ import time import json from typing import Dict import utils.helpers as helpers +import redis from utils.redis_client import RedisClient from config.settings import SYNC_CONFIG @@ -15,6 +16,7 @@ from .account_sync import AccountSyncBatch from utils.redis_batch_helper import RedisBatchHelper from config.settings import COMPUTER_NAMES, COMPUTER_NAME_PATTERN from typing import List, Dict, Any, Set, Optional +from config.settings import REDIS_CONFIG class SyncManager: """同步管理器(完整批量版本)""" @@ -62,7 +64,8 @@ class SyncManager: async def start(self): """启动同步服务""" logger.info(f"同步服务启动,间隔 {self.sync_interval} 秒") - + # await self.cp() + # return while self.is_running: try: @@ -155,4 +158,62 @@ class SyncManager: if hasattr(syncer, 'db_manager'): syncer.db_manager.close() - logger.info("同步服务停止") \ No newline at end of file + logger.info("同步服务停止") + + async def cp(self): + """把Redis 0库的资产数据复制到Redis 1库""" + # 创建到Redis 0库的连接 + redis_0 = redis.Redis( + host=REDIS_CONFIG['host'], # 你的Redis主机 + port=REDIS_CONFIG['port'], # 你的Redis端口 + db=0, # 数据库0 + password=None, # 如果有密码 + decode_responses=True # 自动解码为字符串 + ) + + # 创建到Redis 1库的连接 + redis_1 = redis.Redis( + host=REDIS_CONFIG['host'], + port=REDIS_CONFIG['port'], + db=1, # 数据库1 + password=None, + decode_responses=True + ) + accounts = self.redis_client.get_accounts_from_redis() + print(f"共 {len(accounts)} 个账号") + all_kid = accounts.keys() + + keys1 = redis_0.keys("Bybit_fin_*") + # print(f"找到 {len(keys1)} 个匹配的键") + keys2 = redis_0.keys("Metatrader_fin_*") + # print(f"找到 {len(keys2)} 个匹配的键") + all_keys = keys1 + keys2 + print(f"共 {len(all_keys)} 个键") + for key in all_keys: + key_split = key.split("_") + k_id = key_split[-1] + if k_id not in all_kid: + continue + exchange_id = None + if key_split[0] == "Bybit": + exchange_id = "bybit" + elif key_split[0] == "Metatrader": + exchange_id = "mt5" + # data = redis_0.hget(key) + print(f"开始处理键 {key} 的数据") + data = redis_0.hgetall(key) + for k, v in data.items(): + # print(f"{k}") + data_dict = json.loads(v) + data_dict['lz_amount'] = float(data_dict['lz_money']) + del data_dict['lz_money'] + if data_dict['lz_type'] == 'lz_balance': + row_key = k + else: + row_key = f"fund_{k}" + # print(row_key) + redis_1.hset(f"{exchange_id}:balance:{data_dict['k_id']}",row_key,json.dumps(data_dict)) + # print(f"键 {key} 的数据为 {data}") + redis_1.close() + redis_0.close() + print("复制完成") \ No newline at end of file diff --git a/sync/order_sync.py b/sync/order_sync.py index d5c9e2c..2ca278c 100644 --- a/sync/order_sync.py +++ b/sync/order_sync.py @@ -15,6 +15,7 @@ class OrderSyncBatch(BaseSync): async def sync_batch(self, accounts: Dict[str, Dict]): """批量同步所有账号的订单数据""" + return try: logger.info(f"开始批量同步订单数据,共 {len(accounts)} 个账号") diff --git a/utils/helpers.py b/utils/helpers.py index 631f336..b80abca 100644 --- a/utils/helpers.py +++ b/utils/helpers.py @@ -1,3 +1,4 @@ +from datetime import datetime, timedelta from typing import List, Dict, Optional, Any from loguru import logger @@ -29,3 +30,85 @@ def safe_str(self, value: Any, default: str = '') -> str: except Exception as e: logger.error(f"safe_str error: {e}") return "" + +def convert_timestamp(timestamp, return_type='str', format_str='%Y-%m-%d %H:%M:%S'): + """ + 时间戳转换函数,支持多种返回格式 + + Args: + timestamp: 时间戳(支持整数、浮点数、字符串) + return_type: 返回类型 'str'/'datetime'/'dict' + format_str: 当return_type='str'时的格式字符串 + + Returns: + 根据return_type返回不同格式的数据 + """ + try: + # 转换为浮点数 + if isinstance(timestamp, str): + timestamp = float(timestamp) + else: + timestamp = float(timestamp) + + # 判断时间戳精度 + original_timestamp = timestamp + + # 精确判断逻辑 + if timestamp > 4102444800000: # 2100-01-01 的毫秒级时间戳 + # 可能是微秒级,转换为秒级 + timestamp = timestamp / 1000000 + precision = 'microseconds' + elif timestamp > 4102444800: # 2100-01-01 的秒级时间戳 + # 毫秒级时间戳 + timestamp = timestamp / 1000 + precision = 'milliseconds' + else: + # 秒级时间戳 + precision = 'seconds' + + # 转换为 datetime 对象 + dt = datetime.fromtimestamp(timestamp) + + # 根据返回类型返回不同格式 + if return_type == 'datetime': + return dt + elif return_type == 'dict': + return { + 'datetime': dt, + 'formatted': dt.strftime(format_str), + 'precision': precision, + 'original_timestamp': original_timestamp, + 'converted_timestamp': timestamp + } + else: # 默认返回字符串 + return dt.strftime(format_str) + + except (ValueError, TypeError, OSError) as e: + logger.error(f"时间戳转换失败: {timestamp}, 错误: {e}") + return None + +def timestamp(unit='seconds'): + """ + 简化版时间戳获取 + + Args: + unit: 's'/'ms'/'us' 或 'seconds'/'milliseconds'/'microseconds' + + Returns: + int: 时间戳 + """ + current = time.time() + + unit_map = { + 's': 1, 'seconds': 1, + 'ms': 1000, 'milliseconds': 1000, + 'us': 1000000, 'microseconds': 1000000 + } + + multiplier = unit_map.get(unit.lower(), 1) + return int(current * multiplier) + +# 别名函数 +def ts(unit='seconds'): + """timestamp的别名""" + return timestamp(unit) \ No newline at end of file diff --git a/utils/redis_client.py b/utils/redis_client.py index bd8f0fc..cb50cf5 100644 --- a/utils/redis_client.py +++ b/utils/redis_client.py @@ -361,9 +361,10 @@ class RedisClient: for account_info in account_list: k_id = int(account_info['k_id']) st_id = account_info.get('st_id', 0) + add_time = account_info.get('add_time', 0) # 从Redis获取账户信息数据 - account_data = await self._get_account_info_from_redis(k_id, st_id, exchange_id) + account_data = await self._get_account_info_from_redis(k_id, st_id, exchange_id, add_time) account_data_list.extend(account_data) logger.debug(f"交易所 {exchange_id}: 收集到 {len(account_data_list)} 条账户信息") @@ -373,7 +374,7 @@ class RedisClient: return account_data_list - async def _get_account_info_from_redis(self, k_id: int, st_id: int, exchange_id: str) -> List[Dict]: + async def _get_account_info_from_redis(self, k_id: int, st_id: int, exchange_id: str, add_time: int) -> List[Dict]: """从Redis获取账户信息数据(批量优化版本)""" try: redis_key = f"{exchange_id}:balance:{k_id}" @@ -383,7 +384,7 @@ class RedisClient: return [] # 按天统计数据 - recent_days = SYNC_CONFIG['recent_days'] + recent_days = SYNC_CONFIG['recent_days_account'] today = datetime.now() date_stats = {} @@ -394,14 +395,18 @@ class RedisClient: fund_data = json.loads(fund_json) date_str = fund_data.get('lz_time', '') lz_type = fund_data.get('lz_type', '') + add_time_str = helpers.convert_timestamp(add_time,format_str='%Y-%m-%d') + if date_str < add_time_str: + continue if not date_str or lz_type not in ['lz_balance', 'deposit', 'withdrawal']: continue # 只处理最近N天的数据 date_obj = datetime.strptime(date_str, '%Y-%m-%d') - if (today - date_obj).days > recent_days: - continue + if recent_days != 0: + if (today - date_obj).days > recent_days: + continue if date_str not in date_stats: date_stats[date_str] = { @@ -554,7 +559,7 @@ class RedisClient: """从Redis获取最近N天的订单数据""" try: redis_key = f"{exchange_id}:orders:{k_id}" - recent_days = SYNC_CONFIG['recent_days'] + recent_days = SYNC_CONFIG['recent_days_order'] # 计算最近N天的日期 today = datetime.now() recent_dates = []