Compare commits

..

7 Commits

Author SHA1 Message Date
lz_db
76ac5370d0 1 2025-12-06 02:10:57 +08:00
lz_db
f472755d84 1 2025-12-06 01:49:38 +08:00
lz_db
0d42a584b7 1 2025-12-06 01:46:05 +08:00
lz_db
64c993319a 1 2025-12-05 13:25:26 +08:00
lz_db
efbcb63cec 1 2025-12-04 22:39:43 +08:00
lz_db
878901826c xm 2025-12-04 22:03:02 +08:00
lz_db
a7152f58ea 1 2025-12-04 22:02:55 +08:00
28 changed files with 809 additions and 3601 deletions

10
.env
View File

@@ -2,14 +2,14 @@
REDIS_HOST=10.0.4.6
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=1
REDIS_DB=0
# MySQL配置
DB_HOST=10.0.4.17
DB_PORT=3306
DB_USER=root
DB_PASSWORD=lz_mysqlLZ
DB_DATABASE=lz_app_test
DB_DATABASE=lz_app2
DB_POOL_SIZE=10
DB_MAX_OVERFLOW=20
DB_POOL_RECYCLE=3600
@@ -19,12 +19,12 @@ SQLALCHEMY_ECHO=false
SQLALCHEMY_ECHO_POOL=false
# 同步配置
SYNC_INTERVAL=20
SYNC_INTERVAL=15
RECENT_DAYS=3
CHUNK_SIZE=1000
ENABLE_POSITION_SYNC=true
ENABLE_ORDER_SYNC=true
ENABLE_ACCOUNT_SYNC=true
ENABLE_ACCOUNT_SYNC=false
# 日志配置
LOG_LEVEL=INFO
@@ -49,7 +49,7 @@ ORDER_REDIS_SCAN_COUNT=1000
POSITION_BATCH_SIZE=500
# 账户同步配置
ACCOUNT_SYNC_RECENT_DAYS=3
ACCOUNT_SYNC_RECENT_DAYS=0
# 并发控制
MAX_CONCURRENT_ACCOUNTS=50

6
.gitignore vendored Normal file
View File

@@ -0,0 +1,6 @@
test.py
__pycache__
logs
.env
*.log
.DS_Store

View File

