Lock Diagnostics in Go

By now it’s pretty clear that I’ve just had a bear of a time with locks and synchronization inside of multi-threaded environments with Go. Probably most gophers would simply tell me that I should share memory by communicating rather than to communication by sharing memory — and frankly I’m in that camp too. The issue is that:

  • Mutexes can be more expressive than channels
  • Channels are fairly heavyweight

So to be honest, there are situations where a mutex is a better choice than a channel. I believe that one of those situations is when dealing with replicated state machines … which is what I’ve been working on the past few months. The issue is that the state of the replica has to be consistent across a variety of events: timers and remote messages. The problem is that the timers and network traffic are all go routines, and there can be a lot of them running in the system at a time.

Of course you could simply create a channel and push event objects to it to serialize all events. The problem with that is that events generate other events that have to be ordered with respect to the parent event. For example, one event might require the generation of messages to be sent to remote replicas, which requires a per-remote state read that is variable. Said another way, the state can be read locked for all go routines operating at that time, but no write locks can be acquired. Hence the last post.

Things got complicated. Lock contention was a thing.

So I had to diagnose who was trying to acquire locks and when and why they were contending. For reference, the most common issues were:

  1. A global read lock was being released before all sub read locks were finished.
  2. A struct with an embedded RWMutex was then embedded by another object with only a Mutex but it still had RLock() methods as a result (or vice versa).
  3. The wrong lock was being called on embedded structs.

The primary lesson I learned was this: when embedding synchronized objects, only embed the mutex on the child object. Hopefully that rule of thumb lasts.

I learned these lessons using a handy little diagnostic tool that this snippet is about. Basically I wanted to track who was acquiring locks and who was waiting on locks. I could then print out a report when I thought something was contending (e.g. on an Interrupt signal) and figure things out.

First step, figure out the name of the calling method:

// Caller returns the name function that called the function which
// called the caller function.
func caller() string {
	pc, _, _, ok := runtime.Caller(2)
	details := runtime.FuncForPC(pc)
	if ok && details != nil {
		return details.Name()
	}
	return UnknownCaller
}

This handy little snippet uses the runtime package to detect the caller two steps above the caller() function in the stack. This allows you to call caller() inside of a function to get the name of the function that’s calling the function calling caller(). Confusing? Try this:

func outer() string {
    return inner()
}

func inner() string {
    return caller()
}

Calling outer() will return something like main.outer — the function that called the inner() function. Here is a runnable example.

With that in hand we can simply create a map[string]int64 and increment any calls by caller name before Lock() and decrement any calls by caller name after Unlock(). Here is the example:

But … that’s actually a little more complicated than I let on!

The problem is that we definitely have multiple go routines calling locks on the lockable struct. However, if we simply try to access the map in the MutexD, then we can have a panic for concurrent map reads and writes. So now, I use the share memory by communicating technique and pass signals via an internal channel, which is read by a go routine ranging over it.

How to use it? Well do something like this:

type StateMachine struct {
    MutexD
}

func (s *StateMachine) Alpha() {
    s.Lock()
    defer s.Unlock()
    time.Sleep(1*time.Second)
}

func (s *StateMachine) Bravo() {
    s.Lock()
    defer s.Unlock()
    time.Sleep(100*time.Millisecond)
}

func main() {
    m := new(StateMachine)
    go m.Alpha()
    time.Sleep(100*time.Millisecond)
    for i:=0; i < 2; i++ {
        go m.Bravo()
    }

    fmt.Println(m.MutexD.String())
}

You should see something like:

1 locks requested by main.(*StateMachine).Alpha
2 locks requested by main.(*StateMachine).Bravo

Obviously you can do the same thing for RWMutex objects, and it’s easy to swap them in and out of code by changing the package and adding or removing a “D”. My implementation is here: github.com/bbengfort/x/lock.

Lock Queuing in Go

In Go, you can use sync.Mutex and sync.RWMutex objects to create thread-safe data structures in memory as discussed in “Synchronizing Structs for Safe Concurrency in Go”. When using the sync.RWMutex in Go, there are two kinds of locks: read locks and write locks. The basic difference is that many read locks can be acquired at the same time, but only one write lock can be acquired at at time.

This means that if a thread attempts to acquire a read lock on an object that is already read locked, then it will not block and it will acquire its own read lock. If a thread attempts to acquire a read or a write lock on a write locked object, then it will block until it is unlocked (as will a write lock acquisition on a read locked object).

