Rate Limiting Channels In Go
Introduction
I was recently working on a disk space analysis tool built into my Filejitsu command lint tool kit written in GO. I call this tool the Space Analyzer and it creates a JSON (or streaming JSON) file with details of your disks contents which can be visualized in any manner of UI. I made a quick web based UI that I plan to improve over time, but for now it does what I need it to do. While working on the scanning tool scanning a very large disk with A LOT of files trying to be slick I was spinning up goroutines for calculating file hashes. I quickly ran into a panic for creating too many threads. That got me thinking of how to rate limit the number of goroutines being spun up in an application.
Rate Limiting With Buffered Channels
To limit the number of concurrent goroutines I used a buffered channel. Comments below discuss how this was accomplished.
func (cfs *concurrentFSScanner) Scan(logger *slog.Logger, entityPath, rootID string, shouldCalculateFileHashes bool, maxRecursion int) (FSEntity, error) {
logger.Info("starting concurrent scan",
slog.Int("concurrencyLimit", cfs.concurrencyLimit),
slog.String("entityPath", entityPath),
slog.String("rootID", rootID),
slog.Bool("shouldCalculateFileHashes", shouldCalculateFileHashes),
slog.Int("maxRecursion", maxRecursion),
)
files := make(map[string][]FSEntity)
dirs := make(map[string][]FSEntity)
// This channel is my rate limiter. cfs.concurrencyLimit is the max number of
// go routines I want to spawn from this function at a time.
limiter := make(chan bool, cfs.concurrencyLimit)
// This wait groups is making sure all of my go routines are finished before
// the function returns as it needs the data from the go routines to return.
wg := sync.WaitGroup{}
if len(rootID) == 0 {
logger.Warn("creating new id because one provided was blank")
rootID = defaultRootID
}
// This channel is being used to spin up go routines per item published on the channel.
// It will be limited by the limiter channel below.
jobsChan := enumerateScanTargets(logger, entityPath, rootParentID, rootID, maxRecursion)
mutex := sync.Mutex{}
// Adding 1 to wait group here so that the function does not immediately
// return before starting the goroutine.
wg.Add(1)
go func() {
// We have an infinite loop that runs until the jobsChan closes.
for {
j, ok := <-jobsChan
// Here we write to the buffered channel. If the buffer is full this will block until
// items are read from the limiter channel
limiter <- true
if !ok {
logger.Warn("jobs channel closed")
if j == emptyFSJob {
logger.Debug("last item from channel was empty")
break
}
}
// We add one more to the wait group so that if the channel closes the
// go routine below will finish before the parent function returns.
wg.Add(1)
go func() {
defer func() {
// We read one from the limiter channel to
// free up a slot in the buffered channel.
<-limiter
// We decrement the wg because the job go routine is complete.
wg.Done()
}()
if j.FailedScan {
logger.Warn("failed scan job",
slog.String("errorMessage", j.Error.Error()),
slog.String("id", j.ID),
slog.String("parentID", j.ParentID),
slog.String("fullPath", j.FullPath),
)
}
if j.IsDir {
dir := FileInfoToFSEntry(logger, j.Info, j.ParentID, j.ID, j.FullPath, shouldCalculateFileHashes, j.Depth)
dir.Depth = j.Depth
logger.Debug("received dir job", slog.Any("dirEntity", dir))
mutex.Lock()
dirs[dir.ParentID] = append(dirs[dir.ParentID], dir)
mutex.Unlock()
} else {
file := FileInfoToFSEntry(logger, j.Info, j.ParentID, j.ID, j.FullPath, shouldCalculateFileHashes, j.Depth)
file.Depth = j.Depth
mutex.Lock()
files[j.ParentID] = append(files[j.ParentID], file)
mutex.Unlock()
}
}()
}
// When we break out of the above infinite loop we make the final call to wg.Done.
// so the parent function can return.
wg.Done()
}()
// This keeps us from exiting the function until all of our jobs are complete.
wg.Wait()
close(limiter)
rootEntity := dirs[rootParentID][0]
delete(dirs, rootParentID)
logger.Info("collating entities")
entity := collateEntities(logger, rootEntity, files, dirs)
logger.Info("finished collating entities")
return entity, nil
}
Additional Notes
The limit on the number of threads GoLang can create can be modified with the SetMaxThreads function in the runtime package.
You can see how many logical CPUs are available with the NumCPU function in the runtime package. You do not need to limit the number of goroutines to the number of logical CPUs available, but how you rate limit the number of goroutines will depend how where your bottleneck is. In my case it was highly IO bound to my disk being scanned, so spinning up 1000 threads would not be helpful, but more than one greatly increases the speed of processing. If you are CPU bound then limiting to the number of logical processors available could make sense.