@@ -17,6 +17,8 @@ REDIS_CONFIG = {
SYNC_CONFIG = {
'interval': int(os.getenv('SYNC_INTERVAL', 60)), # 同步间隔(秒)
'recent_days': int(os.getenv('RECENT_DAYS', 3)), # 同步最近几天数据
'recent_days_account': int(os.getenv('ACCOUNT_SYNC_RECENT_DAYS', 3)), # 同步最近几天数据
'recent_days_order': int(os.getenv('ORDER_SYNC_RECENT_DAYS', 3)), # 同步最近几天数据
'chunk_size': int(os.getenv('CHUNK_SIZE', 1000)), # 批量处理大小
'enable_position_sync': os.getenv('ENABLE_POSITION_SYNC', 'true').lower() == 'true',
'enable_order_sync': os.getenv('ENABLE_ORDER_SYNC', 'true').lower() == 'true',

File diff suppressed because one or more lines are too long

View File

@@ -73,4 +73,4 @@ class StrategyKX(Base):
up_time: Mapped[Optional[datetime]] = mapped_column(DateTime, default=func.now(), onupdate=func.now(), comment='最后更新时间')
def __repr__(self) -> str:
return f"StrategyKX(id={self.id!r}, k_id={self.k_id!r}, time={self.time!r})"
return f"StrategyKX(id={self.id!r}, k_id={self.k_id!r}, st_id={self.st_id!r}, time={self.time!r})"

View File

@@ -1,10 +1,7 @@
from .base_sync import BaseSync
from loguru import logger
from typing import List, Dict, Any, Set
import json
import time
from datetime import datetime, timedelta
from sqlalchemy import text, and_
from typing import List, Dict
from sqlalchemy import text, select, func, and_
from models.orm_models import StrategyKX
class AccountSyncBatch(BaseSync):
@@ -15,14 +12,9 @@ class AccountSyncBatch(BaseSync):
try:
logger.info(f"开始批量同步账户信息,共 {len(accounts)} 个账号")
# 测试
# res = await self.redis_client._get_account_info_from_redis(10140, 5548, 'mt5')
# print(res)
# return
# 收集所有账号的数据
all_account_data = await self.redis_client._collect_all_account_data(accounts)
if not all_account_data:
logger.info("无账户信息数据需要同步")
return
@@ -185,8 +177,44 @@ class AccountSyncBatch(BaseSync):
logger.error(f"批量查询现有记录失败: {e}")
return existing_records
async def sync(self):
"""兼容旧接口"""
accounts = self.get_accounts_from_redis()
await self.sync_batch(accounts)
async def networth(self, accounts: Dict[str, Dict]):
"""计算所有策略的净值"""
# 从accounts中获取所有策略的st_id
st_ids = set()
for account in accounts.values():
st_ids.add(account['st_id'])
stmt = select(
StrategyKX.st_id,
StrategyKX.time,
func.sum(StrategyKX.balance).label('balance_sum'),
func.sum(StrategyKX.profit).label('profit_sum'),
func.sum(StrategyKX.withdrawal).label('withdrawal_sum'),
func.sum(StrategyKX.deposit).label('deposit_sum'),
func.sum(StrategyKX.other).label('other_sum')
).where(
StrategyKX.st_id.in_(st_ids)
).group_by(
StrategyKX.st_id,
StrategyKX.time
).order_by(
StrategyKX.time.asc(),
StrategyKX.st_id.asc()
)
results = self.session.execute(stmt).all()
return [
{
'st_id': row.st_id,
'time': row.time,
'balance_sum': float(row.balance_sum) if row.balance_sum else 0.0,
'profit_sum': float(row.profit_sum) if row.profit_sum else 0.0,
'withdrawal_sum': float(row.withdrawal_sum) if row.withdrawal_sum else 0.0,
'deposit_sum': float(row.deposit_sum) if row.deposit_sum else 0.0,
'other_sum': float(row.other_sum) if row.other_sum else 0.0,
}
for row in results
]

View File

@@ -6,6 +6,7 @@ import time
import json
from typing import Dict
import utils.helpers as helpers
import redis
from utils.redis_client import RedisClient
from config.settings import SYNC_CONFIG
@@ -15,6 +16,7 @@ from .account_sync import AccountSyncBatch
from utils.redis_batch_helper import RedisBatchHelper
from config.settings import COMPUTER_NAMES, COMPUTER_NAME_PATTERN
from typing import List, Dict, Any, Set, Optional
from config.settings import REDIS_CONFIG
class SyncManager:
"""同步管理器(完整批量版本)"""
@@ -62,7 +64,8 @@ class SyncManager:
async def start(self):
"""启动同步服务"""
logger.info(f"同步服务启动,间隔 {self.sync_interval}")
# await self.cp()
# return
while self.is_running:
try:
@@ -155,4 +158,62 @@ class SyncManager:
if hasattr(syncer, 'db_manager'):
syncer.db_manager.close()
logger.info("同步服务停止")
logger.info("同步服务停止")
async def cp(self):
"""把Redis 0库的资产数据复制到Redis 1库"""
# 创建到Redis 0库的连接
redis_0 = redis.Redis(
host=REDIS_CONFIG['host'], # 你的Redis主机
port=REDIS_CONFIG['port'], # 你的Redis端口
db=0, # 数据库0
password=None, # 如果有密码
decode_responses=True # 自动解码为字符串
)
# 创建到Redis 1库的连接
redis_1 = redis.Redis(
host=REDIS_CONFIG['host'],
port=REDIS_CONFIG['port'],
db=1, # 数据库1
password=None,
decode_responses=True
)
accounts = self.redis_client.get_accounts_from_redis()
print(f"{len(accounts)} 个账号")
all_kid = accounts.keys()
keys1 = redis_0.keys("Bybit_fin_*")
# print(f"找到 {len(keys1)} 个匹配的键")
keys2 = redis_0.keys("Metatrader_fin_*")
# print(f"找到 {len(keys2)} 个匹配的键")
all_keys = keys1 + keys2
print(f"{len(all_keys)} 个键")
for key in all_keys:
key_split = key.split("_")
k_id = key_split[-1]
if k_id not in all_kid:
continue
exchange_id = None
if key_split[0] == "Bybit":
exchange_id = "bybit"
elif key_split[0] == "Metatrader":
exchange_id = "mt5"
# data = redis_0.hget(key)
print(f"开始处理键 {key} 的数据")
data = redis_0.hgetall(key)
for k, v in data.items():
# print(f"{k}")
data_dict = json.loads(v)
data_dict['lz_amount'] = float(data_dict['lz_money'])
# del data_dict['lz_money']
if data_dict['lz_type'] == 'lz_balance':
row_key = k
else:
row_key = f"fund_{k}"
# print(row_key)
redis_0.hset(f"{exchange_id}:balance:{data_dict['k_id']}",row_key,json.dumps(data_dict))
# print(f"键 {key} 的数据为 {data}")
redis_0.close()
redis_0.close()
print("复制完成")

View File

@@ -1,12 +1,9 @@
from .base_sync import BaseSync
from loguru import logger
from typing import List, Dict, Any, Tuple
import json
import asyncio
import time
from datetime import datetime, timedelta
from sqlalchemy import text
import redis
class OrderSyncBatch(BaseSync):
"""订单数据批量同步器"""
@@ -20,11 +17,11 @@ class OrderSyncBatch(BaseSync):
"""批量同步所有账号的订单数据"""
try:
logger.info(f"开始批量同步订单数据,共 {len(accounts)} 个账号")
return
start_time = time.time()
# 1. 收集所有账号的订单数据
all_orders = await self._collect_all_orders(accounts)
all_orders = await self.redis_client._collect_all_orders(accounts)
if not all_orders:
logger.info("无订单数据需要同步")
@@ -33,8 +30,12 @@ class OrderSyncBatch(BaseSync):
logger.info(f"收集到 {len(all_orders)} 条订单数据")
# 2. 批量同步到数据库
# 使用基本版本
success, processed_count = await self._sync_orders_batch_to_db(all_orders)
# 或者使用增强版本
# success, results = await self._sync_orders_batch_to_db_enhanced(all_orders)
elapsed = time.time() - start_time
if success:
logger.info(f"订单批量同步完成: 处理 {processed_count} 条订单,耗时 {elapsed:.2f}")
@@ -43,189 +44,273 @@ class OrderSyncBatch(BaseSync):
except Exception as e:
logger.error(f"订单批量同步失败: {e}")
async def _collect_all_orders(self, accounts: Dict[str, Dict]) -> List[Dict]:
"""收集所有账号的订单数据"""
all_orders = []
try:
# 按交易所分组账号
account_groups = self._group_accounts_by_exchange(accounts)
# 并发收集每个交易所的数据
tasks = []
for exchange_id, account_list in account_groups.items():
task = self._collect_exchange_orders(exchange_id, account_list)
tasks.append(task)
# 等待所有任务完成并合并结果
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, list):
all_orders.extend(result)
except Exception as e:
logger.error(f"收集订单数据失败: {e}")
return all_orders
def _group_accounts_by_exchange(self, accounts: Dict[str, Dict]) -> Dict[str, List[Dict]]:
"""按交易所分组账号"""
groups = {}
for account_id, account_info in accounts.items():
exchange_id = account_info.get('exchange_id')
if exchange_id:
if exchange_id not in groups:
groups[exchange_id] = []
groups[exchange_id].append(account_info)
return groups
async def _collect_exchange_orders(self, exchange_id: str, account_list: List[Dict]) -> List[Dict]:
"""收集某个交易所的订单数据"""
orders_list = []
try:
# 并发获取每个账号的数据
tasks = []
for account_info in account_list:
k_id = int(account_info['k_id'])
st_id = account_info.get('st_id', 0)
task = self._get_recent_orders_from_redis(k_id, st_id, exchange_id)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, list):
orders_list.extend(result)
logger.debug(f"交易所 {exchange_id}: 收集到 {len(orders_list)} 条订单")
except Exception as e:
logger.error(f"收集交易所 {exchange_id} 订单数据失败: {e}")
return orders_list
async def _get_recent_orders_from_redis(self, k_id: int, st_id: int, exchange_id: str) -> List[Dict]:
"""从Redis获取最近N天的订单数据"""
try:
redis_key = f"{exchange_id}:orders:{k_id}"
# 计算最近N天的日期
today = datetime.now()
recent_dates = []
for i in range(self.recent_days):
date = today - timedelta(days=i)
date_format = date.strftime('%Y-%m-%d')
recent_dates.append(date_format)
# 使用scan获取所有符合条件的key
cursor = 0
recent_keys = []
while True:
cursor, keys = self.redis_client.client.hscan(redis_key, cursor, count=1000)
for key, _ in keys.items():
key_str = key.decode('utf-8') if isinstance(key, bytes) else key
if key_str == 'positions':
continue
# 检查是否以最近N天的日期开头
for date_format in recent_dates:
if key_str.startswith(date_format + '_'):
recent_keys.append(key_str)
break
if cursor == 0:
break
if not recent_keys:
return []
# 批量获取订单数据
orders_list = []
# 分批获取避免单次hgetall数据量太大
chunk_size = 500
for i in range(0, len(recent_keys), chunk_size):
chunk_keys = recent_keys[i:i + chunk_size]
# 使用hmget批量获取
chunk_values = self.redis_client.client.hmget(redis_key, chunk_keys)
for key, order_json in zip(chunk_keys, chunk_values):
if not order_json:
continue
try:
order = json.loads(order_json)
# 验证时间
order_time = order.get('time', 0)
if order_time >= int(time.time()) - self.recent_days * 24 * 3600:
# 添加账号信息
order['k_id'] = k_id
order['st_id'] = st_id
order['exchange_id'] = exchange_id
orders_list.append(order)
except json.JSONDecodeError as e:
logger.debug(f"解析订单JSON失败: key={key}, error={e}")
continue
return orders_list
except Exception as e:
logger.error(f"获取Redis订单数据失败: k_id={k_id}, error={e}")
return []
async def _sync_orders_batch_to_db(self, all_orders: List[Dict]) -> Tuple[bool, int]:
"""批量同步订单数据到数据库"""
"""批量同步订单数据到数据库
Args:
all_orders: 订单数据列表
Returns:
Tuple[bool, int]: (是否成功, 处理的订单数量)
"""
if not all_orders:
return True, 0
session = self.db_manager.get_session()
processed_count = 0
errors = []
try:
if not all_orders:
return True, 0
# 转换数据
converted_orders = []
for order in all_orders:
# 按批次处理
for i in range(0, len(all_orders), self.batch_size):
batch_orders = all_orders[i:i + self.batch_size]
try:
order_dict = self._convert_order_data(order)
session.begin()
# 检查完整性
required_fields = ['order_id', 'symbol', 'side', 'time']
if not all(order_dict.get(field) for field in required_fields):
# 转换数据并准备批量插入
converted_orders = []
for raw_order in batch_orders:
try:
converted = self._convert_order_data(raw_order)
# 检查必要字段
if not all([
converted.get('order_id'),
converted.get('symbol'),
converted.get('k_id'),
converted.get('side')
]):
logger.warning(f"订单缺少必要字段: {raw_order}")
continue
converted_orders.append(converted)
except Exception as e:
logger.error(f"转换订单数据失败: {raw_order}, error={e}")
continue
if not converted_orders:
session.commit()
continue
converted_orders.append(order_dict)
# 批量插入或更新
upsert_sql = text("""
INSERT INTO deh_strategy_order_new
(st_id, k_id, asset, order_id, symbol, side, price, time,
order_qty, last_qty, avg_price, exchange_id)
VALUES
(:st_id, :k_id, :asset, :order_id, :symbol, :side, :price, :time,
:order_qty, :last_qty, :avg_price, :exchange_id)
ON DUPLICATE KEY UPDATE
price = VALUES(price),
time = VALUES(time),
order_qty = VALUES(order_qty),
last_qty = VALUES(last_qty),
avg_price = VALUES(avg_price)
""")
result = session.execute(upsert_sql, converted_orders)
processed_count += len(converted_orders)
# 计算统计信息
batch_size = len(converted_orders)
total_affected = result.rowcount
updated_count = max(0, total_affected - batch_size)
inserted_count = batch_size - updated_count
logger.debug(f"订单批次 {i//self.batch_size + 1}: "
f"处理 {batch_size} 条, "
f"插入 {inserted_count} 条, "
f"更新 {updated_count}")
session.commit()
except Exception as e:
logger.error(f"转换订单数据失败: {order}, error={e}")
continue
session.rollback()
error_msg = f"订单批次 {i//self.batch_size + 1} 处理失败: {str(e)}"
logger.error(error_msg, exc_info=True)
errors.append(error_msg)
# 继续处理下一个批次
if not converted_orders:
return True, 0
# 使用批量工具同步
from utils.batch_order_sync import BatchOrderSync
batch_tool = BatchOrderSync(self.db_manager, self.batch_size)
success, processed_count = batch_tool.sync_orders_batch(converted_orders)
if errors:
logger.error(f"订单同步完成但有错误: {len(errors)} 个错误")
for error in errors[:5]: # 只打印前5个错误
logger.error(f"错误详情: {error}")
if len(errors) > 5:
logger.error(f"...还有 {len(errors) - 5} 个错误")
success = len(errors) == 0
return success, processed_count
except Exception as e:
logger.error(f"批量同步订单到数据库失败: {e}")
return False, 0
logger.error(f"订单批量同步失败: {e}", exc_info=True)
return False, processed_count
finally:
session.close()
async def _sync_orders_batch_to_db_enhanced(self, all_orders: List[Dict]) -> Tuple[bool, Dict]:
"""增强版:批量同步订单数据到数据库(带详细统计)
Args:
all_orders: 订单数据列表
Returns:
Tuple[bool, Dict]: (是否成功, 统计结果)
"""
if not all_orders:
return True, {'total': 0, 'processed': 0, 'inserted': 0, 'updated': 0, 'errors': []}
session = self.db_manager.get_session()
results = {
'total': len(all_orders),
'processed': 0,
'inserted': 0,
'updated': 0,
'errors': [],
'invalid_orders': 0
}
try:
logger.info(f"开始同步 {results['total']} 条订单数据,批次大小: {self.batch_size}")
# 按批次处理
total_batches = (len(all_orders) + self.batch_size - 1) // self.batch_size
for batch_idx in range(total_batches):
start_idx = batch_idx * self.batch_size
end_idx = start_idx + self.batch_size
batch_orders = all_orders[start_idx:end_idx]
logger.debug(f"处理批次 {batch_idx + 1}/{total_batches}: "
f"订单 {start_idx + 1}-{min(end_idx, len(all_orders))}")
try:
session.begin()
# 转换数据
converted_orders = []
batch_invalid = 0
for raw_order in batch_orders:
try:
converted = self._convert_order_data(raw_order)
# 验证必要字段
required_fields = ['order_id', 'symbol', 'k_id', 'side']
missing_fields = [field for field in required_fields if not converted.get(field)]
if missing_fields:
logger.warning(f"订单缺少必要字段 {missing_fields}: {raw_order}")
batch_invalid += 1
continue
# 验证字段长度(防止数据库错误)
order_id = converted.get('order_id', '')
if len(order_id) > 765: # 根据表结构限制
converted['order_id'] = order_id[:765]
logger.warning(f"order_id过长已截断: {order_id}")
symbol = converted.get('symbol', '')
if len(symbol) > 120:
converted['symbol'] = symbol[:120]
side = converted.get('side', '')
if len(side) > 120:
converted['side'] = side[:120]
converted_orders.append(converted)
except Exception as e:
logger.error(f"处理订单失败: {raw_order}, error={e}")
batch_invalid += 1
continue
results['invalid_orders'] += batch_invalid
if not converted_orders:
session.commit()
continue
# 批量插入或更新
upsert_sql = text("""
INSERT INTO deh_strategy_order_new
(st_id, k_id, asset, order_id, symbol, side, price, time,
order_qty, last_qty, avg_price, exchange_id)
VALUES
(:st_id, :k_id, :asset, :order_id, :symbol, :side, :price, :time,
:order_qty, :last_qty, :avg_price, :exchange_id)
ON DUPLICATE KEY UPDATE
price = VALUES(price),
time = VALUES(time),
order_qty = VALUES(order_qty),
last_qty = VALUES(last_qty),
avg_price = VALUES(avg_price),
updated_at = CURRENT_TIMESTAMP
""")
result = session.execute(upsert_sql, converted_orders)
# 统计本批次结果
batch_size = len(converted_orders)
total_affected = result.rowcount
batch_updated = max(0, total_affected - batch_size)
batch_inserted = batch_size - batch_updated
# 累加到总结果
results['processed'] += batch_size
results['inserted'] += batch_inserted
results['updated'] += batch_updated
logger.info(f"批次 {batch_idx + 1} 完成: "
f"有效 {batch_size} 条, "
f"无效 {batch_invalid} 条, "
f"插入 {batch_inserted} 条, "
f"更新 {batch_updated}")
session.commit()
except Exception as e:
session.rollback()
error_msg = f"批次 {batch_idx + 1} 处理失败: {str(e)}"
logger.error(error_msg, exc_info=True)
results['errors'].append(error_msg)
# 继续处理下一个批次
# 最终统计
success_rate = results['processed'] / results['total'] * 100 if results['total'] > 0 else 0
logger.info(f"订单同步完成: "
f"总数={results['total']}, "
f"处理={results['processed']}({success_rate:.1f}%), "
f"插入={results['inserted']}, "
f"更新={results['updated']}, "
f"无效={results['invalid_orders']}, "
f"错误={len(results['errors'])}")
success = len(results['errors']) == 0
return success, results
except Exception as e:
logger.error(f"订单批量同步失败: {e}", exc_info=True)
results['errors'].append(f"同步过程失败: {str(e)}")
return False, results
finally:
session.close()
def _convert_order_data(self, data: Dict) -> Dict:
"""转换订单数据格式"""
try:
# 安全转换函数
def safe_float(value):
if value is None:
if value is None or value == '':
return None
try:
return float(value)
@@ -233,7 +318,7 @@ class OrderSyncBatch(BaseSync):
return None
def safe_int(value):
if value is None:
if value is None or value == '':
return None
try:
return int(float(value))
@@ -246,9 +331,9 @@ class OrderSyncBatch(BaseSync):
return str(value)
return {
'st_id': safe_int(data.get('st_id'), 0),
'k_id': safe_int(data.get('k_id'), 0),
'asset': 'USDT',
'st_id': safe_int(data.get('st_id')) or 0,
'k_id': safe_int(data.get('k_id')) or 0,
'asset': safe_str(data.get('asset')) or 'USDT',
'order_id': safe_str(data.get('order_id')),
'symbol': safe_str(data.get('symbol')),
'side': safe_str(data.get('side')),
@@ -263,8 +348,4 @@ class OrderSyncBatch(BaseSync):
except Exception as e:
logger.error(f"转换订单数据异常: {data}, error={e}")
return {}
async def sync(self):
"""兼容旧接口"""
accounts = self.get_accounts_from_redis()
await self.sync_batch(accounts)

View File

@@ -15,7 +15,6 @@ class PositionSyncBatch(BaseSync):
async def sync_batch(self, accounts: Dict[str, Dict]):
"""批量同步所有账号的持仓数据"""
return
try:
logger.info(f"开始批量同步持仓数据,共 {len(accounts)} 个账号")

View File

@@ -1,3 +1,4 @@
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Any
from loguru import logger
@@ -29,3 +30,85 @@ def safe_str(self, value: Any, default: str = '') -> str:
except Exception as e:
logger.error(f"safe_str error: {e}")
return ""
def convert_timestamp(timestamp, return_type='str', format_str='%Y-%m-%d %H:%M:%S'):
"""
时间戳转换函数,支持多种返回格式
Args:
timestamp: 时间戳(支持整数、浮点数、字符串)
return_type: 返回类型 'str'/'datetime'/'dict'
format_str: 当return_type='str'时的格式字符串
Returns:
根据return_type返回不同格式的数据
"""
try:
# 转换为浮点数
if isinstance(timestamp, str):
timestamp = float(timestamp)
else:
timestamp = float(timestamp)
# 判断时间戳精度
original_timestamp = timestamp
# 精确判断逻辑
if timestamp > 4102444800000: # 2100-01-01 的毫秒级时间戳
# 可能是微秒级,转换为秒级
timestamp = timestamp / 1000000
precision = 'microseconds'
elif timestamp > 4102444800: # 2100-01-01 的秒级时间戳
# 毫秒级时间戳
timestamp = timestamp / 1000
precision = 'milliseconds'
else:
# 秒级时间戳
precision = 'seconds'
# 转换为 datetime 对象
dt = datetime.fromtimestamp(timestamp)
# 根据返回类型返回不同格式
if return_type == 'datetime':
return dt
elif return_type == 'dict':
return {
'datetime': dt,
'formatted': dt.strftime(format_str),
'precision': precision,
'original_timestamp': original_timestamp,
'converted_timestamp': timestamp
}
else: # 默认返回字符串
return dt.strftime(format_str)
except (ValueError, TypeError, OSError) as e:
logger.error(f"时间戳转换失败: {timestamp}, 错误: {e}")
return None
def timestamp(unit='seconds'):
"""
简化版时间戳获取
Args:
unit: 's'/'ms'/'us''seconds'/'milliseconds'/'microseconds'
Returns:
int: 时间戳
"""
current = time.time()
unit_map = {
's': 1, 'seconds': 1,
'ms': 1000, 'milliseconds': 1000,
'us': 1000000, 'microseconds': 1000000
}
multiplier = unit_map.get(unit.lower(), 1)
return int(current * multiplier)
# 别名函数
def ts(unit='seconds'):
"""timestamp的别名"""
return timestamp(unit)

View File

@@ -2,8 +2,9 @@ import asyncio
import redis
import json
import re
import time
from loguru import logger
from config.settings import REDIS_CONFIG, COMPUTER_NAMES, COMPUTER_NAME_PATTERN
from config.settings import REDIS_CONFIG, COMPUTER_NAMES, COMPUTER_NAME_PATTERN,SYNC_CONFIG
import utils.helpers as helpers
from typing import List, Dict, Any, Set, Tuple, Optional
from datetime import datetime, timedelta
@@ -360,9 +361,10 @@ class RedisClient:
for account_info in account_list:
k_id = int(account_info['k_id'])
st_id = account_info.get('st_id', 0)
add_time = account_info.get('add_time', 0)
# 从Redis获取账户信息数据
account_data = await self._get_account_info_from_redis(k_id, st_id, exchange_id)
account_data = await self._get_account_info_from_redis(k_id, st_id, exchange_id, add_time)
account_data_list.extend(account_data)
logger.debug(f"交易所 {exchange_id}: 收集到 {len(account_data_list)} 条账户信息")
@@ -372,7 +374,7 @@ class RedisClient:
return account_data_list
async def _get_account_info_from_redis(self, k_id: int, st_id: int, exchange_id: str) -> List[Dict]:
async def _get_account_info_from_redis(self, k_id: int, st_id: int, exchange_id: str, add_time: int) -> List[Dict]:
"""从Redis获取账户信息数据批量优化版本"""
try:
redis_key = f"{exchange_id}:balance:{k_id}"
@@ -382,8 +384,7 @@ class RedisClient:
return []
# 按天统计数据
from config.settings import SYNC_CONFIG
recent_days = SYNC_CONFIG['recent_days']
recent_days = SYNC_CONFIG['recent_days_account']
today = datetime.now()
date_stats = {}
@@ -394,14 +395,18 @@ class RedisClient:
fund_data = json.loads(fund_json)
date_str = fund_data.get('lz_time', '')
lz_type = fund_data.get('lz_type', '')
add_time_str = helpers.convert_timestamp(add_time,format_str='%Y-%m-%d')
if date_str < add_time_str:
continue
if not date_str or lz_type not in ['lz_balance', 'deposit', 'withdrawal']:
continue
# 只处理最近N天的数据
date_obj = datetime.strptime(date_str, '%Y-%m-%d')
if (today - date_obj).days > recent_days:
continue
if recent_days != 0:
if (today - date_obj).days > recent_days:
continue
if date_str not in date_stats:
date_stats[date_str] = {
@@ -410,8 +415,11 @@ class RedisClient:
'withdrawal': 0.0,
'has_balance': False
}
if fund_data.get('lz_amount'):
lz_amount = float(fund_data.get('lz_amount', 0))
else:
lz_amount = float(fund_data.get('lz_money', 0))
lz_amount = float(fund_data.get('lz_amount', 0))
if lz_type == 'lz_balance':
date_stats[date_str]['balance'] = lz_amount
@@ -494,7 +502,134 @@ class RedisClient:
return prev_balance_map
async def _collect_all_orders(self, accounts: Dict[str, Dict]) -> List[Dict]:
"""收集所有账号的订单数据"""
all_orders = []
try:
# 按交易所分组账号
account_groups = self._group_accounts_by_exchange(accounts)
# 并发收集每个交易所的数据
tasks = []
for exchange_id, account_list in account_groups.items():
task = self._collect_exchange_orders(exchange_id, account_list)
tasks.append(task)
# 等待所有任务完成并合并结果
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, list):
all_orders.extend(result)
except Exception as e:
logger.error(f"收集订单数据失败: {e}")
return all_orders
async def _collect_exchange_orders(self, exchange_id: str, account_list: List[Dict]) -> List[Dict]:
"""收集某个交易所的订单数据"""
orders_list = []
try:
# 并发获取每个账号的数据
tasks = []
for account_info in account_list:
k_id = int(account_info['k_id'])
st_id = account_info.get('st_id', 0)
task = self._get_recent_orders_from_redis(k_id, st_id, exchange_id)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, list):
orders_list.extend(result)
logger.debug(f"交易所 {exchange_id}: 收集到 {len(orders_list)} 条订单")
except Exception as e:
logger.error(f"收集交易所 {exchange_id} 订单数据失败: {e}")
return orders_list
async def _get_recent_orders_from_redis(self, k_id: int, st_id: int, exchange_id: str) -> List[Dict]:
"""从Redis获取最近N天的订单数据"""
try:
redis_key = f"{exchange_id}:orders:{k_id}"
recent_days = SYNC_CONFIG['recent_days_order']
# 计算最近N天的日期
today = datetime.now()
recent_dates = []
for i in range(recent_days):
date = today - timedelta(days=i)
date_format = date.strftime('%Y-%m-%d')
recent_dates.append(date_format)
# 使用scan获取所有符合条件的key
cursor = 0
recent_keys = []
while True:
cursor, keys = self.client.hscan(redis_key, cursor, count=1000)
for key, _ in keys.items():
key_str = key.decode('utf-8') if isinstance(key, bytes) else key
if key_str == 'positions':
continue
# 检查是否以最近N天的日期开头
for date_format in recent_dates:
if key_str.startswith(date_format + '_'):
recent_keys.append(key_str)
break
if cursor == 0:
break
if not recent_keys:
return []
# 批量获取订单数据
orders_list = []
# 分批获取避免单次hgetall数据量太大
chunk_size = 500
for i in range(0, len(recent_keys), chunk_size):
chunk_keys = recent_keys[i:i + chunk_size]
# 使用hmget批量获取
chunk_values = self.client.hmget(redis_key, chunk_keys)
for key, order_json in zip(chunk_keys, chunk_values):
if not order_json:
continue
try:
order = json.loads(order_json)
# 验证时间
order_time = order.get('time', 0)
if order_time >= int(time.time()) - recent_days * 24 * 3600:
# 添加账号信息
order['k_id'] = k_id
order['st_id'] = st_id
order['exchange_id'] = exchange_id
orders_list.append(order)
except json.JSONDecodeError as e:
logger.debug(f"解析订单JSON失败: key={key}, error={e}")
continue
return orders_list
except Exception as e:
logger.error(f"获取Redis订单数据失败: k_id={k_id}, error={e}")
return []
def close(self):
"""关闭连接池"""