Granting a lock can be prioritized depending on different policies for accesses. Priorities balance the trade-off between concurrency and starvation as follows:

  1. Read-Preferring RW allows new read locks to be acquired as long as the lock is read-locked, forcing the write-lock acquirer to wait until there are no more read-locks. In high contention environments, this might lead to write-starvation.

  2. Write-Preferring RW prevents a read-lock acquisition if a writer is queued and waiting for the lock. This reduces concurrency, because new read locks have to wait for the write lock, but prevents starvation.

So which of these does Go implement? According to the documentation:

If a goroutine holds a RWMutex for reading and another goroutine might call Lock, no goroutine should expect to be able to acquire a read lock until the initial read lock is released. In particular, this prohibits recursive read locking. This is to ensure that the lock eventually becomes available; a blocked Lock call excludes new readers from acquiring the lock. — godoc

My initial read of this made me think that Go implements write-preferring mutexes. However, this was not the behavior that I observed.

Consider the following locker:

var delay time.Duration
var started time.Time

// Locker holds values that are threadsafe
type Locker struct {
	sync.RWMutex
	value  uint64    // the current value of the locker
	access time.Time // time of the last access
}

// Write to the value of the locker in a threadsafe fashion.
func (l *Locker) Write(value uint64) {
	l.Lock()
	defer l.Unlock()

	// Arbitrary amount of work
	time.Sleep(delay)

	l.value = value
	l.access = time.Now()
	l.log("written")
}

// Read the value of the locker in a threadsafe fasion.
func (l *Locker) Read() uint64 {
	l.RLock()
	defer l.RUnlock()

	// Arbirtray amount of work
	time.Sleep(delay / 2)
	l.access = time.Now()
	l.log("read")
	return l.value
}

// Log the access (not thread-safe)
func (l *Locker) log(method string) {
	after := l.access.Sub(started)
	log.Printf(
		"%d %s after %s\n", l.value, method, after,
	)
}

This locker holds a value and logs all accesses to it after the start time. If we run a few threads to read and write to it we can see concurrent reads in action:

func main() {
	delay = 1 * time.Second
	started = time.Now()
	group := new(errgroup.Group)
	locker := new(Locker)

	// Straight forward, write three reads and a write
	group.Go(func() error { locker.Write(42); return nil })
	group.Go(func() error { locker.Read(); return nil })
	group.Go(func() error { locker.Read(); return nil })
	group.Go(func() error { locker.Read(); return nil })
	group.Go(func() error { locker.Write(101); return nil })

	group.Wait()
}

The output is as follows

$ go run locker.go
2017/09/08 12:26:32 101 written after 1.005058824s
2017/09/08 12:26:33 101 read after 1.50770225s
2017/09/08 12:26:33 101 read after 1.507769109s
2017/09/08 12:26:33 101 read after 1.50773587s
2017/09/08 12:26:34 42 written after 2.511968581s

Note that the last go routine actually managed to acquire the lock first, after which the three readers managed to acquire the lock, then finally the last writer. Now if we interleave the read and write access, adding a sleep between the kick-off of each go routine to ensure that the preceding thread has time to acquire the lock:

func main() {
	delay = 1 * time.Second
	started = time.Now()
	group := new(errgroup.Group)
	locker := new(Locker)

	// Straight forward, write three reads and a write
	group.Go(func() error { locker.Write(42); return nil })
	time.Sleep(10 * time.Millisecond)
	group.Go(func() error { locker.Read(); return nil })
	time.Sleep(10 * time.Millisecond)
	group.Go(func() error { locker.Write(101); return nil })
	time.Sleep(10 * time.Millisecond)
	group.Go(func() error { locker.Read(); return nil })
	time.Sleep(10 * time.Millisecond)
	group.Go(func() error { locker.Write(3); return nil })
	time.Sleep(10 * time.Millisecond)
	group.Go(func() error { locker.Read(); return nil })
	time.Sleep(10 * time.Millisecond)
	group.Go(func() error { locker.Write(18); return nil })
	time.Sleep(10 * time.Millisecond)
	group.Go(func() error { locker.Read(); return nil })

	group.Wait()
}

We get the following output:

go run locker.go
2017/09/08 12:29:28 42 written after 1.000178155s
2017/09/08 12:29:28 42 read after 1.500703007s
2017/09/08 12:29:28 42 read after 1.500691088s
2017/09/08 12:29:28 42 read after 1.500756144s
2017/09/08 12:29:28 42 read after 1.500648159s
2017/09/08 12:29:28 42 read after 1.500762323s
2017/09/08 12:29:28 42 read after 1.500679533s
2017/09/08 12:29:28 42 read after 1.500795204s
2017/09/08 12:29:29 101 written after 2.500971593s
2017/09/08 12:29:30 3 written after 3.505325487s
2017/09/08 12:29:31 18 written after 4.50594131s

