2025-12-04 19:44:22 +08:00
|
|
|
|
import asyncio
|
2025-12-02 22:05:54 +08:00
|
|
|
|
import redis
|
2025-12-04 19:44:22 +08:00
|
|
|
|
import json
|
|
|
|
|
|
import re
|
2025-12-04 22:03:02 +08:00
|
|
|
|
import time
|
2025-12-02 22:05:54 +08:00
|
|
|
|
from loguru import logger
|
2025-12-04 22:03:02 +08:00
|
|
|
|
from config.settings import REDIS_CONFIG, COMPUTER_NAMES, COMPUTER_NAME_PATTERN,SYNC_CONFIG
|
2025-12-04 19:44:22 +08:00
|
|
|
|
import utils.helpers as helpers
|
|
|
|
|
|
from typing import List, Dict, Any, Set, Tuple, Optional
|
|
|
|
|
|
from datetime import datetime, timedelta
|
2025-12-02 22:05:54 +08:00
|
|
|
|
|
|
|
|
|
|
class RedisClient:
|
|
|
|
|
|
"""Redis客户端管理器"""
|
|
|
|
|
|
|
|
|
|
|
|
_instance = None
|
|
|
|
|
|
|
|
|
|
|
|
def __new__(cls):
|
|
|
|
|
|
if cls._instance is None:
|
|
|
|
|
|
cls._instance = super().__new__(cls)
|
|
|
|
|
|
cls._instance._initialized = False
|
|
|
|
|
|
return cls._instance
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
|
if not self._initialized:
|
|
|
|
|
|
self._pool = None
|
|
|
|
|
|
self._client = None
|
|
|
|
|
|
self._initialized = True
|
2025-12-04 19:44:22 +08:00
|
|
|
|
self.computer_names = self._get_computer_names()
|
|
|
|
|
|
self.computer_name_pattern = re.compile(COMPUTER_NAME_PATTERN)
|
2025-12-02 22:05:54 +08:00
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
|
def client(self):
|
|
|
|
|
|
"""获取Redis客户端(懒加载)"""
|
|
|
|
|
|
if self._client is None:
|
|
|
|
|
|
self._init_connection_pool()
|
|
|
|
|
|
return self._client
|
|
|
|
|
|
|
|
|
|
|
|
def _init_connection_pool(self):
|
|
|
|
|
|
"""初始化连接池"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
self._pool = redis.ConnectionPool(
|
|
|
|
|
|
host=REDIS_CONFIG['host'],
|
|
|
|
|
|
port=REDIS_CONFIG['port'],
|
|
|
|
|
|
db=REDIS_CONFIG['db'],
|
|
|
|
|
|
decode_responses=REDIS_CONFIG['decode_responses'],
|
|
|
|
|
|
max_connections=REDIS_CONFIG['max_connections'],
|
|
|
|
|
|
socket_keepalive=True
|
|
|
|
|
|
)
|
|
|
|
|
|
self._client = redis.Redis(connection_pool=self._pool)
|
|
|
|
|
|
|
|
|
|
|
|
# 测试连接
|
|
|
|
|
|
self._client.ping()
|
|
|
|
|
|
logger.info("Redis连接池初始化成功")
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"Redis连接失败: {e}")
|
|
|
|
|
|
raise
|
|
|
|
|
|
|
2025-12-04 19:44:22 +08:00
|
|
|
|
def get_accounts_from_redis(self) -> Dict[str, Dict]:
|
|
|
|
|
|
"""从Redis获取所有计算机名的账号配置"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
accounts_dict = {}
|
|
|
|
|
|
total_keys_processed = 0
|
|
|
|
|
|
|
|
|
|
|
|
# 方法1:使用配置的计算机名列表
|
|
|
|
|
|
for computer_name in self.computer_names:
|
|
|
|
|
|
accounts = self._get_accounts_by_computer_name(computer_name)
|
|
|
|
|
|
total_keys_processed += 1
|
|
|
|
|
|
accounts_dict.update(accounts)
|
|
|
|
|
|
|
|
|
|
|
|
# 方法2:如果配置的计算机名没有数据,尝试自动发现(备用方案)
|
|
|
|
|
|
if not accounts_dict:
|
|
|
|
|
|
logger.warning("配置的计算机名未找到数据,尝试自动发现...")
|
|
|
|
|
|
accounts_dict = self._discover_all_accounts()
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"从 {len(self.computer_names)} 个计算机名获取到 {len(accounts_dict)} 个账号")
|
|
|
|
|
|
|
|
|
|
|
|
return accounts_dict
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"获取账户信息失败: {e}")
|
|
|
|
|
|
return {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_computer_names(self) -> List[str]:
|
|
|
|
|
|
"""获取计算机名列表"""
|
|
|
|
|
|
if ',' in COMPUTER_NAMES:
|
|
|
|
|
|
names = [name.strip() for name in COMPUTER_NAMES.split(',')]
|
|
|
|
|
|
logger.info(f"使用配置的计算机名列表: {names}")
|
|
|
|
|
|
return names
|
|
|
|
|
|
return [COMPUTER_NAMES.strip()]
|
|
|
|
|
|
|
|
|
|
|
|
def _get_accounts_by_computer_name(self, computer_name: str) -> Dict[str, Dict]:
|
|
|
|
|
|
"""获取指定计算机名的账号"""
|
|
|
|
|
|
accounts_dict = {}
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 构建key
|
|
|
|
|
|
redis_key = f"{computer_name}_strategy_api"
|
|
|
|
|
|
|
|
|
|
|
|
# 从Redis获取数据
|
|
|
|
|
|
result = self.client.hgetall(redis_key)
|
|
|
|
|
|
if not result:
|
|
|
|
|
|
logger.debug(f"未找到 {redis_key} 的策略API配置")
|
|
|
|
|
|
return {}
|
|
|
|
|
|
|
|
|
|
|
|
# logger.info(f"从 {redis_key} 获取到 {len(result)} 个交易所配置")
|
|
|
|
|
|
|
|
|
|
|
|
for exchange_name, accounts_json in result.items():
|
|
|
|
|
|
try:
|
|
|
|
|
|
accounts = json.loads(accounts_json)
|
|
|
|
|
|
if not accounts:
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
# 格式化交易所ID
|
|
|
|
|
|
exchange_id = self.format_exchange_id(exchange_name)
|
|
|
|
|
|
|
|
|
|
|
|
for account_id, account_info in accounts.items():
|
|
|
|
|
|
parsed_account = self.parse_account(exchange_id, account_id, account_info)
|
|
|
|
|
|
if parsed_account:
|
|
|
|
|
|
# 添加计算机名标记
|
|
|
|
|
|
parsed_account['computer_name'] = computer_name
|
|
|
|
|
|
accounts_dict[account_id] = parsed_account
|
|
|
|
|
|
|
|
|
|
|
|
except json.JSONDecodeError as e:
|
|
|
|
|
|
logger.error(f"解析交易所 {exchange_name} 的JSON数据失败: {e}")
|
|
|
|
|
|
continue
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"处理交易所 {exchange_name} 数据异常: {e}")
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"从 {redis_key} 解析到 {len(accounts_dict)} 个账号")
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"获取计算机名 {computer_name} 的账号失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
return accounts_dict
|
|
|
|
|
|
|
|
|
|
|
|
def _discover_all_accounts(self) -> Dict[str, Dict]:
|
|
|
|
|
|
"""自动发现所有匹配的账号key"""
|
|
|
|
|
|
accounts_dict = {}
|
|
|
|
|
|
discovered_keys = []
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 获取所有匹配模式的key
|
|
|
|
|
|
pattern = "*_strategy_api"
|
|
|
|
|
|
cursor = 0
|
|
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
|
cursor, keys = self.client.scan(cursor, match=pattern, count=100)
|
|
|
|
|
|
|
|
|
|
|
|
for key in keys:
|
|
|
|
|
|
key_str = key.decode('utf-8') if isinstance(key, bytes) else key
|
|
|
|
|
|
discovered_keys.append(key_str)
|
|
|
|
|
|
|
|
|
|
|
|
if cursor == 0:
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"自动发现 {len(discovered_keys)} 个策略API key")
|
|
|
|
|
|
|
|
|
|
|
|
# 处理每个发现的key
|
|
|
|
|
|
for key_str in discovered_keys:
|
|
|
|
|
|
# 提取计算机名
|
|
|
|
|
|
computer_name = key_str.replace('_strategy_api', '')
|
|
|
|
|
|
|
|
|
|
|
|
# 验证计算机名格式
|
|
|
|
|
|
if self.computer_name_pattern.match(computer_name):
|
|
|
|
|
|
accounts = self._get_accounts_by_computer_name(computer_name)
|
|
|
|
|
|
accounts_dict.update(accounts)
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.warning(f"跳过不符合格式的计算机名: {computer_name}")
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"自动发现共获取到 {len(accounts_dict)} 个账号")
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"自动发现账号失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
return accounts_dict
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def format_exchange_id(self, key: str) -> str:
|
|
|
|
|
|
"""格式化交易所ID"""
|
|
|
|
|
|
key = key.lower().strip()
|
|
|
|
|
|
|
|
|
|
|
|
# 交易所名称映射
|
|
|
|
|
|
exchange_mapping = {
|
|
|
|
|
|
'metatrader': 'mt5',
|
|
|
|
|
|
'binance_spot_test': 'binance',
|
|
|
|
|
|
'binance_spot': 'binance',
|
|
|
|
|
|
'binance': 'binance',
|
|
|
|
|
|
'gate_spot': 'gate',
|
|
|
|
|
|
'okex': 'okx',
|
|
|
|
|
|
'okx': 'okx',
|
|
|
|
|
|
'bybit': 'bybit',
|
|
|
|
|
|
'bybit_spot': 'bybit',
|
|
|
|
|
|
'bybit_test': 'bybit',
|
|
|
|
|
|
'huobi': 'huobi',
|
|
|
|
|
|
'huobi_spot': 'huobi',
|
|
|
|
|
|
'gate': 'gate',
|
|
|
|
|
|
'gateio': 'gate',
|
|
|
|
|
|
'kucoin': 'kucoin',
|
|
|
|
|
|
'kucoin_spot': 'kucoin',
|
|
|
|
|
|
'mexc': 'mexc',
|
|
|
|
|
|
'mexc_spot': 'mexc',
|
|
|
|
|
|
'bitget': 'bitget',
|
|
|
|
|
|
'bitget_spot': 'bitget'
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
normalized_key = exchange_mapping.get(key, key)
|
|
|
|
|
|
|
|
|
|
|
|
# 记录未映射的交易所
|
|
|
|
|
|
if normalized_key == key and key not in exchange_mapping.values():
|
|
|
|
|
|
logger.debug(f"未映射的交易所名称: {key}")
|
|
|
|
|
|
|
|
|
|
|
|
return normalized_key
|
|
|
|
|
|
|
|
|
|
|
|
def parse_account(self, exchange_id: str, account_id: str, account_info: str) -> Optional[Dict]:
|
|
|
|
|
|
"""解析账号信息"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
source_account_info = json.loads(account_info)
|
|
|
|
|
|
# print(source_account_info)
|
|
|
|
|
|
# 基础信息
|
|
|
|
|
|
account_data = {
|
|
|
|
|
|
'exchange_id': exchange_id,
|
|
|
|
|
|
'k_id': account_id,
|
|
|
|
|
|
'st_id': helpers.safe_int(source_account_info.get('st_id'), 0),
|
|
|
|
|
|
'add_time': helpers.safe_int(source_account_info.get('add_time'), 0),
|
|
|
|
|
|
'api_key': source_account_info.get('api_key', ''),
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return account_data
|
|
|
|
|
|
|
|
|
|
|
|
except json.JSONDecodeError as e:
|
|
|
|
|
|
logger.error(f"解析账号 {account_id} JSON数据失败: {e}, 原始数据: {account_info[:100]}...")
|
|
|
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"处理账号 {account_id} 数据异常: {e}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def _group_accounts_by_exchange(self, accounts: Dict[str, Dict]) -> Dict[str, List[Dict]]:
|
|
|
|
|
|
"""按交易所分组账号"""
|
|
|
|
|
|
groups = {}
|
|
|
|
|
|
for account_id, account_info in accounts.items():
|
|
|
|
|
|
exchange_id = account_info.get('exchange_id')
|
|
|
|
|
|
if exchange_id:
|
|
|
|
|
|
if exchange_id not in groups:
|
|
|
|
|
|
groups[exchange_id] = []
|
|
|
|
|
|
groups[exchange_id].append(account_info)
|
|
|
|
|
|
return groups
|
|
|
|
|
|
|
|
|
|
|
|
async def _collect_all_positions(self, accounts: Dict[str, Dict]) -> List[Dict]:
|
|
|
|
|
|
"""收集所有账号的持仓数据"""
|
|
|
|
|
|
all_positions = []
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 按交易所分组账号
|
|
|
|
|
|
account_groups = self._group_accounts_by_exchange(accounts)
|
|
|
|
|
|
|
|
|
|
|
|
# 并发收集每个交易所的数据
|
|
|
|
|
|
tasks = []
|
|
|
|
|
|
for exchange_id, account_list in account_groups.items():
|
|
|
|
|
|
task = self._collect_exchange_positions(exchange_id, account_list)
|
|
|
|
|
|
tasks.append(task)
|
|
|
|
|
|
|
|
|
|
|
|
# 等待所有任务完成并合并结果
|
|
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
|
|
|
|
|
|
|
|
for result in results:
|
|
|
|
|
|
if isinstance(result, list):
|
|
|
|
|
|
all_positions.extend(result)
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"收集持仓数据失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
return all_positions
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _collect_exchange_positions(self, exchange_id: str, account_list: List[Dict]) -> List[Dict]:
|
|
|
|
|
|
"""收集某个交易所的持仓数据"""
|
|
|
|
|
|
positions_list = []
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
tasks = []
|
|
|
|
|
|
for account_info in account_list:
|
|
|
|
|
|
k_id = int(account_info['k_id'])
|
|
|
|
|
|
st_id = account_info.get('st_id', 0)
|
|
|
|
|
|
task = self._get_positions_from_redis(k_id, st_id, exchange_id)
|
|
|
|
|
|
tasks.append(task)
|
|
|
|
|
|
|
|
|
|
|
|
# 并发获取
|
|
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
|
|
|
|
|
|
|
|
for result in results:
|
|
|
|
|
|
if isinstance(result, list):
|
|
|
|
|
|
positions_list.extend(result)
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"收集交易所 {exchange_id} 持仓数据失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
return positions_list
|
|
|
|
|
|
|
|
|
|
|
|
async def _get_positions_from_redis(self, k_id: int, st_id: int, exchange_id: str) -> List[Dict]:
|
|
|
|
|
|
"""从Redis获取持仓数据"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
redis_key = f"{exchange_id}:positions:{k_id}"
|
|
|
|
|
|
redis_data = self.client.hget(redis_key, 'positions')
|
|
|
|
|
|
|
|
|
|
|
|
if not redis_data:
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
positions = json.loads(redis_data)
|
|
|
|
|
|
|
|
|
|
|
|
# 添加账号信息
|
|
|
|
|
|
for position in positions:
|
|
|
|
|
|
# print(position['symbol'])
|
|
|
|
|
|
position['k_id'] = k_id
|
|
|
|
|
|
position['st_id'] = st_id
|
|
|
|
|
|
position['exchange_id'] = exchange_id
|
|
|
|
|
|
|
|
|
|
|
|
return positions
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"获取Redis持仓数据失败: k_id={k_id}, error={e}")
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _collect_all_account_data(self, accounts: Dict[str, Dict]) -> List[Dict]:
|
|
|
|
|
|
"""收集所有账号的账户信息数据"""
|
|
|
|
|
|
all_account_data = []
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 按交易所分组账号
|
|
|
|
|
|
account_groups = self._group_accounts_by_exchange(accounts)
|
|
|
|
|
|
|
|
|
|
|
|
# 并发收集每个交易所的数据
|
|
|
|
|
|
tasks = []
|
|
|
|
|
|
for exchange_id, account_list in account_groups.items():
|
|
|
|
|
|
task = self._collect_exchange_account_data(exchange_id, account_list)
|
|
|
|
|
|
tasks.append(task)
|
|
|
|
|
|
|
|
|
|
|
|
# 等待所有任务完成并合并结果
|
|
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
|
|
|
|
|
|
|
|
for result in results:
|
|
|
|
|
|
if isinstance(result, list):
|
|
|
|
|
|
all_account_data.extend(result)
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"收集到 {len(all_account_data)} 条账户信息记录")
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"收集账户信息数据失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
return all_account_data
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _collect_exchange_account_data(self, exchange_id: str, account_list: List[Dict]) -> List[Dict]:
|
|
|
|
|
|
"""收集某个交易所的账户信息数据"""
|
|
|
|
|
|
account_data_list = []
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
for account_info in account_list:
|
|
|
|
|
|
k_id = int(account_info['k_id'])
|
|
|
|
|
|
st_id = account_info.get('st_id', 0)
|
2025-12-05 13:25:26 +08:00
|
|
|
|
add_time = account_info.get('add_time', 0)
|
2025-12-04 19:44:22 +08:00
|
|
|
|
|
|
|
|
|
|
# 从Redis获取账户信息数据
|
2025-12-05 13:25:26 +08:00
|
|
|
|
account_data = await self._get_account_info_from_redis(k_id, st_id, exchange_id, add_time)
|
2025-12-04 19:44:22 +08:00
|
|
|
|
account_data_list.extend(account_data)
|
|
|
|
|
|
|
|
|
|
|
|
logger.debug(f"交易所 {exchange_id}: 收集到 {len(account_data_list)} 条账户信息")
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"收集交易所 {exchange_id} 账户信息失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
return account_data_list
|
|
|
|
|
|
|
2025-12-05 13:25:26 +08:00
|
|
|
|
async def _get_account_info_from_redis(self, k_id: int, st_id: int, exchange_id: str, add_time: int) -> List[Dict]:
|
2025-12-04 19:44:22 +08:00
|
|
|
|
"""从Redis获取账户信息数据(批量优化版本)"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
redis_key = f"{exchange_id}:balance:{k_id}"
|
|
|
|
|
|
redis_funds = self.client.hgetall(redis_key)
|
|
|
|
|
|
|
|
|
|
|
|
if not redis_funds:
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
# 按天统计数据
|
2025-12-05 13:25:26 +08:00
|
|
|
|
recent_days = SYNC_CONFIG['recent_days_account']
|
2025-12-04 19:44:22 +08:00
|
|
|
|
|
|
|
|
|
|
today = datetime.now()
|
|
|
|
|
|
date_stats = {}
|
|
|
|
|
|
|
|
|
|
|
|
# 收集所有日期的数据
|
|
|
|
|
|
for fund_key, fund_json in redis_funds.items():
|
|
|
|
|
|
try:
|
|
|
|
|
|
fund_data = json.loads(fund_json)
|
|
|
|
|
|
date_str = fund_data.get('lz_time', '')
|
|
|
|
|
|
lz_type = fund_data.get('lz_type', '')
|
2025-12-05 13:25:26 +08:00
|
|
|
|
add_time_str = helpers.convert_timestamp(add_time,format_str='%Y-%m-%d')
|
|
|
|
|
|
if date_str < add_time_str:
|
|
|
|
|
|
continue
|
2025-12-04 19:44:22 +08:00
|
|
|
|
|
|
|
|
|
|
if not date_str or lz_type not in ['lz_balance', 'deposit', 'withdrawal']:
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
# 只处理最近N天的数据
|
|
|
|
|
|
date_obj = datetime.strptime(date_str, '%Y-%m-%d')
|
2025-12-05 13:25:26 +08:00
|
|
|
|
if recent_days != 0:
|
|
|
|
|
|
if (today - date_obj).days > recent_days:
|
|
|
|
|
|
continue
|
2025-12-04 19:44:22 +08:00
|
|
|
|
|
|
|
|
|
|
if date_str not in date_stats:
|
|
|
|
|
|
date_stats[date_str] = {
|
|
|
|
|
|
'balance': 0.0,
|
|
|
|
|
|
'deposit': 0.0,
|
|
|
|
|
|
'withdrawal': 0.0,
|
|
|
|
|
|
'has_balance': False
|
|
|
|
|
|
}
|
2025-12-04 22:03:02 +08:00
|
|
|
|
if fund_data.get('lz_amount'):
|
|
|
|
|
|
lz_amount = float(fund_data.get('lz_amount', 0))
|
|
|
|
|
|
else:
|
|
|
|
|
|
lz_amount = float(fund_data.get('lz_money', 0))
|
2025-12-04 19:44:22 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if lz_type == 'lz_balance':
|
|
|
|
|
|
date_stats[date_str]['balance'] = lz_amount
|
|
|
|
|
|
date_stats[date_str]['has_balance'] = True
|
|
|
|
|
|
elif lz_type == 'deposit':
|
|
|
|
|
|
date_stats[date_str]['deposit'] += lz_amount
|
|
|
|
|
|
elif lz_type == 'withdrawal':
|
|
|
|
|
|
date_stats[date_str]['withdrawal'] += lz_amount
|
|
|
|
|
|
|
|
|
|
|
|
except (json.JSONDecodeError, ValueError) as e:
|
|
|
|
|
|
logger.debug(f"解析Redis数据失败: {fund_key}, error={e}")
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
# 转换为账户信息数据
|
|
|
|
|
|
account_data_list = []
|
|
|
|
|
|
sorted_dates = sorted(date_stats.keys())
|
|
|
|
|
|
|
|
|
|
|
|
# 获取前一天余额用于计算利润
|
|
|
|
|
|
prev_balance_map = self._get_previous_balances(redis_funds, sorted_dates)
|
|
|
|
|
|
|
|
|
|
|
|
for date_str in sorted_dates:
|
|
|
|
|
|
stats = date_stats[date_str]
|
|
|
|
|
|
|
|
|
|
|
|
# 如果没有余额数据但有充提数据,仍然处理
|
|
|
|
|
|
if not stats['has_balance'] and stats['deposit'] == 0 and stats['withdrawal'] == 0:
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
balance = stats['balance']
|
|
|
|
|
|
deposit = stats['deposit']
|
|
|
|
|
|
withdrawal = stats['withdrawal']
|
|
|
|
|
|
|
|
|
|
|
|
# 计算利润
|
|
|
|
|
|
prev_balance = prev_balance_map.get(date_str, 0.0)
|
|
|
|
|
|
profit = balance - deposit - withdrawal - prev_balance
|
|
|
|
|
|
|
|
|
|
|
|
# 转换时间戳
|
|
|
|
|
|
date_obj = datetime.strptime(date_str, '%Y-%m-%d')
|
|
|
|
|
|
time_timestamp = int(date_obj.timestamp())
|
|
|
|
|
|
|
|
|
|
|
|
account_data = {
|
|
|
|
|
|
'st_id': st_id,
|
|
|
|
|
|
'k_id': k_id,
|
|
|
|
|
|
'balance': balance,
|
|
|
|
|
|
'withdrawal': withdrawal,
|
|
|
|
|
|
'deposit': deposit,
|
|
|
|
|
|
'other': 0.0, # 暂时为0
|
|
|
|
|
|
'profit': profit,
|
|
|
|
|
|
'time': time_timestamp
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
account_data_list.append(account_data)
|
|
|
|
|
|
|
|
|
|
|
|
return account_data_list
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"获取Redis账户信息失败: k_id={k_id}, error={e}")
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
def _get_previous_balances(self, redis_funds: Dict, sorted_dates: List[str]) -> Dict[str, float]:
|
|
|
|
|
|
"""获取前一天的余额"""
|
|
|
|
|
|
prev_balance_map = {}
|
|
|
|
|
|
prev_date = None
|
|
|
|
|
|
|
|
|
|
|
|
for date_str in sorted_dates:
|
|
|
|
|
|
# 查找前一天的余额
|
|
|
|
|
|
if prev_date:
|
|
|
|
|
|
for fund_key, fund_json in redis_funds.items():
|
|
|
|
|
|
try:
|
|
|
|
|
|
fund_data = json.loads(fund_json)
|
|
|
|
|
|
if (fund_data.get('lz_time') == prev_date and
|
|
|
|
|
|
fund_data.get('lz_type') == 'lz_balance'):
|
|
|
|
|
|
prev_balance_map[date_str] = float(fund_data.get('lz_amount', 0))
|
|
|
|
|
|
break
|
|
|
|
|
|
except:
|
|
|
|
|
|
continue
|
|
|
|
|
|
else:
|
|
|
|
|
|
prev_balance_map[date_str] = 0.0
|
|
|
|
|
|
|
|
|
|
|
|
prev_date = date_str
|
|
|
|
|
|
|
|
|
|
|
|
return prev_balance_map
|
|
|
|
|
|
|
2025-12-04 22:03:02 +08:00
|
|
|
|
|
|
|
|
|
|
async def _collect_all_orders(self, accounts: Dict[str, Dict]) -> List[Dict]:
|
|
|
|
|
|
"""收集所有账号的订单数据"""
|
|
|
|
|
|
all_orders = []
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 按交易所分组账号
|
|
|
|
|
|
account_groups = self._group_accounts_by_exchange(accounts)
|
|
|
|
|
|
|
|
|
|
|
|
# 并发收集每个交易所的数据
|
|
|
|
|
|
tasks = []
|
|
|
|
|
|
for exchange_id, account_list in account_groups.items():
|
|
|
|
|
|
task = self._collect_exchange_orders(exchange_id, account_list)
|
|
|
|
|
|
tasks.append(task)
|
|
|
|
|
|
|
|
|
|
|
|
# 等待所有任务完成并合并结果
|
|
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
|
|
|
|
|
|
|
|
for result in results:
|
|
|
|
|
|
if isinstance(result, list):
|
|
|
|
|
|
all_orders.extend(result)
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"收集订单数据失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
return all_orders
|
2025-12-04 19:44:22 +08:00
|
|
|
|
|
2025-12-04 22:03:02 +08:00
|
|
|
|
async def _collect_exchange_orders(self, exchange_id: str, account_list: List[Dict]) -> List[Dict]:
|
|
|
|
|
|
"""收集某个交易所的订单数据"""
|
|
|
|
|
|
orders_list = []
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 并发获取每个账号的数据
|
|
|
|
|
|
tasks = []
|
|
|
|
|
|
for account_info in account_list:
|
|
|
|
|
|
k_id = int(account_info['k_id'])
|
|
|
|
|
|
st_id = account_info.get('st_id', 0)
|
|
|
|
|
|
task = self._get_recent_orders_from_redis(k_id, st_id, exchange_id)
|
|
|
|
|
|
tasks.append(task)
|
|
|
|
|
|
|
|
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
|
|
|
|
|
|
|
|
for result in results:
|
|
|
|
|
|
if isinstance(result, list):
|
|
|
|
|
|
orders_list.extend(result)
|
|
|
|
|
|
|
|
|
|
|
|
logger.debug(f"交易所 {exchange_id}: 收集到 {len(orders_list)} 条订单")
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"收集交易所 {exchange_id} 订单数据失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
return orders_list
|
|
|
|
|
|
|
|
|
|
|
|
async def _get_recent_orders_from_redis(self, k_id: int, st_id: int, exchange_id: str) -> List[Dict]:
|
|
|
|
|
|
"""从Redis获取最近N天的订单数据"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
redis_key = f"{exchange_id}:orders:{k_id}"
|
2025-12-05 13:25:26 +08:00
|
|
|
|
recent_days = SYNC_CONFIG['recent_days_order']
|
2025-12-04 22:03:02 +08:00
|
|
|
|
# 计算最近N天的日期
|
|
|
|
|
|
today = datetime.now()
|
|
|
|
|
|
recent_dates = []
|
|
|
|
|
|
for i in range(recent_days):
|
|
|
|
|
|
date = today - timedelta(days=i)
|
|
|
|
|
|
date_format = date.strftime('%Y-%m-%d')
|
|
|
|
|
|
recent_dates.append(date_format)
|
|
|
|
|
|
|
|
|
|
|
|
# 使用scan获取所有符合条件的key
|
|
|
|
|
|
cursor = 0
|
|
|
|
|
|
recent_keys = []
|
|
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
|
cursor, keys = self.client.hscan(redis_key, cursor, count=1000)
|
|
|
|
|
|
|
|
|
|
|
|
for key, _ in keys.items():
|
|
|
|
|
|
key_str = key.decode('utf-8') if isinstance(key, bytes) else key
|
|
|
|
|
|
|
|
|
|
|
|
if key_str == 'positions':
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
# 检查是否以最近N天的日期开头
|
|
|
|
|
|
for date_format in recent_dates:
|
|
|
|
|
|
if key_str.startswith(date_format + '_'):
|
|
|
|
|
|
recent_keys.append(key_str)
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
if cursor == 0:
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
if not recent_keys:
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
# 批量获取订单数据
|
|
|
|
|
|
orders_list = []
|
|
|
|
|
|
|
|
|
|
|
|
# 分批获取,避免单次hgetall数据量太大
|
|
|
|
|
|
chunk_size = 500
|
|
|
|
|
|
for i in range(0, len(recent_keys), chunk_size):
|
|
|
|
|
|
chunk_keys = recent_keys[i:i + chunk_size]
|
|
|
|
|
|
|
|
|
|
|
|
# 使用hmget批量获取
|
|
|
|
|
|
chunk_values = self.client.hmget(redis_key, chunk_keys)
|
|
|
|
|
|
|
|
|
|
|
|
for key, order_json in zip(chunk_keys, chunk_values):
|
|
|
|
|
|
if not order_json:
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
order = json.loads(order_json)
|
|
|
|
|
|
|
|
|
|
|
|
# 验证时间
|
|
|
|
|
|
order_time = order.get('time', 0)
|
|
|
|
|
|
if order_time >= int(time.time()) - recent_days * 24 * 3600:
|
|
|
|
|
|
# 添加账号信息
|
|
|
|
|
|
order['k_id'] = k_id
|
|
|
|
|
|
order['st_id'] = st_id
|
|
|
|
|
|
order['exchange_id'] = exchange_id
|
|
|
|
|
|
orders_list.append(order)
|
|
|
|
|
|
|
|
|
|
|
|
except json.JSONDecodeError as e:
|
|
|
|
|
|
logger.debug(f"解析订单JSON失败: key={key}, error={e}")
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
return orders_list
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"获取Redis订单数据失败: k_id={k_id}, error={e}")
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
2025-12-04 19:44:22 +08:00
|
|
|
|
|
2025-12-02 22:05:54 +08:00
|
|
|
|
def close(self):
|
|
|
|
|
|
"""关闭连接池"""
|
|
|
|
|
|
if self._pool:
|
|
|
|
|
|
self._pool.disconnect()
|
|
|
|
|
|
logger.info("Redis连接池已关闭")
|