Add per-file locking to prevent concurrent copy races

Introduced a sync.Map of mutexes in FileCopyManager to serialize concurrent file copy operations for the same file cache key. Updated GetTempCopy to use double-checked locking, ensuring only one goroutine creates or updates a cached file at a time. Improved atomicCopyFile to handle destination file existence gracefully, avoiding errors when concurrent processes create the same file.
This commit is contained in:
lx1056758714-glitch
2025-12-21 10:02:03 +08:00
parent 6c04569123
commit 1cbad609fa
2 changed files with 111 additions and 26 deletions
+44
View File
@@ -0,0 +1,44 @@
[2025-12-21 09:58:29.440] [INFO] 正在进行第 1 轮内存扫描... 请打开任意图片以触发密钥加载
[2025-12-21 09:58:29.473] [DEBUG] 内存扫描结束,共检查了 894 个候选图片密钥字符串
[2025-12-21 09:58:29.473] [DEBUG] 内存扫描结束,共检查了 1124 个候选图片密钥字符串
[2025-12-21 09:58:29.473] [DEBUG] 内存扫描结束,共检查了 630 个候选图片密钥字符串
[2025-12-21 09:58:29.473] [DEBUG] 内存扫描结束,共检查了 535 个候选图片密钥字符串
[2025-12-21 09:58:29.473] [DEBUG] 内存扫描结束,共检查了 462 个候选图片密钥字符串
[2025-12-21 09:58:29.473] [DEBUG] 内存扫描结束,共检查了 702 个候选图片密钥字符串
[2025-12-21 09:58:29.473] [DEBUG] 内存扫描结束,共检查了 599 个候选图片密钥字符串
[2025-12-21 09:58:29.473] [DEBUG] 内存扫描结束,共检查了 2072 个候选图片密钥字符串
[2025-12-21 09:58:30.506] [DEBUG] 内存扫描结束,共检查了 2038 个候选图片密钥字符串
[2025-12-21 09:58:30.506] [DEBUG] 内存扫描结束,共检查了 472 个候选图片密钥字符串
[2025-12-21 09:58:30.506] [DEBUG] 内存扫描结束,共检查了 702 个候选图片密钥字符串
[2025-12-21 09:58:30.506] [DEBUG] 内存扫描结束,共检查了 903 个候选图片密钥字符串
[2025-12-21 09:58:30.506] [DEBUG] 内存扫描结束,共检查了 963 个候选图片密钥字符串
[2025-12-21 09:58:30.506] [DEBUG] 内存扫描结束,共检查了 373 个候选图片密钥字符串
[2025-12-21 09:58:30.506] [DEBUG] 内存扫描结束,共检查了 1124 个候选图片密钥字符串
[2025-12-21 09:58:30.506] [DEBUG] 内存扫描结束,共检查了 443 个候选图片密钥字符串
[2025-12-21 09:58:31.539] [DEBUG] 内存扫描结束,共检查了 532 个候选图片密钥字符串
[2025-12-21 09:58:31.539] [DEBUG] 内存扫描结束,共检查了 936 个候选图片密钥字符串
[2025-12-21 09:58:31.539] [DEBUG] 内存扫描结束,共检查了 1124 个候选图片密钥字符串
[2025-12-21 09:58:31.539] [DEBUG] 内存扫描结束,共检查了 470 个候选图片密钥字符串
[2025-12-21 09:58:31.539] [DEBUG] 内存扫描结束,共检查了 560 个候选图片密钥字符串
[2025-12-21 09:58:31.539] [DEBUG] 内存扫描结束,共检查了 737 个候选图片密钥字符串
[2025-12-21 09:58:31.539] [DEBUG] 内存扫描结束,共检查了 621 个候选图片密钥字符串
[2025-12-21 09:58:31.539] [DEBUG] 内存扫描结束,共检查了 2038 个候选图片密钥字符串
[2025-12-21 09:58:32.576] [DEBUG] 内存扫描结束,共检查了 520 个候选图片密钥字符串
[2025-12-21 09:58:32.576] [DEBUG] 内存扫描结束,共检查了 902 个候选图片密钥字符串
[2025-12-21 09:58:32.576] [DEBUG] 内存扫描结束,共检查了 1114 个候选图片密钥字符串
[2025-12-21 09:58:32.576] [DEBUG] 内存扫描结束,共检查了 2072 个候选图片密钥字符串
[2025-12-21 09:58:32.576] [DEBUG] 内存扫描结束,共检查了 232 个候选图片密钥字符串
[2025-12-21 09:58:32.576] [DEBUG] 内存扫描结束,共检查了 702 个候选图片密钥字符串
[2025-12-21 09:58:32.576] [DEBUG] 内存扫描结束,共检查了 896 个候选图片密钥字符串
[2025-12-21 09:58:32.576] [DEBUG] 内存扫描结束,共检查了 568 个候选图片密钥字符串
[2025-12-21 09:58:33.576] [INFO] 正在进行第 5 轮内存扫描... 请打开任意图片以触发密钥加载
[2025-12-21 09:58:33.623] [DEBUG] 内存扫描结束,共检查了 816 个候选图片密钥字符串
[2025-12-21 09:58:33.623] [DEBUG] 内存扫描结束,共检查了 817 个候选图片密钥字符串
[2025-12-21 09:58:33.623] [DEBUG] 内存扫描结束,共检查了 665 个候选图片密钥字符串
[2025-12-21 09:58:33.623] [DEBUG] 内存扫描结束,共检查了 417 个候选图片密钥字符串
[2025-12-21 09:58:33.623] [DEBUG] 内存扫描结束,共检查了 374 个候选图片密钥字符串
[2025-12-21 09:58:33.623] [DEBUG] 内存扫描结束,共检查了 893 个候选图片密钥字符串
[2025-12-21 09:58:33.623] [DEBUG] 内存扫描结束,共检查了 2328 个候选图片密钥字符串
[2025-12-21 09:58:33.623] [DEBUG] 内存扫描结束,共检查了 1025 个候选图片密钥字符串
[2025-12-21 09:58:34.659] [SUCCESS] 通过字符串扫描找到图片密钥! (在检查了 700 个候选后) Key: 34643931386533626538323732313734
[2025-12-21 09:58:34.661] [DEBUG] 内存扫描结束,共检查了 1010 个候选图片密钥字符串
+67 -26
View File
@@ -71,6 +71,7 @@ type FileCopyManager struct {
cancel context.CancelFunc // Cancel function for graceful shutdown
wg sync.WaitGroup // WaitGroup for goroutine synchronization
cacheSize int64 // Current number of cached entries (atomic)
locks sync.Map // Locks for concurrent file copies: key -> *sync.Mutex
}
// FileIndexEntry represents an indexed temporary file with comprehensive metadata.
@@ -749,6 +750,13 @@ func (fm *FileCopyManager) cleanupOrphanedFilesInternal() {
}
}
// getLock returns or creates a mutex for the specified key.
// This ensures that concurrent operations for the same file are serialized.
func (fm *FileCopyManager) getLock(key string) *sync.Mutex {
v, _ := fm.locks.LoadOrStore(key, &sync.Mutex{})
return v.(*sync.Mutex)
}
// GetTempCopy creates or retrieves a temporary copy of the specified file.
// It provides persistent caching with instance-based isolation.
//
@@ -800,30 +808,65 @@ func (fm *FileCopyManager) GetTempCopy(originalPath string) (string, error) {
// Use unified cache key generation
cacheKey := fm.generateCacheKey(fm.instanceID, baseName, ext, currentHash, expectedDataHash)
var oldTempPath string // Track old file to delete after successful creation
if value, exists := fm.fileIndex.Load(cacheKey); exists {
entry := value.(*FileIndexEntry)
// Found cached file, verify it still exists and matches
if _, err := os.Stat(entry.TempPath); err == nil && currentSize == entry.Size {
// File exists and size matches, reuse cached copy
entry.SetLastAccess(now) // Update access time atomically
entry.SetOriginalPath(originalPath) // Update original path thread-safely
return entry.TempPath, nil
} else {
// Cached file is missing or size mismatch, remove from index
fm.fileIndex.Delete(cacheKey)
atomic.AddInt64(&fm.cacheSize, -1)
if err == nil {
// File exists but size mismatch, mark for cleanup
oldTempPath = entry.TempPath
// Helper function to check cache
checkCache := func() (string, bool) {
if value, exists := fm.fileIndex.Load(cacheKey); exists {
entry := value.(*FileIndexEntry)
// Found cached file, verify it still exists and matches
if _, err := os.Stat(entry.TempPath); err == nil && currentSize == entry.Size {
// File exists and size matches, reuse cached copy
entry.SetLastAccess(now) // Update access time atomically
entry.SetOriginalPath(originalPath) // Update original path thread-safely
return entry.TempPath, true
} else {
// Cached file is missing or size mismatch, remove from index
fm.fileIndex.Delete(cacheKey)
atomic.AddInt64(&fm.cacheSize, -1)
if err == nil {
// File exists but size mismatch, mark for cleanup
// We can't safely clean it here if we are going to create a new one with same key?
// Actually cacheKey includes dataHash. If size mismatch, dataHash SHOULD be different.
// But if we are here, it means cacheKey matched.
// So collision? Or file modified on disk?
// In any case, we remove it.
// Note: We don't delete synchronously here to avoid race, async worker handles it.
select {
case fm.deletionChan <- entry.TempPath:
default:
// Channel full, try synchronous delete if possible or ignore
}
}
}
}
return "", false
}
// First check (Fast Path)
if path, ok := checkCache(); ok {
return path, nil
}
// Slow Path: Acquire lock and double check
lock := fm.getLock(cacheKey)
lock.Lock()
defer lock.Unlock()
// Double Check (after acquiring lock)
if path, ok := checkCache(); ok {
return path, nil
}
// Strategy 2: No valid cached file found, create new one
tempPath := fm.generateTempPath(originalPath)
// Before creating new copy, clean up old versions of the same file
// Note: cleaning up old versions might race with other locks if they lock on different cacheKeys?
// cleanupOldVersions uses versionKey (without dataHash).
// If we lock on cacheKey (with dataHash), we allow concurrent creation of DIFFERENT versions.
// This is acceptable. cleanupOldVersions handles its own safety via fileIndex iteration?
// cleanupOldVersions iterates and deletes. It might delete something being used?
// It only deletes if dataHash != currentDataHash.
// So it won't delete what WE are building (since we match currentDataHash).
fm.cleanupOldVersions(baseName, ext, currentHash, expectedDataHash)
// Perform atomic file copy
@@ -852,16 +895,6 @@ func (fm *FileCopyManager) GetTempCopy(originalPath string) (string, error) {
go fm.performCacheCleanup()
}
// Clean up old version after successful creation to prevent race conditions
if oldTempPath != "" {
select {
case fm.deletionChan <- oldTempPath:
default:
// Channel full, delete synchronously as fallback
os.Remove(oldTempPath)
}
}
return tempPath, nil
}
@@ -966,6 +999,14 @@ func (fm *FileCopyManager) atomicCopyFile(src, dst string) error {
// Atomic rename to final destination
if err = os.Rename(tempDst, dst); err != nil {
// If rename failed, check if the destination file already exists
if _, statErr := os.Stat(dst); statErr == nil {
// Destination exists, likely created by a concurrent process.
// We can consider this a success (reuse existing file).
// Clean up our temporary file since we don't need it.
os.Remove(tempDst)
return nil
}
return fmt.Errorf("failed to rename temporary file: %w", err)
}