Files
assetx/webapp-back/fundmarket/snapshot.go
default 2ee4553b71 init: 初始化 AssetX 项目仓库
包含 webapp(Next.js 用户端)、webapp-back(Go 后端)、
antdesign(管理后台)、landingpage(营销落地页)、
数据库 SQL 和配置文件。
2026-03-27 11:26:43 +00:00

489 lines
16 KiB
Go

package fundmarket
import (
"context"
"fmt"
"log"
"math/big"
"strings"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
db "github.com/gothinkster/golang-gin-realworld-example-app/common"
"github.com/gothinkster/golang-gin-realworld-example-app/config"
"github.com/gothinkster/golang-gin-realworld-example-app/models"
"gorm.io/gorm"
)
const (
// YTAssetFactory on BSC Testnet
ytAssetFactoryAddressBSC = "0x37B2CD7D94ba1400a6FEB34804a32EfD555bbfc8"
// YTAssetFactory on Arbitrum Sepolia
ytAssetFactoryAddressArb = "0x37B2CD7D94ba1400a6FEB34804a32EfD555bbfc8"
// fastSnapshotInterval: price / TVL / APY — cheap single RPC call per asset
fastSnapshotInterval = 5 * time.Minute
// volumeSnapshotInterval: 24h volume via FilterLogs — heavier, runs less often
volumeSnapshotInterval = 30 * time.Minute
)
// Buy / Sell events — emitted by YT token contract
// Buy(address user, uint256 usdcAmount, uint256 ytAmount)
// Sell(address user, uint256 ytAmount, uint256 usdcAmount)
const buySellEventABIJSON = `[
{
"anonymous": false,
"inputs": [
{"indexed": true, "name": "user", "type": "address"},
{"indexed": false, "name": "usdcAmount", "type": "uint256"},
{"indexed": false, "name": "ytAmount", "type": "uint256"}
],
"name": "Buy",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{"indexed": true, "name": "user", "type": "address"},
{"indexed": false, "name": "ytAmount", "type": "uint256"},
{"indexed": false, "name": "usdcAmount", "type": "uint256"}
],
"name": "Sell",
"type": "event"
}
]`
// blockTimeByChain returns approximate seconds-per-block for volume estimation
func blockTimeByChain(chainID int) float64 {
switch chainID {
case 97: return 3 // BSC Testnet ~3s/block
case 421614: return 0.25 // Arbitrum Sepolia ~0.25s/block
default: return 3
}
}
// YTAssetFactory.getVaultInfo(address) ABI
// Returns: exists(bool), totalAssets, idleAssets, managedAssets, totalSupply,
// hardCap, usdcPrice(8dec), ytPrice(30dec), nextRedemptionTime
const getVaultInfoABIJSON = `[{
"inputs":[{"internalType":"address","name":"_vault","type":"address"}],
"name":"getVaultInfo",
"outputs":[
{"internalType":"bool","name":"exists","type":"bool"},
{"internalType":"uint256","name":"totalAssets","type":"uint256"},
{"internalType":"uint256","name":"idleAssets","type":"uint256"},
{"internalType":"uint256","name":"managedAssets","type":"uint256"},
{"internalType":"uint256","name":"totalSupply","type":"uint256"},
{"internalType":"uint256","name":"hardCap","type":"uint256"},
{"internalType":"uint256","name":"usdcPrice","type":"uint256"},
{"internalType":"uint256","name":"ytPrice","type":"uint256"},
{"internalType":"uint256","name":"nextRedemptionTime","type":"uint256"}
],
"stateMutability":"view",
"type":"function"
}]`
// StartPriceSnapshot starts two background loops:
// - Fast loop (every 5min): price / TVL / APY via getVaultInfo
// - Volume loop (every 30min): 24h USDC volume via FilterLogs
//
// Call as: go fundmarket.StartPriceSnapshot(cfg)
func StartPriceSnapshot(cfg *config.Config) {
log.Println("=== Price Snapshot Service: fast=5min, volume=30min ===")
// Fast loop runs in a sub-goroutine so both loops start concurrently.
go func() {
runFastSnapshot(cfg)
t := time.NewTicker(fastSnapshotInterval)
defer t.Stop()
for range t.C {
runFastSnapshot(cfg)
}
}()
// Volume loop blocks this goroutine (keeps StartPriceSnapshot alive).
runVolumeSnapshot(cfg)
t := time.NewTicker(volumeSnapshotInterval)
defer t.Stop()
for range t.C {
runVolumeSnapshot(cfg)
}
}
// loadActiveAssets returns all active assets that have a contract address set.
func loadActiveAssets(database *gorm.DB, label string) ([]models.Asset, bool) {
var assets []models.Asset
if err := database.Where(
"is_active = ? AND contract_address IS NOT NULL AND contract_address != ''",
true,
).Find(&assets).Error; err != nil {
log.Printf("[%s] DB query error: %v", label, err)
return nil, false
}
if len(assets) == 0 {
log.Printf("[%s] No active assets with contract_address set, skipping", label)
return nil, false
}
return assets, true
}
// ── Fast snapshot (price / TVL / APY) ────────────────────────────────────────
func runFastSnapshot(cfg *config.Config) {
start := time.Now()
log.Printf("[FastSnapshot] Starting at %s", start.Format("2006-01-02 15:04:05"))
database := db.GetDB()
assets, ok := loadActiveAssets(database, "FastSnapshot")
if !ok {
return
}
factoryABI, err := abi.JSON(strings.NewReader(getVaultInfoABIJSON))
if err != nil {
log.Printf("[FastSnapshot] Parse factory ABI error: %v", err)
return
}
today := time.Now().Truncate(24 * time.Hour)
for _, asset := range assets {
rpcURL, factoryAddrStr := getSnapshotClientForChain(asset.ChainID, cfg)
if rpcURL == "" {
log.Printf("[FastSnapshot] %s: unsupported chain_id=%d, skipping", asset.AssetCode, asset.ChainID)
continue
}
client, err := ethclient.Dial(rpcURL)
if err != nil {
log.Printf("[FastSnapshot] %s: RPC connect error: %v", asset.AssetCode, err)
continue
}
factoryAddr := common.HexToAddress(factoryAddrStr)
if err := snapshotAssetFast(client, database, asset, factoryABI, factoryAddr, today); err != nil {
log.Printf("[FastSnapshot] %s error: %v", asset.AssetCode, err)
}
client.Close()
}
log.Printf("[FastSnapshot] Done in %v", time.Since(start))
}
func snapshotAssetFast(
client *ethclient.Client,
database *gorm.DB,
asset models.Asset,
factoryABI abi.ABI,
factoryAddr common.Address,
today time.Time,
) error {
ctx := context.Background()
vaultAddr := common.HexToAddress(asset.ContractAddress)
callData, err := factoryABI.Pack("getVaultInfo", vaultAddr)
if err != nil {
return fmt.Errorf("pack getVaultInfo: %w", err)
}
result, err := client.CallContract(ctx, ethereum.CallMsg{
To: &factoryAddr,
Data: callData,
}, nil)
if err != nil {
return fmt.Errorf("getVaultInfo call: %w", err)
}
decoded, err := factoryABI.Unpack("getVaultInfo", result)
if err != nil {
return fmt.Errorf("unpack getVaultInfo: %w", err)
}
if len(decoded) < 9 {
return fmt.Errorf("getVaultInfo returned %d values, expected 9", len(decoded))
}
info := vaultInfo{
exists: decoded[0].(bool),
totalAssets: bigIntToFloat(decoded[1].(*big.Int), 18),
totalSupply: bigIntToFloat(decoded[4].(*big.Int), 18),
hardCap: bigIntToFloat(decoded[5].(*big.Int), 18),
usdcPrice: bigIntToFloat(decoded[6].(*big.Int), 8),
ytPrice: bigIntToFloat(decoded[7].(*big.Int), 30),
}
if !info.exists {
return fmt.Errorf("vault %s not registered in factory", asset.ContractAddress)
}
poolCapPercent := 0.0
if info.hardCap > 0 {
poolCapPercent = info.totalSupply / info.hardCap * 100
}
apy := calcAPY(database, asset.ID, info.ytPrice, today)
log.Printf("[FastSnapshot] %s: ytPrice=%.8f USDC, supply=%.2f, TVL=$%.2f, APY=%.2f%%",
asset.AssetCode, info.ytPrice, info.totalSupply, info.totalAssets, apy)
// Upsert today's row — do NOT touch volume_24h_usd (managed by volume task).
todayDate := today.Format("2006-01-02")
var perf models.AssetPerformance
if err := database.Where("asset_id = ? AND snapshot_date = ?", asset.ID, todayDate).First(&perf).Error; err != nil {
// Row doesn't exist yet — create it. volume_24h_usd defaults to 0 until volume task runs.
perf = models.AssetPerformance{
AssetID: asset.ID,
CurrentAPY: apy,
TVLUSD: info.totalAssets,
CirculatingSupply: info.totalSupply,
PoolCapacityPercent: poolCapPercent,
YTPrice: info.ytPrice,
Volume24hUSD: 0,
SnapshotDate: today,
}
if err := database.Create(&perf).Error; err != nil {
return err
}
} else {
if err := database.Model(&perf).Updates(map[string]interface{}{
"current_apy": apy,
"tvlusd": info.totalAssets,
"circulating_supply": info.totalSupply,
"pool_capacity_percent": poolCapPercent,
"yt_price": info.ytPrice,
// volume_24h_usd intentionally omitted
}).Error; err != nil {
return err
}
}
// Sync pool_cap_usd in assets table.
if capUSD := info.hardCap * info.ytPrice; capUSD > 0 {
if err := database.Model(&models.Asset{}).Where("id = ?", asset.ID).
Update("pool_cap_usd", capUSD).Error; err != nil {
log.Printf("[FastSnapshot] %s: update pool_cap_usd error: %v", asset.AssetCode, err)
}
}
// Always insert a new record into apy_snapshots for chart history.
hourlySnap := models.APYSnapshot{
AssetID: asset.ID,
ChainID: asset.ChainID,
ContractAddress: asset.ContractAddress,
APYValue: apy,
Price: info.ytPrice,
TotalAssets: info.totalAssets,
TotalSupply: info.totalSupply,
SnapshotTime: time.Now(),
}
return database.Create(&hourlySnap).Error
}
// ── Volume snapshot (24h FilterLogs) ─────────────────────────────────────────
func runVolumeSnapshot(cfg *config.Config) {
start := time.Now()
log.Printf("[VolumeSnapshot] Starting at %s", start.Format("2006-01-02 15:04:05"))
database := db.GetDB()
assets, ok := loadActiveAssets(database, "VolumeSnapshot")
if !ok {
return
}
todayDate := time.Now().Truncate(24 * time.Hour).Format("2006-01-02")
for _, asset := range assets {
rpcURL, _ := getSnapshotClientForChain(asset.ChainID, cfg)
if rpcURL == "" {
log.Printf("[VolumeSnapshot] %s: unsupported chain_id=%d, skipping", asset.AssetCode, asset.ChainID)
continue
}
client, err := ethclient.Dial(rpcURL)
if err != nil {
log.Printf("[VolumeSnapshot] %s: RPC connect error: %v", asset.AssetCode, err)
continue
}
volume24h := calc24hVolume(client, asset, asset.ChainID)
client.Close()
// Update only volume_24h_usd in today's performance row.
res := database.Model(&models.AssetPerformance{}).
Where("asset_id = ? AND snapshot_date = ?", asset.ID, todayDate).
Update("volume_24h_usd", volume24h)
if res.Error != nil {
log.Printf("[VolumeSnapshot] %s: DB update error: %v", asset.AssetCode, res.Error)
continue
}
if res.RowsAffected == 0 {
// Fast snapshot hasn't run yet for today; volume will be set on next cycle.
log.Printf("[VolumeSnapshot] %s: no row for today yet, will retry next cycle", asset.AssetCode)
continue
}
log.Printf("[VolumeSnapshot] %s: volume=$%.2f updated", asset.AssetCode, volume24h)
}
log.Printf("[VolumeSnapshot] Done in %v", time.Since(start))
}
// ── Shared helpers ────────────────────────────────────────────────────────────
// getSnapshotClientForChain returns the RPC URL and factory address for the given chain ID
func getSnapshotClientForChain(chainID int, cfg *config.Config) (rpcURL, factoryAddr string) {
switch chainID {
case 97:
return cfg.BSCTestnetRPC, ytAssetFactoryAddressBSC
case 421614:
return cfg.ArbSepoliaRPC, ytAssetFactoryAddressArb
default:
return "", ""
}
}
// vaultInfo holds decoded output from YTAssetFactory.getVaultInfo()
type vaultInfo struct {
exists bool
totalAssets float64 // USDC, 18 dec on BSC
totalSupply float64 // YT tokens, 18 dec
hardCap float64 // YT tokens, 18 dec
usdcPrice float64 // 8 dec
ytPrice float64 // 30 dec
}
// calcAPY returns annualized APY (%) based on daily price change vs yesterday.
func calcAPY(database *gorm.DB, assetID uint, currentPrice float64, today time.Time) float64 {
yesterday := today.AddDate(0, 0, -1).Format("2006-01-02")
var prev models.AssetPerformance
if err := database.Where("asset_id = ? AND snapshot_date = ?", assetID, yesterday).First(&prev).Error; err != nil {
return 0
}
if prev.YTPrice <= 0 {
return 0
}
dailyReturn := (currentPrice - prev.YTPrice) / prev.YTPrice
return dailyReturn * 365 * 100
}
// maxFilterBlockRange is the max block range allowed per FilterLogs call.
// Most public RPC nodes cap this at 10,000 blocks.
const maxFilterBlockRange = int64(9000)
// chunkTimeout is the per-chunk RPC call timeout.
const chunkTimeout = 15 * time.Second
// calc24hVolume scans Buy + Sell events on the YT token contract for the last 24h
// and returns the total USDC volume. Paginates FilterLogs in 9000-block chunks to
// stay within RPC node limits. Each chunk gets its own timeout to prevent a slow
// early chunk from cancelling the rest of the scan.
func calc24hVolume(client *ethclient.Client, asset models.Asset, chainID int) float64 {
bctx, bcancel := context.WithTimeout(context.Background(), 10*time.Second)
currentBlock, err := client.BlockNumber(bctx)
bcancel()
if err != nil {
log.Printf("[Volume] %s: get block number error: %v", asset.AssetCode, err)
return 0
}
// Estimate fromBlock covering 24h + 20% buffer to handle block-time variance.
blockTime := blockTimeByChain(chainID)
blocksIn24h := int64(float64(86400) / blockTime * 1.2)
fromBlock := int64(currentBlock) - blocksIn24h
if fromBlock < 0 {
fromBlock = 0
}
eventABI, err := abi.JSON(strings.NewReader(buySellEventABIJSON))
if err != nil {
log.Printf("[Volume] %s: parse event ABI error: %v", asset.AssetCode, err)
return 0
}
buyTopic := eventABI.Events["Buy"].ID
sellTopic := eventABI.Events["Sell"].ID
contractAddr := common.HexToAddress(asset.ContractAddress)
usdcDecimals := int64(18)
toBlock := int64(currentBlock)
log.Printf("[Volume] %s: scanning blocks %d-%d (%d chunks), contract=%s",
asset.AssetCode, fromBlock, toBlock,
(toBlock-fromBlock)/maxFilterBlockRange+1,
contractAddr.Hex())
var totalVolume float64
var totalLogs int
// Paginate in chunks. Each chunk gets its own independent timeout so that a
// slow or failing chunk does not cancel subsequent chunks.
for start := fromBlock; start <= toBlock; start += maxFilterBlockRange {
end := start + maxFilterBlockRange - 1
if end > toBlock {
end = toBlock
}
query := ethereum.FilterQuery{
FromBlock: big.NewInt(start),
ToBlock: big.NewInt(end),
Addresses: []common.Address{contractAddr},
Topics: [][]common.Hash{{buyTopic, sellTopic}},
}
// Retry once on failure, each attempt with its own independent timeout.
var chunkLogs []types.Log
var fetchErr error
for attempt := 0; attempt < 2; attempt++ {
chunkCtx, chunkCancel := context.WithTimeout(context.Background(), chunkTimeout)
chunkLogs, fetchErr = client.FilterLogs(chunkCtx, query)
chunkCancel()
if fetchErr == nil {
break
}
log.Printf("[Volume] %s: filter logs [%d-%d] attempt %d error: %v",
asset.AssetCode, start, end, attempt+1, fetchErr)
}
if fetchErr != nil {
log.Printf("[Volume] %s: skipping chunk [%d-%d] after retries", asset.AssetCode, start, end)
continue
}
totalLogs += len(chunkLogs)
for _, vLog := range chunkLogs {
var eventDef abi.Event
switch vLog.Topics[0] {
case buyTopic:
eventDef = eventABI.Events["Buy"]
case sellTopic:
eventDef = eventABI.Events["Sell"]
default:
continue
}
decoded, err := eventDef.Inputs.NonIndexed().Unpack(vLog.Data)
if err != nil || len(decoded) < 2 {
continue
}
// Buy: [0]=usdcAmount, [1]=ytAmount
// Sell: [0]=ytAmount, [1]=usdcAmount
var usdcAmt *big.Int
if vLog.Topics[0] == buyTopic {
usdcAmt = decoded[0].(*big.Int)
} else {
usdcAmt = decoded[1].(*big.Int)
}
totalVolume += bigIntToFloat(usdcAmt, usdcDecimals)
}
}
log.Printf("[Volume] %s: total logs found=%d, volume=$%.2f", asset.AssetCode, totalLogs, totalVolume)
return totalVolume
}
// bigIntToFloat converts *big.Int to float64 by dividing by 10^decimals.
func bigIntToFloat(n *big.Int, decimals int64) float64 {
divisor := new(big.Int).Exp(big.NewInt(10), big.NewInt(decimals), nil)
f, _ := new(big.Float).SetPrec(256).Quo(
new(big.Float).SetPrec(256).SetInt(n),
new(big.Float).SetPrec(256).SetInt(divisor),
).Float64()
return f
}