# Synchronization in Write Throughput

This post serves as a reminder of how to perform benchmarks when accounting for synchronized writing in Go. The normal benchmarking process involves running a command a large number of times and determining the average amount of time that operation took. When threads come into play, we consider throughput - that is the number of operations that can be conducted per second. However, in order to successfully measure this without duplicating time, the throughput must be measured from the server’s perspective.

Let w be the amount of time a single operation takes and n be the number of operations per thread. Given t threads, the cost for each operation from the perspective of the client thread will be t*w because the server is synchronizing writes as shown in the figure above, e.g. the thread has to wait for t-1 other writes to complete before conducting it’s write. This means that each thread returns a latency of n*t*w from it’s perspective. If this is aggregated, the total time is computed n*w*t^2, even though the real time that has passed is actually n*t*w as shown in the single threaded case.

For normal server-client throughput we measure the start time of the first access on the server end and the end time of the last access and the duration as the difference between these two timestamps. We then compute the number of operations conducted in that time to measure throughput. This only works if the server is pegged, e.g. it is not waiting for requests.

However, for synchronization we can’t measure at the server since we’re trying to determine the cost of locks and scheduling. We’ve used an external benchmarking procedure that may underestimate throughput but allows us to measure from the client-side rather than at the server. I’ve put up a simple library called syncwrite to benchmark this code.

Here is the use case: consider a system that has multiple goroutines, each of which want to append to a log file on disk. The append-only log determines the sequence or ordering of events in the system, so appends must be atomic and reads to an index in the log, idempotent. The interface of the Log is as follows:

type Log interface {
Open(path string) error
Append(value []byte) error
Get(index int) (*Entry, error)
Close() error
}


The log embeds sync.RWMutex to ensure no race conditions occur and that our read/write invariants are met. The Open, Append, and Close methods are all protected by a write lock, and the Get method is protected with a read lock. The structs that implement Log all deal with the disk and in-memory storage in different ways:

• InMemoryLog: appends to an in-memory slice and does not write to disk.
• FileLog: on open, reads entries from file into in-memory slice and reads from it, writes append to both the slice and the file.
• LevelDBLog: both writes and reads go to a LevelDB database.

In the future (probably in the next post), I will also implement an AsyncLog that wraps a Log and causes write operations to be asynchronous by storing them on a channel and allowing the goroutine to immediately return.

## Benchmarks

The benchmarks are associated with an action, which can be one or more operations to disk. In this benchmark we simply evaluate an action that calls the Write method of a log with "foo" as the value. Per-action benchmarks are computed using go-bench, which computes the average time it takes to run the action once:

BenchmarkInMemoryLog-8   	10000000	       210 ns/op
BenchmarkFileLog-8       	  200000	      7456 ns/op
BenchmarkLevelDBLog-8    	  100000	     12379 ns/op


Writing to the in-memory log is by far the fastest, while writing to the LevelDB log is by far the slowest operation. We expect throughput, that is the number of operations per second, to be equivalent with these per-action benchmarks in a single thread. The theoretical throughput is simply 1/w*1e-9 (converting nanoseconds to seconds). The question is how throughput changes with more threads and in a real workload.

Throughput benchmarks are conducted by running t threads, each of which run n actions and returns the amount of time it takes all threads to run n*t actions. As the t increases, the workload stays static, e.g. n becomes smaller to keep n*t constant. The throughput is the number of operations divided by the duration in seconds.

Note that the y-axis is on a logarithm scale, and because of the magnitude of in-memory writes, the chart is a bit difficult to read. Therefore the next chart shows the percentage of the theoretical throughput (as computed by w) the real system achieves:

## Observations

Looking at the percent theoretical chart, I believe the reason that both In-Memory and LevelDB achieve >100% for up to 4 threads is because the benchmark that computes w has such a high variability; though this does not explain why the File log has dramatically lower throughput.

Because the benchmarks were run on a 4 core machine, up to 4 threads for In-Memory and LevelDB can operate without a noticeable decrease in throughput since there is no scheduling issue. However, at 8 threads and above, there is a noticeable drop in the percent of theoretical throughput, probably due to synchronization or scheduling issues. This same drop does not occur in the File log because it was already below it’s theoretical maximum throughput.

I came across this now archived project that implements a set data structure in Go and was intrigued by the implementation of both thread-safe and non-thread-safe implementations of the same data structure. Recently I’ve been attempting to get rid of locks in my code in favor of one master data structure that does all of the synchronization, having multiple options for thread safety is useful. Previously I did this by having a lower-case method name (a private method) that was non-thread-safe and an upper-case method name (public) that did implement thread-safety. However, as I’ve started to reorganize my packages this no longer works.

The way that the Set implementation works is that it defines a base data structure that is private, set, as well as an interface (set.Interface) that describes the methods a set is expected to have. The set methods are all private, then two data structures are composed that embed the setSet and SetNonTS — the thread and non-thread safe versions of set. In this snippet I’ll just show a bit of boiler plate code that does this for reference, see the full set implementation for more detail.

In the implementation above, the set object provides four internal methods: init() creates the internal map data structure, add updates the map with one or more items, remove deletes one or more items from the map, and contains does a simple check to see if the item is in the internal map. All of these methods are private to the set package.

The SetNonTs and Set methods embed the set object and add some additional functionality. Both implement a constructor, NewNonTS and New respectively, which call the internal init functions. Both also implement Add and Remove, which silently exit if no items are added, the difference being that Set write locks the data structure after performing that check. Contains is also implemented, which the Set data structure read locks before checking.

The only small problem with this implementation is that there is a little bit of code duplication (e.g. the checks for non items in the Add and Remove methods). However, I’ve noticed in my code that often there are tasks that are done in either thread-safe or non-thread safe versions but not both (like marking a flag or sending data to a channel). Because of this, it’s often better to keep those methods separate rather then relying solely on embedding.

# Git-Style File Editing in CLI

A recent application I was working on required the management of several configuration and list files that needed to be validated. Rather than have the user find and edit these files directly, I wanted to create an editing workflow similar to crontab -e or git commit — the user would call the application, which would redirect to a text editor like vim, then when editing was complete, the application would take over again.

This happened to be a Go app, so the following code is in Go, but it would work with any programming language. The workflow is as follows:

1. Find an editor executable
2. Copy the original to a temporary file
3. Exec the editor on the temporary file
4. Wait for the editor to be done
5. Validate the temporary file
6. Copy the temporary file to the original location

This worked surprisingly well especially for things like YAML files which are structured enough to be validated easily, but human readable enough to edit.

First up, finding an editor executable. I used a three part strategy; first the user could specify the path to an editor in the configuration file (like git), second, the user could set the $EDITOR environment variable, and third, I look for common editors. Here’s the code: var editors = [4]string{"vim", "emacs", "nano"} func findEditor() (string, error) { config, err := LoadConfig() if err != nil { return "", err } if config.Editor != "" { return config.Editor, nil } if editor := os.Getenv("EDITOR"); editor != "" { return editor, nil } for _, name := range editors { path, err := exec.LookPath(name) if err == nil { return path, nil } } return "", errors.New("no editor found") }  The crucial part of this is exec.LookPath which searches the $PATH for editor and returns the full path to exec it. Next up is copying the file:

func copyFile(src, dst string) error {
in, err := os.Open(src)
if err != nil {
return err
}
defer in.Close()

out, err := os.Create(dst)
if err != nil {
return err
}
defer out.Close()

if _, err = io.Copy(out, in); err != nil {
return err
}

return nil
}


Finally the full editor workflow:

func EditFile(path string) error {
// Find the editor to use
editor, err := findEditor()
if err != nil {
return err
}

// Create the temporary directory and ensure we clean up when done.
tmpDir := os.TempDir()
defer os.RemoveAll(tmpDir)

// Get the temporary file location
tmpFile := filepath.Join(tmpDir, filepath.Base(path))

// Copy the original file to the tmpFile
if err = copyFile(path, tmpFile); err != nil {
return err
}

// Create the editor command
cmd := exec.Command(editor, tmpFile)
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

// Start the editor command and wait for it to finish
if err = cmd.Start(); err != nil {
return err
}

if err = cmd.Wait(); err != nil {
return err
}

// Copy the tmp file back to the original file
return copyFile(tmpFile, path)
}


This workflow assumes that the file being edited already exists, but of course you could modify it any number of ways. For example, you could use a template to populate the temporary file (similar to what git does for a commit message), or you could add more validation around input and output.

# Transaction Handling with Psycopg2

Databases are essential to most applications, however most database interaction is often overlooked by Python developers who use higher level libraries like Django or SQLAlchemy. We use and love PostgreSQL with Psycopg2, but I recently realized that I didn’t have a good grasp on how exactly psycopg2 implemented core database concepts: particularly transaction isolation and thread safety.

Here’s what the documentation says regarding transactions:

Transactions are handled by the connection class. By default, the first time a command is sent to the database (using one of the cursors created by the connection), a new transaction is created. The following database commands will be executed in the context of the same transaction – not only the commands issued by the first cursor, but the ones issued by all the cursors created by the same connection. Should any command fail, the transaction will be aborted and no further command will be executed until a call to the rollback() method.

Transactions are therefore connection specific. When you create a connection, you can create multiple cursors, the transaction begins when the first cursor issues an execute – all all commands executed by all cursors after that are part of the same transaction until commit or rollback. After any of these methods are called, the next transaction is started on the next execute call.

This brings up a very important point:

By default even a simple SELECT will start a transaction: in long-running programs, if no further action is taken, the session will remain “idle in transaction”, an undesirable condition for several reasons (locks are held by the session, tables bloat…). For long lived scripts, either make sure to terminate a transaction as soon as possible or use an autocommit connection.

This seems to indicate that when working directly with psycopg2, understanding transactions is essential to writing stable scripts. This post therefore details my notes and techniques for working more effectively with PostgreSQL from Python.

## Database Preliminaries

In order to demonstrate the code in this blog post, we need a database. The classic database example taught to undergraduates is that of a bank account, so we’ll continue with that theme here! Sorry if this part is tedious, feel free to skip ahead. In a file, schema.sql, I defined the following schema as DDL (data definition language):

DROP TABLE IF EXISTS users CASCADE;
CREATE TABLE users (
id SERIAL PRIMARY KEY,
pin SMALLINT NOT NULL
);

DROP TYPE IF EXISTS account_type CASCADE;
CREATE TYPE account_type AS ENUM ('checking', 'savings');

DROP TABLE IF EXISTS accounts CASCADE;
CREATE TABLE accounts (
id SERIAL PRIMARY KEY,
type account_type,
owner_id INTEGER NOT NULL,
balance NUMERIC DEFAULT 0.0,
CONSTRAINT positive_balance CHECK (balance >= 0),
FOREIGN KEY (owner_id) REFERENCES users (id)
);

DROP TYPE IF EXISTS ledger_type CASCADE;
CREATE TYPE ledger_type AS ENUM ('credit', 'debit');

DROP TABLE IF EXISTS ledger;
CREATE TABLE ledger (
id SERIAL PRIMARY KEY,
account_id INTEGER NOT NULL,
date DATE NOT NULL DEFAULT CURRENT_DATE,
type ledger_type NOT NULL,
amount NUMERIC NOT NULL,
FOREIGN KEY (account_id) REFERENCES accounts (id)
);


This creates a simple database with two tables. The owners table contains a PIN code for verification. Owners can have one or more accounts, and accounts have the constraint that the balance can never fall below $0.00. We can also seed the database with some initial data: INSERT INTO users (id, username, pin) VALUES (1, 'alice', 1234), (2, 'bob', 9999); INSERT INTO accounts (type, owner_id, balance) VALUES ('checking', 1, 250.0), ('savings', 1, 5.00), ('checking', 2, 100.0), ('savings', 2, 2342.13);  Moving to Python code we can add some template code to allow us to connect to the database and execute the SQL in our file above: import os import psycopg2 as pg def connect(env="DATABASE_URL"): url = os.getenv(env) if not url: raise ValueError("no database url specified") return pg.connect(url) def createdb(conn, schema="schema.sql"): with open(schema, 'r') as f: sql = f.read() try: with conn.cursor() as curs: curs.execute(sql) conn.commit() except Exception as e: conn.rollback() raise e  The connect function looks for the database connection string in the environment variable $DATABASE_URL. Because database configuration code can contain passwords and network information it is always best to store it in the environment or in a local, secure configuration file that can only be accessed by the process and not checked in with code. The connection string should look something like: postgresql://user@localhost:5432/dbname.

The createdb function reads the SQL from the schema.sql file and executes it against the database. Note this is why we have the DROP TABLE IF EXISTS statements, so we can guarantee we always start with a fresh database when we run this script. This function also gives us our first glance at transactions and database interaction with Python.

Complying with PEP 249 we create a connection to the database, then create a cursor from the connection. Cursors manage the execution of SQL against the database as well as data retrieval. We execute the SQL in our schema file, committing the transaction if no exceptions are raised, and rolling back if it fails. We will explore this more in the next section.

## Transaction Management

A transaction consists of one or more related operations that represent a single unit of work. For example, in the bank account example you might have a deposit transaction that executes queries to look up the account and verify the user, add a record to a list of daily deposits, check if the daily deposit limit has been reached, then modify the account balance. All of these operations represent all of the steps required to perform a deposit.

The goal of a transaction is that when the transaction is complete, the database remains in a single consistent state. Consistency is often defined by invariants or constraints that describe at a higher level how the database should maintain information. From a programming perspective, if those constraints are violated an exception is raised. For example, the database has a positive_balance constraint, if the balance for an account goes below zero an exception is raised. When this constraint is violated the database must remain unchanged and all operations performed by the transaction must be rolled back. If the transaction was successful we can then commit the changes, which guarantee that the database has successfully applied our operation.

So why do we need to manage transactions? Consider the following code:

conn = connect()
curs = conn.cursor()

