# Streaming Remote Throughput

In order to improve the performance of asynchronous message passing in Alia, I’m using gRPC bidirectional streaming to create the peer to peer connections. When the replica is initialized it creates a remote connection to each of its peers that lives in its own go routine; any other thread can send messages by passing them to that go routine through a channel, replies are then dispatched via another channel, directed to the thread via an actor dispatching model.

This post is about the performance of the remote sending go routine, particularly with respect to how many threads that routine is. Here is some basic stub code for the messenger go routine that listens for incoming messages on a buffered channel, and sends them to the remote via the stream:

func (r *Remote) messenger() {
// Attempt to establish a connection to the remote peer
var err error
if err = r.connect(); err != nil {
out.Warn(err.Error())
}

// Send all messages in the order they arrive on the channel
for msg := range r.messages {
// If we're not online try to re-establish the connection
if !r.online {
if err = r.connect(); err != nil {
out.Warn(
"dropped %s message to %s (%s): could not connect",
msg.Type, r.Name, r.Endpoint()
)

// close the connection and go to the next message
r.close()
continue
}
}

// Send the message on the remote stream
if err = r.stream.Send(msg); err != nil {
out.Warn(
"dropped %s message to %s (%s): could not send: %s",
msg.Type, r.Name, r.Endpoint(), err.Error()
)

// go offline if there was an error sending a message
r.close()
continue
}

}
}


The question is, how do we receive the reply from the remote?

In sync mode, we can simply receive the reply before we send the next message. This has the benefit of ensuring that there is no further synchronization required on connect and close, however as shown in the graph below, it does not perform well at all.

In async mode, we can launch another go routine to handle all the incoming requests and dispatch them:

func (r *Remote) listener() {
for {
if r.online {
var (
err error
)

if rep, err = r.stream.Recv(); err != nil {
out.Warn(
"no response from %s (%s): %s",
r.Name, r.Endpoint(), err
)
return
}

r.Dispatcher.Dispatch(events.New(rep.EventType(), r, rep))
}
}
}


This does much better in terms of performance, however there is a race condition on the access to r.online before the access to r.stream which may be made nil by messenger routine closing.

To test this, I ran a benchmark, sending 5000 messages each in their own go routine and waiting until all responses were dispatched before computing the throughput. The iorder mode is to prove that even when in async if the messages are sent one at a time (e.g. not in a go routine) the order is preserved.

At first, I thought the size of the message buffer might be causing the bottleneck (hence the x-axis). The buffer prevents back-pressure from the message sender, and it does appear to have some influence on sync and async mode (but less of an impact in iorder mode). From these numbers, however, it’s clear that we need to run the listener in its own routine.

Notes:

• With sender and receiver go routines, the message order is preserved
• There is a race condition between sender and receiver
• Buffer size only has a small impact

# Future Date Script

This is kind of a dumb post, but it’s something I’m sure I’ll look up in the future. I have a lot of emails where I have to send a date that’s sometime in the future, e.g. six weeks from the end of a class to specify a deadline … I’ve just been opening a Python terminal and importing datetime and timedelta but I figured this quick script on the command line would make my life a bit easier:

And that’s all there is to it, not very interesting, but something I will probably have in my bin for the rest of my life.

# Aggregating Reads from a Go Channel

Here’s the scenario: we have a buffered channel that’s being read by a single Go routine and is written to by multiple go routines. For simplicity, we’ll say that the channel accepts events and that the other routines generate events of specific types, A, B, and C. If there are more of one type of event generator (or some producers are faster than others) we may end up in the situation where there are a series of the same events on the buffered channel. What we would like to do is read all of the same type of event that is on the buffered channel at once, handling them all simultaneously; e.g. aggregating the read of our events.

An initial solution is composed of two loops; the first loop has a select that performs a blocking read of either the msgs or a done channel to determine when to exit the go routine. If a msg is received a second loop labeled grouper is initiated with a non blocking read of the msgs channel. The loop keeps track of a current and next value. If next and current are the same, it continues reading off the channel, until they are different or there is nothing to read at which point it handles both next and current.

