Files
exchange_monitor_sync/sync/account_sync.py
lz_db 8dc0f0dbc3 1
2025-12-04 19:44:22 +08:00

192 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from .base_sync import BaseSync
from loguru import logger
from typing import List, Dict, Any, Set
import json
import time
from datetime import datetime, timedelta
from sqlalchemy import text, and_
from models.orm_models import StrategyKX
class AccountSyncBatch(BaseSync):
"""账户信息批量同步器"""
async def sync_batch(self, accounts: Dict[str, Dict]):
"""批量同步所有账号的账户信息"""
try:
logger.info(f"开始批量同步账户信息,共 {len(accounts)} 个账号")
# 测试
# res = await self.redis_client._get_account_info_from_redis(10140, 5548, 'mt5')
# print(res)
# return
# 收集所有账号的数据
all_account_data = await self.redis_client._collect_all_account_data(accounts)
if not all_account_data:
logger.info("无账户信息数据需要同步")
return
# 批量同步到数据库
success = await self._sync_account_info_batch_to_db(all_account_data)
if success:
logger.info(f"账户信息批量同步完成: 处理 {len(all_account_data)} 条记录")
else:
logger.error("账户信息批量同步失败")
except Exception as e:
logger.error(f"账户信息批量同步失败: {e}")
async def _sync_account_info_batch_to_db(self, account_data_list: List[Dict]) -> bool:
"""批量同步账户信息到数据库(最高效版本)"""
session = self.db_manager.get_session()
try:
if not account_data_list:
return True
with session.begin():
# 方法1使用原生SQL批量插入/更新(性能最好)
success = self._batch_upsert_account_info(session, account_data_list)
if not success:
# 方法2回退到ORM批量操作
success = self._batch_orm_upsert_account_info(session, account_data_list)
return success
except Exception as e:
logger.error(f"批量同步账户信息到数据库失败: {e}")
return False
finally:
session.close()
def _batch_upsert_account_info(self, session, account_data_list: List[Dict]) -> bool:
"""使用原生SQL批量插入/更新账户信息"""
try:
# 准备批量数据
values_list = []
for data in account_data_list:
values = (
f"({data['st_id']}, {data['k_id']}, 'USDT', "
f"{data['balance']}, {data['withdrawal']}, {data['deposit']}, "
f"{data['other']}, {data['profit']}, {data['time']})"
)
values_list.append(values)
if not values_list:
return True
values_str = ", ".join(values_list)
# 使用INSERT ... ON DUPLICATE KEY UPDATE
sql = f"""
INSERT INTO deh_strategy_kx_new
(st_id, k_id, asset, balance, withdrawal, deposit, other, profit, time)
VALUES {values_str}
ON DUPLICATE KEY UPDATE
balance = VALUES(balance),
withdrawal = VALUES(withdrawal),
deposit = VALUES(deposit),
other = VALUES(other),
profit = VALUES(profit),
up_time = NOW()
"""
session.execute(text(sql))
logger.info(f"原生SQL批量更新账户信息: {len(account_data_list)} 条记录")
return True
except Exception as e:
logger.error(f"原生SQL批量更新账户信息失败: {e}")
return False
def _batch_orm_upsert_account_info(self, session, account_data_list: List[Dict]) -> bool:
"""使用ORM批量插入/更新账户信息"""
try:
# 分组数据以提高效率
account_data_by_key = {}
for data in account_data_list:
key = (data['k_id'], data['st_id'], data['time'])
account_data_by_key[key] = data
# 批量查询现有记录
existing_records = self._batch_query_existing_records(session, list(account_data_by_key.keys()))
# 批量更新或插入
to_update = []
to_insert = []
for key, data in account_data_by_key.items():
if key in existing_records:
# 更新
record = existing_records[key]
record.balance = data['balance']
record.withdrawal = data['withdrawal']
record.deposit = data['deposit']
record.other = data['other']
record.profit = data['profit']
else:
# 插入
to_insert.append(StrategyKX(**data))
# 批量插入新记录
if to_insert:
session.add_all(to_insert)
logger.info(f"ORM批量更新账户信息: 更新 {len(existing_records)} 条,插入 {len(to_insert)}")
return True
except Exception as e:
logger.error(f"ORM批量更新账户信息失败: {e}")
return False
def _batch_query_existing_records(self, session, keys: List[tuple]) -> Dict[tuple, StrategyKX]:
"""批量查询现有记录"""
existing_records = {}
try:
if not keys:
return existing_records
# 构建查询条件
conditions = []
for k_id, st_id, time_val in keys:
conditions.append(f"(k_id = {k_id} AND st_id = {st_id} AND time = {time_val})")
if conditions:
conditions_str = " OR ".join(conditions)
sql = f"""
SELECT * FROM deh_strategy_kx_new
WHERE {conditions_str}
"""
results = session.execute(text(sql)).fetchall()
for row in results:
key = (row.k_id, row.st_id, row.time)
existing_records[key] = StrategyKX(
id=row.id,
st_id=row.st_id,
k_id=row.k_id,
asset=row.asset,
balance=row.balance,
withdrawal=row.withdrawal,
deposit=row.deposit,
other=row.other,
profit=row.profit,
time=row.time
)
except Exception as e:
logger.error(f"批量查询现有记录失败: {e}")
return existing_records
async def sync(self):
"""兼容旧接口"""
accounts = self.get_accounts_from_redis()
await self.sync_batch(accounts)