try:
# Execute a command that will raise a constraint
curs.execute("UPDATE accounts SET balance=%s", (-130.935,))
except Exception as e:
print(e) # Constraint exception

# Execute another command, but because of the previous exception:
curs = conn.cursor()
try:
curs.execute("SELECT id, type FROM accounts WHERE owner_id=%s", (1,))
except pg.InternalError as e:
print(e)


The first curs.execute triggers the constraint exception, which is caught and printed. However, the database is now in an inconsistent state. When you try to execute the second query, a psycopg2.InternalError is raised: "current transaction is aborted, commands ignored until end of transaction block". In order to continue with the application, conn.rollback() needs to be called to end the transaction and start a new one.

NOTE: Using with conn.cursor() as curs: causes the same behavior, the context manager does not automatically clean up the state of the transaction.

This essentially means all transactions can be wrapped in a try block, if they conclude successfully they can be committed, however if they raise an exception, they must be rolled back. A basic decorator that does this is as follows:

from functools import wraps

def transaction(func):
@wraps(func)
def inner(*args, **kwargs):
conn = connect()
try:
func(conn, *args, **kwargs)
conn.commit()
except Exception as e:
conn.rollback()
log.error("{} error: {}".format(func.__name__, e))
finally:
conn.close()
return inner


This decorator wraps the specified function, returning an inner function that injects a new connection as the first argument to the decorated function. If the decorated function raises an exception, the transaction is rolled back and the error is logged.

The decorator method is nice but the connection injection can be a bit weird. An alternative is a context manager that ensures the connection is committed or rolled back in a similar fashion:

from contextlib import contextmanager

@contextmanager
def transaction():
try:
conn = connect()
yield conn
conn.commit()
except Exception as e:
conn.rollback()
log.error("db error: {}".format(e))
finally:
conn.close()


This allows you to write code using with as follows:

with transaction() as conn:
# do transaction


The context manager allows you to easily compose two transactions inside a single function — of course this may be against the point. However, it is no problem to combine both the decorator and the context manager methods into two steps (more on this in isolation levels).

### ATM Application

So let’s talk about two specific transactions for an imaginary database application: deposit and withdraw. Each of these operations has several steps:

1. Validate the user with the associated PIN
2. Ensure the user owns the account being modified
3. Write a ledger record with the credit or debit being applied
4. On credit, ensure the daily deposit limit isn’t reached
5. Modify the balance of the account
6. Fetch the current balance to display to the user

Each transaction will perform 6-7 distinct SQL queries: SELECT, INSERT, and UPDATE. If any of them fails, then the database should remain completely unchanged. Failure in this case is that an exception is raised, which is potentially the easiest thing to do when you have a stack of functions calling other functions. Let’s look at deposit first:

@transaction
def deposit(conn, user, pin, account, amount):
# Step 1: authenticate the user via pin and verify account ownership
authenticate(conn, user, pin, account)

# Step 2: add the ledger record with the credit
ledger(conn, account, "credit", amount)

# Step 3: update the account value by adding the amount
update_balance(conn, account, amount)

# Fetch the current balance in the account and log it
record = "withdraw ${:0.2f} from account {} | current balance:${:0.2f}"
log.info(record.format(amount, account, balance(conn, account)))


This function simply calls other functions, passing the transaction context (in this case a connection as well as input details) to other functions which may or may not raise exceptions. Here are the two authenticate methods:

def authenticate(conn, user, pin, account=None):
"""
Returns an account id if the name is found and if the pin matches.
"""
with conn.cursor() as curs:
sql = "SELECT 1 AS authd FROM users WHERE username=%s AND pin=%s"
curs.execute(sql, (user, pin))
if curs.fetchone() is None:
raise ValueError("could not validate user via PIN")
return True

if account:
# Verify account ownership if account is provided
verify_account(conn, user, account)

def verify_account(conn, user, account):
"""
Verify that the account is held by the user.
"""
with conn.cursor() as curs:
sql = (
"SELECT 1 AS verified FROM accounts a "
)
curs.execute(sql, (user, account))

if curs.fetchone() is None:
return True


The authenticate and verify_account functions basically look in the database to see if there is a record that matches the conditions — a user with a matching PIN in authenticate and a (user, account_id) pair in verify_account. Both of these functions rely on the UNIQUE constraint in the database for usernames and account ids. This example shows how the function call stack can get arbitrarily deep; verify_account is called by authenticate which is called by deposit. By raising an exception at any point in the stack, the transaction will proceed no further, protecting us from harm later in the transaction.

Note also that neither of these functions have an @transaction decorator, this is because it is expected that they are called from within another transaction. They are independent operations, but they can be called independently in a transaction with the context manager.

Next we insert a ledger record:

MAX_DEPOSIT_LIMIT = 1000.00

def ledger(conn, account, record, amount):
"""
Add a ledger record with the amount being credited or debited.
"""
# Perform the insert
with conn.cursor() as curs:
sql = "INSERT INTO ledger (account_id, type, amount) VALUES (%s, %s, %s)"
curs.execute(sql, (account, record, amount))

# If we are crediting the account, perform daily deposit verification
if record == "credit":
check_daily_deposit(conn, account)

def check_daily_deposit(conn, account):
"""
Raise an exception if the deposit limit has been exceeded.
"""
with conn.cursor() as curs:
sql = (
"SELECT amount FROM ledger "
"WHERE date=now()::date AND type='credit' AND account_id=%s"
)
curs.execute(sql, (account,))
total = sum(row[0] for row in curs.fetchall())
if total > MAX_DEPOSIT_LIMIT:
raise Exception("daily deposit limit has been exceeded!")


This is the first place that we modify the state of the database by inserting a ledger record. If, when we check_daily_deposit, we discover that our deposit limit has been exceeded for the day, an exception is raised that will rollback the transaction. This will ensure that the ledger record is not accidentally stored on disk. Finally we update the account balance:

def update_balance(conn, account, amount):
"""
Add the amount (or subtract if negative) to the account balance.
"""
amount = Decimal(amount)
with conn.cursor() as curs:
current = balance(conn, account)
sql = "UPDATE accounts SET balance=%s WHERE id=%s"
curs.execute(sql, (current+amount, account))

def balance(conn, account):
with conn.cursor() as curs:
curs.execute("SELECT balance FROM accounts WHERE id=%s", (account,))
return curs.fetchone()[0]


I’ll have more to say on update_balance when we discuss isolation levels, but suffice it to say, this is another place where if the transaction fails we want to ensure that our account is not modified! In order to complete the example, here is the withdraw transaction:

@transaction
def withdraw(conn, user, pin, account, amount):
# Step 1: authenticate the user via pin and verify account ownership
authenticate(conn, user, pin, account)

# Step 2: add the ledger record with the debit
ledger(conn, account, "debit", amount)

# Step 3: update the account value by subtracting the amount
update_balance(conn, account, amount * -1)

# Fetch the current balance in the account and log it
record = "withdraw ${:0.2f} from account {} | current balance:${:0.2f}"
log.info(record.format(amount, account, balance(conn, account)))