func consumeAggregate(msgs <-chan string, done <-chan bool) {
var current, next string
var count int

for {
// Outer select does a blocking read on both channels
select {
case current = <-msgs:
// count our current event
ecount = 1
grouper:
// continue reading events off the msgs channel
for {
select {
case next = <-msgs:
if next != current {
// exit grouper loop and handle next and current
break grouper
} else {
// keep track of the number of similar events
count++
}
default:
// nothing is on the channel, break grouper and
// only handle current by setting next to empty
next = ""
break grouper
}
}
case <-done:
// done consuming exit go routine
return
}

// This section happens after select is complete
// handle the current messages with the aggregate count
handle(current, count)

// handle next if one exists
if next != "" {
handle(next, 1)
}
}
}


This solution does have one obvious problem; the next value is not aggregated with similar values that happen after. E.g. in the event stream aaaabbb, the calls to handle will be (a, 4), (b, 1), (b, 2). The good news though is that testing with the race and deadlock detector show that this method is correct. Possible improvements for a future post include:

• Aggregate the next value
• Read val, ok from the channel to detect if it’s closed to exit
• convert the outer loop to a range to complete when the channel is closed

Here is the Aggregating Channel Gist that contains the complete code and tests.

# The Actor Model

Building correct concurrent programs in a distributed system with multiple threads and processes can quickly become very complex to reason about. For performance, we want each thread in a single process to operate as independently as possible; however anytime the shared state of the system is modified synchronization is required. Primitives like mutexes can ensure structs are thread-safe, however in Go, the strong preference for synchronization is communication. In either case Go programs can quickly become locks upon locks or morasses of channels, incurring performance penalties at each synchronization point.

The Actor Model is a solution for reasoning about concurrency in distributed systems that helps eliminate unnecessary synchronization. In the actor model, we consider our system to be composed of actors, computational primitives that have a private state, can send and receive messages, and perform computations based on those messages. The key is that a system is composed of many actors and actors do not share memory, they have to communicate with messaging. Although Go does not provide first class actor primitives like languages such as Akka or Erlang, this does fit in well with the CSP principle.

In the next few posts, I’ll explore implementing the Actor model in Go for a simple distributed system that allows clients to make requests and periodically synchronizes its state to its peers. The model is shown below:

## Actors

An actor is a process or a thread that has the ability to send and receive messages. When an actor receives a message it can do one of three things:

1. Create new actors
2. Send messages to known actors
3. Can designate how you handle the next message

At first glance we may think that actors are only created at the beginning of a program, e.g. the “main” actor or the instantiation of a program-long ticker actor that sends periodic messages and can receive start and stop messages. However, anytime a go programmer executes a new go routine, there is the possibility of a new actor being created. In our example, we’ll explore how a server creates temporary actors to handle single requests from clients.

Sending messages to known actors allows an actor to synchronize or share state with other go routines in the same process, other processes on the same machine, or even processes on other machines. As a result, actors are a natural framework for creating distributed systems. In our example we’ll send messages both with channels as well as using gRPC for network communications.

The most important thing to understand about actor communication is that although actors run concurrently, they will only process messages sequentially in the order which they are received. Actors send messages asynchronously (e.g. an actor isn’t blocked while waiting for another actor to receive the message). This means that messages need to be stored while the actor is processing other messages; this storage is usually called a “mailbox”. We’ll implement mailboxes with buffered channels in this post.

Deciding how to handle the next message is a general way for saying that actors “do something” with messages, usually by modifying their state, and that it is something “interesting enough” that it impacts how the next message is handled. This implies a couple of things:

• Actors have an internal state and memory
• Actors mutate their state based on messages
• How an actor responds depends on the order of messages received
• Actors can shutdown or stop

