Compare commits
5 Commits
878901826c
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
76ac5370d0 | ||
|
|
f472755d84 | ||
|
|
0d42a584b7 | ||
|
|
64c993319a | ||
|
|
efbcb63cec |
10
.env
10
.env
@@ -2,14 +2,14 @@
|
|||||||
REDIS_HOST=10.0.4.6
|
REDIS_HOST=10.0.4.6
|
||||||
REDIS_PORT=6379
|
REDIS_PORT=6379
|
||||||
REDIS_PASSWORD=
|
REDIS_PASSWORD=
|
||||||
REDIS_DB=1
|
REDIS_DB=0
|
||||||
|
|
||||||
# MySQL配置
|
# MySQL配置
|
||||||
DB_HOST=10.0.4.17
|
DB_HOST=10.0.4.17
|
||||||
DB_PORT=3306
|
DB_PORT=3306
|
||||||
DB_USER=root
|
DB_USER=root
|
||||||
DB_PASSWORD=lz_mysqlLZ
|
DB_PASSWORD=lz_mysqlLZ
|
||||||
DB_DATABASE=lz_app_test
|
DB_DATABASE=lz_app2
|
||||||
DB_POOL_SIZE=10
|
DB_POOL_SIZE=10
|
||||||
DB_MAX_OVERFLOW=20
|
DB_MAX_OVERFLOW=20
|
||||||
DB_POOL_RECYCLE=3600
|
DB_POOL_RECYCLE=3600
|
||||||
@@ -19,12 +19,12 @@ SQLALCHEMY_ECHO=false
|
|||||||
SQLALCHEMY_ECHO_POOL=false
|
SQLALCHEMY_ECHO_POOL=false
|
||||||
|
|
||||||
# 同步配置
|
# 同步配置
|
||||||
SYNC_INTERVAL=20
|
SYNC_INTERVAL=15
|
||||||
RECENT_DAYS=3
|
RECENT_DAYS=3
|
||||||
CHUNK_SIZE=1000
|
CHUNK_SIZE=1000
|
||||||
ENABLE_POSITION_SYNC=true
|
ENABLE_POSITION_SYNC=true
|
||||||
ENABLE_ORDER_SYNC=true
|
ENABLE_ORDER_SYNC=true
|
||||||
ENABLE_ACCOUNT_SYNC=true
|
ENABLE_ACCOUNT_SYNC=false
|
||||||
|
|
||||||
# 日志配置
|
# 日志配置
|
||||||
LOG_LEVEL=INFO
|
LOG_LEVEL=INFO
|
||||||
@@ -49,7 +49,7 @@ ORDER_REDIS_SCAN_COUNT=1000
|
|||||||
POSITION_BATCH_SIZE=500
|
POSITION_BATCH_SIZE=500
|
||||||
|
|
||||||
# 账户同步配置
|
# 账户同步配置
|
||||||
ACCOUNT_SYNC_RECENT_DAYS=3
|
ACCOUNT_SYNC_RECENT_DAYS=0
|
||||||
|
|
||||||
# 并发控制
|
# 并发控制
|
||||||
MAX_CONCURRENT_ACCOUNTS=50
|
MAX_CONCURRENT_ACCOUNTS=50
|
||||||
|
|||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -17,6 +17,8 @@ REDIS_CONFIG = {
|
|||||||
SYNC_CONFIG = {
|
SYNC_CONFIG = {
|
||||||
'interval': int(os.getenv('SYNC_INTERVAL', 60)), # 同步间隔(秒)
|
'interval': int(os.getenv('SYNC_INTERVAL', 60)), # 同步间隔(秒)
|
||||||
'recent_days': int(os.getenv('RECENT_DAYS', 3)), # 同步最近几天数据
|
'recent_days': int(os.getenv('RECENT_DAYS', 3)), # 同步最近几天数据
|
||||||
|
'recent_days_account': int(os.getenv('ACCOUNT_SYNC_RECENT_DAYS', 3)), # 同步最近几天数据
|
||||||
|
'recent_days_order': int(os.getenv('ORDER_SYNC_RECENT_DAYS', 3)), # 同步最近几天数据
|
||||||
'chunk_size': int(os.getenv('CHUNK_SIZE', 1000)), # 批量处理大小
|
'chunk_size': int(os.getenv('CHUNK_SIZE', 1000)), # 批量处理大小
|
||||||
'enable_position_sync': os.getenv('ENABLE_POSITION_SYNC', 'true').lower() == 'true',
|
'enable_position_sync': os.getenv('ENABLE_POSITION_SYNC', 'true').lower() == 'true',
|
||||||
'enable_order_sync': os.getenv('ENABLE_ORDER_SYNC', 'true').lower() == 'true',
|
'enable_order_sync': os.getenv('ENABLE_ORDER_SYNC', 'true').lower() == 'true',
|
||||||
|
|||||||
Binary file not shown.
Binary file not shown.
@@ -73,4 +73,4 @@ class StrategyKX(Base):
|
|||||||
up_time: Mapped[Optional[datetime]] = mapped_column(DateTime, default=func.now(), onupdate=func.now(), comment='最后更新时间')
|
up_time: Mapped[Optional[datetime]] = mapped_column(DateTime, default=func.now(), onupdate=func.now(), comment='最后更新时间')
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
return f"StrategyKX(id={self.id!r}, k_id={self.k_id!r}, time={self.time!r})"
|
return f"StrategyKX(id={self.id!r}, k_id={self.k_id!r}, st_id={self.st_id!r}, time={self.time!r})"
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,7 +1,7 @@
|
|||||||
from .base_sync import BaseSync
|
from .base_sync import BaseSync
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
from typing import List, Dict
|
from typing import List, Dict
|
||||||
from sqlalchemy import text
|
from sqlalchemy import text, select, func, and_
|
||||||
from models.orm_models import StrategyKX
|
from models.orm_models import StrategyKX
|
||||||
|
|
||||||
class AccountSyncBatch(BaseSync):
|
class AccountSyncBatch(BaseSync):
|
||||||
@@ -12,14 +12,9 @@ class AccountSyncBatch(BaseSync):
|
|||||||
try:
|
try:
|
||||||
logger.info(f"开始批量同步账户信息,共 {len(accounts)} 个账号")
|
logger.info(f"开始批量同步账户信息,共 {len(accounts)} 个账号")
|
||||||
|
|
||||||
# 测试
|
|
||||||
# res = await self.redis_client._get_account_info_from_redis(10140, 5548, 'mt5')
|
|
||||||
# print(res)
|
|
||||||
# return
|
|
||||||
# 收集所有账号的数据
|
# 收集所有账号的数据
|
||||||
all_account_data = await self.redis_client._collect_all_account_data(accounts)
|
all_account_data = await self.redis_client._collect_all_account_data(accounts)
|
||||||
|
|
||||||
|
|
||||||
if not all_account_data:
|
if not all_account_data:
|
||||||
logger.info("无账户信息数据需要同步")
|
logger.info("无账户信息数据需要同步")
|
||||||
return
|
return
|
||||||
@@ -182,3 +177,44 @@ class AccountSyncBatch(BaseSync):
|
|||||||
logger.error(f"批量查询现有记录失败: {e}")
|
logger.error(f"批量查询现有记录失败: {e}")
|
||||||
|
|
||||||
return existing_records
|
return existing_records
|
||||||
|
|
||||||
|
async def networth(self, accounts: Dict[str, Dict]):
|
||||||
|
"""计算所有策略的净值"""
|
||||||
|
|
||||||
|
# 从accounts中获取所有策略的st_id
|
||||||
|
st_ids = set()
|
||||||
|
for account in accounts.values():
|
||||||
|
st_ids.add(account['st_id'])
|
||||||
|
|
||||||
|
stmt = select(
|
||||||
|
StrategyKX.st_id,
|
||||||
|
StrategyKX.time,
|
||||||
|
func.sum(StrategyKX.balance).label('balance_sum'),
|
||||||
|
func.sum(StrategyKX.profit).label('profit_sum'),
|
||||||
|
func.sum(StrategyKX.withdrawal).label('withdrawal_sum'),
|
||||||
|
func.sum(StrategyKX.deposit).label('deposit_sum'),
|
||||||
|
func.sum(StrategyKX.other).label('other_sum')
|
||||||
|
).where(
|
||||||
|
StrategyKX.st_id.in_(st_ids)
|
||||||
|
).group_by(
|
||||||
|
StrategyKX.st_id,
|
||||||
|
StrategyKX.time
|
||||||
|
).order_by(
|
||||||
|
StrategyKX.time.asc(),
|
||||||
|
StrategyKX.st_id.asc()
|
||||||
|
)
|
||||||
|
|
||||||
|
results = self.session.execute(stmt).all()
|
||||||
|
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
'st_id': row.st_id,
|
||||||
|
'time': row.time,
|
||||||
|
'balance_sum': float(row.balance_sum) if row.balance_sum else 0.0,
|
||||||
|
'profit_sum': float(row.profit_sum) if row.profit_sum else 0.0,
|
||||||
|
'withdrawal_sum': float(row.withdrawal_sum) if row.withdrawal_sum else 0.0,
|
||||||
|
'deposit_sum': float(row.deposit_sum) if row.deposit_sum else 0.0,
|
||||||
|
'other_sum': float(row.other_sum) if row.other_sum else 0.0,
|
||||||
|
}
|
||||||
|
for row in results
|
||||||
|
]
|
||||||
@@ -6,6 +6,7 @@ import time
|
|||||||
import json
|
import json
|
||||||
from typing import Dict
|
from typing import Dict
|
||||||
import utils.helpers as helpers
|
import utils.helpers as helpers
|
||||||
|
import redis
|
||||||
|
|
||||||
from utils.redis_client import RedisClient
|
from utils.redis_client import RedisClient
|
||||||
from config.settings import SYNC_CONFIG
|
from config.settings import SYNC_CONFIG
|
||||||
@@ -15,6 +16,7 @@ from .account_sync import AccountSyncBatch
|
|||||||
from utils.redis_batch_helper import RedisBatchHelper
|
from utils.redis_batch_helper import RedisBatchHelper
|
||||||
from config.settings import COMPUTER_NAMES, COMPUTER_NAME_PATTERN
|
from config.settings import COMPUTER_NAMES, COMPUTER_NAME_PATTERN
|
||||||
from typing import List, Dict, Any, Set, Optional
|
from typing import List, Dict, Any, Set, Optional
|
||||||
|
from config.settings import REDIS_CONFIG
|
||||||
|
|
||||||
class SyncManager:
|
class SyncManager:
|
||||||
"""同步管理器(完整批量版本)"""
|
"""同步管理器(完整批量版本)"""
|
||||||
@@ -62,7 +64,8 @@ class SyncManager:
|
|||||||
async def start(self):
|
async def start(self):
|
||||||
"""启动同步服务"""
|
"""启动同步服务"""
|
||||||
logger.info(f"同步服务启动,间隔 {self.sync_interval} 秒")
|
logger.info(f"同步服务启动,间隔 {self.sync_interval} 秒")
|
||||||
|
# await self.cp()
|
||||||
|
# return
|
||||||
while self.is_running:
|
while self.is_running:
|
||||||
try:
|
try:
|
||||||
|
|
||||||
@@ -156,3 +159,61 @@ class SyncManager:
|
|||||||
syncer.db_manager.close()
|
syncer.db_manager.close()
|
||||||
|
|
||||||
logger.info("同步服务停止")
|
logger.info("同步服务停止")
|
||||||
|
|
||||||
|
async def cp(self):
|
||||||
|
"""把Redis 0库的资产数据复制到Redis 1库"""
|
||||||
|
# 创建到Redis 0库的连接
|
||||||
|
redis_0 = redis.Redis(
|
||||||
|
host=REDIS_CONFIG['host'], # 你的Redis主机
|
||||||
|
port=REDIS_CONFIG['port'], # 你的Redis端口
|
||||||
|
db=0, # 数据库0
|
||||||
|
password=None, # 如果有密码
|
||||||
|
decode_responses=True # 自动解码为字符串
|
||||||
|
)
|
||||||
|
|
||||||
|
# 创建到Redis 1库的连接
|
||||||
|
redis_1 = redis.Redis(
|
||||||
|
host=REDIS_CONFIG['host'],
|
||||||
|
port=REDIS_CONFIG['port'],
|
||||||
|
db=1, # 数据库1
|
||||||
|
password=None,
|
||||||
|
decode_responses=True
|
||||||
|
)
|
||||||
|
accounts = self.redis_client.get_accounts_from_redis()
|
||||||
|
print(f"共 {len(accounts)} 个账号")
|
||||||
|
all_kid = accounts.keys()
|
||||||
|
|
||||||
|
keys1 = redis_0.keys("Bybit_fin_*")
|
||||||
|
# print(f"找到 {len(keys1)} 个匹配的键")
|
||||||
|
keys2 = redis_0.keys("Metatrader_fin_*")
|
||||||
|
# print(f"找到 {len(keys2)} 个匹配的键")
|
||||||
|
all_keys = keys1 + keys2
|
||||||
|
print(f"共 {len(all_keys)} 个键")
|
||||||
|
for key in all_keys:
|
||||||
|
key_split = key.split("_")
|
||||||
|
k_id = key_split[-1]
|
||||||
|
if k_id not in all_kid:
|
||||||
|
continue
|
||||||
|
exchange_id = None
|
||||||
|
if key_split[0] == "Bybit":
|
||||||
|
exchange_id = "bybit"
|
||||||
|
elif key_split[0] == "Metatrader":
|
||||||
|
exchange_id = "mt5"
|
||||||
|
# data = redis_0.hget(key)
|
||||||
|
print(f"开始处理键 {key} 的数据")
|
||||||
|
data = redis_0.hgetall(key)
|
||||||
|
for k, v in data.items():
|
||||||
|
# print(f"{k}")
|
||||||
|
data_dict = json.loads(v)
|
||||||
|
data_dict['lz_amount'] = float(data_dict['lz_money'])
|
||||||
|
# del data_dict['lz_money']
|
||||||
|
if data_dict['lz_type'] == 'lz_balance':
|
||||||
|
row_key = k
|
||||||
|
else:
|
||||||
|
row_key = f"fund_{k}"
|
||||||
|
# print(row_key)
|
||||||
|
redis_0.hset(f"{exchange_id}:balance:{data_dict['k_id']}",row_key,json.dumps(data_dict))
|
||||||
|
# print(f"键 {key} 的数据为 {data}")
|
||||||
|
redis_0.close()
|
||||||
|
redis_0.close()
|
||||||
|
print("复制完成")
|
||||||
@@ -15,7 +15,6 @@ class PositionSyncBatch(BaseSync):
|
|||||||
|
|
||||||
async def sync_batch(self, accounts: Dict[str, Dict]):
|
async def sync_batch(self, accounts: Dict[str, Dict]):
|
||||||
"""批量同步所有账号的持仓数据"""
|
"""批量同步所有账号的持仓数据"""
|
||||||
return
|
|
||||||
try:
|
try:
|
||||||
logger.info(f"开始批量同步持仓数据,共 {len(accounts)} 个账号")
|
logger.info(f"开始批量同步持仓数据,共 {len(accounts)} 个账号")
|
||||||
|
|
||||||
|
|||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,3 +1,4 @@
|
|||||||
|
from datetime import datetime, timedelta
|
||||||
from typing import List, Dict, Optional, Any
|
from typing import List, Dict, Optional, Any
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
@@ -29,3 +30,85 @@ def safe_str(self, value: Any, default: str = '') -> str:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"safe_str error: {e}")
|
logger.error(f"safe_str error: {e}")
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
|
def convert_timestamp(timestamp, return_type='str', format_str='%Y-%m-%d %H:%M:%S'):
|
||||||
|
"""
|
||||||
|
时间戳转换函数,支持多种返回格式
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timestamp: 时间戳(支持整数、浮点数、字符串)
|
||||||
|
return_type: 返回类型 'str'/'datetime'/'dict'
|
||||||
|
format_str: 当return_type='str'时的格式字符串
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
根据return_type返回不同格式的数据
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# 转换为浮点数
|
||||||
|
if isinstance(timestamp, str):
|
||||||
|
timestamp = float(timestamp)
|
||||||
|
else:
|
||||||
|
timestamp = float(timestamp)
|
||||||
|
|
||||||
|
# 判断时间戳精度
|
||||||
|
original_timestamp = timestamp
|
||||||
|
|
||||||
|
# 精确判断逻辑
|
||||||
|
if timestamp > 4102444800000: # 2100-01-01 的毫秒级时间戳
|
||||||
|
# 可能是微秒级,转换为秒级
|
||||||
|
timestamp = timestamp / 1000000
|
||||||
|
precision = 'microseconds'
|
||||||
|
elif timestamp > 4102444800: # 2100-01-01 的秒级时间戳
|
||||||
|
# 毫秒级时间戳
|
||||||
|
timestamp = timestamp / 1000
|
||||||
|
precision = 'milliseconds'
|
||||||
|
else:
|
||||||
|
# 秒级时间戳
|
||||||
|
precision = 'seconds'
|
||||||
|
|
||||||
|
# 转换为 datetime 对象
|
||||||
|
dt = datetime.fromtimestamp(timestamp)
|
||||||
|
|
||||||
|
# 根据返回类型返回不同格式
|
||||||
|
if return_type == 'datetime':
|
||||||
|
return dt
|
||||||
|
elif return_type == 'dict':
|
||||||
|
return {
|
||||||
|
'datetime': dt,
|
||||||
|
'formatted': dt.strftime(format_str),
|
||||||
|
'precision': precision,
|
||||||
|
'original_timestamp': original_timestamp,
|
||||||
|
'converted_timestamp': timestamp
|
||||||
|
}
|
||||||
|
else: # 默认返回字符串
|
||||||
|
return dt.strftime(format_str)
|
||||||
|
|
||||||
|
except (ValueError, TypeError, OSError) as e:
|
||||||
|
logger.error(f"时间戳转换失败: {timestamp}, 错误: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def timestamp(unit='seconds'):
|
||||||
|
"""
|
||||||
|
简化版时间戳获取
|
||||||
|
|
||||||
|
Args:
|
||||||
|
unit: 's'/'ms'/'us' 或 'seconds'/'milliseconds'/'microseconds'
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: 时间戳
|
||||||
|
"""
|
||||||
|
current = time.time()
|
||||||
|
|
||||||
|
unit_map = {
|
||||||
|
's': 1, 'seconds': 1,
|
||||||
|
'ms': 1000, 'milliseconds': 1000,
|
||||||
|
'us': 1000000, 'microseconds': 1000000
|
||||||
|
}
|
||||||
|
|
||||||
|
multiplier = unit_map.get(unit.lower(), 1)
|
||||||
|
return int(current * multiplier)
|
||||||
|
|
||||||
|
# 别名函数
|
||||||
|
def ts(unit='seconds'):
|
||||||
|
"""timestamp的别名"""
|
||||||
|
return timestamp(unit)
|
||||||
@@ -361,9 +361,10 @@ class RedisClient:
|
|||||||
for account_info in account_list:
|
for account_info in account_list:
|
||||||
k_id = int(account_info['k_id'])
|
k_id = int(account_info['k_id'])
|
||||||
st_id = account_info.get('st_id', 0)
|
st_id = account_info.get('st_id', 0)
|
||||||
|
add_time = account_info.get('add_time', 0)
|
||||||
|
|
||||||
# 从Redis获取账户信息数据
|
# 从Redis获取账户信息数据
|
||||||
account_data = await self._get_account_info_from_redis(k_id, st_id, exchange_id)
|
account_data = await self._get_account_info_from_redis(k_id, st_id, exchange_id, add_time)
|
||||||
account_data_list.extend(account_data)
|
account_data_list.extend(account_data)
|
||||||
|
|
||||||
logger.debug(f"交易所 {exchange_id}: 收集到 {len(account_data_list)} 条账户信息")
|
logger.debug(f"交易所 {exchange_id}: 收集到 {len(account_data_list)} 条账户信息")
|
||||||
@@ -373,7 +374,7 @@ class RedisClient:
|
|||||||
|
|
||||||
return account_data_list
|
return account_data_list
|
||||||
|
|
||||||
async def _get_account_info_from_redis(self, k_id: int, st_id: int, exchange_id: str) -> List[Dict]:
|
async def _get_account_info_from_redis(self, k_id: int, st_id: int, exchange_id: str, add_time: int) -> List[Dict]:
|
||||||
"""从Redis获取账户信息数据(批量优化版本)"""
|
"""从Redis获取账户信息数据(批量优化版本)"""
|
||||||
try:
|
try:
|
||||||
redis_key = f"{exchange_id}:balance:{k_id}"
|
redis_key = f"{exchange_id}:balance:{k_id}"
|
||||||
@@ -383,7 +384,7 @@ class RedisClient:
|
|||||||
return []
|
return []
|
||||||
|
|
||||||
# 按天统计数据
|
# 按天统计数据
|
||||||
recent_days = SYNC_CONFIG['recent_days']
|
recent_days = SYNC_CONFIG['recent_days_account']
|
||||||
|
|
||||||
today = datetime.now()
|
today = datetime.now()
|
||||||
date_stats = {}
|
date_stats = {}
|
||||||
@@ -394,12 +395,16 @@ class RedisClient:
|
|||||||
fund_data = json.loads(fund_json)
|
fund_data = json.loads(fund_json)
|
||||||
date_str = fund_data.get('lz_time', '')
|
date_str = fund_data.get('lz_time', '')
|
||||||
lz_type = fund_data.get('lz_type', '')
|
lz_type = fund_data.get('lz_type', '')
|
||||||
|
add_time_str = helpers.convert_timestamp(add_time,format_str='%Y-%m-%d')
|
||||||
|
if date_str < add_time_str:
|
||||||
|
continue
|
||||||
|
|
||||||
if not date_str or lz_type not in ['lz_balance', 'deposit', 'withdrawal']:
|
if not date_str or lz_type not in ['lz_balance', 'deposit', 'withdrawal']:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# 只处理最近N天的数据
|
# 只处理最近N天的数据
|
||||||
date_obj = datetime.strptime(date_str, '%Y-%m-%d')
|
date_obj = datetime.strptime(date_str, '%Y-%m-%d')
|
||||||
|
if recent_days != 0:
|
||||||
if (today - date_obj).days > recent_days:
|
if (today - date_obj).days > recent_days:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@@ -554,7 +559,7 @@ class RedisClient:
|
|||||||
"""从Redis获取最近N天的订单数据"""
|
"""从Redis获取最近N天的订单数据"""
|
||||||
try:
|
try:
|
||||||
redis_key = f"{exchange_id}:orders:{k_id}"
|
redis_key = f"{exchange_id}:orders:{k_id}"
|
||||||
recent_days = SYNC_CONFIG['recent_days']
|
recent_days = SYNC_CONFIG['recent_days_order']
|
||||||
# 计算最近N天的日期
|
# 计算最近N天的日期
|
||||||
today = datetime.now()
|
today = datetime.now()
|
||||||
recent_dates = []
|
recent_dates = []
|
||||||
|
|||||||
Reference in New Issue
Block a user