package holders import ( "context" "fmt" "log" "math/big" "strings" "sync" "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" "github.com/gothinkster/golang-gin-realworld-example-app/models" db "github.com/gothinkster/golang-gin-realworld-example-app/common" ) // Config holds scanner configuration type Config struct { ChainID int RPCURL string YTVaults []VaultConfig YTLPAddress string DeploymentBlocks DeploymentBlocks PollInterval time.Duration BatchSize int64 } type VaultConfig struct { Name string Address string DeployBlock uint64 } type DeploymentBlocks struct { YTLP uint64 } // Scanner handles blockchain event scanning type Scanner struct { config Config client *ethclient.Client lastScannedBlock uint64 isScanning bool mu sync.Mutex // Address tracking ytAddresses map[string]map[string]int64 // vault -> address -> firstSeen lpAddresses map[string]int64 // address -> firstSeen } // Event topics var ( transferEventTopic = crypto.Keccak256Hash([]byte("Transfer(address,address,uint256)")) ) // NewScanner creates a new blockchain scanner func NewScanner(config Config) (*Scanner, error) { client, err := ethclient.Dial(config.RPCURL) if err != nil { return nil, fmt.Errorf("failed to connect to Ethereum client: %w", err) } return &Scanner{ config: config, client: client, ytAddresses: make(map[string]map[string]int64), lpAddresses: make(map[string]int64), }, nil } // loadStateFromDB loads the last scanned block from database func (s *Scanner) loadStateFromDB() uint64 { database := db.GetDB() var state models.ScannerState result := database.Where("chain_id = ? AND scanner_type = ?", s.config.ChainID, "holder").First(&state) if result.Error != nil { return 0 } return state.LastScannedBlock } // saveStateToDB persists the last scanned block to database func (s *Scanner) saveStateToDB(block uint64) { database := db.GetDB() state := models.ScannerState{ ScannerType: "holder", ChainID: s.config.ChainID, LastScannedBlock: block, } database.Where("chain_id = ? AND scanner_type = ?", s.config.ChainID, "holder").Assign(state).FirstOrCreate(&state) database.Model(&state).Update("last_scanned_block", block) } // loadAddressesFromDB restores in-memory address maps from holder_snapshots func (s *Scanner) loadAddressesFromDB() { database := db.GetDB() var snapshots []models.HolderSnapshot database.Where("chain_id = ?", s.config.ChainID).Find(&snapshots) for _, snap := range snapshots { switch snap.TokenType { case "ytLP": s.lpAddresses[snap.HolderAddress] = snap.FirstSeen default: for _, vault := range s.config.YTVaults { if vault.Name == snap.TokenType { if s.ytAddresses[vault.Address] == nil { s.ytAddresses[vault.Address] = make(map[string]int64) } s.ytAddresses[vault.Address][snap.HolderAddress] = snap.FirstSeen } } } } log.Printf("▶️ 从数据库加载了 %d 个历史地址", len(snapshots)) } // Start begins the scanning process func Start(config Config) error { scanner, err := NewScanner(config) if err != nil { return err } log.Println("=== Holder Scanner Started ===") log.Printf("RPC: %s\n", config.RPCURL) log.Printf("Poll Interval: %v\n", config.PollInterval) // Check if we can resume from a previous scan lastBlock := scanner.loadStateFromDB() if lastBlock > 0 { log.Printf("▶️ 发现上次扫描记录,从区块 %d 续扫...", lastBlock) scanner.loadAddressesFromDB() scanner.lastScannedBlock = lastBlock } else { // Fresh start: full initial scan from deploy blocks log.Println("📊 首次扫描,从部署区块开始...") startTime := time.Now() if err := scanner.scanAll(context.Background(), true); err != nil { return fmt.Errorf("initial scan failed: %w", err) } log.Printf("✓ 初始扫描完成,耗时 %v\n", time.Since(startTime)) } // Start polling ticker := time.NewTicker(config.PollInterval) defer ticker.Stop() log.Printf("⏰ 开始轮询,每 %v 扫一次新区块...\n", config.PollInterval) for range ticker.C { if err := scanner.incrementalScan(context.Background()); err != nil { log.Printf("✗ Incremental scan error: %v\n", err) } } return nil } // minUint64 returns the smaller of two uint64 values. func minUint64(a, b uint64) uint64 { if a < b { return a } return b } // scanBatch scans ALL contract types for a single block range [fromBlock, toBlock]. // The range must be ≤ BatchSize. After this call, every contract has been scanned to toBlock. func (s *Scanner) scanBatch(ctx context.Context, fromBlock, toBlock uint64) { log.Printf(" [Batch] Scanning blocks %d → %d\n", fromBlock, toBlock) s.scanYTVaultTransfers(ctx, fromBlock, toBlock) s.scanYTLPTransfers(ctx, fromBlock, toBlock) s.scanSwapEvents(ctx, fromBlock, toBlock) // Checkpoint: all contracts have been scanned up to toBlock s.mu.Lock() s.lastScannedBlock = toBlock s.mu.Unlock() s.saveStateToDB(toBlock) } // scanAll performs a full scan from deployment blocks, saving a checkpoint after every batch. // All contract types are scanned together per batch, so the checkpoint is always safe. func (s *Scanner) scanAll(ctx context.Context, isInitial bool) error { s.mu.Lock() if s.isScanning { s.mu.Unlock() return fmt.Errorf("scan already in progress") } s.isScanning = true s.mu.Unlock() defer func() { s.mu.Lock() s.isScanning = false s.mu.Unlock() }() latestBlock, err := s.client.BlockNumber(ctx) if err != nil { return err } log.Printf("Current block: %d\n", latestBlock) // Determine the earliest block we need to scan across all contracts startBlock := s.lastScannedBlock + 1 if isInitial { startBlock = latestBlock // will be lowered below for _, v := range s.config.YTVaults { startBlock = minUint64(startBlock, v.DeployBlock) } if s.config.YTLPAddress != "" && s.config.DeploymentBlocks.YTLP > 0 { startBlock = minUint64(startBlock, s.config.DeploymentBlocks.YTLP) } } if startBlock > latestBlock { log.Printf("📌 No new blocks to scan (startBlock %d > latestBlock %d)\n", startBlock, latestBlock) return nil } log.Printf("📊 Scanning blocks %d → %d in batches of %d\n", startBlock, latestBlock, s.config.BatchSize) // Outer loop: one checkpoint per batch, all contracts scanned together for batchFrom := startBlock; batchFrom <= latestBlock; { batchTo := minUint64(batchFrom+uint64(s.config.BatchSize)-1, latestBlock) s.scanBatch(ctx, batchFrom, batchTo) batchFrom = batchTo + 1 // Rate limiting between batches if batchFrom <= latestBlock { time.Sleep(100 * time.Millisecond) } } // Balance snapshots: run once after all event scanning is done log.Printf("📊 Updating balance snapshots...\n") for _, vault := range s.config.YTVaults { if err := s.saveYTHolders(ctx, vault); err != nil { log.Printf(" [Snapshot] %s error: %v", vault.Name, err) } } if err := s.saveYTLPHolders(ctx); err != nil { log.Printf(" [Snapshot] ytLP error: %v", err) } log.Printf("📌 Scan complete. Last scanned block: %d\n", latestBlock) return nil } // incrementalScan scans new blocks since last scan. // Incremental ranges are small (seconds of blocks), so a single batch suffices. func (s *Scanner) incrementalScan(ctx context.Context) error { s.mu.Lock() if s.isScanning { s.mu.Unlock() log.Println("⏰ Skipping scan (previous scan still running)") return nil } s.isScanning = true lastBlock := s.lastScannedBlock s.mu.Unlock() defer func() { s.mu.Lock() s.isScanning = false s.mu.Unlock() }() latestBlock, err := s.client.BlockNumber(ctx) if err != nil { return err } if latestBlock <= lastBlock { log.Printf("⏰ [%s] No new blocks (current: %d)\n", time.Now().Format("15:04:05"), latestBlock) return nil } log.Printf("\n%s\n", strings.Repeat("=", 60)) log.Printf("⏰ [%s] Found new blocks %d → %d\n", time.Now().Format("15:04:05"), lastBlock+1, latestBlock) log.Printf("%s\n", strings.Repeat("=", 60)) startTime := time.Now() fromBlock := lastBlock + 1 // Incremental range may exceed BatchSize if server was down for a while; use batches for batchFrom := fromBlock; batchFrom <= latestBlock; { batchTo := minUint64(batchFrom+uint64(s.config.BatchSize)-1, latestBlock) s.scanBatch(ctx, batchFrom, batchTo) batchFrom = batchTo + 1 if batchFrom <= latestBlock { time.Sleep(100 * time.Millisecond) } } // Update balance snapshots for _, vault := range s.config.YTVaults { if err := s.saveYTHolders(ctx, vault); err != nil { log.Printf(" [Snapshot] %s error: %v", vault.Name, err) } } if err := s.saveYTLPHolders(ctx); err != nil { log.Printf(" [Snapshot] ytLP error: %v", err) } log.Printf("✓ Incremental scan completed in %v\n", time.Since(startTime)) return nil } // scanYTVaultTransfers scans Transfer events for all YT vaults in the given block range. // fromBlock/toBlock must already be a single batch (≤ BatchSize blocks). // Each vault skips blocks before its own deployBlock. // Does NOT save balance snapshots (call saveYTHolders separately after all batches complete). func (s *Scanner) scanYTVaultTransfers(ctx context.Context, fromBlock, toBlock uint64) error { for _, vault := range s.config.YTVaults { // Skip if vault not yet deployed in this range if vault.DeployBlock > toBlock { continue } effectiveFrom := fromBlock if vault.DeployBlock > effectiveFrom { effectiveFrom = vault.DeployBlock } logs, err := s.queryBlockRange(ctx, common.HexToAddress(vault.Address), transferEventTopic, effectiveFrom, toBlock) if err != nil { log.Printf(" [Transfer] %s query error: %v", vault.Name, err) continue } if s.ytAddresses[vault.Address] == nil { s.ytAddresses[vault.Address] = make(map[string]int64) } for _, l := range logs { toAddress := common.BytesToAddress(l.Topics[2].Bytes()).Hex() if toAddress == "0x0000000000000000000000000000000000000000" { continue } if _, exists := s.ytAddresses[vault.Address][toAddress]; !exists { block, err := s.client.BlockByNumber(ctx, big.NewInt(int64(l.BlockNumber))) if err == nil { s.ytAddresses[vault.Address][toAddress] = int64(block.Time()) } } } } return nil } // scanYTLPTransfers scans Transfer events for the YT LP token in the given block range. // Does NOT save balance snapshots (call saveYTLPHolders separately after all batches complete). func (s *Scanner) scanYTLPTransfers(ctx context.Context, fromBlock, toBlock uint64) error { if s.config.YTLPAddress == "" { return nil } lpDeployBlock := s.config.DeploymentBlocks.YTLP if lpDeployBlock > toBlock { return nil } effectiveFrom := fromBlock if lpDeployBlock > effectiveFrom { effectiveFrom = lpDeployBlock } logs, err := s.queryBlockRange(ctx, common.HexToAddress(s.config.YTLPAddress), transferEventTopic, effectiveFrom, toBlock) if err != nil { log.Printf(" [Transfer] ytLP query error: %v", err) return nil } for _, l := range logs { toAddress := common.BytesToAddress(l.Topics[2].Bytes()).Hex() if toAddress == "0x0000000000000000000000000000000000000000" { continue } if _, exists := s.lpAddresses[toAddress]; !exists { block, err := s.client.BlockByNumber(ctx, big.NewInt(int64(l.BlockNumber))) if err == nil { s.lpAddresses[toAddress] = int64(block.Time()) } } } return nil } // queryBlockRange queries logs for a single block range (no internal batching). // The caller is responsible for keeping the range within RPC limits (BatchSize). func (s *Scanner) queryBlockRange(ctx context.Context, contractAddr common.Address, topic common.Hash, fromBlock, toBlock uint64) ([]types.Log, error) { query := ethereum.FilterQuery{ FromBlock: big.NewInt(int64(fromBlock)), ToBlock: big.NewInt(int64(toBlock)), Addresses: []common.Address{contractAddr}, Topics: [][]common.Hash{{topic}}, } logs, err := s.client.FilterLogs(ctx, query) if err != nil { return nil, fmt.Errorf("failed to query logs [%d-%d]: %w", fromBlock, toBlock, err) } return logs, nil } // queryLogsInBatches queries logs in batches to avoid RPC limits. // Used only by incremental scans where range size is unpredictable. func (s *Scanner) queryLogsInBatches(ctx context.Context, contractAddr common.Address, topic common.Hash, fromBlock, toBlock uint64) ([]types.Log, error) { var allLogs []types.Log currentBlock := fromBlock log.Printf(" Querying blocks %d -> %d (total: %d blocks)\n", fromBlock, toBlock, toBlock-fromBlock+1) for currentBlock <= toBlock { endBlock := currentBlock + uint64(s.config.BatchSize) if endBlock > toBlock { endBlock = toBlock } log.Printf(" Querying blocks %d - %d...\n", currentBlock, endBlock) logs, err := s.queryBlockRange(ctx, contractAddr, topic, currentBlock, endBlock) if err != nil { return nil, err } allLogs = append(allLogs, logs...) log.Printf(" ✓ Got %d events\n", len(logs)) currentBlock = endBlock + 1 // Rate limiting if currentBlock <= toBlock { time.Sleep(100 * time.Millisecond) } } log.Printf(" Total: %d events\n\n", len(allLogs)) return allLogs, nil } // saveYTHolders queries balances and saves to database func (s *Scanner) saveYTHolders(ctx context.Context, vault VaultConfig) error { // ERC20 balanceOf ABI balanceOfABI, _ := abi.JSON(strings.NewReader(`[{"constant":true,"inputs":[{"name":"account","type":"address"}],"name":"balanceOf","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"}]`)) database := db.GetDB() now := time.Now().Unix() holders := 0 totalAddresses := len(s.ytAddresses[vault.Address]) contractAddr := common.HexToAddress(vault.Address) log.Printf("📞 [%s] 开始查询 %d 个地址的余额", vault.Name, totalAddresses) log.Printf("📍 [%s] 合约地址: %s", vault.Name, vault.Address) processedCount := 0 errorCount := 0 zeroBalanceCount := 0 for address, firstSeen := range s.ytAddresses[vault.Address] { processedCount++ // Call balanceOf data, err := balanceOfABI.Pack("balanceOf", common.HexToAddress(address)) if err != nil { log.Printf("❌ [%s] Pack balanceOf 失败: %s - %v", vault.Name, address, err) errorCount++ continue } result, err := s.client.CallContract(ctx, ethereum.CallMsg{ To: &contractAddr, Data: data, }, nil) if err != nil { log.Printf("❌ [%s] CallContract 失败: %s - %v", vault.Name, address, err) errorCount++ continue } balance := new(big.Int).SetBytes(result) // Log balance query result if processedCount <= 5 || balance.Cmp(big.NewInt(0)) > 0 { log.Printf(" [%s] 地址: %s, 余额: %s", vault.Name, address[:10]+"...", balance.String()) } if balance.Cmp(big.NewInt(0)) == 0 { zeroBalanceCount++ continue } holders++ // Upsert to database: create if not exists, only update balance/last_updated if exists holder := models.HolderSnapshot{ HolderAddress: address, TokenType: vault.Name, TokenAddress: vault.Address, Balance: balance.String(), ChainID: s.config.ChainID, FirstSeen: firstSeen, LastUpdated: now, } var existing models.HolderSnapshot res := database.Where("holder_address = ? AND token_type = ?", address, vault.Name).First(&existing) if res.Error != nil { database.Create(&holder) } else { database.Model(&existing).Updates(map[string]interface{}{ "balance": balance.String(), "last_updated": now, }) } } log.Printf("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━") log.Printf("📊 [%s] 查询完成统计:", vault.Name) log.Printf(" • 总地址数: %d", totalAddresses) log.Printf(" • 已处理: %d", processedCount) log.Printf(" • 余额>0: %d ✅", holders) log.Printf(" • 余额=0: %d", zeroBalanceCount) log.Printf(" • 错误数: %d", errorCount) log.Printf(" • 已保存到数据库: %d", holders) log.Printf("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n") return nil } // saveYTLPHolders saves ytLP holders to database func (s *Scanner) saveYTLPHolders(ctx context.Context) error { balanceOfABI, _ := abi.JSON(strings.NewReader(`[{"constant":true,"inputs":[{"name":"account","type":"address"}],"name":"balanceOf","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"}]`)) database := db.GetDB() now := time.Now().Unix() holders := 0 totalAddresses := len(s.lpAddresses) contractAddr := common.HexToAddress(s.config.YTLPAddress) log.Printf("📞 [ytLP] 开始查询 %d 个地址的余额", totalAddresses) log.Printf("📍 [ytLP] 合约地址: %s", s.config.YTLPAddress) processedCount := 0 errorCount := 0 zeroBalanceCount := 0 for address, firstSeen := range s.lpAddresses { processedCount++ data, err := balanceOfABI.Pack("balanceOf", common.HexToAddress(address)) if err != nil { log.Printf("❌ [ytLP] Pack balanceOf 失败: %s - %v", address, err) errorCount++ continue } result, err := s.client.CallContract(ctx, ethereum.CallMsg{ To: &contractAddr, Data: data, }, nil) if err != nil { log.Printf("❌ [ytLP] CallContract 失败: %s - %v", address, err) errorCount++ continue } balance := new(big.Int).SetBytes(result) // Log balance query result if processedCount <= 5 || balance.Cmp(big.NewInt(0)) > 0 { log.Printf(" [ytLP] 地址: %s, 余额: %s", address[:10]+"...", balance.String()) } if balance.Cmp(big.NewInt(0)) == 0 { zeroBalanceCount++ continue } holders++ holder := models.HolderSnapshot{ HolderAddress: address, TokenType: "ytLP", TokenAddress: s.config.YTLPAddress, Balance: balance.String(), ChainID: s.config.ChainID, FirstSeen: firstSeen, LastUpdated: now, } var existing models.HolderSnapshot res := database.Where("holder_address = ? AND token_type = ?", address, "ytLP").First(&existing) if res.Error != nil { database.Create(&holder) } else { database.Model(&existing).Updates(map[string]interface{}{ "balance": balance.String(), "last_updated": now, }) } } log.Printf("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━") log.Printf("📊 [ytLP] 查询完成统计:") log.Printf(" • 总地址数: %d", totalAddresses) log.Printf(" • 已处理: %d", processedCount) log.Printf(" • 余额>0: %d ✅", holders) log.Printf(" • 余额=0: %d", zeroBalanceCount) log.Printf(" • 错误数: %d", errorCount) log.Printf(" • 已保存到数据库: %d", holders) log.Printf("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n") return nil }