204
数据库.sql Normal file
View File

@@ -0,0 +1,204 @@
/*
Navicat Premium Data Transfer
Source Server : lz_mysql_host
Source Server Type : MySQL
Source Server Version : 50740
Source Host : localhost:3306
Source Schema : lz_app_test
Target Server Type : MySQL
Target Server Version : 50740
File Encoding : 65001
Date: 04/12/2025 21:57:38
*/
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for deh_strategy_asset_new
-- ----------------------------
DROP TABLE IF EXISTS `deh_strategy_asset_new`;
CREATE TABLE `deh_strategy_asset_new` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
`st_id` int(11) NOT NULL COMMENT '策略id',
`asset` varchar(32) NOT NULL COMMENT '资产名称',
`win_rate` decimal(11,8) NOT NULL DEFAULT '0.00000000' COMMENT '胜率',
`aror` decimal(12,8) NOT NULL DEFAULT '0.00000000' COMMENT '年化收益',
`networth` decimal(12,8) NOT NULL DEFAULT '1.00000000' COMMENT '净值',
`max_down` decimal(12,8) NOT NULL DEFAULT '0.00000000' COMMENT '最大回撤',
`profit` decimal(20,8) NOT NULL DEFAULT '0.00000000' COMMENT '总收益',
`profit_rate` decimal(12,8) NOT NULL DEFAULT '0.00000000' COMMENT '总收益率',
`current_num` decimal(20,8) NOT NULL DEFAULT '0.00000000' COMMENT '资产数量',
`usd_num` decimal(20,8) NOT NULL DEFAULT '0.00000000' COMMENT '价值美元数',
`deposit` decimal(20,8) NOT NULL DEFAULT '0.00000000' COMMENT '总充值',
`withdrawal` decimal(20,8) NOT NULL DEFAULT '0.00000000' COMMENT '总提现',
`other` decimal(20,8) NOT NULL DEFAULT '0.00000000' COMMENT '其他总变动',
`time` int(11) NOT NULL COMMENT 'unix时间戳',
PRIMARY KEY (`id`) USING BTREE,
KEY `st_id` (`st_id`) USING BTREE,
KEY `asset` (`asset`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='策略资产表';
-- ----------------------------
-- Table structure for deh_strategy_kx_new
-- ----------------------------
DROP TABLE IF EXISTS `deh_strategy_kx_new`;
CREATE TABLE `deh_strategy_kx_new` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
`st_id` int(11) DEFAULT NULL COMMENT '策略id',
`k_id` int(11) DEFAULT '0' COMMENT '对应strategy_key 的ID',
`asset` varchar(32) DEFAULT NULL COMMENT '资产名',
`balance` decimal(20,8) DEFAULT '0.00000000' COMMENT '当日账户金额',
`withdrawal` decimal(20,8) DEFAULT '0.00000000' COMMENT '当日提现',
`deposit` decimal(20,8) DEFAULT '0.00000000' COMMENT '当日充值',
`other` decimal(20,8) DEFAULT '0.00000000' COMMENT '当日其他',
`profit` decimal(20,8) DEFAULT '0.00000000' COMMENT '当日利润',
`time` int(11) DEFAULT NULL COMMENT '时间',
`up_time` datetime DEFAULT NULL COMMENT '最后更新时间',
PRIMARY KEY (`id`) USING BTREE,
KEY `kid_asset_time` (`k_id`,`asset`,`time`) USING BTREE,
KEY `st_id_asset_time` (`st_id`,`asset`,`time`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=259 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for deh_strategy_networth
-- ----------------------------
DROP TABLE IF EXISTS `deh_strategy_networth`;
CREATE TABLE `deh_strategy_networth` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`st_id` int(11) DEFAULT '0' COMMENT '策略ID对应deh_strategy表的ID',
`asset` varchar(32) CHARACTER SET utf8 DEFAULT NULL COMMENT '资产名',
`share` decimal(20,8) DEFAULT '0.00000000' COMMENT '份额-净值算法',
`net_worth` decimal(13,8) DEFAULT '0.00000000' COMMENT '净值-净值算法',
`total_profit` decimal(20,8) DEFAULT NULL COMMENT '累计利润',
`time` int(10) DEFAULT '0' COMMENT '日期',
`up_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间',
PRIMARY KEY (`id`),
KEY `net_worth_index` (`st_id`,`asset`,`time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- ----------------------------
-- Table structure for deh_strategy_order_new
-- ----------------------------
DROP TABLE IF EXISTS `deh_strategy_order_new`;
CREATE TABLE `deh_strategy_order_new` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`st_id` int(11) DEFAULT NULL COMMENT '策略id',
`k_id` int(11) DEFAULT '0' COMMENT '对应strategy_key 的ID',
`asset` varchar(32) DEFAULT NULL COMMENT '资产名称',
`order_id` varchar(765) DEFAULT NULL COMMENT '订单id',
`symbol` varchar(120) DEFAULT NULL COMMENT '交易对',
`side` varchar(120) DEFAULT NULL COMMENT '订单方向',
`price` float DEFAULT NULL COMMENT '订单价格',
`time` int(11) DEFAULT NULL COMMENT '订单时间',
`order_qty` float DEFAULT NULL COMMENT '订单挂单数量',
`last_qty` float DEFAULT NULL COMMENT '订单成交数量',
`avg_price` float DEFAULT NULL COMMENT '订单成交均价',
`exchange_id` int(11) DEFAULT NULL COMMENT '交易所id',
PRIMARY KEY (`id`),
UNIQUE KEY `order_index` (`order_id`,`symbol`,`k_id`,`side`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1269 DEFAULT CHARSET=utf8 COMMENT='订单表';
-- ----------------------------
-- Table structure for deh_strategy_position_new
-- ----------------------------
DROP TABLE IF EXISTS `deh_strategy_position_new`;
CREATE TABLE `deh_strategy_position_new` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
`st_id` int(11) NOT NULL COMMENT '策略id',
`k_id` int(11) DEFAULT '0' COMMENT '对应strategy_key 的ID',
`asset` varchar(32) DEFAULT '' COMMENT '使用资产名称,如BTC或USDT',
`symbol` varchar(50) NOT NULL COMMENT '交易对',
`price` float DEFAULT NULL COMMENT '持仓均价',
`side` varchar(10) NOT NULL COMMENT '方向',
`sum` float NOT NULL COMMENT '仓位(张数)',
`asset_num` decimal(20,8) DEFAULT '0.00000000' COMMENT '资产数量',
`asset_profit` decimal(20,8) DEFAULT NULL COMMENT '利润数量',
`leverage` int(11) DEFAULT '0' COMMENT '杠杆倍数',
`uptime` int(11) NOT NULL COMMENT '更新时间',
`profit_price` decimal(20,8) DEFAULT '0.00000000' COMMENT '止盈价格',
`stop_price` decimal(20,8) DEFAULT '0.00000000' COMMENT '止损价格',
`liquidation_price` decimal(20,8) DEFAULT '0.00000000' COMMENT '强平价格',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `st_id_asset_sum` (`k_id`,`st_id`,`symbol`,`side`) USING BTREE,
KEY `k_id` (`k_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='持仓表';
-- ----------------------------
-- Event structure for auto_del
-- ----------------------------
DROP EVENT IF EXISTS `auto_del`;
delimiter ;;
CREATE EVENT `auto_del`
ON SCHEDULE
EVERY '1' MINUTE STARTS '2021-11-01 00:00:00'
DO BEGIN
DELETE FROM deh_strategy_kx_new WHERE CONCAT(k_id,asset,time) IN (
SELECT p_key FROM (
SELECT CONCAT(k_id,asset,time) AS p_key FROM deh_strategy_kx_new GROUP BY CONCAT(k_id,asset,time) HAVING count(CONCAT(k_id,asset,time)) > 1)
AS tmp
) AND id NOT IN (
SELECT id FROM (
SELECT min(id) AS id FROM deh_strategy_kx_new GROUP BY CONCAT(k_id,asset,time) HAVING count(CONCAT(k_id,asset,time)) > 1)
AS tmp1
);
DELETE FROM deh_strategy_networth WHERE CONCAT(st_id,asset,time) IN (
SELECT p_key FROM (
SELECT CONCAT(st_id,asset,time) AS p_key FROM deh_strategy_networth GROUP BY CONCAT(st_id,asset,time) HAVING count(CONCAT(st_id,asset,time)) > 1)
AS tmp
) AND id NOT IN (
SELECT id FROM (
SELECT min(id) AS id FROM deh_strategy_networth GROUP BY CONCAT(st_id,asset,time) HAVING count(CONCAT(st_id,asset,time)) > 1)
AS tmp1
);
DELETE FROM deh_strategy_asset_new WHERE CONCAT(st_id,asset) IN (
SELECT p_key FROM (
SELECT CONCAT(st_id,asset) AS p_key FROM deh_strategy_asset_new GROUP BY CONCAT(st_id,asset) HAVING count(CONCAT(st_id,asset)) > 1)
AS tmp
) AND id NOT IN (
SELECT id FROM (
SELECT min(id) AS id FROM deh_strategy_asset_new GROUP BY CONCAT(st_id,asset) HAVING count(CONCAT(st_id,asset)) > 1)
AS tmp1
);
END
;;
delimiter ;
-- ----------------------------
-- Event structure for auto_del_order
-- ----------------------------
DROP EVENT IF EXISTS `auto_del_order`;
delimiter ;;
CREATE EVENT `auto_del_order`
ON SCHEDULE
EVERY '1' HOUR STARTS '2021-11-01 00:00:00'
DO BEGIN
-- DELETE FROM `deh_strategy_order_new` WHERE `time` < UNIX_TIMESTAMP(NOW() - INTERVAL 3 DAY);
DELETE FROM `deh_strategy_order_new` WHERE `time` < UNIX_TIMESTAMP(NOW() - INTERVAL 3 DAY) AND st_id IN (SELECT st_id FROM `deh_strategy` WHERE is_ia=1);
DELETE FROM `deh_strategy_order_new` WHERE `time` < UNIX_TIMESTAMP(NOW() - INTERVAL 30 DAY) AND st_id IN (SELECT st_id FROM `deh_strategy` WHERE is_ia=0);
DELETE from `deh_strategy_order_new` where order_id in (
select order_id from (
SELECT order_id FROM `deh_strategy_order_new` GROUP BY order_id HAVING count(order_id) > 1)
as tmp
) and id not in (
select id from (
SELECT min(id) as id FROM `deh_strategy_order_new` GROUP BY order_id HAVING count(order_id) > 1)
as tmp1
);
END
;;
delimiter ;
SET FOREIGN_KEY_CHECKS = 1;