This suggests that the reads continue to acquire locks as long as the Locker is read locked, forcing the writes to happen at the end.

I found one Stack Overflow post: “Read preferring RW mutex lock in Golang” that seems to suggest that sync.RWMutex can implement both read and write preferred locking, but doesn’t really give an explanation about how external callers can implement it.

Finally consider the following:

func main() {
	delay = 1 * time.Second
	started = time.Now()
	group := new(errgroup.Group)
	locker := new(Locker)

	// Straight forward, write three reads and a write
	group.Go(func() error { locker.Write(42); return nil })
	group.Go(func() error { locker.Write(101); return nil })
	group.Go(func() error { locker.Write(3); return nil })
	group.Go(func() error { locker.Write(18); return nil })

	for i := 0; i < 22; i++ {
		group.Go(func() error {
			locker.Read()
			return nil
		})
		time.Sleep(delay / 4)
	}

	group.Wait()
}

Given the loop issuing 22 read locks that sleep only a quarter of the time of the write lock, we might expect that this code will issue 22 read locks then all the write locks will occur at the end (and if we put this in a forever loop, then the writes would never occur). However, the output of this is as follows:

2017/09/08 12:43:40 18 written after 1.004461829s
2017/09/08 12:43:40 18 read after 1.508343716s
2017/09/08 12:43:40 18 read after 1.50842899s
2017/09/08 12:43:40 18 read after 1.508362345s
2017/09/08 12:43:40 18 read after 1.508339659s
2017/09/08 12:43:40 18 read after 1.50852229s
2017/09/08 12:43:41 42 written after 2.513789339s
2017/09/08 12:43:42 42 read after 3.0163191s
2017/09/08 12:43:42 42 read after 3.016330534s
2017/09/08 12:43:42 42 read after 3.016355628s
2017/09/08 12:43:42 42 read after 3.016371381s
2017/09/08 12:43:42 42 read after 3.016316992s
2017/09/08 12:43:43 3 written after 4.017954589s
2017/09/08 12:43:43 3 read after 4.518495233s
2017/09/08 12:43:43 3 read after 4.518523255s
2017/09/08 12:43:43 3 read after 4.518537387s
2017/09/08 12:43:43 3 read after 4.518540397s
2017/09/08 12:43:43 3 read after 4.518543262s
2017/09/08 12:43:43 3 read after 4.51863128s
2017/09/08 12:43:44 101 written after 5.521872765s
2017/09/08 12:43:45 101 read after 6.023207828s
2017/09/08 12:43:45 101 read after 6.023225272s
2017/09/08 12:43:45 101 read after 6.023249529s
2017/09/08 12:43:45 101 read after 6.023190828s
2017/09/08 12:43:45 101 read after 6.023243032s
2017/09/08 12:43:45 101 read after 6.023190457s
2017/09/08 12:43:45 101 read after 6.04455716s
2017/09/08 12:43:45 101 read after 6.29457923s

What Go implements is actually something else: read locks can only be acquired so long as the original read lock is maintained (the word “initial” being critical in the documentation). As soon as the first read lock is released, then the queued write-lock gets priority. The first read lock lasts approximately 500ms; this means that there is enough time for between 4-5 other locks to acquire a read lock, as soon as the first read lock completes, the write is given priority.

Messaging Throughput gRPC vs. ZMQ

Building distributed systems in Go requires an RPC or message framework of some sort. In the systems I build I prefer to pass messages serialized with protocol buffers therefore a natural choice for me is grpc. The grpc library uses HTTP2 as a transport layer and provides a code generator based on the protocol buffer syntax making it very simple to use.

For more detailed control, the ZMQ library is an excellent, low latency socket framework. ZMQ provides several communication patterns from basic REQ/REP (request/reply) to PUB/SUB (publish/subscribe). ZMQ is used at a lower level though, so more infrastructure per app needs to be built.

This leads to the obvious question: which RPC framework is faster? Here are the results:

Echo Server Throughput

These results show the message throughput of three echo servers that respond to a simple message with a response including a sequence number. Each server is running on its own EC2 micro instance with 1GB of memory and 1 vCPU. Each client is running on on an EC2 nano instance with 0.5GB of memory and 1 vCPU and are constantly sending messages at the server. The throughput is the number of messages per second the server can handle.

The servers are as follows:

  1. rep: a server that implements a REQ/REP socket and simple handler.
  2. router: a server that implements a REQ/ROUTER socket along with a DEALER/REP socket for 16 workers, connected via a proxy.
  3. grpc: implements a gRPC service.