For the rest of the posts, we’ll consider a simple service that hands out monotonically increasing, unique identities to clients called Ipseity. If the actor receives a next() message, it increments it’s local counter (mutating it’s internal state) ensuring that the next message always returns a monotonically increasing number. If it receives an update(id) message, it updates it’s internal state to specified id if it is larger than its internal id, allowing it to synchronize with remote peers (in an eventually consistent fashion).

### Event Model

In order to reduce confusion between network messages and actor messages, I prefer to use the term “event” when referring to messages sent between actors. This also allows us to reason about actors as implementing an event loop, another common distributed systems design paradigm. It is important to note that “actors are a specialized, opinionated implementation of an event driven architecture”, which means the actor model is a subset of event architectures, such as the dispatcher model described earlier in this journal.

I realize this does cause a bit of cognitive overhead, but this pays off when complex systems with many event types can be traced, showing a serial order of events handled by an actor. So for now, we’ll consider an event a message that can be “dispatched” (sent) to other actors, and “handled” (received) by an actor, one at a time.

Events are described by their type, which determines what data the event contains and how it should be handled by the actor. In Go, event types can be implemented as an enumeration by extending the uint16 type as follows:

// Event types represented in Ipseity
const (
UnknownEvent EventType = iota
IdentityRequest
SyncTimeout
SyncRequest
)

// String names of event types
var eventTypeStrings = [...]string{
}

// EventType is an enumeration of the kind of events that actors handle
type EventType uint16

// String returns the human readable name of the event type
func (t EventType) String() string {
if int(t) < len(eventTypeStrings) {
return eventTypeStrings[t]
}
return eventTypeStrings[0]
}


Events themselves are usually represented by an interface to allow for multiple event types with specialized functionality to be created in code. For simplicity here, however, I’ll simply define a single event struct and we’ll use type casting later in the code:

type Event struct {
Type EventType
Source interface{}
Value interface{}
}


The Source of the event is the actor that is dispatching the event, and we’ll primarily use this to store channels so that we can send messages (events) back to the actor. The Value of the event is any associated data that needs to be used by the actor processing the event.

### Actor Interface

There are a lot of different types of actors including:

• Actors that run for the duration of the program
• Actors that generate events but do not receive them
• Actors that exist ephemerally to handle one or few events

As a result it is difficult to describe an interface that handles all of these types generically. Instead we’ll focus on the central actor of our application (called the “Local Actor” in the diagram above), which fulfills the first role (runs the duration of the program) and most completely describes the actor design.

type Actor interface {
Listen(addr string) error // Run the actor to listen for messages
Dispatch(Event) error     // Outside callers dispatch events to actor
Handle(Event) error       // Handle each event sequentially
}


As noted in the introduction and throughput appendix below, there are a number of ways to implement the actor interface that ensure events received by the Dispatch method are handled one at a time, in sequential order. Here, we’ll use a a buffered channel as a mailbox of a fixed size, so that other actors that are dispatching events to this actor aren’t blocked while the actor is handling other messages.

type ActorServer struct {
pid      int64      // unique identity of the actor
events   chan Event // mailbox to receive event dispatches
sequence int64      // internal state, monotonically increasing identity
}


The Listen method starts the actor, (as well as a gRPC server on the specified addr, which we’ll discuss later) and reads messages off the channel one at a time, executing the Handle method for each message before moving to the next message. Listen runs forever until the events channel is closed, e.g. when the program exits.

func (a *ActorServer) Listen(addr string) error {
// Initialize the events channel able to buffer 1024 messages
a.events = make(chan Event, 1024)

// Read events off of the channel sequentially
for event := range a.events {
if err := a.Handle(event); err != nil {
return err
}
}

return nil
}


The Handle method can create new actors, send messages, and determine how to respond to the next event. Generally it is just a jump table, passing the event to the correct event handling method:

func (a *ActorServer) Handle(e Event) error {
switch e.Type() {
case IdentityRequest:
return a.onIdentityRequest(e)
case SyncTimeout:
return a.onSyncTimeout(e)
case SyncRequest:
return a.onSyncRequest(e)
default:
return fmt.Errorf("no handler identified for event %s", e.Type())
}
}


The Dispatch method allows other actors to send events to the actor, by simply putting the event on the channel. When other go routines call Dispatch they won’t be blocked, waiting for the actor to handle the event because of the buffer … unless the actor has been backed up so the buffer is full.

func (a *ActorServer) Dispatch(e Event) error {
a.events <- e
return nil
}


## Next Steps

In the next post (or two) we’ll hook up a gRPC server to the actor so that it can serve identity requests to clients as well as send and respond to synchronization requests for remote actors. We’ll also create a second go routine next to the actor process that issues synchronization timeouts on a periodic interval. Together, the complete system will be able to issue monotonically increasing identities in an eventually consistent fashion.

## Other Resources

For any discussion of Actors, it seems obligatory to include this very entertaining video of Carl Hewitt, the inventor of the actor model, describing them on a white board with Erik Meijer and Clemens Szyperski.

Other blog posts:

## Appendix: Throughput

One of the biggest questions I had was whether or not the actor model introduced any performance issues over a regular mutex by serializing a wrapper event over a channel instead of directly locking the actor state. I tested the throughput for the following types of ipseity servers:

• Simple: locks the whole server to increment the sequence and create the response to the client.
• Sequence: creates a sequence struct that is locked when incremented, but not when creating the response to the client.
• Actor: Uses the buffered channel actor model as described in this post.
• Locker: Implements the actor interface but instead of a buffered channel uses a mutex to serialize events.

As you can see from the above benchmark, it does not appear that the actor model described in these posts adds overhead that penalizes performance.

The code for both the benchmark and the implementations of the servers above can be found at: https://github.com/bbengfort/ipseity/tree/multiactor

# Syntax Parsing with CoreNLP and NLTK

Syntactic parsing is a technique by which segmented, tokenized, and part-of-speech tagged text is assigned a structure that reveals the relationships between tokens governed by syntax rules, e.g. by grammars. Consider the sentence:

The factory employs 12.8 percent of Bradford County.

A syntax parse produces a tree that might help us understand that the subject of the sentence is “the factory”, the predicate is “employs”, and the target is “12.8 percent”, which in turn is modified by “Bradford County”. Syntax parses are often a first step toward deep information extraction or semantic understanding of text. Note however, that syntax parsing methods suffer from structural ambiguity, that is the possibility that there exists more than one correct parse for a given sentence. Attempting to select the most likely parse for a sentence is incredibly difficult.

The best general syntax parser that exists for English, Arabic, Chinese, French, German, and Spanish is currently the blackbox parser found in Stanford’s CoreNLP library. This parser is a Java library, however, and requires Java 1.8 to be installed. Luckily it also comes with a server that can be run and accessed from Python using NLTK 3.2.3 or later. Once you have downloaded the JAR files from the CoreNLP download page and installed Java 1.8 as well as pip installed nltk, you can run the server as follows:

from nltk.parse.corenlp import CoreNLPServer

# The server needs to know the location of the following files:
#   - stanford-corenlp-X.X.X.jar
#   - stanford-corenlp-X.X.X-models.jar
STANFORD = os.path.join("models", "stanford-corenlp-full-2018-02-27")

# Create the server
server = CoreNLPServer(
os.path.join(STANFORD, "stanford-corenlp-3.9.1.jar"),
os.path.join(STANFORD, "stanford-corenlp-3.9.1-models.jar"),
)

# Start the server in the background
server.start()


The server needs to know the location of the JAR files you downloaded, either by adding them to your Java \$CLASSPATH or like me, storing them in a models directory that you can access from your project. When you start the server, it runs in the background, ready for parsing.

To get constituency parses from the server, instantiate a CoreNLPParser and parse raw text as follows:

from  nltk.parse.corenlpnltk.pa  import CoreNLPParser