This is similar but modifies the inputs to the various operations to decrease the amount of the account by a debit ledger record. We can run:

if __name__ == '__main__':
conn = connect()
createdb(conn)

# Successful deposit
deposit('alice', 1234, 1, 785.0)

# Successful withdrawal
withdraw('alice', 1234, 1, 230.0)

# Unsuccessful deposit
deposit('alice', 1234, 1, 489.0)

# Successful deposit
deposit('bob', 9999, 2, 220.23)


And we should see the following log records:

2017-12-06 20:01:00,086 withdraw $785.00 from account 1 | current balance:$1035.00
2017-12-06 20:01:00,094 withdraw error: could not validate user via PIN
2017-12-06 20:01:00,103 withdraw $230.00 from account 1 | current balance:$805.00
2017-12-06 20:01:00,118 deposit error: daily deposit limit has been exceeded!
2017-12-06 20:01:00,130 withdraw $220.23 from account 2 | current balance:$225.23


This should set a baseline for creating simple and easy to use transactions in Python. However, if you remember your databases class as an undergraduate, things get more interesting when two transactions are occurring at the same time. We’ll explore that from a single process by looking at multi-threaded database connections.

Let’s consider how to run two transactions at the same time from within the same application. The simplest way to do this is to use the threading library to execute transactions simultaneously. How do you achieve thread safety when accessing the database? Back to the docs:

Connection objects are thread-safe: many threads can access the same database either using separate sessions and creating a connection per thread or using the same connection and creating separate cursors. In DB API 2.0 parlance, Psycopg is level 2 thread safe.

This means that every thread must have its own conn object (which explore in the connection pool section). Any cursor created from the same connection object will be in the same transaction no matter the thread. We also want to consider how each transaction influences each other, and we’ll take a look at that first by exploring isolation levels and session state.