The runner and results can be found here.

Discussion

All the figures exhibit a standard shape for throughput - namely as more clients are added the throughput increases, but begins to tail off toward an asymptote. The asymptote represents the maximum number of messages a server can respond to without message latency. Generally speaking if a server can handle multiple clients at once, the throughput is higher.

The ZMQ REQ/ROUTER/PROXY/DEALER/REP server with 16 workers outperforms the gRPC server (it has a higher overall throughput). I hypothesize that this is because ZMQ does not have the overhead of HTTP and is in fact lighter weight code than gRPC since none of it is generated. It’s unclear if adding more workers would improve the throughput of the ZMQ router server.

The performance of the REQ/REP server is a mystery. It’s doing way better than the other two. This socket has very little overhead, so for fewer clients it should be performing better. However, this socket also blocks on a per-client basis. Both grpc and router are asynchronous and can handle multiple clients at a time suggesting that they should be much faster.

Online Distribution

This post started out as a discussion of a struct in Go that could keep track of online statistics without keeping an array of values. It ended up being a lesson on over-engineering for concurrency.

The spec of the routine was to build a data structure that could keep track of internal statistics of values over time in a space-saving fashion. The primary interface was a method, Update(sample float64), so that a new sample could be passed to the structure, updating internal parameters. At conclusion, the structure should be able to describe the mean, variance, and range of all values passed to the update method. I created two versions:

  1. A thread-safe version using mutexes, but blocking on Update()
  2. A thread-safe version using a channel and a go routine so that Update() was non-blocking.

I ran some benchmarking, and discovered that the blocking implementation of Update was actually far faster than the non-blocking version. Here are the numbers:

BenchmarkBlocking-8      	20000000            81.1 ns/op
BenchmarkNonBlocking-8   	10000000	       140 ns/op

Apparently, putting a float on a channel, even a buffered channel, incurs some overhead that is more expensive than simply incrementing and summing a few integers and floats. I will present both methods here, but note that the first method (blocking update) should be implemented in production.

You can find this code at github.com/bbengfort/x/stats if you would like to use it in your work.

Online Descriptive Statistics (Blocking)

To track statistics in an online fashion, you need to keep track of the various aggregates that are used to compute the final descriptives statistics of the distribution. For simple statistics such as the minimum, maximum, standard deviation, and mean you need to track the number of samples, the sum of samples, and the sum of the squares of all samples (along with the minimum and maximum value seen). Here is how you do that:

I use this data structure as a lightweight mechanism to keep track of online statistics for experimental results or latency. It gives a good overall view of incoming values at very little expense.

Non-blocking Update

In an attempt to improve the performance of this method, I envisioned a mechanism where I could simply dump values into a buffered channel then run an updater go routine to collect values and perform the online computation. The updater function can simply range over the channel, and the channel can be closed to stop the goroutine and finalize anything still on the channel. This is written as follows:

The lesson was that this is actually less performant, no matter how large the buffer is. I increased the buffer size to 10000001 to ensure that the sender could not block, but I still received 116 ns/op benchmarks. Generally, this style is what I use when the function being implemented is actually pretty heavy (e.g. writes to disk). In this case, the function was too lightweight to matter!

Rapid FS Walks with ErrGroup

I’ve been looking for a way to quickly scan a file system and gather information about the files in directories contained within. I had been doing this with multiprocessing in Python, but figured Go could speed up my performance by a lot. What I discovered when I went down this path was the sync.ErrGroup, an extension of the sync.WaitGroup that helps manage the complexity of multiple go routines but also includes error handling!

The end result of this exploration was a utility called urfs — which you can install on your system to take a uniform random sample of files in a directory or to compute the number of files and bytes per directory. This utility is also extensible to a large number of functionality that requires rapid walking of a file system like search or other utilities.

This post is therefore a bit of a walkthrough on using sync.ErrGroup for scanning a file system and applying arbitrary functions. First a couple of types:

type WalkFunc func(path string) (string, error)

type FSWalker struct {
	Workers    int             
	SkipHidden bool            
	SkipDirs   bool            
	Match      string          
	root       string          
	paths      chan string     
	nPaths     uint64          
	results    chan string     
	nResults   uint64        
	group      *errgroup.Group
	ctx        context.Context
	started    time.Time       
	duration   time.Duration   
}

The first type is a generic function that can be passed to the Walk method of the FSWalker. The FSWalker maintains state with a variety of channels, and of course the errgroup.Group object. The SkipHidden, SkipDirs, and Match properties allow us to filter path types being passed to Walk.

To initialize FSWalker:

func (fs *FSWalker) Init(ctx context.Context) {
	// Set up FSWalker defaults
	fs.Workers = DefaultWorkers
	fs.SkipHidden = true
	fs.SkipDirs = true
	fs.Match = "*"

    // Create the context for the errgroup
	if ctx == nil {
		// Create a new context
		ctx = context.Background()
		deadline, ok := fs.ctx.Deadline()
		if ok {
			ctx, _ = context.WithDeadline(ctx, deadline)
		}
	}

    // Create the err group
    fs.group, fs.ctx = errgroup.WithContext(ctx)

    // Create channels and instantiate other statistics variables
	fs.paths = make(chan string, DefaultBuffer)
	fs.results = make(chan string, DefaultBuffer)
	fs.nPaths = 0
	fs.nResults = 0
	fs.started = time.Time{}
	fs.duration = time.Duration(0)
}

Ok, so we’re doing a lot of work here, but things get paid off in the Walk function where we keep track of the number of paths we’ve seen at a root directory, passing them off to a WalkFunc using a variety of Go routines:

func (fs *FSWalker) Walk(path string, walkFn WalkFunc) error {
	// Compute the duration of the walk
	fs.started = time.Now()
	defer func() { fs.duration = time.Since(fs.started) }()

	// Set the root path for the walk
	fs.root = path

	// Launch the goroutine that populates the paths
	fs.group.Go(func() error {
        // Ensure that the channel is closed when all paths loaded
    	defer close(fs.paths)

        // Apply the path filter to the filepath.Walk function
    	return filepath.Walk(fs.root, fs.filterPaths)
    })

	// Create the worker function and allocate pool
	worker := fs.worker(walkFn)
	for w := 0; w < fs.Workers; w++ {
		fs.group.Go(worker)
	}

	// Wait for the workers to complete, then close the results channel
	go func() {
		fs.group.Wait()
		close(fs.results)
	}()

	// Start gathering the results
	for _ = range fs.results {
		fs.nResults++
	}

	return fs.group.Wait()
}

So this is a lot of code, let’s step through it. The first thing we do is set the started time to now, and defer a function to compute the duration as the difference between the time at the end of the function and the start function. We also set the root value. We then launch a go routine in the ErrGroup by using fs.group.Go(func) — this function must have the signature func() error, so we use an anonymous function to kick off the filepath.Walk, which starts walking the directory structure, adding paths that match the filter criteria to a buffered channel called fs.paths, more on this later. This channel must be closed on complete so that our worker go routines complete, more on that later.

Next we create a worker function using our worker method and walk function. The workers read paths off the fs.paths channel, and apply the walkFn to each path individually. Note that we use a pool-like structure here, limiting the number of workers to 5000 — this is so we don’t get a “too many files open” error when we exhaust the number of file descriptors since Go has unlimited go routines. The worker definitions is here:

func (fs *FSWalker) worker(walkFn WalkFunc) func() error {
	return func() error {
		// Apply the function all paths in the channel
		for path := range fs.paths {
			// avoid race condition
			p := path

			// apply the walk function to the path and return errors
			r, err := walkFn(p)
			if err != nil {
				return err
			}

			// store the result and check the context
			if r != "" {

				select {
				case fs.results <- r:
				case <-fs.ctx.Done():
					return fs.ctx.Err()
				}
			}

		}
		return nil
	}
}

As you can see, the worker function just creates a closure with the signature of our ErrGroup function, so that we can pass it to the wait group. All the worker function does is range over the paths channel, applying the path to the walkFn.

Finally, we kick off another go routine that waits until all the workers have stopped, and when it does, we close our results channel. We do this so that we can start gathering results, immediately; we don’t have to wait. We can do this by simply ranging over the results channel and adding the number of results. A final wait at the end means that we can wait for all go routines to complete.

Lastly the filter function. We want to ignore files and directories that are hidden, e.g. start with a “.” or a “~” on Unix systems. We also want to be able to pass a glob like matcher, e.g. "*.txt" to only match text files. The filter function is here:

// Internal filter paths function that is passed to filepath.Walk
func (fs *FSWalker) filterPaths(path string, info os.FileInfo, err error) error {
	// Propagate any errors
	if err != nil {
		return err
	}

	// Check to ensure that no mode bits are set
	if !info.Mode().IsRegular() {
		return nil
	}

	// Get the name of the file without the complete path
	name := info.Name()

	// Skip hidden files or directories if required.
	if fs.SkipHidden {
		if strings.HasPrefix(name, ".") || strings.HasPrefix(name, "~") {
			return nil
		}
	}

	// Skip directories if required
	if fs.SkipDirs {
		if info.IsDir() {
			return nil
		}
	}

	// Check to see if the pattern matches the file
	match, err := filepath.Match(fs.Match, name)
	if err != nil {
		return err
	} else if !match {
		return nil
	}

	// Increment the total number of paths we've seen.
	atomic.AddUint64(&fs.nPaths, 1)

	select {
	case fs.paths <- path:
	case <-fs.ctx.Done():
		return fs.ctx.Err()
	}

	return nil
}

