Files
assetx/webapp-back/holders/scanner.go

617 lines
19 KiB
Go
Raw Normal View History

package holders
import (
"context"
"fmt"
"log"
"math/big"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/gothinkster/golang-gin-realworld-example-app/models"
db "github.com/gothinkster/golang-gin-realworld-example-app/common"
)
// Config holds scanner configuration
type Config struct {
ChainID int
RPCURL string
YTVaults []VaultConfig
YTLPAddress string
DeploymentBlocks DeploymentBlocks
PollInterval time.Duration
BatchSize int64
}
type VaultConfig struct {
Name string
Address string
DeployBlock uint64
}
type DeploymentBlocks struct {
YTLP uint64
}
// Scanner handles blockchain event scanning
type Scanner struct {
config Config
client *ethclient.Client
lastScannedBlock uint64
isScanning bool
mu sync.Mutex
// Address tracking
ytAddresses map[string]map[string]int64 // vault -> address -> firstSeen
lpAddresses map[string]int64 // address -> firstSeen
}
// Event topics
var (
transferEventTopic = crypto.Keccak256Hash([]byte("Transfer(address,address,uint256)"))
)
// NewScanner creates a new blockchain scanner
func NewScanner(config Config) (*Scanner, error) {
client, err := ethclient.Dial(config.RPCURL)
if err != nil {
return nil, fmt.Errorf("failed to connect to Ethereum client: %w", err)
}
return &Scanner{
config: config,
client: client,
ytAddresses: make(map[string]map[string]int64),
lpAddresses: make(map[string]int64),
}, nil
}
// loadStateFromDB loads the last scanned block from database
func (s *Scanner) loadStateFromDB() uint64 {
database := db.GetDB()
var state models.ScannerState
result := database.Where("chain_id = ? AND scanner_type = ?", s.config.ChainID, "holder").First(&state)
if result.Error != nil {
return 0
}
return state.LastScannedBlock
}
// saveStateToDB persists the last scanned block to database
func (s *Scanner) saveStateToDB(block uint64) {
database := db.GetDB()
state := models.ScannerState{
ScannerType: "holder",
ChainID: s.config.ChainID,
LastScannedBlock: block,
}
database.Where("chain_id = ? AND scanner_type = ?", s.config.ChainID, "holder").Assign(state).FirstOrCreate(&state)
database.Model(&state).Update("last_scanned_block", block)
}
// loadAddressesFromDB restores in-memory address maps from holder_snapshots
func (s *Scanner) loadAddressesFromDB() {
database := db.GetDB()
var snapshots []models.HolderSnapshot
database.Where("chain_id = ?", s.config.ChainID).Find(&snapshots)
for _, snap := range snapshots {
switch snap.TokenType {
case "ytLP":
s.lpAddresses[snap.HolderAddress] = snap.FirstSeen
default:
for _, vault := range s.config.YTVaults {
if vault.Name == snap.TokenType {
if s.ytAddresses[vault.Address] == nil {
s.ytAddresses[vault.Address] = make(map[string]int64)
}
s.ytAddresses[vault.Address][snap.HolderAddress] = snap.FirstSeen
}
}
}
}
log.Printf("▶️ 从数据库加载了 %d 个历史地址", len(snapshots))
}
// Start begins the scanning process
func Start(config Config) error {
scanner, err := NewScanner(config)
if err != nil {
return err
}
log.Println("=== Holder Scanner Started ===")
log.Printf("RPC: %s\n", config.RPCURL)
log.Printf("Poll Interval: %v\n", config.PollInterval)
// Check if we can resume from a previous scan
lastBlock := scanner.loadStateFromDB()
if lastBlock > 0 {
log.Printf("▶️ 发现上次扫描记录,从区块 %d 续扫...", lastBlock)
scanner.loadAddressesFromDB()
scanner.lastScannedBlock = lastBlock
} else {
// Fresh start: full initial scan from deploy blocks
log.Println("📊 首次扫描,从部署区块开始...")
startTime := time.Now()
if err := scanner.scanAll(context.Background(), true); err != nil {
return fmt.Errorf("initial scan failed: %w", err)
}
log.Printf("✓ 初始扫描完成,耗时 %v\n", time.Since(startTime))
}
// Start polling
ticker := time.NewTicker(config.PollInterval)
defer ticker.Stop()
log.Printf("⏰ 开始轮询,每 %v 扫一次新区块...\n", config.PollInterval)
for range ticker.C {
if err := scanner.incrementalScan(context.Background()); err != nil {
log.Printf("✗ Incremental scan error: %v\n", err)
}
}
return nil
}
// minUint64 returns the smaller of two uint64 values.
func minUint64(a, b uint64) uint64 {
if a < b {
return a
}
return b
}
// scanBatch scans ALL contract types for a single block range [fromBlock, toBlock].
// The range must be ≤ BatchSize. After this call, every contract has been scanned to toBlock.
func (s *Scanner) scanBatch(ctx context.Context, fromBlock, toBlock uint64) {
log.Printf(" [Batch] Scanning blocks %d → %d\n", fromBlock, toBlock)
s.scanYTVaultTransfers(ctx, fromBlock, toBlock)
s.scanYTLPTransfers(ctx, fromBlock, toBlock)
s.scanSwapEvents(ctx, fromBlock, toBlock)
// Checkpoint: all contracts have been scanned up to toBlock
s.mu.Lock()
s.lastScannedBlock = toBlock
s.mu.Unlock()
s.saveStateToDB(toBlock)
}
// scanAll performs a full scan from deployment blocks, saving a checkpoint after every batch.
// All contract types are scanned together per batch, so the checkpoint is always safe.
func (s *Scanner) scanAll(ctx context.Context, isInitial bool) error {
s.mu.Lock()
if s.isScanning {
s.mu.Unlock()
return fmt.Errorf("scan already in progress")
}
s.isScanning = true
s.mu.Unlock()
defer func() {
s.mu.Lock()
s.isScanning = false
s.mu.Unlock()
}()
latestBlock, err := s.client.BlockNumber(ctx)
if err != nil {
return err
}
log.Printf("Current block: %d\n", latestBlock)
// Determine the earliest block we need to scan across all contracts
startBlock := s.lastScannedBlock + 1
if isInitial {
startBlock = latestBlock // will be lowered below
for _, v := range s.config.YTVaults {
startBlock = minUint64(startBlock, v.DeployBlock)
}
if s.config.YTLPAddress != "" && s.config.DeploymentBlocks.YTLP > 0 {
startBlock = minUint64(startBlock, s.config.DeploymentBlocks.YTLP)
}
}
if startBlock > latestBlock {
log.Printf("📌 No new blocks to scan (startBlock %d > latestBlock %d)\n", startBlock, latestBlock)
return nil
}
log.Printf("📊 Scanning blocks %d → %d in batches of %d\n", startBlock, latestBlock, s.config.BatchSize)
// Outer loop: one checkpoint per batch, all contracts scanned together
for batchFrom := startBlock; batchFrom <= latestBlock; {
batchTo := minUint64(batchFrom+uint64(s.config.BatchSize)-1, latestBlock)
s.scanBatch(ctx, batchFrom, batchTo)
batchFrom = batchTo + 1
// Rate limiting between batches
if batchFrom <= latestBlock {
time.Sleep(100 * time.Millisecond)
}
}
// Balance snapshots: run once after all event scanning is done
log.Printf("📊 Updating balance snapshots...\n")
for _, vault := range s.config.YTVaults {
if err := s.saveYTHolders(ctx, vault); err != nil {
log.Printf(" [Snapshot] %s error: %v", vault.Name, err)
}
}
if err := s.saveYTLPHolders(ctx); err != nil {
log.Printf(" [Snapshot] ytLP error: %v", err)
}
log.Printf("📌 Scan complete. Last scanned block: %d\n", latestBlock)
return nil
}
// incrementalScan scans new blocks since last scan.
// Incremental ranges are small (seconds of blocks), so a single batch suffices.
func (s *Scanner) incrementalScan(ctx context.Context) error {
s.mu.Lock()
if s.isScanning {
s.mu.Unlock()
log.Println("⏰ Skipping scan (previous scan still running)")
return nil
}
s.isScanning = true
lastBlock := s.lastScannedBlock
s.mu.Unlock()
defer func() {
s.mu.Lock()
s.isScanning = false
s.mu.Unlock()
}()
latestBlock, err := s.client.BlockNumber(ctx)
if err != nil {
return err
}
if latestBlock <= lastBlock {
log.Printf("⏰ [%s] No new blocks (current: %d)\n", time.Now().Format("15:04:05"), latestBlock)
return nil
}
log.Printf("\n%s\n", strings.Repeat("=", 60))
log.Printf("⏰ [%s] Found new blocks %d → %d\n", time.Now().Format("15:04:05"), lastBlock+1, latestBlock)
log.Printf("%s\n", strings.Repeat("=", 60))
startTime := time.Now()
fromBlock := lastBlock + 1
// Incremental range may exceed BatchSize if server was down for a while; use batches
for batchFrom := fromBlock; batchFrom <= latestBlock; {
batchTo := minUint64(batchFrom+uint64(s.config.BatchSize)-1, latestBlock)
s.scanBatch(ctx, batchFrom, batchTo)
batchFrom = batchTo + 1
if batchFrom <= latestBlock {
time.Sleep(100 * time.Millisecond)
}
}
// Update balance snapshots
for _, vault := range s.config.YTVaults {
if err := s.saveYTHolders(ctx, vault); err != nil {
log.Printf(" [Snapshot] %s error: %v", vault.Name, err)
}
}
if err := s.saveYTLPHolders(ctx); err != nil {
log.Printf(" [Snapshot] ytLP error: %v", err)
}
log.Printf("✓ Incremental scan completed in %v\n", time.Since(startTime))
return nil
}
// scanYTVaultTransfers scans Transfer events for all YT vaults in the given block range.
// fromBlock/toBlock must already be a single batch (≤ BatchSize blocks).
// Each vault skips blocks before its own deployBlock.
// Does NOT save balance snapshots (call saveYTHolders separately after all batches complete).
func (s *Scanner) scanYTVaultTransfers(ctx context.Context, fromBlock, toBlock uint64) error {
for _, vault := range s.config.YTVaults {
// Skip if vault not yet deployed in this range
if vault.DeployBlock > toBlock {
continue
}
effectiveFrom := fromBlock
if vault.DeployBlock > effectiveFrom {
effectiveFrom = vault.DeployBlock
}
logs, err := s.queryBlockRange(ctx, common.HexToAddress(vault.Address), transferEventTopic, effectiveFrom, toBlock)
if err != nil {
log.Printf(" [Transfer] %s query error: %v", vault.Name, err)
continue
}
if s.ytAddresses[vault.Address] == nil {
s.ytAddresses[vault.Address] = make(map[string]int64)
}
for _, l := range logs {
toAddress := common.BytesToAddress(l.Topics[2].Bytes()).Hex()
if toAddress == "0x0000000000000000000000000000000000000000" {
continue
}
if _, exists := s.ytAddresses[vault.Address][toAddress]; !exists {
block, err := s.client.BlockByNumber(ctx, big.NewInt(int64(l.BlockNumber)))
if err == nil {
s.ytAddresses[vault.Address][toAddress] = int64(block.Time())
}
}
}
}
return nil
}
// scanYTLPTransfers scans Transfer events for the YT LP token in the given block range.
// Does NOT save balance snapshots (call saveYTLPHolders separately after all batches complete).
func (s *Scanner) scanYTLPTransfers(ctx context.Context, fromBlock, toBlock uint64) error {
if s.config.YTLPAddress == "" {
return nil
}
lpDeployBlock := s.config.DeploymentBlocks.YTLP
if lpDeployBlock > toBlock {
return nil
}
effectiveFrom := fromBlock
if lpDeployBlock > effectiveFrom {
effectiveFrom = lpDeployBlock
}
logs, err := s.queryBlockRange(ctx, common.HexToAddress(s.config.YTLPAddress), transferEventTopic, effectiveFrom, toBlock)
if err != nil {
log.Printf(" [Transfer] ytLP query error: %v", err)
return nil
}
for _, l := range logs {
toAddress := common.BytesToAddress(l.Topics[2].Bytes()).Hex()
if toAddress == "0x0000000000000000000000000000000000000000" {
continue
}
if _, exists := s.lpAddresses[toAddress]; !exists {
block, err := s.client.BlockByNumber(ctx, big.NewInt(int64(l.BlockNumber)))
if err == nil {
s.lpAddresses[toAddress] = int64(block.Time())
}
}
}
return nil
}
// queryBlockRange queries logs for a single block range (no internal batching).
// The caller is responsible for keeping the range within RPC limits (BatchSize).
func (s *Scanner) queryBlockRange(ctx context.Context, contractAddr common.Address, topic common.Hash, fromBlock, toBlock uint64) ([]types.Log, error) {
query := ethereum.FilterQuery{
FromBlock: big.NewInt(int64(fromBlock)),
ToBlock: big.NewInt(int64(toBlock)),
Addresses: []common.Address{contractAddr},
Topics: [][]common.Hash{{topic}},
}
logs, err := s.client.FilterLogs(ctx, query)
if err != nil {
return nil, fmt.Errorf("failed to query logs [%d-%d]: %w", fromBlock, toBlock, err)
}
return logs, nil
}
// queryLogsInBatches queries logs in batches to avoid RPC limits.
// Used only by incremental scans where range size is unpredictable.
func (s *Scanner) queryLogsInBatches(ctx context.Context, contractAddr common.Address, topic common.Hash, fromBlock, toBlock uint64) ([]types.Log, error) {
var allLogs []types.Log
currentBlock := fromBlock
log.Printf(" Querying blocks %d -> %d (total: %d blocks)\n", fromBlock, toBlock, toBlock-fromBlock+1)
for currentBlock <= toBlock {
endBlock := currentBlock + uint64(s.config.BatchSize)
if endBlock > toBlock {
endBlock = toBlock
}
log.Printf(" Querying blocks %d - %d...\n", currentBlock, endBlock)
logs, err := s.queryBlockRange(ctx, contractAddr, topic, currentBlock, endBlock)
if err != nil {
return nil, err
}
allLogs = append(allLogs, logs...)
log.Printf(" ✓ Got %d events\n", len(logs))
currentBlock = endBlock + 1
// Rate limiting
if currentBlock <= toBlock {
time.Sleep(100 * time.Millisecond)
}
}
log.Printf(" Total: %d events\n\n", len(allLogs))
return allLogs, nil
}
// saveYTHolders queries balances and saves to database
func (s *Scanner) saveYTHolders(ctx context.Context, vault VaultConfig) error {
// ERC20 balanceOf ABI
balanceOfABI, _ := abi.JSON(strings.NewReader(`[{"constant":true,"inputs":[{"name":"account","type":"address"}],"name":"balanceOf","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"}]`))
database := db.GetDB()
now := time.Now().Unix()
holders := 0
totalAddresses := len(s.ytAddresses[vault.Address])
contractAddr := common.HexToAddress(vault.Address)
log.Printf("📞 [%s] 开始查询 %d 个地址的余额", vault.Name, totalAddresses)
log.Printf("📍 [%s] 合约地址: %s", vault.Name, vault.Address)
processedCount := 0
errorCount := 0
zeroBalanceCount := 0
for address, firstSeen := range s.ytAddresses[vault.Address] {
processedCount++
// Call balanceOf
data, err := balanceOfABI.Pack("balanceOf", common.HexToAddress(address))
if err != nil {
log.Printf("❌ [%s] Pack balanceOf 失败: %s - %v", vault.Name, address, err)
errorCount++
continue
}
result, err := s.client.CallContract(ctx, ethereum.CallMsg{
To: &contractAddr,
Data: data,
}, nil)
if err != nil {
log.Printf("❌ [%s] CallContract 失败: %s - %v", vault.Name, address, err)
errorCount++
continue
}
balance := new(big.Int).SetBytes(result)
// Log balance query result
if processedCount <= 5 || balance.Cmp(big.NewInt(0)) > 0 {
log.Printf(" [%s] 地址: %s, 余额: %s", vault.Name, address[:10]+"...", balance.String())
}
if balance.Cmp(big.NewInt(0)) == 0 {
zeroBalanceCount++
continue
}
holders++
// Upsert to database: create if not exists, only update balance/last_updated if exists
holder := models.HolderSnapshot{
HolderAddress: address,
TokenType: vault.Name,
TokenAddress: vault.Address,
Balance: balance.String(),
ChainID: s.config.ChainID,
FirstSeen: firstSeen,
LastUpdated: now,
}
var existing models.HolderSnapshot
res := database.Where("holder_address = ? AND token_type = ?", address, vault.Name).First(&existing)
if res.Error != nil {
database.Create(&holder)
} else {
database.Model(&existing).Updates(map[string]interface{}{
"balance": balance.String(),
"last_updated": now,
})
}
}
log.Printf("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
log.Printf("📊 [%s] 查询完成统计:", vault.Name)
log.Printf(" • 总地址数: %d", totalAddresses)
log.Printf(" • 已处理: %d", processedCount)
log.Printf(" • 余额>0: %d ✅", holders)
log.Printf(" • 余额=0: %d", zeroBalanceCount)
log.Printf(" • 错误数: %d", errorCount)
log.Printf(" • 已保存到数据库: %d", holders)
log.Printf("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n")
return nil
}
// saveYTLPHolders saves ytLP holders to database
func (s *Scanner) saveYTLPHolders(ctx context.Context) error {
balanceOfABI, _ := abi.JSON(strings.NewReader(`[{"constant":true,"inputs":[{"name":"account","type":"address"}],"name":"balanceOf","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"}]`))
database := db.GetDB()
now := time.Now().Unix()
holders := 0
totalAddresses := len(s.lpAddresses)
contractAddr := common.HexToAddress(s.config.YTLPAddress)
log.Printf("📞 [ytLP] 开始查询 %d 个地址的余额", totalAddresses)
log.Printf("📍 [ytLP] 合约地址: %s", s.config.YTLPAddress)
processedCount := 0
errorCount := 0
zeroBalanceCount := 0
for address, firstSeen := range s.lpAddresses {
processedCount++
data, err := balanceOfABI.Pack("balanceOf", common.HexToAddress(address))
if err != nil {
log.Printf("❌ [ytLP] Pack balanceOf 失败: %s - %v", address, err)
errorCount++
continue
}
result, err := s.client.CallContract(ctx, ethereum.CallMsg{
To: &contractAddr,
Data: data,
}, nil)
if err != nil {
log.Printf("❌ [ytLP] CallContract 失败: %s - %v", address, err)
errorCount++
continue
}
balance := new(big.Int).SetBytes(result)
// Log balance query result
if processedCount <= 5 || balance.Cmp(big.NewInt(0)) > 0 {
log.Printf(" [ytLP] 地址: %s, 余额: %s", address[:10]+"...", balance.String())
}
if balance.Cmp(big.NewInt(0)) == 0 {
zeroBalanceCount++
continue
}
holders++
holder := models.HolderSnapshot{
HolderAddress: address,
TokenType: "ytLP",
TokenAddress: s.config.YTLPAddress,
Balance: balance.String(),
ChainID: s.config.ChainID,
FirstSeen: firstSeen,
LastUpdated: now,
}
var existing models.HolderSnapshot
res := database.Where("holder_address = ? AND token_type = ?", address, "ytLP").First(&existing)
if res.Error != nil {
database.Create(&holder)
} else {
database.Model(&existing).Updates(map[string]interface{}{
"balance": balance.String(),
"last_updated": now,
})
}
}
log.Printf("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
log.Printf("📊 [ytLP] 查询完成统计:")
log.Printf(" • 总地址数: %d", totalAddresses)
log.Printf(" • 已处理: %d", processedCount)
log.Printf(" • 余额>0: %d ✅", holders)
log.Printf(" • 余额=0: %d", zeroBalanceCount)
log.Printf(" • 错误数: %d", errorCount)
log.Printf(" • 已保存到数据库: %d", holders)
log.Printf("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n")
return nil
}