from .base_sync import BaseSync from loguru import logger from typing import List, Dict, Any, Set import json import time from datetime import datetime, timedelta from sqlalchemy import text, and_ from models.orm_models import StrategyKX class AccountSyncBatch(BaseSync): """账户信息批量同步器""" async def sync_batch(self, accounts: Dict[str, Dict]): """批量同步所有账号的账户信息""" try: logger.info(f"开始批量同步账户信息,共 {len(accounts)} 个账号") # 收集所有账号的数据 all_account_data = await self._collect_all_account_data(accounts) if not all_account_data: logger.info("无账户信息数据需要同步") return # 批量同步到数据库 success = await self._sync_account_info_batch_to_db(all_account_data) if success: logger.info(f"账户信息批量同步完成: 处理 {len(all_account_data)} 条记录") else: logger.error("账户信息批量同步失败") except Exception as e: logger.error(f"账户信息批量同步失败: {e}") async def _collect_all_account_data(self, accounts: Dict[str, Dict]) -> List[Dict]: """收集所有账号的账户信息数据""" all_account_data = [] try: # 按交易所分组账号 account_groups = self._group_accounts_by_exchange(accounts) # 并发收集每个交易所的数据 tasks = [] for exchange_id, account_list in account_groups.items(): task = self._collect_exchange_account_data(exchange_id, account_list) tasks.append(task) # 等待所有任务完成并合并结果 results = await asyncio.gather(*tasks, return_exceptions=True) for result in results: if isinstance(result, list): all_account_data.extend(result) logger.info(f"收集到 {len(all_account_data)} 条账户信息记录") except Exception as e: logger.error(f"收集账户信息数据失败: {e}") return all_account_data def _group_accounts_by_exchange(self, accounts: Dict[str, Dict]) -> Dict[str, List[Dict]]: """按交易所分组账号""" groups = {} for account_id, account_info in accounts.items(): exchange_id = account_info.get('exchange_id') if exchange_id: if exchange_id not in groups: groups[exchange_id] = [] groups[exchange_id].append(account_info) return groups async def _collect_exchange_account_data(self, exchange_id: str, account_list: List[Dict]) -> List[Dict]: """收集某个交易所的账户信息数据""" account_data_list = [] try: for account_info in account_list: k_id = int(account_info['k_id']) st_id = account_info.get('st_id', 0) # 从Redis获取账户信息数据 account_data = await self._get_account_info_from_redis(k_id, st_id, exchange_id) account_data_list.extend(account_data) logger.debug(f"交易所 {exchange_id}: 收集到 {len(account_data_list)} 条账户信息") except Exception as e: logger.error(f"收集交易所 {exchange_id} 账户信息失败: {e}") return account_data_list async def _get_account_info_from_redis(self, k_id: int, st_id: int, exchange_id: str) -> List[Dict]: """从Redis获取账户信息数据(批量优化版本)""" try: redis_key = f"{exchange_id}:balance:{k_id}" redis_funds = self.redis_client.client.hgetall(redis_key) if not redis_funds: return [] # 按天统计数据 from config.settings import SYNC_CONFIG recent_days = SYNC_CONFIG['recent_days'] today = datetime.now() date_stats = {} # 收集所有日期的数据 for fund_key, fund_json in redis_funds.items(): try: fund_data = json.loads(fund_json) date_str = fund_data.get('lz_time', '') lz_type = fund_data.get('lz_type', '') if not date_str or lz_type not in ['lz_balance', 'deposit', 'withdrawal']: continue # 只处理最近N天的数据 date_obj = datetime.strptime(date_str, '%Y-%m-%d') if (today - date_obj).days > recent_days: continue if date_str not in date_stats: date_stats[date_str] = { 'balance': 0.0, 'deposit': 0.0, 'withdrawal': 0.0, 'has_balance': False } lz_amount = float(fund_data.get('lz_amount', 0)) if lz_type == 'lz_balance': date_stats[date_str]['balance'] = lz_amount date_stats[date_str]['has_balance'] = True elif lz_type == 'deposit': date_stats[date_str]['deposit'] += lz_amount elif lz_type == 'withdrawal': date_stats[date_str]['withdrawal'] += lz_amount except (json.JSONDecodeError, ValueError) as e: logger.debug(f"解析Redis数据失败: {fund_key}, error={e}") continue # 转换为账户信息数据 account_data_list = [] sorted_dates = sorted(date_stats.keys()) # 获取前一天余额用于计算利润 prev_balance_map = self._get_previous_balances(redis_funds, sorted_dates) for date_str in sorted_dates: stats = date_stats[date_str] # 如果没有余额数据但有充提数据,仍然处理 if not stats['has_balance'] and stats['deposit'] == 0 and stats['withdrawal'] == 0: continue balance = stats['balance'] deposit = stats['deposit'] withdrawal = stats['withdrawal'] # 计算利润 prev_balance = prev_balance_map.get(date_str, 0.0) profit = balance - deposit - withdrawal - prev_balance # 转换时间戳 date_obj = datetime.strptime(date_str, '%Y-%m-%d') time_timestamp = int(date_obj.timestamp()) account_data = { 'st_id': st_id, 'k_id': k_id, 'balance': balance, 'withdrawal': withdrawal, 'deposit': deposit, 'other': 0.0, # 暂时为0 'profit': profit, 'time': time_timestamp } account_data_list.append(account_data) return account_data_list except Exception as e: logger.error(f"获取Redis账户信息失败: k_id={k_id}, error={e}") return [] def _get_previous_balances(self, redis_funds: Dict, sorted_dates: List[str]) -> Dict[str, float]: """获取前一天的余额""" prev_balance_map = {} prev_date = None for date_str in sorted_dates: # 查找前一天的余额 if prev_date: for fund_key, fund_json in redis_funds.items(): try: fund_data = json.loads(fund_json) if (fund_data.get('lz_time') == prev_date and fund_data.get('lz_type') == 'lz_balance'): prev_balance_map[date_str] = float(fund_data.get('lz_amount', 0)) break except: continue else: prev_balance_map[date_str] = 0.0 prev_date = date_str return prev_balance_map async def _sync_account_info_batch_to_db(self, account_data_list: List[Dict]) -> bool: """批量同步账户信息到数据库(最高效版本)""" session = self.db_manager.get_session() try: if not account_data_list: return True with session.begin(): # 方法1:使用原生SQL批量插入/更新(性能最好) success = self._batch_upsert_account_info(session, account_data_list) if not success: # 方法2:回退到ORM批量操作 success = self._batch_orm_upsert_account_info(session, account_data_list) return success except Exception as e: logger.error(f"批量同步账户信息到数据库失败: {e}") return False finally: session.close() def _batch_upsert_account_info(self, session, account_data_list: List[Dict]) -> bool: """使用原生SQL批量插入/更新账户信息""" try: # 准备批量数据 values_list = [] for data in account_data_list: values = ( f"({data['st_id']}, {data['k_id']}, 'USDT', " f"{data['balance']}, {data['withdrawal']}, {data['deposit']}, " f"{data['other']}, {data['profit']}, {data['time']})" ) values_list.append(values) if not values_list: return True values_str = ", ".join(values_list) # 使用INSERT ... ON DUPLICATE KEY UPDATE sql = f""" INSERT INTO deh_strategy_kx_new (st_id, k_id, asset, balance, withdrawal, deposit, other, profit, time) VALUES {values_str} ON DUPLICATE KEY UPDATE balance = VALUES(balance), withdrawal = VALUES(withdrawal), deposit = VALUES(deposit), other = VALUES(other), profit = VALUES(profit), up_time = NOW() """ session.execute(text(sql)) logger.info(f"原生SQL批量更新账户信息: {len(account_data_list)} 条记录") return True except Exception as e: logger.error(f"原生SQL批量更新账户信息失败: {e}") return False def _batch_orm_upsert_account_info(self, session, account_data_list: List[Dict]) -> bool: """使用ORM批量插入/更新账户信息""" try: # 分组数据以提高效率 account_data_by_key = {} for data in account_data_list: key = (data['k_id'], data['st_id'], data['time']) account_data_by_key[key] = data # 批量查询现有记录 existing_records = self._batch_query_existing_records(session, list(account_data_by_key.keys())) # 批量更新或插入 to_update = [] to_insert = [] for key, data in account_data_by_key.items(): if key in existing_records: # 更新 record = existing_records[key] record.balance = data['balance'] record.withdrawal = data['withdrawal'] record.deposit = data['deposit'] record.other = data['other'] record.profit = data['profit'] else: # 插入 to_insert.append(StrategyKX(**data)) # 批量插入新记录 if to_insert: session.add_all(to_insert) logger.info(f"ORM批量更新账户信息: 更新 {len(existing_records)} 条,插入 {len(to_insert)} 条") return True except Exception as e: logger.error(f"ORM批量更新账户信息失败: {e}") return False def _batch_query_existing_records(self, session, keys: List[tuple]) -> Dict[tuple, StrategyKX]: """批量查询现有记录""" existing_records = {} try: if not keys: return existing_records # 构建查询条件 conditions = [] for k_id, st_id, time_val in keys: conditions.append(f"(k_id = {k_id} AND st_id = {st_id} AND time = {time_val})") if conditions: conditions_str = " OR ".join(conditions) sql = f""" SELECT * FROM deh_strategy_kx_new WHERE {conditions_str} """ results = session.execute(text(sql)).fetchall() for row in results: key = (row.k_id, row.st_id, row.time) existing_records[key] = StrategyKX( id=row.id, st_id=row.st_id, k_id=row.k_id, asset=row.asset, balance=row.balance, withdrawal=row.withdrawal, deposit=row.deposit, other=row.other, profit=row.profit, time=row.time ) except Exception as e: logger.error(f"批量查询现有记录失败: {e}") return existing_records async def sync(self): """兼容旧接口""" accounts = self.get_accounts_from_redis() await self.sync_batch(accounts)