And that’s it, with this simple framework, you can apply an arbitrary walkFn to all paths in a directory, matching a specific criteria. The big win here is to manage all of the go routines using the ErrGroup and a context.Context object.

The following post: Run strikingly fast parallel file searches in Go with sync.ErrGroup by Brian Ketelsen was the primary inspiration for the use of sync.ErrGroup.

Buffered Write Performance

This is just a quick note on the performance of writing to a file on disk using Go, and reveals a question about a common programming paradigm that I am now suspicious of. I discovered that when I wrapped the open file object with a bufio.Writer that the performance of my writes to disk significantly increased. Ok, so this isn’t about simple file writing to disk, this is about a complex writer that does some seeking in the file writing to different positions and maintains the overall state of what’s on disk in memory, however the question remains:

Why do we buffer our writes to a file?

A couple of answers come to mind: safety, the buffer ensures that writes to the underlying writer are not flushed when an error occurs; helpers, there may be some methods in the buffer struct not available to a native writer; concurrency, the buffer can be appended to concurrently with another part of the buffer being flushed.

However, we determined that in performance critical applications (file systems, databases) the buffer abstraction adds an unacceptable performance overhead. Here are the results.

Results

First, we’re not doing a simple write - we’re appending to a write-ahead log that has fixed length metadata at the top of the file. This means that a single operation to append data to the log consists of the following steps:

  1. Marshal data to bytes
  2. Write data to end of the log file (possibly sync the file)
  3. Seek to the top of the file
  4. Marshall and write fixed length meta data header
  5. Seek to the bottom of the file
  6. Sync the file to disk

So there is a bit more work here than simply throwing data at disk. We can see in the following graph that the performance of the machine (CPU, Memory, and Disk) plays a huge role in determining the performance of these operations in terms of the number of these writes the machine is able to do per second:

Write Throughput (ops/sec)

In the above graph, Hyperion and Lagoon are Dell Optiplex servers and Antigua, Curacao, and Nevis are Intel NUCs. They all have different processors and SSDs, but all have 16GB memory. For throughput, bigger is better (you can do more operations per second). As you can see on all of the servers, there is about a 1.6x increase in throughput using unbuffered writes to the file over buffered writes to the file.

We can inspect the distribution of the latency of each individual operation as follows (with latency, smaller is better — you’re doing operations faster):

Operation Latency Distribution

The boxplot shows the distribution of latency such that the box is between the 25th and 75th percentile (with a bisecting line at the median) - the lines are from the 5th to the 95th percentile, and anything outside the lines are considered outliers and are visualized as diamonds.

We can see the shift not just in the mean, but also the median; a 1.6 increase in speed (decrease in latency) from buffered to unbuffered writes. More importantly, we can see that unbuffered writes are more consistent; e.g. they have a tighter distribution and less variable operational latency. I suspect this means that while both types of writes are bound by disk accesses from other processes, buffered writes are also bound by CPU whereas unbuffered writes are less so.

Method

The idea here is that we are going to open a file and append data to it, tracking what we’re doing with a fixed length metadata header at the beginning of the file. Creating a struct to wrap the file and open, sync, and close it is pretty straight forward:

type Log struct {
    path string
    file *os.File
}

func (l *Log) Open(path string) (err error) {
    l.path = path
    l.file, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0644)
    if err != nil {
        return err
    }
}

func (l *Log) Close() error {
    err := l.file.Close()
    l.file = nil
    return err
}

func (l *Log) Sync() error {
    return l.file.Sync()
}

Now let’s say that we have an entry that knows how to write itself to an io.Writer interface as follows:

type Entry struct {
    Version uint64    `json:"version"`
    Key     string    `json:"key"`
    Value   []byte    `json:"value"`
    Created time.Time `json:"created"`
}

func (e *Entry) Dump(w io.Writer) (int64, error) {
    // Encode entry as JSON data (base64 enocded bytes value)
    data, err := json.Marshal(e)
    if err != nil {
        return -1, err
    }

    // Add a newline to the data for json lines format
    data = append(data, byte('\n'))

    // Write the data to the writer and return.
    return w.Write(data)
}

