2025-12-02 22:05:54 +08:00
|
|
|
|
from .base_sync import BaseSync
|
|
|
|
|
|
from loguru import logger
|
2025-12-04 22:03:02 +08:00
|
|
|
|
from typing import List, Dict
|
2025-12-05 13:25:26 +08:00
|
|
|
|
from sqlalchemy import text, select, func, and_
|
2025-12-03 23:53:07 +08:00
|
|
|
|
from models.orm_models import StrategyKX
|
2025-12-02 22:05:54 +08:00
|
|
|
|
|
2025-12-03 23:53:07 +08:00
|
|
|
|
class AccountSyncBatch(BaseSync):
|
|
|
|
|
|
"""账户信息批量同步器"""
|
2025-12-02 22:05:54 +08:00
|
|
|
|
|
2025-12-03 23:53:07 +08:00
|
|
|
|
async def sync_batch(self, accounts: Dict[str, Dict]):
|
|
|
|
|
|
"""批量同步所有账号的账户信息"""
|
2025-12-02 22:05:54 +08:00
|
|
|
|
try:
|
2025-12-03 23:53:07 +08:00
|
|
|
|
logger.info(f"开始批量同步账户信息,共 {len(accounts)} 个账号")
|
2025-12-04 19:44:22 +08:00
|
|
|
|
|
2025-12-03 23:53:07 +08:00
|
|
|
|
# 收集所有账号的数据
|
2025-12-04 19:44:22 +08:00
|
|
|
|
all_account_data = await self.redis_client._collect_all_account_data(accounts)
|
|
|
|
|
|
|
2025-12-03 23:53:07 +08:00
|
|
|
|
if not all_account_data:
|
|
|
|
|
|
logger.info("无账户信息数据需要同步")
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
# 批量同步到数据库
|
|
|
|
|
|
success = await self._sync_account_info_batch_to_db(all_account_data)
|
|
|
|
|
|
|
|
|
|
|
|
if success:
|
|
|
|
|
|
logger.info(f"账户信息批量同步完成: 处理 {len(all_account_data)} 条记录")
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.error("账户信息批量同步失败")
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"账户信息批量同步失败: {e}")
|
|
|
|
|
|
|
2025-12-04 19:44:22 +08:00
|
|
|
|
|
|
|
|
|
|
|
2025-12-03 23:53:07 +08:00
|
|
|
|
async def _sync_account_info_batch_to_db(self, account_data_list: List[Dict]) -> bool:
|
|
|
|
|
|
"""批量同步账户信息到数据库(最高效版本)"""
|
2025-12-02 22:05:54 +08:00
|
|
|
|
session = self.db_manager.get_session()
|
|
|
|
|
|
try:
|
2025-12-03 23:53:07 +08:00
|
|
|
|
if not account_data_list:
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
2025-12-02 22:05:54 +08:00
|
|
|
|
with session.begin():
|
2025-12-03 23:53:07 +08:00
|
|
|
|
# 方法1:使用原生SQL批量插入/更新(性能最好)
|
|
|
|
|
|
success = self._batch_upsert_account_info(session, account_data_list)
|
|
|
|
|
|
|
|
|
|
|
|
if not success:
|
|
|
|
|
|
# 方法2:回退到ORM批量操作
|
|
|
|
|
|
success = self._batch_orm_upsert_account_info(session, account_data_list)
|
2025-12-02 22:05:54 +08:00
|
|
|
|
|
2025-12-03 23:53:07 +08:00
|
|
|
|
return success
|
2025-12-02 22:05:54 +08:00
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
2025-12-03 23:53:07 +08:00
|
|
|
|
logger.error(f"批量同步账户信息到数据库失败: {e}")
|
2025-12-02 22:05:54 +08:00
|
|
|
|
return False
|
|
|
|
|
|
finally:
|
2025-12-03 23:53:07 +08:00
|
|
|
|
session.close()
|
|
|
|
|
|
|
|
|
|
|
|
def _batch_upsert_account_info(self, session, account_data_list: List[Dict]) -> bool:
|
|
|
|
|
|
"""使用原生SQL批量插入/更新账户信息"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 准备批量数据
|
|
|
|
|
|
values_list = []
|
|
|
|
|
|
for data in account_data_list:
|
|
|
|
|
|
values = (
|
|
|
|
|
|
f"({data['st_id']}, {data['k_id']}, 'USDT', "
|
|
|
|
|
|
f"{data['balance']}, {data['withdrawal']}, {data['deposit']}, "
|
|
|
|
|
|
f"{data['other']}, {data['profit']}, {data['time']})"
|
|
|
|
|
|
)
|
|
|
|
|
|
values_list.append(values)
|
|
|
|
|
|
|
|
|
|
|
|
if not values_list:
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
values_str = ", ".join(values_list)
|
|
|
|
|
|
|
|
|
|
|
|
# 使用INSERT ... ON DUPLICATE KEY UPDATE
|
|
|
|
|
|
sql = f"""
|
|
|
|
|
|
INSERT INTO deh_strategy_kx_new
|
|
|
|
|
|
(st_id, k_id, asset, balance, withdrawal, deposit, other, profit, time)
|
|
|
|
|
|
VALUES {values_str}
|
|
|
|
|
|
ON DUPLICATE KEY UPDATE
|
|
|
|
|
|
balance = VALUES(balance),
|
|
|
|
|
|
withdrawal = VALUES(withdrawal),
|
|
|
|
|
|
deposit = VALUES(deposit),
|
|
|
|
|
|
other = VALUES(other),
|
|
|
|
|
|
profit = VALUES(profit),
|
|
|
|
|
|
up_time = NOW()
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
session.execute(text(sql))
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"原生SQL批量更新账户信息: {len(account_data_list)} 条记录")
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"原生SQL批量更新账户信息失败: {e}")
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def _batch_orm_upsert_account_info(self, session, account_data_list: List[Dict]) -> bool:
|
|
|
|
|
|
"""使用ORM批量插入/更新账户信息"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 分组数据以提高效率
|
|
|
|
|
|
account_data_by_key = {}
|
|
|
|
|
|
for data in account_data_list:
|
|
|
|
|
|
key = (data['k_id'], data['st_id'], data['time'])
|
|
|
|
|
|
account_data_by_key[key] = data
|
|
|
|
|
|
|
|
|
|
|
|
# 批量查询现有记录
|
|
|
|
|
|
existing_records = self._batch_query_existing_records(session, list(account_data_by_key.keys()))
|
|
|
|
|
|
|
|
|
|
|
|
# 批量更新或插入
|
|
|
|
|
|
to_update = []
|
|
|
|
|
|
to_insert = []
|
|
|
|
|
|
|
|
|
|
|
|
for key, data in account_data_by_key.items():
|
|
|
|
|
|
if key in existing_records:
|
|
|
|
|
|
# 更新
|
|
|
|
|
|
record = existing_records[key]
|
|
|
|
|
|
record.balance = data['balance']
|
|
|
|
|
|
record.withdrawal = data['withdrawal']
|
|
|
|
|
|
record.deposit = data['deposit']
|
|
|
|
|
|
record.other = data['other']
|
|
|
|
|
|
record.profit = data['profit']
|
|
|
|
|
|
else:
|
|
|
|
|
|
# 插入
|
|
|
|
|
|
to_insert.append(StrategyKX(**data))
|
|
|
|
|
|
|
|
|
|
|
|
# 批量插入新记录
|
|
|
|
|
|
if to_insert:
|
|
|
|
|
|
session.add_all(to_insert)
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"ORM批量更新账户信息: 更新 {len(existing_records)} 条,插入 {len(to_insert)} 条")
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"ORM批量更新账户信息失败: {e}")
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def _batch_query_existing_records(self, session, keys: List[tuple]) -> Dict[tuple, StrategyKX]:
|
|
|
|
|
|
"""批量查询现有记录"""
|
|
|
|
|
|
existing_records = {}
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
if not keys:
|
|
|
|
|
|
return existing_records
|
|
|
|
|
|
|
|
|
|
|
|
# 构建查询条件
|
|
|
|
|
|
conditions = []
|
|
|
|
|
|
for k_id, st_id, time_val in keys:
|
|
|
|
|
|
conditions.append(f"(k_id = {k_id} AND st_id = {st_id} AND time = {time_val})")
|
|
|
|
|
|
|
|
|
|
|
|
if conditions:
|
|
|
|
|
|
conditions_str = " OR ".join(conditions)
|
|
|
|
|
|
sql = f"""
|
|
|
|
|
|
SELECT * FROM deh_strategy_kx_new
|
|
|
|
|
|
WHERE {conditions_str}
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
results = session.execute(text(sql)).fetchall()
|
|
|
|
|
|
|
|
|
|
|
|
for row in results:
|
|
|
|
|
|
key = (row.k_id, row.st_id, row.time)
|
|
|
|
|
|
existing_records[key] = StrategyKX(
|
|
|
|
|
|
id=row.id,
|
|
|
|
|
|
st_id=row.st_id,
|
|
|
|
|
|
k_id=row.k_id,
|
|
|
|
|
|
asset=row.asset,
|
|
|
|
|
|
balance=row.balance,
|
|
|
|
|
|
withdrawal=row.withdrawal,
|
|
|
|
|
|
deposit=row.deposit,
|
|
|
|
|
|
other=row.other,
|
|
|
|
|
|
profit=row.profit,
|
|
|
|
|
|
time=row.time
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"批量查询现有记录失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
return existing_records
|
2025-12-05 13:25:26 +08:00
|
|
|
|
|
|
|
|
|
|
async def networth(self, accounts: Dict[str, Dict]):
|
|
|
|
|
|
"""计算所有策略的净值"""
|
|
|
|
|
|
|
|
|
|
|
|
# 从accounts中获取所有策略的st_id
|
|
|
|
|
|
st_ids = set()
|
|
|
|
|
|
for account in accounts.values():
|
|
|
|
|
|
st_ids.add(account['st_id'])
|
|
|
|
|
|
|
|
|
|
|
|
stmt = select(
|
|
|
|
|
|
StrategyKX.st_id,
|
|
|
|
|
|
StrategyKX.time,
|
|
|
|
|
|
func.sum(StrategyKX.balance).label('balance_sum'),
|
|
|
|
|
|
func.sum(StrategyKX.profit).label('profit_sum'),
|
|
|
|
|
|
func.sum(StrategyKX.withdrawal).label('withdrawal_sum'),
|
|
|
|
|
|
func.sum(StrategyKX.deposit).label('deposit_sum'),
|
|
|
|
|
|
func.sum(StrategyKX.other).label('other_sum')
|
|
|
|
|
|
).where(
|
|
|
|
|
|
StrategyKX.st_id.in_(st_ids)
|
|
|
|
|
|
).group_by(
|
|
|
|
|
|
StrategyKX.st_id,
|
|
|
|
|
|
StrategyKX.time
|
|
|
|
|
|
).order_by(
|
|
|
|
|
|
StrategyKX.time.asc(),
|
|
|
|
|
|
StrategyKX.st_id.asc()
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
results = self.session.execute(stmt).all()
|
|
|
|
|
|
|
|
|
|
|
|
return [
|
|
|
|
|
|
{
|
|
|
|
|
|
'st_id': row.st_id,
|
|
|
|
|
|
'time': row.time,
|
|
|
|
|
|
'balance_sum': float(row.balance_sum) if row.balance_sum else 0.0,
|
|
|
|
|
|
'profit_sum': float(row.profit_sum) if row.profit_sum else 0.0,
|
|
|
|
|
|
'withdrawal_sum': float(row.withdrawal_sum) if row.withdrawal_sum else 0.0,
|
|
|
|
|
|
'deposit_sum': float(row.deposit_sum) if row.deposit_sum else 0.0,
|
|
|
|
|
|
'other_sum': float(row.other_sum) if row.other_sum else 0.0,
|
|
|
|
|
|
}
|
|
|
|
|
|
for row in results
|
|
|
|
|
|
]
|