parser = CoreNLPParser()
parse = next(parser.raw_parse("I put the book in the box on the table."))


If you’re in a Jupyter notebook, the tree will be drawn as above. Note that the CoreNLPParser can take a URL to the CoreNLP server, so if you’re deploying this in production, you can run the server in a docker container, etc. and access it for multiple parses. The raw_parse method expects a single sentence as a string; you can also use the parse method to pass in tokenized and tagged text using other NLTK methods. Parses are also handy for identifying questions:

next(parser.raw_parse("What is the longest river in the world?"))


Note the SBARQ representing the question; this data can be used to create a classifier that can detect what type of question is being asked, which can then in turn be used to transform the question into a database query!

I should also point out why we’re using next(); the parser actually returns a generator of parses, starting with the most likely. By using next, we’re selecting only the first, most likely parse.

Constituency parses are deep and contain a lot of information, but often dependency parses are more useful for text analytics and information extraction. To get a Stanford dependency parse with Python:

from nltk.parse.corenlp import CoreNLPDependencyParser

parser = CoreNLPDependencyParser()
parse = next(parser.raw_parse("I put the book in the box on the table."))


Once you’re done parsing, don’t forget to stop the server!

# Stop the CoreNLP server
server.stop()


To ensure that the server is stopped even when an exception occurs, you can also use the CoreNLPServer context manager as follows:

jars = (
"stanford-corenlp-3.9.1.jar",
"stanford-corenlp-3.9.1-models.jar"
)

with CoreNLPServer(*jars):
parser = CoreNLPParser()

text = "The runner scored from second on a base hit"
parse = next(parser.parse_text(text))
parse.draw()


Note that the parse_text function in the above code allows a string to be passed that might contain multiple sentences and returns a parse for each sentence it segments. Additionally the tokenize and tag methods can be used on the parser to get the Stanford part of speech tags from the text.

Unfortunately there isn’t much documentation on this, but for more check out the NLTK CoreNLP API documentation.

# Continuing Outer Loops with for/else

When you have an outer and an inner loop, how do you continue the outer loop from a condition inside the inner loop? Consider the following code:

for i in range(10):
for j in range(9):
if i <= j:
# break out of inner loop
# continue outer loop
print(i,j)

# don't print unless inner loop completes,
# e.g. outer loop is not continued
print("inner complete!")


Here, we want to print for all i[0,10) all numbers j[0,9) that are less than or equal to i and we want to print complete once we’ve found an entire list of j that meets the criteria. While this seems like a fairly contrived example, I’ve actually encountered this exact situation in several places in code this week, and I’ll provide a real example in a bit.

My first instinct simply uses a function to use return to do a “hard break” out of the loop. This allows us to short-circuit functionality by exiting the function, but doesn’t actually provide continue functionality, which is the goal in the above example. The technique does work, however, and in multi-loop situations is probably the best bet.

def inner(i):
for j in range(9):
if i <= j:
# Note if this was break, the print statement would execute
return
print(i,j)

print("inner complete")

for i in range(10):
inner(i)


Much neater, however is using for/else. The else block fires iff the for loop it is connected with completes. This was very weird to me at first, I thought else should trigger if break. Think of it this way though:

You’re searching through a list of things, for item in collection and you plan to break when you’ve found the item you’re looking for, else you do something if you exhaust the collection and didn’t find what you were looking for.

Therefore we can code our loop as follows:

for i in range(10):
for j in range(9):
if i <= j:
break
print(i,j)
else:
# Outer loop is continued
continue

print("inner complete!")


This is a little strange, because it is probably more appropriate to put our print in the else block, but this was the spec, continue the outer loop if the inner loop gets broken.

Here’s a better example with date parsing:

# Try to parse a timestamp with a bunch of formats
for fmt in (JSON, PG, ISO, RFC, HUMAN):
try:
ts = datetime.strptime(ts, fmt)
break
except ValueError:
continue
else:
# Could not parse with any of the formats required
raise ValueError("could not parse timestamp")