Let’s say that Alice and Charlie have a joint account, under Alice’s name. They both show up to ATMs at the same time, Alice tries to deposit $75 and then withdraw$25 and Charlie attempts to withdraw $300. We can simulate this with threads as follows: import time import random import threading def op1(): time.sleep(random.random()) withdraw('alice', 1234, 1, 300.0) def op2(): time.sleep(random.random()) deposit('alice', 1234, 1, 75.0) withdraw('alice', 1234, 1, 25.0) threads = [ threading.Thread(target=op1), threading.Thread(target=op2), ] for t in threads: t.start() for t in threads: t.join()  Depending on the timing, one of two things can happen. Charlie can get rejected as not having enough money in his account, and the final state of the database can be$300 or all transaction can succeed with the final state of the database set to $0. There are three transactions happening, two withdraw transactions and a deposit. Each of these transactions runs in isolation, meaning that they see the database how they started and any changes that they make; so if Charlie’s withdraw and Alice’s deposit happen simultaneously, Charlie will be rejected since it doesn’t know about the deposit until it’s finished. No matter what, the database will be left in the same state. However, for performance reasons, you may want to modify the isolation level for a particular transaction. Possible levels are as follows: 1. READ UNCOMMITTED: lowest isolation level, transaction may read values that are not yet committed (and may never be committed). 2. READ COMMITTED: write locks are maintained but read locks are released after select, meaning two different values can be read in different parts of the transaction. 3. REPEATABLE READ: keep both read and write locks so multiple reads return same values but phantom reads can occur. 4. SERIALIZABLE: the highest isolation level: read, write, and range locks are maintained until the end of the transaction. 5. DEFAULT: set by server configuration not Python, usually READ COMMITTED. Note that as the isolation level increases, the number of locks being maintained also increases, which severely impacts performance if there is lock contention or deadlocks. It is possible to set the isolation level on a per-transaction basis in order to improve performance of all transactions happening concurrently. To do this we must modify the session parameters on the connection, which modify the behavior of the transaction or statements that follow in that particular session. Additionally we can set the session to readonly, which does not allow writes to temporary tables (for performance and security) or to deferrable. Deferrability is very interesting in a transaction, because it modifies how database constraints are checked. Non-deferrable transactions immediately check the constraint after a statement is executed. This means that UPDATE accounts SET balance=-5.45 will immediately raise an exception. Deferrable transactions however wait until the transaction is concluded before checking the constraints. This allows you to write multiple overlapping operations that may put the database into a correct state by the end of the transaction, but potentially not during the transaction (this also overlaps with the performance of various isolation levels). In order to change the session, we’ll use a context manager as we did before to modify the session for the transaction, then reset the session back to the defaults: @contextmanager def session(conn, isolation_level=None, readonly=None, deferrable=None): try: conn.set_session( isolation_level=isolation_level, readonly=readonly, deferrable=deferrable ) yield conn finally: # Reset the session to defaults conn.set_session(None, None, None, None)  We can then use with to conduct transactions with different isolation levels: with transaction() as conn: with session(conn, isolation_level="READ COMMITTED") as conn: # Do transaction  NOTE: There cannot be an ongoing transaction when the session is set therefore it is more common for me to set the isolation level, readonly, and deferrable inside of the transaction decorator, rather than using two separate context managers as shown above. Frankly, it is also common to set these properties on a per-process basis rather than on a per-transaction basis, therefore the session is set in connect. ### Connection Pools Connections cannot be shared across threads. In the threading example above, if we remove the @transaction decorator and pass the same connection into both operations as follows: conn = connect() def op1(): time.sleep(random.random()) withdraw(conn, 'alice', 1234, 1, 300.0) def op2(): time.sleep(random.random()) deposit(conn, 'alice', 1234, 1, 75.0) withdraw(conn, 'alice', 1234, 1, 25.0)  If the op1 withdraw fires first, the exception will cause all of the op2 statements to also fail, since its in the same transaction. This essentially means that both op1 and op2 are in the same transaction even though they are in different threads! We’ve avoided this so far by creating a new connection every time a transaction runs. However, connecting to the database can be expensive and in high-transaction workloads we may want to simply keep the connection open, but ensure they are only used by one transaction at a time. The solution is to use connection pools. We can modify our connect function as follows: from psycopg2.pool import ThreadedConnectionPool def connect(env="DATABASE_URL", connections=2): """ Connect to the database using an environment variable. """ url = os.getenv(env) if not url: raise ValueError("no database url specified") minconns = connections maxconns = connections * 2 return ThreadedConnectionPool(minconns, maxconns, url)  This creates a thread-safe connection pool that establishes at least 2 connections and will go up to a maximum of 4 connections on demand. In order to use the pool object in our transaction decorator, we will have to connect when the decorator is imported, creating a global pool object: pool = connect() @contextmanager def transaction(name="transaction", **kwargs): # Get the session parameters from the kwargs options = { "isolation_level": kwargs.get("isolation_level", None), "readonly": kwargs.get("readonly", None), "deferrable": kwargs.get("deferrable", None), } try: conn = pool.getconn() conn.set_session(**options) yield conn conn.commit() except Exception as e: conn.rollback() log.error("{} error: {}".format(name, e)) finally: conn.reset() pool.putconn(conn)  Using pool.getconn retrieves a connection from the pool (if one is available, blocking until one is ready), then when we’re done we can pool.putconn to release the connection object. ## Conclusion This has been a ton of notes on more direct usage of psycopg2. Sorry I couldn’t write a more conclusive conclusion but it’s late and this post is now close to 4k words. Time to go get dinner! ## Notes I used logging as the primary output to this application. The logging was set up as follows: import logging LOG_FORMAT = "%(asctime)s %(message)s" logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) log = logging.getLogger('balance')  For the complete code, see this gist. # Lock Diagnostics in Go By now it’s pretty clear that I’ve just had a bear of a time with locks and synchronization inside of multi-threaded environments with Go. Probably most gophers would simply tell me that I should share memory by communicating rather than to communication by sharing memory — and frankly I’m in that camp too. The issue is that: • Mutexes can be more expressive than channels • Channels are fairly heavyweight So to be honest, there are situations where a mutex is a better choice than a channel. I believe that one of those situations is when dealing with replicated state machines … which is what I’ve been working on the past few months. The issue is that the state of the replica has to be consistent across a variety of events: timers and remote messages. The problem is that the timers and network traffic are all go routines, and there can be a lot of them running in the system at a time. Of course you could simply create a channel and push event objects to it to serialize all events. The problem with that is that events generate other events that have to be ordered with respect to the parent event. For example, one event might require the generation of messages to be sent to remote replicas, which requires a per-remote state read that is variable. Said another way, the state can be read locked for all go routines operating at that time, but no write locks can be acquired. Hence the last post. Things got complicated. Lock contention was a thing. So I had to diagnose who was trying to acquire locks and when and why they were contending. For reference, the most common issues were: 1. A global read lock was being released before all sub read locks were finished. 2. A struct with an embedded RWMutex was then embedded by another object with only a Mutex but it still had RLock() methods as a result (or vice versa). 3. The wrong lock was being called on embedded structs. The primary lesson I learned was this: when embedding synchronized objects, only embed the mutex on the child object. Hopefully that rule of thumb lasts. I learned these lessons using a handy little diagnostic tool that this snippet is about. Basically I wanted to track who was acquiring locks and who was waiting on locks. I could then print out a report when I thought something was contending (e.g. on an Interrupt signal) and figure things out. First step, figure out the name of the calling method: // Caller returns the name function that called the function which // called the caller function. func caller() string { pc, _, _, ok := runtime.Caller(2) details := runtime.FuncForPC(pc) if ok && details != nil { return details.Name() } return UnknownCaller }  This handy little snippet uses the runtime package to detect the caller two steps above the caller() function in the stack. This allows you to call caller() inside of a function to get the name of the function that’s calling the function calling caller(). Confusing? Try this: func outer() string { return inner() } func inner() string { return caller() }  Calling outer() will return something like main.outer — the function that called the inner() function. Here is a runnable example. With that in hand we can simply create a map[string]int64 and increment any calls by caller name before Lock() and decrement any calls by caller name after Unlock(). Here is the example: But … that’s actually a little more complicated than I let on! The problem is that we definitely have multiple go routines calling locks on the lockable struct. However, if we simply try to access the map in the MutexD, then we can have a panic for concurrent map reads and writes. So now, I use the share memory by communicating technique and pass signals via an internal channel, which is read by a go routine ranging over it. How to use it? Well do something like this: type StateMachine struct { MutexD } func (s *StateMachine) Alpha() { s.Lock() defer s.Unlock() time.Sleep(1*time.Second) } func (s *StateMachine) Bravo() { s.Lock() defer s.Unlock() time.Sleep(100*time.Millisecond) } func main() { m := new(StateMachine) go m.Alpha() time.Sleep(100*time.Millisecond) for i:=0; i < 2; i++ { go m.Bravo() } fmt.Println(m.MutexD.String()) }  You should see something like: 1 locks requested by main.(*StateMachine).Alpha 2 locks requested by main.(*StateMachine).Bravo  Obviously you can do the same thing for RWMutex objects, and it’s easy to swap them in and out of code by changing the package and adding or removing a “D”. My implementation is here: github.com/bbengfort/x/lock. # Lock Queuing in Go In Go, you can use sync.Mutex and sync.RWMutex objects to create thread-safe data structures in memory as discussed in “Synchronizing Structs for Safe Concurrency in Go”. When using the sync.RWMutex in Go, there are two kinds of locks: read locks and write locks. The basic difference is that many read locks can be acquired at the same time, but only one write lock can be acquired at at time. This means that if a thread attempts to acquire a read lock on an object that is already read locked, then it will not block and it will acquire its own read lock. If a thread attempts to acquire a read or a write lock on a write locked object, then it will block until it is unlocked (as will a write lock acquisition on a read locked object). Granting a lock can be prioritized depending on different policies for accesses. Priorities balance the trade-off between concurrency and starvation as follows: 1. Read-Preferring RW allows new read locks to be acquired as long as the lock is read-locked, forcing the write-lock acquirer to wait until there are no more read-locks. In high contention environments, this might lead to write-starvation. 2. Write-Preferring RW prevents a read-lock acquisition if a writer is queued and waiting for the lock. This reduces concurrency, because new read locks have to wait for the write lock, but prevents starvation. So which of these does Go implement? According to the documentation: If a goroutine holds a RWMutex for reading and another goroutine might call Lock, no goroutine should expect to be able to acquire a read lock until the initial read lock is released. In particular, this prohibits recursive read locking. This is to ensure that the lock eventually becomes available; a blocked Lock call excludes new readers from acquiring the lock. — godoc My initial read of this made me think that Go implements write-preferring mutexes. However, this was not the behavior that I observed. Consider the following locker: var delay time.Duration var started time.Time // Locker holds values that are threadsafe type Locker struct { sync.RWMutex value uint64 // the current value of the locker access time.Time // time of the last access } // Write to the value of the locker in a threadsafe fashion. func (l *Locker) Write(value uint64) { l.Lock() defer l.Unlock() // Arbitrary amount of work time.Sleep(delay) l.value = value l.access = time.Now() l.log("written") } // Read the value of the locker in a threadsafe fasion. func (l *Locker) Read() uint64 { l.RLock() defer l.RUnlock() // Arbirtray amount of work time.Sleep(delay / 2) l.access = time.Now() l.log("read") return l.value } // Log the access (not thread-safe) func (l *Locker) log(method string) { after := l.access.Sub(started) log.Printf( "%d %s after %s\n", l.value, method, after, ) }  This locker holds a value and logs all accesses to it after the start time. If we run a few threads to read and write to it we can see concurrent reads in action: func main() { delay = 1 * time.Second started = time.Now() group := new(errgroup.Group) locker := new(Locker) // Straight forward, write three reads and a write group.Go(func() error { locker.Write(42); return nil }) group.Go(func() error { locker.Read(); return nil }) group.Go(func() error { locker.Read(); return nil }) group.Go(func() error { locker.Read(); return nil }) group.Go(func() error { locker.Write(101); return nil }) group.Wait() }  The output is as follows $ go run locker.go
2017/09/08 12:26:32 101 written after 1.005058824s
2017/09/08 12:26:33 101 read after 1.50770225s
2017/09/08 12:26:33 101 read after 1.507769109s
2017/09/08 12:26:33 101 read after 1.50773587s
2017/09/08 12:26:34 42 written after 2.511968581s


