I’ve been looking for a way to quickly scan a file system and gather information about the files in directories contained within. I had been doing this with multiprocessing in Python, but figured Go could speed up my performance by a lot. What I discovered when I went down this path was the sync.ErrGroup
, an extension of the sync.WaitGroup
that helps manage the complexity of multiple go routines but also includes error handling!
The end result of this exploration was a utility called urfs — which you can install on your system to take a uniform random sample of files in a directory or to compute the number of files and bytes per directory. This utility is also extensible to a large number of functionality that requires rapid walking of a file system like search or other utilities.
This post is therefore a bit of a walkthrough on using sync.ErrGroup
for scanning a file system and applying arbitrary functions. First a couple of types:
type WalkFunc func(path string) (string, error)
type FSWalker struct {
Workers int
SkipHidden bool
SkipDirs bool
Match string
root string
paths chan string
nPaths uint64
results chan string
nResults uint64
group *errgroup.Group
ctx context.Context
started time.Time
duration time.Duration
}
The first type is a generic function that can be passed to the Walk
method of the FSWalker
. The FSWalker
maintains state with a variety of channels, and of course the errgroup.Group
object. The SkipHidden, SkipDirs, and Match properties allow us to filter path types being passed to Walk.
To initialize FSWalker
:
func (fs *FSWalker) Init(ctx context.Context) {
// Set up FSWalker defaults
fs.Workers = DefaultWorkers
fs.SkipHidden = true
fs.SkipDirs = true
fs.Match = "*"
// Create the context for the errgroup
if ctx == nil {
// Create a new context
ctx = context.Background()
deadline, ok := fs.ctx.Deadline()
if ok {
ctx, _ = context.WithDeadline(ctx, deadline)
}
}
// Create the err group
fs.group, fs.ctx = errgroup.WithContext(ctx)
// Create channels and instantiate other statistics variables
fs.paths = make(chan string, DefaultBuffer)
fs.results = make(chan string, DefaultBuffer)
fs.nPaths = 0
fs.nResults = 0
fs.started = time.Time{}
fs.duration = time.Duration(0)
}
Ok, so we’re doing a lot of work here, but things get paid off in the Walk function where we keep track of the number of paths we’ve seen at a root directory, passing them off to a WalkFunc
using a variety of Go routines:
func (fs *FSWalker) Walk(path string, walkFn WalkFunc) error {
// Compute the duration of the walk
fs.started = time.Now()
defer func() { fs.duration = time.Since(fs.started) }()
// Set the root path for the walk
fs.root = path
// Launch the goroutine that populates the paths
fs.group.Go(func() error {
// Ensure that the channel is closed when all paths loaded
defer close(fs.paths)
// Apply the path filter to the filepath.Walk function
return filepath.Walk(fs.root, fs.filterPaths)
})
// Create the worker function and allocate pool
worker := fs.worker(walkFn)
for w := 0; w < fs.Workers; w++ {
fs.group.Go(worker)
}
// Wait for the workers to complete, then close the results channel
go func() {
fs.group.Wait()
close(fs.results)
}()
// Start gathering the results
for _ = range fs.results {
fs.nResults++
}
return fs.group.Wait()
}
So this is a lot of code, let’s step through it. The first thing we do is set the started time to now, and defer a function to compute the duration as the difference between the time at the end of the function and the start function. We also set the root value. We then launch a go routine in the ErrGroup
by using fs.group.Go(func)
— this function must have the signature func() error
, so we use an anonymous function to kick off the filepath.Walk
, which starts walking the directory structure, adding paths that match the filter criteria to a buffered channel called fs.paths
, more on this later. This channel must be closed on complete so that our worker go routines complete, more on that later.
Next we create a worker function using our worker method and walk function. The workers read paths off the fs.paths
channel, and apply the walkFn to each path individually. Note that we use a pool-like structure here, limiting the number of workers to 5000 — this is so we don’t get a “too many files open” error when we exhaust the number of file descriptors since Go has unlimited go routines. The worker definitions is here:
func (fs *FSWalker) worker(walkFn WalkFunc) func() error {
return func() error {
// Apply the function all paths in the channel
for path := range fs.paths {
// avoid race condition
p := path
// apply the walk function to the path and return errors
r, err := walkFn(p)
if err != nil {
return err
}
// store the result and check the context
if r != "" {
select {
case fs.results <- r:
case <-fs.ctx.Done():
return fs.ctx.Err()
}
}
}
return nil
}
}
As you can see, the worker function just creates a closure with the signature of our ErrGroup
function, so that we can pass it to the wait group. All the worker function does is range over the paths channel, applying the path to the walkFn
.
Finally, we kick off another go routine that waits until all the workers have stopped, and when it does, we close our results channel. We do this so that we can start gathering results, immediately; we don’t have to wait. We can do this by simply ranging over the results channel and adding the number of results. A final wait at the end means that we can wait for all go routines to complete.
Lastly the filter function. We want to ignore files and directories that are hidden, e.g. start with a “.” or a “~” on Unix systems. We also want to be able to pass a glob like matcher, e.g. "*.txt"
to only match text files. The filter function is here:
// Internal filter paths function that is passed to filepath.Walk
func (fs *FSWalker) filterPaths(path string, info os.FileInfo, err error) error {
// Propagate any errors
if err != nil {
return err
}
// Check to ensure that no mode bits are set
if !info.Mode().IsRegular() {
return nil
}
// Get the name of the file without the complete path
name := info.Name()
// Skip hidden files or directories if required.
if fs.SkipHidden {
if strings.HasPrefix(name, ".") || strings.HasPrefix(name, "~") {
return nil
}
}
// Skip directories if required
if fs.SkipDirs {
if info.IsDir() {
return nil
}
}
// Check to see if the pattern matches the file
match, err := filepath.Match(fs.Match, name)
if err != nil {
return err
} else if !match {
return nil
}
// Increment the total number of paths we've seen.
atomic.AddUint64(&fs.nPaths, 1)
select {
case fs.paths <- path:
case <-fs.ctx.Done():
return fs.ctx.Err()
}
return nil
}
And that’s it, with this simple framework, you can apply an arbitrary walkFn
to all paths in a directory, matching a specific criteria. The big win here is to manage all of the go routines using the ErrGroup
and a context.Context
object.
The following post: Run strikingly fast parallel file searches in Go with sync.ErrGroup by Brian Ketelsen was the primary inspiration for the use of sync.ErrGroup.