So the question is, if we have a list of entries we want to append to the log, how do we pass the io.Writer to the Entry.Dump method in order to write them one at a time?

The first method is the standard method, buffered, using bufio.Writer:

func (l *Log) Append(entries ...*Entry) (size int64, err error) {
    // Crate the buffer and define the bytes
    bytes := 0
    buffer := bufio.NewWriter(l.file)

    // Write each entry keeping track of the amount of data written
    for _, entry := range entries {
        if bytes, err =  entry.Write(buffer); err != nil {
            return -1, err
        } else {
            size += bytes
        }

    }

    // Flush the buffer
    if err = buffer.Flush(); err != nil {
        return -1, err
    }

    // Sync the underlying file
    if err = l.Sync(); err != nil {
        return -1, err
    }

    return size, nil
}

As you can see, even though we’re getting a buffered write to disk, we’re not actually leveraging any of the benefits of the buffered write. By eliminating the middleman with an unbuffered approach:

func (l *Log) Append(entries ...*Entry) (size int64, err error) {
    // Write each entry keeping track of the amount of data written
    for _, entry := range entries {
        if bytes, err :=  entry.Write(buffer); err != nil {
            return -1, err
        } else {
            size += bytes
        }

    }

    // Sync the underlying file
    if err = l.Sync(); err != nil {
        return -1, err
    }

    return size, nil
}

We get the performance benefit as shown above. Now, I’m not sure if this is obvious or not; but I do know that it’s commonly taught to wrap the file object with the buffer; the unbuffered approach may be simpler and faster but it may also be less safe, it depends on your use case.

Event Dispatcher in Go

The event dispatcher pattern is extremely common in software design, particularly in languages like JavaScript that are primarily used for user interface work. The dispatcher is an object (usually a mixin to other objects) that can register callback functions for particular events. Then when a dispatch method is called with an event, the dispatcher calls each callback function in order of their registration and passes them a copy of the event. In fact, I’ve already written a version of this pattern in Python: Implementing Observers with Events In this snippet, I’m presenting a version in Go that has been incredibly stable and useful in my code.

There are three types in the snippet below:

  • EventType is a uint16 that represents the type of event that occurs, several constants in the code declare event types along with a string method for human readability. Typing constants this way improves performance in the dispatcher environment.
  • Callback defines the signature of a function that can be registered.
  • Dispatcher is the core of the code and wraps a source — that is the actual object that is doing the dispatching.
  • Event is an interface for event types that has a type, source, and value.

For example, consider if you want to watch a directory for new files being created, you could do something like this:


type DirectoryWatcher struct {
    Dispatcher
    path string // path to directory on disk
}

func (w *DirectoryWatcher) Init(path string) {
    w.path = path
    w.Dispatcher.Init(w)
}

// Watch the given directory and dispatch new file events
func (w *DirectoryWatcher) Watch() error {
    for {
        files, _ := ioutil.ReadDir(w.path)
        for _, file := range files {
            if w.Unseen(file) {
                w.Dispatch(NewFile, file)
            }
        }
        time.Sleep(100 * time.Millisecond)
    }
}

This initializes the DirectoryWatcher dispatcher with the source as the watcher (so you can refer to exactly which directory was being watched). Then as the watcher looks at the directory for new data every 100 milliseconds, if it sees any files that were Unseen() then it dispatches the event.

The dispatcher code is as follows:

So this works very well but there are a copule of key points:

  • When dispatching the event, a single error terminates all event handling. It might be better to create a specific error type that terminates event handling (e.g. do not propagate) and then collect all other errors into a slice and return them from the dispatcher.
  • The event can technically be modified by callback functions since it’s a pointer. It might be better to pass by value to guarantee that all callbacks see the original event.
  • Callback handling is in order of registration, which gets to point number one about canceling event propagation. An alternative is to do all the callbacks concurrently using Go routines; which is something I want to investigate further.

Lazy Pirate Client

In the last post I discussed a simple REQ/REP pattern for ZMQ. However, by itself REQ/REP is pretty fragile. First, every REQ requires a REP and a server can only handle one request at a time. Moreover, if the server fails in the middle of a reply, then everything is hung. We need more reliable REQ/REP, which is actually the subject of an entire chapter in the ZMQ book.

For my purposes, I want to ensure that repliers (servers) can fail without taking out the client. The server can simply sock.Send(zmq.DONTWAIT) to deal with clients that dropout before the communication is complete. Server failure is a bit more difficult to deal with, however. Client side reliability is based on timeouts and retries, dealing with failed messages. ZMQ calls this the Lazy Pirate Pattern.