Note that the last go routine actually managed to acquire the lock first, after which the three readers managed to acquire the lock, then finally the last writer. Now if we interleave the read and write access, adding a sleep between the kick-off of each go routine to ensure that the preceding thread has time to acquire the lock:

func main() {
delay = 1 * time.Second
started = time.Now()
group := new(errgroup.Group)
locker := new(Locker)

// Straight forward, write three reads and a write
group.Go(func() error { locker.Write(42); return nil })
time.Sleep(10 * time.Millisecond)
group.Go(func() error { locker.Read(); return nil })
time.Sleep(10 * time.Millisecond)
group.Go(func() error { locker.Write(101); return nil })
time.Sleep(10 * time.Millisecond)
group.Go(func() error { locker.Read(); return nil })
time.Sleep(10 * time.Millisecond)
group.Go(func() error { locker.Write(3); return nil })
time.Sleep(10 * time.Millisecond)
group.Go(func() error { locker.Read(); return nil })
time.Sleep(10 * time.Millisecond)
group.Go(func() error { locker.Write(18); return nil })
time.Sleep(10 * time.Millisecond)
group.Go(func() error { locker.Read(); return nil })

group.Wait()
}


We get the following output:

go run locker.go
2017/09/08 12:29:28 42 written after 1.000178155s
2017/09/08 12:29:28 42 read after 1.500703007s
2017/09/08 12:29:28 42 read after 1.500691088s
2017/09/08 12:29:28 42 read after 1.500756144s
2017/09/08 12:29:28 42 read after 1.500648159s
2017/09/08 12:29:28 42 read after 1.500762323s
2017/09/08 12:29:28 42 read after 1.500679533s
2017/09/08 12:29:28 42 read after 1.500795204s
2017/09/08 12:29:29 101 written after 2.500971593s
2017/09/08 12:29:30 3 written after 3.505325487s
2017/09/08 12:29:31 18 written after 4.50594131s


This suggests that the reads continue to acquire locks as long as the Locker is read locked, forcing the writes to happen at the end.

I found one Stack Overflow post: “Read preferring RW mutex lock in Golang” that seems to suggest that sync.RWMutex can implement both read and write preferred locking, but doesn’t really give an explanation about how external callers can implement it.

Finally consider the following:

func main() {
delay = 1 * time.Second
started = time.Now()
group := new(errgroup.Group)
locker := new(Locker)

// Straight forward, write three reads and a write
group.Go(func() error { locker.Write(42); return nil })
group.Go(func() error { locker.Write(101); return nil })
group.Go(func() error { locker.Write(3); return nil })
group.Go(func() error { locker.Write(18); return nil })

for i := 0; i < 22; i++ {
group.Go(func() error {
return nil
})
time.Sleep(delay / 4)
}

group.Wait()
}


Given the loop issuing 22 read locks that sleep only a quarter of the time of the write lock, we might expect that this code will issue 22 read locks then all the write locks will occur at the end (and if we put this in a forever loop, then the writes would never occur). However, the output of this is as follows:

2017/09/08 12:43:40 18 written after 1.004461829s
2017/09/08 12:43:40 18 read after 1.508343716s
2017/09/08 12:43:40 18 read after 1.50842899s
2017/09/08 12:43:40 18 read after 1.508362345s
2017/09/08 12:43:40 18 read after 1.508339659s
2017/09/08 12:43:40 18 read after 1.50852229s
2017/09/08 12:43:41 42 written after 2.513789339s
2017/09/08 12:43:42 42 read after 3.0163191s
2017/09/08 12:43:42 42 read after 3.016330534s
2017/09/08 12:43:42 42 read after 3.016355628s
2017/09/08 12:43:42 42 read after 3.016371381s
2017/09/08 12:43:42 42 read after 3.016316992s
2017/09/08 12:43:43 3 written after 4.017954589s
2017/09/08 12:43:43 3 read after 4.518495233s
2017/09/08 12:43:43 3 read after 4.518523255s
2017/09/08 12:43:43 3 read after 4.518537387s
2017/09/08 12:43:43 3 read after 4.518540397s
2017/09/08 12:43:43 3 read after 4.518543262s
2017/09/08 12:43:43 3 read after 4.51863128s
2017/09/08 12:43:44 101 written after 5.521872765s
2017/09/08 12:43:45 101 read after 6.023207828s
2017/09/08 12:43:45 101 read after 6.023225272s
2017/09/08 12:43:45 101 read after 6.023249529s
2017/09/08 12:43:45 101 read after 6.023190828s
2017/09/08 12:43:45 101 read after 6.023243032s
2017/09/08 12:43:45 101 read after 6.023190457s
2017/09/08 12:43:45 101 read after 6.04455716s
2017/09/08 12:43:45 101 read after 6.29457923s


What Go implements is actually something else: read locks can only be acquired so long as the original read lock is maintained (the word “initial” being critical in the documentation). As soon as the first read lock is released, then the queued write-lock gets priority. The first read lock lasts approximately 500ms; this means that there is enough time for between 4-5 other locks to acquire a read lock, as soon as the first read lock completes, the write is given priority.

# Messaging Throughput gRPC vs. ZMQ

Building distributed systems in Go requires an RPC or message framework of some sort. In the systems I build I prefer to pass messages serialized with protocol buffers therefore a natural choice for me is grpc. The grpc library uses HTTP2 as a transport layer and provides a code generator based on the protocol buffer syntax making it very simple to use.

For more detailed control, the ZMQ library is an excellent, low latency socket framework. ZMQ provides several communication patterns from basic REQ/REP (request/reply) to PUB/SUB (publish/subscribe). ZMQ is used at a lower level though, so more infrastructure per app needs to be built.

This leads to the obvious question: which RPC framework is faster? Here are the results:

These results show the message throughput of three echo servers that respond to a simple message with a response including a sequence number. Each server is running on its own EC2 micro instance with 1GB of memory and 1 vCPU. Each client is running on on an EC2 nano instance with 0.5GB of memory and 1 vCPU and are constantly sending messages at the server. The throughput is the number of messages per second the server can handle.

The servers are as follows:

1. rep: a server that implements a REQ/REP socket and simple handler.
2. router: a server that implements a REQ/ROUTER socket along with a DEALER/REP socket for 16 workers, connected via a proxy.
3. grpc: implements a gRPC service.

The runner and results can be found here.

## Discussion

All the figures exhibit a standard shape for throughput - namely as more clients are added the throughput increases, but begins to tail off toward an asymptote. The asymptote represents the maximum number of messages a server can respond to without message latency. Generally speaking if a server can handle multiple clients at once, the throughput is higher.