Is this better or worse than the function version of this?

def parse_timestamp(ts):
for fmt in (JSON, PG, ISO, RFC, HUMAN):
try:
return datetime.strptime(ts, fmt)
except ValueError:
continue

raise ValueError("could not parse timestamp")

ts = parse_timestamp(ts)


Let’s go to the benchmarks:

So basically, there is no meaningful difference, but depending on the context of implementation, using for/else may be a bit more meaningful or easy to test than having to implement another function.

Benchmark code can be found here.

# Predicted Class Balance

This is a follow on to the prediction distribution visualization presented in the last post. This visualization shows a bar chart with the number of predicted and number of actual values for each class, e.g. a class balance chart with predicted balance as well.

This visualization actually came before the prior visualization, but I was more excited about that one because it showed where error was occurring similar to a classification report or confusion matrix. I’ve recently been using this chart for initial spot checking more however, since it gives me a general feel for how balanced both the class and the classifier is with respect to each other. It has also helped diagnose what is being displayed in the heat map chart of the other post.

The code follows, again prototype code. However in this code I made an effort to use more scikit-learn tooling in the visualization, including their validation and checking code. Hopefully this will help us eliminate problems with various types of input.

This code also shows a cross-validation strategy for getting y_true and y_pred from a classifier. I think this type of code will become a cornerstone in Yellowbrick, so please let us know in the YB issues if you see anything fishy with this methodology!

# Class Balance Prediction Distribution

In this quick snippet I present an alternative to the confusion matrix or classification report visualizations in order to judge the efficacy of multi-class classifiers:

The base of the visualization is a class balance chart, the x-axis is the actual (or true class) and the height of the bar chart is the number of instances that match that class in the dataset. The difference here is that each bar is a stacked chart representing the percentage of the predicted class given the actual value. If the predicted color matches the actual color then the classifier was correct, otherwise it was wrong.

The code to do this follows. This is simple prototype code that we’ll be including in Yellowbrick soon and may not work in all cases; nor does it include features for doing cross-validation and putting together the two vectors required for visualization.

Other interesting things that can be done with this: make the x axis the predicted class instead of the actual class, if classes are ordinal use a heatmap to show predictions over or under the specified class, or find a better way to show “correct” values with out discrete color values. More investigation on this and an implementation in Yellowbrick soon!

Update: Thanks @lwgray for putting together the pull request for this!

# Synchronization in Write Throughput

This post serves as a reminder of how to perform benchmarks when accounting for synchronized writing in Go. The normal benchmarking process involves running a command a large number of times and determining the average amount of time that operation took. When threads come into play, we consider throughput - that is the number of operations that can be conducted per second. However, in order to successfully measure this without duplicating time, the throughput must be measured from the server’s perspective.

Let w be the amount of time a single operation takes and n be the number of operations per thread. Given t threads, the cost for each operation from the perspective of the client thread will be t*w because the server is synchronizing writes as shown in the figure above, e.g. the thread has to wait for t-1 other writes to complete before conducting it’s write. This means that each thread returns a latency of n*t*w from it’s perspective. If this is aggregated, the total time is computed n*w*t^2, even though the real time that has passed is actually n*t*w as shown in the single threaded case.

For normal server-client throughput we measure the start time of the first access on the server end and the end time of the last access and the duration as the difference between these two timestamps. We then compute the number of operations conducted in that time to measure throughput. This only works if the server is pegged, e.g. it is not waiting for requests.

However, for synchronization we can’t measure at the server since we’re trying to determine the cost of locks and scheduling. We’ve used an external benchmarking procedure that may underestimate throughput but allows us to measure from the client-side rather than at the server. I’ve put up a simple library called syncwrite to benchmark this code.

Here is the use case: consider a system that has multiple goroutines, each of which want to append to a log file on disk. The append-only log determines the sequence or ordering of events in the system, so appends must be atomic and reads to an index in the log, idempotent. The interface of the Log is as follows:

type Log interface {
Open(path string) error
Append(value []byte) error
Get(index int) (*Entry, error)
Close() error
}


The log embeds sync.RWMutex to ensure no race conditions occur and that our read/write invariants are met. The Open, Append, and Close methods are all protected by a write lock, and the Get method is protected with a read lock. The structs that implement Log all deal with the disk and in-memory storage in different ways:

• InMemoryLog: appends to an in-memory slice and does not write to disk.
• FileLog: on open, reads entries from file into in-memory slice and reads from it, writes append to both the slice and the file.
• LevelDBLog: both writes and reads go to a LevelDB database.

In the future (probably in the next post), I will also implement an AsyncLog that wraps a Log and causes write operations to be asynchronous by storing them on a channel and allowing the goroutine to immediately return.

## Benchmarks

The benchmarks are associated with an action, which can be one or more operations to disk. In this benchmark we simply evaluate an action that calls the Write method of a log with "foo" as the value. Per-action benchmarks are computed using go-bench, which computes the average time it takes to run the action once:

BenchmarkInMemoryLog-8   	10000000	       210 ns/op
BenchmarkFileLog-8       	  200000	      7456 ns/op
BenchmarkLevelDBLog-8    	  100000	     12379 ns/op


Writing to the in-memory log is by far the fastest, while writing to the LevelDB log is by far the slowest operation. We expect throughput, that is the number of operations per second, to be equivalent with these per-action benchmarks in a single thread. The theoretical throughput is simply 1/w*1e-9 (converting nanoseconds to seconds). The question is how throughput changes with more threads and in a real workload.

Throughput benchmarks are conducted by running t threads, each of which run n actions and returns the amount of time it takes all threads to run n*t actions. As the t increases, the workload stays static, e.g. n becomes smaller to keep n*t constant. The throughput is the number of operations divided by the duration in seconds.

Note that the y-axis is on a logarithm scale, and because of the magnitude of in-memory writes, the chart is a bit difficult to read. Therefore the next chart shows the percentage of the theoretical throughput (as computed by w) the real system achieves:

## Observations

Looking at the percent theoretical chart, I believe the reason that both In-Memory and LevelDB achieve >100% for up to 4 threads is because the benchmark that computes w has such a high variability; though this does not explain why the File log has dramatically lower throughput.

Because the benchmarks were run on a 4 core machine, up to 4 threads for In-Memory and LevelDB can operate without a noticeable decrease in throughput since there is no scheduling issue. However, at 8 threads and above, there is a noticeable drop in the percent of theoretical throughput, probably due to synchronization or scheduling issues. This same drop does not occur in the File log because it was already below it’s theoretical maximum throughput.

The way that the Set implementation works is that it defines a base data structure that is private, set, as well as an interface (set.Interface) that describes the methods a set is expected to have. The set methods are all private, then two data structures are composed that embed the setSet and SetNonTS — the thread and non-thread safe versions of set. In this snippet I’ll just show a bit of boiler plate code that does this for reference, see the full set implementation for more detail.
In the implementation above, the set object provides four internal methods: init() creates the internal map data structure, add updates the map with one or more items, remove deletes one or more items from the map, and contains does a simple check to see if the item is in the internal map. All of these methods are private to the set package.
The SetNonTs and Set methods embed the set object and add some additional functionality. Both implement a constructor, NewNonTS and New respectively, which call the internal init functions. Both also implement Add and Remove, which silently exit if no items are added, the difference being that Set write locks the data structure after performing that check. Contains is also implemented, which the Set data structure read locks before checking.
The only small problem with this implementation is that there is a little bit of code duplication (e.g. the checks for non items in the Add and Remove methods). However, I’ve noticed in my code that often there are tasks that are done in either thread-safe or non-thread safe versions but not both (like marking a flag or sending data to a channel). Because of this, it’s often better to keep those methods separate rather then relying solely on embedding.