This is a pretty big chunk of code, but it creates a Client object that wraps a socket and performs lazy pirate sends. The primary code is in the Reset() and Send() methods. The Reset() method sets the linger to zero in order to close the connection immediately without errors; it then closes the connection and reconnects thereby resetting the state to be able to send messages again. This is “brute force but effective and reliable”.

The Send() method fires off a message then uses a zmq.Poller with a timeout to keep checking if a message has been received in that time limit. If it was successful, then great! Otherwise we decrement our retries and try again. If we’re out of retries there is nothing to do but return an error. The code is here:

This code is fairly lengthy, but as it turns out, most of the content for both clients and servers on either side of REQ/REP have similar wrapper code for context, socket, and connection/bind wrapping. So far it’s been very reliable in my code to allow servers to drop out and fail without blocking clients or other nodes in the network.

Simple ZMQ Message Passing

There are many ways to create RPCs and send messages between nodes in a distributed system. Typically when we think about messaging, we think about a transport layer (TCP, IP) and a protocol layer (HTTP) along with some message serialization. Perhaps best known are RESTful APIs which allow us to GET, POST, PUT, and DELETE JSON data to a server. Other methods include gRPC which uses HTTP and protocol buffers for interprocess communication.

ZMQ is a bit different. It provides an abstraction for sockets that look like embedded networking but can actually be used for in- and inter-process channels, multicast, TCP, and more. ZMQ has many patterns, starting on simple REQ/REP (request/reply) where a client connects to a socket that a server is bound on; the client sends a REQ and waits for a response, the REP from the server.

The interesting thing about this (pretty standard) network communication is that the server doesn’t have to be up for the client to connect, it will just wait until the server is available. Moreover, there is no need for multiplexing because ZMQ buffers messages under the hood. The pattern is incredibly failure resistant. ZMQ is not HTTP, ZMQ is something different with its own protocol, and even though its a lower level networking abstraction, it can be used for very powerful distributed systems design.

This is just a snippet with a bare bones REQ/REP message server and client that passes strings back and forth.

To use this code, download the gist and run the server and client in two different terminal windows with go run. To run the server:

$ go run zmqmsg.go serve

And to send messages:

$ go run zmqmsg.go send "first message" "second message" "third message"

You should see messages received at the server and replies sent back to the client. Of course this is pretty much the hello world of the ZMQ REQ/REP model and there are many other networking patterns and sockets provided by ZMQ to check out. In particular, there is a PUB/SUB pattern where clients can connect to a publisher to receive updates pushed to them. More to come!

PID File Management

In this discussion, I want to propose some code to perform PID file management in a Go program. When a program is backgrounded or daemonized we need some way to communicate with it in order to stop it. All active processes are assigned a unique process id by the operating system and that ID can be used to send signals to the program. Therefore a PID file:

The pid files contains the process id (a number) of a given program. For example, Apache HTTPD may write it’s main process number to a pid file - which is a regular text file, nothing more than that - and later use the information there contained to stop itself. You can also use that information (just do a cat filename.pid) to kill the process yourself, using echo filename.pid | xargs kill.

Rafael Steil

From a Go program we can use the PID to get access to the program and send a signal, such as SIGTERM - terminate the program!

import (
	"os"
	"syscall"

	"github.com/bbengfort/x/pid"
	"github.com/urfave/cli"
)

// Send a kill signal to the process defined by the PID
func stop(c *cli.Context) error {
	pid := pid.New()
	if err := pid.Load(); err != nil {
		return cli.NewExitError(err.Error(), 1)
	}

	// Get the process from the os
	proc, err := os.FindProcess(pid.PID)
	if err != nil {
		return cli.NewExitError(err.Error(), 1)
	}

	// Kill the process
	if err := proc.Signal(syscall.SIGTERM); err != nil {
		return cli.NewExitError(err.Error(), 1)
	}

	return nil
}

Using the PID file within a program requires a bit of forethought. Where do you store the PID file? Do you only allow one running instance of the program? If so, the program needs to throw an error if it starts up and a PID file exists, if not, how do you name multiple PID files? When exiting, how do you make sure that the PID file is deleted?

Some of these questions are addressed by my initial implementation of the PID file in the github.com/bbengfort/x/pid package. The stub of that implementation is as follows:

This implementation stores both the PID and the parent PID (if the process forks) in the PID file in JSON format. JSON is not necessarily required, but it does make the format a bit simpler to understand and also allows the addition of other process information.

So why talk about PIDs? Well I’m writing some programs that need to be run in the background and always started up. I’m investigating systemd for Ubuntu and launchtl for OS X in order to manage the processes. But more on that in a future post.