The ZMQ REQ/ROUTER/PROXY/DEALER/REP server with 16 workers outperforms the gRPC server (it has a higher overall throughput). I hypothesize that this is because ZMQ does not have the overhead of HTTP and is in fact lighter weight code than gRPC since none of it is generated. It’s unclear if adding more workers would improve the throughput of the ZMQ router server.

The performance of the REQ/REP server is a mystery. It’s doing way better than the other two. This socket has very little overhead, so for fewer clients it should be performing better. However, this socket also blocks on a per-client basis. Both grpc and router are asynchronous and can handle multiple clients at a time suggesting that they should be much faster.

# Online Distribution

This post started out as a discussion of a struct in Go that could keep track of online statistics without keeping an array of values. It ended up being a lesson on over-engineering for concurrency.

The spec of the routine was to build a data structure that could keep track of internal statistics of values over time in a space-saving fashion. The primary interface was a method, Update(sample float64), so that a new sample could be passed to the structure, updating internal parameters. At conclusion, the structure should be able to describe the mean, variance, and range of all values passed to the update method. I created two versions:

1. A thread-safe version using mutexes, but blocking on Update()
2. A thread-safe version using a channel and a go routine so that Update() was non-blocking.

I ran some benchmarking, and discovered that the blocking implementation of Update was actually far faster than the non-blocking version. Here are the numbers:

BenchmarkBlocking-8      	20000000            81.1 ns/op
BenchmarkNonBlocking-8   	10000000	       140 ns/op


Apparently, putting a float on a channel, even a buffered channel, incurs some overhead that is more expensive than simply incrementing and summing a few integers and floats. I will present both methods here, but note that the first method (blocking update) should be implemented in production.

You can find this code at github.com/bbengfort/x/stats if you would like to use it in your work.

## Online Descriptive Statistics (Blocking)

To track statistics in an online fashion, you need to keep track of the various aggregates that are used to compute the final descriptives statistics of the distribution. For simple statistics such as the minimum, maximum, standard deviation, and mean you need to track the number of samples, the sum of samples, and the sum of the squares of all samples (along with the minimum and maximum value seen). Here is how you do that:

I use this data structure as a lightweight mechanism to keep track of online statistics for experimental results or latency. It gives a good overall view of incoming values at very little expense.

## Non-blocking Update

In an attempt to improve the performance of this method, I envisioned a mechanism where I could simply dump values into a buffered channel then run an updater go routine to collect values and perform the online computation. The updater function can simply range over the channel, and the channel can be closed to stop the goroutine and finalize anything still on the channel. This is written as follows:

The lesson was that this is actually less performant, no matter how large the buffer is. I increased the buffer size to 10000001 to ensure that the sender could not block, but I still received 116 ns/op benchmarks. Generally, this style is what I use when the function being implemented is actually pretty heavy (e.g. writes to disk). In this case, the function was too lightweight to matter!

# Rapid FS Walks with ErrGroup

I’ve been looking for a way to quickly scan a file system and gather information about the files in directories contained within. I had been doing this with multiprocessing in Python, but figured Go could speed up my performance by a lot. What I discovered when I went down this path was the sync.ErrGroup, an extension of the sync.WaitGroup that helps manage the complexity of multiple go routines but also includes error handling!

The end result of this exploration was a utility called urfs — which you can install on your system to take a uniform random sample of files in a directory or to compute the number of files and bytes per directory. This utility is also extensible to a large number of functionality that requires rapid walking of a file system like search or other utilities.

This post is therefore a bit of a walkthrough on using sync.ErrGroup for scanning a file system and applying arbitrary functions. First a couple of types:

type WalkFunc func(path string) (string, error)

type FSWalker struct {
Workers    int
SkipHidden bool
SkipDirs   bool
Match      string
root       string
paths      chan string
nPaths     uint64
results    chan string
nResults   uint64
group      *errgroup.Group
ctx        context.Context
started    time.Time
duration   time.Duration
}


The first type is a generic function that can be passed to the Walk method of the FSWalker. The FSWalker maintains state with a variety of channels, and of course the errgroup.Group object. The SkipHidden, SkipDirs, and Match properties allow us to filter path types being passed to Walk.

To initialize FSWalker:

func (fs *FSWalker) Init(ctx context.Context) {
// Set up FSWalker defaults
fs.Workers = DefaultWorkers
fs.SkipHidden = true
fs.SkipDirs = true
fs.Match = "*"

// Create the context for the errgroup
if ctx == nil {
// Create a new context
ctx = context.Background()
if ok {
}
}

// Create the err group
fs.group, fs.ctx = errgroup.WithContext(ctx)

// Create channels and instantiate other statistics variables
fs.paths = make(chan string, DefaultBuffer)
fs.results = make(chan string, DefaultBuffer)
fs.nPaths = 0
fs.nResults = 0
fs.started = time.Time{}
fs.duration = time.Duration(0)
}


Ok, so we’re doing a lot of work here, but things get paid off in the Walk function where we keep track of the number of paths we’ve seen at a root directory, passing them off to a WalkFunc using a variety of Go routines:

func (fs *FSWalker) Walk(path string, walkFn WalkFunc) error {
// Compute the duration of the walk
fs.started = time.Now()
defer func() { fs.duration = time.Since(fs.started) }()

// Set the root path for the walk
fs.root = path

// Launch the goroutine that populates the paths
fs.group.Go(func() error {
// Ensure that the channel is closed when all paths loaded
defer close(fs.paths)

// Apply the path filter to the filepath.Walk function
return filepath.Walk(fs.root, fs.filterPaths)
})

// Create the worker function and allocate pool
worker := fs.worker(walkFn)
for w := 0; w < fs.Workers; w++ {
fs.group.Go(worker)
}

// Wait for the workers to complete, then close the results channel
go func() {
fs.group.Wait()
close(fs.results)
}()

// Start gathering the results
for _ = range fs.results {
fs.nResults++
}

return fs.group.Wait()
}


So this is a lot of code, let’s step through it. The first thing we do is set the started time to now, and defer a function to compute the duration as the difference between the time at the end of the function and the start function. We also set the root value. We then launch a go routine in the ErrGroup by using fs.group.Go(func) — this function must have the signature func() error, so we use an anonymous function to kick off the filepath.Walk, which starts walking the directory structure, adding paths that match the filter criteria to a buffered channel called fs.paths, more on this later. This channel must be closed on complete so that our worker go routines complete, more on that later.

Next we create a worker function using our worker method and walk function. The workers read paths off the fs.paths channel, and apply the walkFn to each path individually. Note that we use a pool-like structure here, limiting the number of workers to 5000 — this is so we don’t get a “too many files open” error when we exhaust the number of file descriptors since Go has unlimited go routines. The worker definitions is here:

func (fs *FSWalker) worker(walkFn WalkFunc) func() error {
return func() error {
// Apply the function all paths in the channel
for path := range fs.paths {
// avoid race condition
p := path

// apply the walk function to the path and return errors
r, err := walkFn(p)
if err != nil {
return err
}

// store the result and check the context
if r != "" {

select {
case fs.results <- r:
case <-fs.ctx.Done():
return fs.ctx.Err()
}
}

}
return nil
}
}


As you can see, the worker function just creates a closure with the signature of our ErrGroup function, so that we can pass it to the wait group. All the worker function does is range over the paths channel, applying the path to the walkFn.

Finally, we kick off another go routine that waits until all the workers have stopped, and when it does, we close our results channel. We do this so that we can start gathering results, immediately; we don’t have to wait. We can do this by simply ranging over the results channel and adding the number of results. A final wait at the end means that we can wait for all go routines to complete.

Lastly the filter function. We want to ignore files and directories that are hidden, e.g. start with a “.” or a “~” on Unix systems. We also want to be able to pass a glob like matcher, e.g. "*.txt" to only match text files. The filter function is here:

// Internal filter paths function that is passed to filepath.Walk
func (fs *FSWalker) filterPaths(path string, info os.FileInfo, err error) error {
// Propagate any errors
if err != nil {
return err
}

// Check to ensure that no mode bits are set
if !info.Mode().IsRegular() {
return nil
}

// Get the name of the file without the complete path
name := info.Name()

// Skip hidden files or directories if required.
if fs.SkipHidden {
if strings.HasPrefix(name, ".") || strings.HasPrefix(name, "~") {
return nil
}
}

// Skip directories if required
if fs.SkipDirs {
if info.IsDir() {
return nil
}
}

// Check to see if the pattern matches the file
match, err := filepath.Match(fs.Match, name)
if err != nil {
return err
} else if !match {
return nil
}

// Increment the total number of paths we've seen.

select {
case fs.paths <- path:
case <-fs.ctx.Done():
return fs.ctx.Err()
}

return nil
}


And that’s it, with this simple framework, you can apply an arbitrary walkFn to all paths in a directory, matching a specific criteria. The big win here is to manage all of the go routines using the ErrGroup and a context.Context object.

The following post: Run strikingly fast parallel file searches in Go with sync.ErrGroup by Brian Ketelsen was the primary inspiration for the use of sync.ErrGroup.

# Buffered Write Performance

This is just a quick note on the performance of writing to a file on disk using Go, and reveals a question about a common programming paradigm that I am now suspicious of. I discovered that when I wrapped the open file object with a bufio.Writer that the performance of my writes to disk significantly increased. Ok, so this isn’t about simple file writing to disk, this is about a complex writer that does some seeking in the file writing to different positions and maintains the overall state of what’s on disk in memory, however the question remains:

Why do we buffer our writes to a file?

A couple of answers come to mind: safety, the buffer ensures that writes to the underlying writer are not flushed when an error occurs; helpers, there may be some methods in the buffer struct not available to a native writer; concurrency, the buffer can be appended to concurrently with another part of the buffer being flushed.

However, we determined that in performance critical applications (file systems, databases) the buffer abstraction adds an unacceptable performance overhead. Here are the results.

## Results

First, we’re not doing a simple write - we’re appending to a write-ahead log that has fixed length metadata at the top of the file. This means that a single operation to append data to the log consists of the following steps:

1. Marshal data to bytes
2. Write data to end of the log file (possibly sync the file)
3. Seek to the top of the file
4. Marshall and write fixed length meta data header
5. Seek to the bottom of the file
6. Sync the file to disk

So there is a bit more work here than simply throwing data at disk. We can see in the following graph that the performance of the machine (CPU, Memory, and Disk) plays a huge role in determining the performance of these operations in terms of the number of these writes the machine is able to do per second:

In the above graph, Hyperion and Lagoon are Dell Optiplex servers and Antigua, Curacao, and Nevis are Intel NUCs. They all have different processors and SSDs, but all have 16GB memory. For throughput, bigger is better (you can do more operations per second). As you can see on all of the servers, there is about a 1.6x increase in throughput using unbuffered writes to the file over buffered writes to the file.

We can inspect the distribution of the latency of each individual operation as follows (with latency, smaller is better — you’re doing operations faster):

The boxplot shows the distribution of latency such that the box is between the 25th and 75th percentile (with a bisecting line at the median) - the lines are from the 5th to the 95th percentile, and anything outside the lines are considered outliers and are visualized as diamonds.

We can see the shift not just in the mean, but also the median; a 1.6 increase in speed (decrease in latency) from buffered to unbuffered writes. More importantly, we can see that unbuffered writes are more consistent; e.g. they have a tighter distribution and less variable operational latency. I suspect this means that while both types of writes are bound by disk accesses from other processes, buffered writes are also bound by CPU whereas unbuffered writes are less so.

## Method

The idea here is that we are going to open a file and append data to it, tracking what we’re doing with a fixed length metadata header at the beginning of the file. Creating a struct to wrap the file and open, sync, and close it is pretty straight forward:

type Log struct {
path string
file *os.File
}

func (l *Log) Open(path string) (err error) {
l.path = path
l.file, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return err
}
}

func (l *Log) Close() error {
err := l.file.Close()
l.file = nil
return err
}

func (l *Log) Sync() error {
return l.file.Sync()
}


Now let’s say that we have an entry that knows how to write itself to an io.Writer interface as follows:

type Entry struct {
Version uint64    json:"version"
Key     string    json:"key"
Value   []byte    json:"value"
Created time.Time json:"created"
}

func (e *Entry) Dump(w io.Writer) (int64, error) {
// Encode entry as JSON data (base64 enocded bytes value)
data, err := json.Marshal(e)
if err != nil {
return -1, err
}

// Add a newline to the data for json lines format
data = append(data, byte('\n'))

// Write the data to the writer and return.
return w.Write(data)
}


So the question is, if we have a list of entries we want to append to the log, how do we pass the io.Writer to the Entry.Dump method in order to write them one at a time?

The first method is the standard method, buffered, using bufio.Writer:

func (l *Log) Append(entries ...*Entry) (size int64, err error) {
// Crate the buffer and define the bytes
bytes := 0
buffer := bufio.NewWriter(l.file)

// Write each entry keeping track of the amount of data written
for _, entry := range entries {
if bytes, err =  entry.Write(buffer); err != nil {
return -1, err
} else {
size += bytes
}

}

// Flush the buffer
if err = buffer.Flush(); err != nil {
return -1, err
}

// Sync the underlying file
if err = l.Sync(); err != nil {
return -1, err
}

return size, nil
}


As you can see, even though we’re getting a buffered write to disk, we’re not actually leveraging any of the benefits of the buffered write. By eliminating the middleman with an unbuffered approach:

func (l *Log) Append(entries ...*Entry) (size int64, err error) {
// Write each entry keeping track of the amount of data written
for _, entry := range entries {
if bytes, err :=  entry.Write(buffer); err != nil {
return -1, err
} else {
size += bytes
}

}

// Sync the underlying file
if err = l.Sync(); err != nil {
return -1, err
}

return size, nil
}


We get the performance benefit as shown above. Now, I’m not sure if this is obvious or not; but I do know that it’s commonly taught to wrap the file object with the buffer; the unbuffered approach may be simpler and faster but it may also be less safe, it depends on your use case.