Git-Style File Editing in CLI

A recent application I was working on required the management of several configuration and list files that needed to be validated. Rather than have the user find and edit these files directly, I wanted to create an editing workflow similar to crontab -e or git commit — the user would call the application, which would redirect to a text editor like vim, then when editing was complete, the application would take over again.

This happened to be a Go app, so the following code is in Go, but it would work with any programming language. The workflow is as follows:

  1. Find an editor executable
  2. Copy the original to a temporary file
  3. Exec the editor on the temporary file
  4. Wait for the editor to be done
  5. Validate the temporary file
  6. Copy the temporary file to the original location

This worked surprisingly well especially for things like YAML files which are structured enough to be validated easily, but human readable enough to edit.

First up, finding an editor executable. I used a three part strategy; first the user could specify the path to an editor in the configuration file (like git), second, the user could set the $EDITOR environment variable, and third, I look for common editors. Here’s the code:

var editors = [4]string{"vim", "emacs", "nano"}

func findEditor() (string, error) {

	config, err := LoadConfig()
	if err != nil {
		return "", err
	}

	if config.Editor != "" {
		return config.Editor, nil
	}

	if editor := os.Getenv("EDITOR"); editor != "" {
		return editor, nil
	}

	for _, name := range editors {
		path, err := exec.LookPath(name)
		if err == nil {
			return path, nil
		}
	}

	return "", errors.New("no editor found")
}

The crucial part of this is exec.LookPath which searches the $PATH for editor and returns the full path to exec it. Next up is copying the file:

func copyFile(src, dst string) error {
	in, err := os.Open(src)
	if err != nil {
		return err
	}
	defer in.Close()

	out, err := os.Create(dst)
	if err != nil {
		return err
	}
	defer out.Close()

	if _, err = io.Copy(out, in); err != nil {
		return err
	}

	return nil
}

Finally the full editor workflow:

func EditFile(path string) error {
	// Find the editor to use
	editor, err := findEditor()
	if err != nil {
		return err
	}

	// Create the temporary directory and ensure we clean up when done.
	tmpDir := os.TempDir()
	defer os.RemoveAll(tmpDir)

	// Get the temporary file location
	tmpFile := filepath.Join(tmpDir, filepath.Base(path))

	// Copy the original file to the tmpFile
	if err = copyFile(path, tmpFile); err != nil {
		return err
	}

	// Create the editor command
	cmd := exec.Command(editor, tmpFile)
	cmd.Stdin = os.Stdin
	cmd.Stdout = os.Stdout
	cmd.Stderr = os.Stderr

	// Start the editor command and wait for it to finish
	if err = cmd.Start(); err != nil {
		return err
	}

	if err = cmd.Wait(); err != nil {
		return err
	}

	// Copy the tmp file back to the original file
	return copyFile(tmpFile, path)
}

This workflow assumes that the file being edited already exists, but of course you could modify it any number of ways. For example, you could use a template to populate the temporary file (similar to what git does for a commit message), or you could add more validation around input and output.

Transaction Handling with Psycopg2

Databases are essential to most applications, however most database interaction is often overlooked by Python developers who use higher level libraries like Django or SQLAlchemy. We use and love PostgreSQL with Psycopg2, but I recently realized that I didn’t have a good grasp on how exactly psycopg2 implemented core database concepts: particularly transaction isolation and thread safety.

Here’s what the documentation says regarding transactions:

Transactions are handled by the connection class. By default, the first time a command is sent to the database (using one of the cursors created by the connection), a new transaction is created. The following database commands will be executed in the context of the same transaction – not only the commands issued by the first cursor, but the ones issued by all the cursors created by the same connection. Should any command fail, the transaction will be aborted and no further command will be executed until a call to the rollback() method.

Transactions are therefore connection specific. When you create a connection, you can create multiple cursors, the transaction begins when the first cursor issues an execute – all all commands executed by all cursors after that are part of the same transaction until commit or rollback. After any of these methods are called, the next transaction is started on the next execute call.

This brings up a very important point:

By default even a simple SELECT will start a transaction: in long-running programs, if no further action is taken, the session will remain “idle in transaction”, an undesirable condition for several reasons (locks are held by the session, tables bloat…). For long lived scripts, either make sure to terminate a transaction as soon as possible or use an autocommit connection.

This seems to indicate that when working directly with psycopg2, understanding transactions is essential to writing stable scripts. This post therefore details my notes and techniques for working more effectively with PostgreSQL from Python.

Database Preliminaries

In order to demonstrate the code in this blog post, we need a database. The classic database example taught to undergraduates is that of a bank account, so we’ll continue with that theme here! Sorry if this part is tedious, feel free to skip ahead. In a file, schema.sql, I defined the following schema as DDL (data definition language):

DROP TABLE IF EXISTS users CASCADE;
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(255) UNIQUE,
    pin SMALLINT NOT NULL
);


DROP TYPE IF EXISTS account_type CASCADE;
CREATE TYPE account_type AS ENUM ('checking', 'savings');

DROP TABLE IF EXISTS accounts CASCADE;
CREATE TABLE accounts (
    id SERIAL PRIMARY KEY,
    type account_type,
    owner_id INTEGER NOT NULL,
    balance NUMERIC DEFAULT 0.0,
    CONSTRAINT positive_balance CHECK (balance >= 0),
    FOREIGN KEY (owner_id) REFERENCES users (id)
);

DROP TYPE IF EXISTS ledger_type CASCADE;
CREATE TYPE ledger_type AS ENUM ('credit', 'debit');

DROP TABLE IF EXISTS ledger;
CREATE TABLE ledger (
    id SERIAL PRIMARY KEY,
    account_id INTEGER NOT NULL,
    date DATE NOT NULL DEFAULT CURRENT_DATE,
    type ledger_type NOT NULL,
    amount NUMERIC NOT NULL,
    FOREIGN KEY (account_id) REFERENCES accounts (id)
);

This creates a simple database with two tables. The owners table contains a PIN code for verification. Owners can have one or more accounts, and accounts have the constraint that the balance can never fall below $0.00. We can also seed the database with some initial data:

INSERT INTO users (id, username, pin) VALUES
    (1, 'alice', 1234),
    (2, 'bob', 9999);

INSERT INTO accounts (type, owner_id, balance) VALUES
    ('checking', 1, 250.0),
    ('savings', 1, 5.00),
    ('checking', 2, 100.0),
    ('savings', 2, 2342.13);

Moving to Python code we can add some template code to allow us to connect to the database and execute the SQL in our file above:

import os
import psycopg2 as pg


def connect(env="DATABASE_URL"):
    url = os.getenv(env)
    if not url:
        raise ValueError("no database url specified")
    return pg.connect(url)


def createdb(conn, schema="schema.sql"):
    with open(schema, 'r') as f:
        sql = f.read()

    try:
        with conn.cursor() as curs:
            curs.execute(sql)
            conn.commit()
    except Exception as e:
        conn.rollback()
        raise e

The connect function looks for the database connection string in the environment variable $DATABASE_URL. Because database configuration code can contain passwords and network information it is always best to store it in the environment or in a local, secure configuration file that can only be accessed by the process and not checked in with code. The connection string should look something like: postgresql://user@localhost:5432/dbname.

The createdb function reads the SQL from the schema.sql file and executes it against the database. Note this is why we have the DROP TABLE IF EXISTS statements, so we can guarantee we always start with a fresh database when we run this script. This function also gives us our first glance at transactions and database interaction with Python.

Complying with PEP 249 we create a connection to the database, then create a cursor from the connection. Cursors manage the execution of SQL against the database as well as data retrieval. We execute the SQL in our schema file, committing the transaction if no exceptions are raised, and rolling back if it fails. We will explore this more in the next section.

Transaction Management

A transaction consists of one or more related operations that represent a single unit of work. For example, in the bank account example you might have a deposit transaction that executes queries to look up the account and verify the user, add a record to a list of daily deposits, check if the daily deposit limit has been reached, then modify the account balance. All of these operations represent all of the steps required to perform a deposit.

The goal of a transaction is that when the transaction is complete, the database remains in a single consistent state. Consistency is often defined by invariants or constraints that describe at a higher level how the database should maintain information. From a programming perspective, if those constraints are violated an exception is raised. For example, the database has a positive_balance constraint, if the balance for an account goes below zero an exception is raised. When this constraint is violated the database must remain unchanged and all operations performed by the transaction must be rolled back. If the transaction was successful we can then commit the changes, which guarantee that the database has successfully applied our operation.

So why do we need to manage transactions? Consider the following code:

conn = connect()
curs = conn.cursor()

try:
    # Execute a command that will raise a constraint
    curs.execute("UPDATE accounts SET balance=%s", (-130.935,))
except Exception as e:
    print(e) # Constraint exception

# Execute another command, but because of the previous exception:
curs = conn.cursor()
try:
    curs.execute("SELECT id, type FROM accounts WHERE owner_id=%s", (1,))
except pg.InternalError as e:
    print(e)

The first curs.execute triggers the constraint exception, which is caught and printed. However, the database is now in an inconsistent state. When you try to execute the second query, a psycopg2.InternalError is raised: "current transaction is aborted, commands ignored until end of transaction block". In order to continue with the application, conn.rollback() needs to be called to end the transaction and start a new one.

NOTE: Using with conn.cursor() as curs: causes the same behavior, the context manager does not automatically clean up the state of the transaction.

This essentially means all transactions can be wrapped in a try block, if they conclude successfully they can be committed, however if they raise an exception, they must be rolled back. A basic decorator that does this is as follows:

from functools import wraps

def transaction(func):
    @wraps(func)
    def inner(*args, **kwargs):
        conn = connect()
        try:
            func(conn, *args, **kwargs)
            conn.commit()
        except Exception as e:
            conn.rollback()
            log.error("{} error: {}".format(func.__name__, e))
        finally:
            conn.close()
    return inner

This decorator wraps the specified function, returning an inner function that injects a new connection as the first argument to the decorated function. If the decorated function raises an exception, the transaction is rolled back and the error is logged.

The decorator method is nice but the connection injection can be a bit weird. An alternative is a context manager that ensures the connection is committed or rolled back in a similar fashion:

from contextlib import contextmanager

@contextmanager
def transaction():
    try:
        conn = connect()
        yield conn
        conn.commit()
    except Exception as e:
        conn.rollback()
        log.error("db error: {}".format(e))
    finally:
        conn.close()

This allows you to write code using with as follows:

with transaction() as conn:
    # do transaction

The context manager allows you to easily compose two transactions inside a single function — of course this may be against the point. However, it is no problem to combine both the decorator and the context manager methods into two steps (more on this in isolation levels).

ATM Application

So let’s talk about two specific transactions for an imaginary database application: deposit and withdraw. Each of these operations has several steps:

  1. Validate the user with the associated PIN
  2. Ensure the user owns the account being modified
  3. Write a ledger record with the credit or debit being applied
  4. On credit, ensure the daily deposit limit isn’t reached
  5. Modify the balance of the account
  6. Fetch the current balance to display to the user

Each transaction will perform 6-7 distinct SQL queries: SELECT, INSERT, and UPDATE. If any of them fails, then the database should remain completely unchanged. Failure in this case is that an exception is raised, which is potentially the easiest thing to do when you have a stack of functions calling other functions. Let’s look at deposit first:

@transaction
def deposit(conn, user, pin, account, amount):
    # Step 1: authenticate the user via pin and verify account ownership
    authenticate(conn, user, pin, account)

    # Step 2: add the ledger record with the credit
    ledger(conn, account, "credit", amount)

    # Step 3: update the account value by adding the amount
    update_balance(conn, account, amount)

    # Fetch the current balance in the account and log it
    record = "withdraw ${:0.2f} from account {} | current balance: ${:0.2f}"
    log.info(record.format(amount, account, balance(conn, account)))

This function simply calls other functions, passing the transaction context (in this case a connection as well as input details) to other functions which may or may not raise exceptions. Here are the two authenticate methods:

def authenticate(conn, user, pin, account=None):
    """
    Returns an account id if the name is found and if the pin matches.
    """
    with conn.cursor() as curs:
        sql = "SELECT 1 AS authd FROM users WHERE username=%s AND pin=%s"
        curs.execute(sql, (user, pin))
        if curs.fetchone() is None:
            raise ValueError("could not validate user via PIN")
        return True

    if account:
        # Verify account ownership if account is provided
        verify_account(conn, user, account)


def verify_account(conn, user, account):
    """
    Verify that the account is held by the user.
    """
    with conn.cursor() as curs:
        sql = (
            "SELECT 1 AS verified FROM accounts a "
            "JOIN users u on u.id = a.owner_id "
            "WHERE u.username=%s AND a.id=%s"
        )
        curs.execute(sql, (user, account))

        if curs.fetchone() is None:
            raise ValueError("account belonging to user not found")
        return True

The authenticate and verify_account functions basically look in the database to see if there is a record that matches the conditions — a user with a matching PIN in authenticate and a (user, account_id) pair in verify_account. Both of these functions rely on the UNIQUE constraint in the database for usernames and account ids. This example shows how the function call stack can get arbitrarily deep; verify_account is called by authenticate which is called by deposit. By raising an exception at any point in the stack, the transaction will proceed no further, protecting us from harm later in the transaction.

Note also that neither of these functions have an @transaction decorator, this is because it is expected that they are called from within another transaction. They are independent operations, but they can be called independently in a transaction with the context manager.

Next we insert a ledger record:

MAX_DEPOSIT_LIMIT = 1000.00

def ledger(conn, account, record, amount):
    """
    Add a ledger record with the amount being credited or debited.
    """
    # Perform the insert
    with conn.cursor() as curs:
        sql = "INSERT INTO ledger (account_id, type, amount) VALUES (%s, %s, %s)"
        curs.execute(sql, (account, record, amount))

    # If we are crediting the account, perform daily deposit verification
    if record == "credit":
        check_daily_deposit(conn, account)

def check_daily_deposit(conn, account):
    """
    Raise an exception if the deposit limit has been exceeded.
    """
    with conn.cursor() as curs:
        sql = (
            "SELECT amount FROM ledger "
            "WHERE date=now()::date AND type='credit' AND account_id=%s"
        )
        curs.execute(sql, (account,))
        total = sum(row[0] for row in curs.fetchall())
        if total > MAX_DEPOSIT_LIMIT:
            raise Exception("daily deposit limit has been exceeded!")

This is the first place that we modify the state of the database by inserting a ledger record. If, when we check_daily_deposit, we discover that our deposit limit has been exceeded for the day, an exception is raised that will rollback the transaction. This will ensure that the ledger record is not accidentally stored on disk. Finally we update the account balance:

def update_balance(conn, account, amount):
    """
    Add the amount (or subtract if negative) to the account balance.
    """
    amount = Decimal(amount)
    with conn.cursor() as curs:
        current = balance(conn, account)
        sql = "UPDATE accounts SET balance=%s WHERE id=%s"
        curs.execute(sql, (current+amount, account))


def balance(conn, account):
    with conn.cursor() as curs:
        curs.execute("SELECT balance FROM accounts WHERE id=%s", (account,))
        return curs.fetchone()[0]

I’ll have more to say on update_balance when we discuss isolation levels, but suffice it to say, this is another place where if the transaction fails we want to ensure that our account is not modified! In order to complete the example, here is the withdraw transaction:

@transaction
def withdraw(conn, user, pin, account, amount):
    # Step 1: authenticate the user via pin and verify account ownership
    authenticate(conn, user, pin, account)

    # Step 2: add the ledger record with the debit
    ledger(conn, account, "debit", amount)

    # Step 3: update the account value by subtracting the amount
    update_balance(conn, account, amount * -1)

    # Fetch the current balance in the account and log it
    record = "withdraw ${:0.2f} from account {} | current balance: ${:0.2f}"
    log.info(record.format(amount, account, balance(conn, account)))

This is similar but modifies the inputs to the various operations to decrease the amount of the account by a debit ledger record. We can run:

if __name__ == '__main__':
    conn = connect()
    createdb(conn)

    # Successful deposit
    deposit('alice', 1234, 1, 785.0)

    # Successful withdrawal
    withdraw('alice', 1234, 1, 230.0)

    # Unsuccessful deposit
    deposit('alice', 1234, 1, 489.0)

    # Successful deposit
    deposit('bob', 9999, 2, 220.23)

And we should see the following log records:

2017-12-06 20:01:00,086 withdraw $785.00 from account 1 | current balance: $1035.00
2017-12-06 20:01:00,094 withdraw error: could not validate user via PIN
2017-12-06 20:01:00,103 withdraw $230.00 from account 1 | current balance: $805.00
2017-12-06 20:01:00,118 deposit error: daily deposit limit has been exceeded!
2017-12-06 20:01:00,130 withdraw $220.23 from account 2 | current balance: $225.23

This should set a baseline for creating simple and easy to use transactions in Python. However, if you remember your databases class as an undergraduate, things get more interesting when two transactions are occurring at the same time. We’ll explore that from a single process by looking at multi-threaded database connections.

Threads

Let’s consider how to run two transactions at the same time from within the same application. The simplest way to do this is to use the threading library to execute transactions simultaneously. How do you achieve thread safety when accessing the database? Back to the docs:

Connection objects are thread-safe: many threads can access the same database either using separate sessions and creating a connection per thread or using the same connection and creating separate cursors. In DB API 2.0 parlance, Psycopg is level 2 thread safe.

This means that every thread must have its own conn object (which explore in the connection pool section). Any cursor created from the same connection object will be in the same transaction no matter the thread. We also want to consider how each transaction influences each other, and we’ll take a look at that first by exploring isolation levels and session state.

Session State

Let’s say that Alice and Charlie have a joint account, under Alice’s name. They both show up to ATMs at the same time, Alice tries to deposit $75 and then withdraw $25 and Charlie attempts to withdraw $300. We can simulate this with threads as follows:

import time
import random
import threading

def op1():
    time.sleep(random.random())
    withdraw('alice', 1234, 1, 300.0)

def op2():
    time.sleep(random.random())
    deposit('alice', 1234, 1, 75.0)
    withdraw('alice', 1234, 1, 25.0)


threads = [
    threading.Thread(target=op1),
    threading.Thread(target=op2),
]

for t in threads:
    t.start()

for t in threads:
    t.join()

Depending on the timing, one of two things can happen. Charlie can get rejected as not having enough money in his account, and the final state of the database can be $300 or all transaction can succeed with the final state of the database set to $0. There are three transactions happening, two withdraw transactions and a deposit. Each of these transactions runs in isolation, meaning that they see the database how they started and any changes that they make; so if Charlie’s withdraw and Alice’s deposit happen simultaneously, Charlie will be rejected since it doesn’t know about the deposit until it’s finished. No matter what, the database will be left in the same state.

However, for performance reasons, you may want to modify the isolation level for a particular transaction. Possible levels are as follows:

  1. READ UNCOMMITTED: lowest isolation level, transaction may read values that are not yet committed (and may never be committed).
  2. READ COMMITTED: write locks are maintained but read locks are released after select, meaning two different values can be read in different parts of the transaction.
  3. REPEATABLE READ: keep both read and write locks so multiple reads return same values but phantom reads can occur.
  4. SERIALIZABLE: the highest isolation level: read, write, and range locks are maintained until the end of the transaction.
  5. DEFAULT: set by server configuration not Python, usually READ COMMITTED.

Note that as the isolation level increases, the number of locks being maintained also increases, which severely impacts performance if there is lock contention or deadlocks. It is possible to set the isolation level on a per-transaction basis in order to improve performance of all transactions happening concurrently. To do this we must modify the session parameters on the connection, which modify the behavior of the transaction or statements that follow in that particular session. Additionally we can set the session to readonly, which does not allow writes to temporary tables (for performance and security) or to deferrable.

Deferrability is very interesting in a transaction, because it modifies how database constraints are checked. Non-deferrable transactions immediately check the constraint after a statement is executed. This means that UPDATE accounts SET balance=-5.45 will immediately raise an exception. Deferrable transactions however wait until the transaction is concluded before checking the constraints. This allows you to write multiple overlapping operations that may put the database into a correct state by the end of the transaction, but potentially not during the transaction (this also overlaps with the performance of various isolation levels).

In order to change the session, we’ll use a context manager as we did before to modify the session for the transaction, then reset the session back to the defaults:

@contextmanager
def session(conn, isolation_level=None, readonly=None, deferrable=None):
    try:
        conn.set_session(
            isolation_level=isolation_level,
            readonly=readonly,
            deferrable=deferrable
        )
        yield conn
    finally:
        # Reset the session to defaults
        conn.set_session(None, None, None, None)

We can then use with to conduct transactions with different isolation levels:

with transaction() as conn:
    with session(conn, isolation_level="READ COMMITTED") as conn:
        # Do transaction

NOTE: There cannot be an ongoing transaction when the session is set therefore it is more common for me to set the isolation level, readonly, and deferrable inside of the transaction decorator, rather than using two separate context managers as shown above. Frankly, it is also common to set these properties on a per-process basis rather than on a per-transaction basis, therefore the session is set in connect.

Connection Pools

Connections cannot be shared across threads. In the threading example above, if we remove the @transaction decorator and pass the same connection into both operations as follows:

conn = connect()

def op1():
    time.sleep(random.random())
    withdraw(conn, 'alice', 1234, 1, 300.0)

def op2():
    time.sleep(random.random())
    deposit(conn, 'alice', 1234, 1, 75.0)
    withdraw(conn, 'alice', 1234, 1, 25.0)

If the op1 withdraw fires first, the exception will cause all of the op2 statements to also fail, since its in the same transaction. This essentially means that both op1 and op2 are in the same transaction even though they are in different threads!

We’ve avoided this so far by creating a new connection every time a transaction runs. However, connecting to the database can be expensive and in high-transaction workloads we may want to simply keep the connection open, but ensure they are only used by one transaction at a time. The solution is to use connection pools. We can modify our connect function as follows:

from psycopg2.pool import ThreadedConnectionPool

def connect(env="DATABASE_URL", connections=2):
    """
    Connect to the database using an environment variable.
    """
    url = os.getenv(env)
    if not url:
        raise ValueError("no database url specified")

    minconns = connections
    maxconns = connections * 2
    return ThreadedConnectionPool(minconns, maxconns, url)

This creates a thread-safe connection pool that establishes at least 2 connections and will go up to a maximum of 4 connections on demand. In order to use the pool object in our transaction decorator, we will have to connect when the decorator is imported, creating a global pool object:

pool = connect()

@contextmanager
def transaction(name="transaction", **kwargs):
    # Get the session parameters from the kwargs
    options = {
        "isolation_level": kwargs.get("isolation_level", None),
        "readonly": kwargs.get("readonly", None),
        "deferrable": kwargs.get("deferrable", None),
    }

    try:
        conn = pool.getconn()
        conn.set_session(**options)
        yield conn
        conn.commit()
    except Exception as e:
        conn.rollback()
        log.error("{} error: {}".format(name, e))
    finally:
        conn.reset()
        pool.putconn(conn)

Using pool.getconn retrieves a connection from the pool (if one is available, blocking until one is ready), then when we’re done we can pool.putconn to release the connection object.

Conclusion

This has been a ton of notes on more direct usage of psycopg2. Sorry I couldn’t write a more conclusive conclusion but it’s late and this post is now close to 4k words. Time to go get dinner!

Notes

I used logging as the primary output to this application. The logging was set up as follows:

import logging

LOG_FORMAT = "%(asctime)s %(message)s"

logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
log = logging.getLogger('balance')

For the complete code, see this gist.

Lock Diagnostics in Go

By now it’s pretty clear that I’ve just had a bear of a time with locks and synchronization inside of multi-threaded environments with Go. Probably most gophers would simply tell me that I should share memory by communicating rather than to communication by sharing memory — and frankly I’m in that camp too. The issue is that:

  • Mutexes can be more expressive than channels
  • Channels are fairly heavyweight

So to be honest, there are situations where a mutex is a better choice than a channel. I believe that one of those situations is when dealing with replicated state machines … which is what I’ve been working on the past few months. The issue is that the state of the replica has to be consistent across a variety of events: timers and remote messages. The problem is that the timers and network traffic are all go routines, and there can be a lot of them running in the system at a time.

Of course you could simply create a channel and push event objects to it to serialize all events. The problem with that is that events generate other events that have to be ordered with respect to the parent event. For example, one event might require the generation of messages to be sent to remote replicas, which requires a per-remote state read that is variable. Said another way, the state can be read locked for all go routines operating at that time, but no write locks can be acquired. Hence the last post.

Things got complicated. Lock contention was a thing.

So I had to diagnose who was trying to acquire locks and when and why they were contending. For reference, the most common issues were:

  1. A global read lock was being released before all sub read locks were finished.
  2. A struct with an embedded RWMutex was then embedded by another object with only a Mutex but it still had RLock() methods as a result (or vice versa).
  3. The wrong lock was being called on embedded structs.

The primary lesson I learned was this: when embedding synchronized objects, only embed the mutex on the child object. Hopefully that rule of thumb lasts.

I learned these lessons using a handy little diagnostic tool that this snippet is about. Basically I wanted to track who was acquiring locks and who was waiting on locks. I could then print out a report when I thought something was contending (e.g. on an Interrupt signal) and figure things out.

First step, figure out the name of the calling method:

// Caller returns the name function that called the function which
// called the caller function.
func caller() string {
	pc, _, _, ok := runtime.Caller(2)
	details := runtime.FuncForPC(pc)
	if ok && details != nil {
		return details.Name()
	}
	return UnknownCaller
}

This handy little snippet uses the runtime package to detect the caller two steps above the caller() function in the stack. This allows you to call caller() inside of a function to get the name of the function that’s calling the function calling caller(). Confusing? Try this:

func outer() string {
    return inner()
}

func inner() string {
    return caller()
}

Calling outer() will return something like main.outer — the function that called the inner() function. Here is a runnable example.

With that in hand we can simply create a map[string]int64 and increment any calls by caller name before Lock() and decrement any calls by caller name after Unlock(). Here is the example:

But … that’s actually a little more complicated than I let on!

The problem is that we definitely have multiple go routines calling locks on the lockable struct. However, if we simply try to access the map in the MutexD, then we can have a panic for concurrent map reads and writes. So now, I use the share memory by communicating technique and pass signals via an internal channel, which is read by a go routine ranging over it.

How to use it? Well do something like this:

type StateMachine struct {
    MutexD
}

func (s *StateMachine) Alpha() {
    s.Lock()
    defer s.Unlock()
    time.Sleep(1*time.Second)
}

func (s *StateMachine) Bravo() {
    s.Lock()
    defer s.Unlock()
    time.Sleep(100*time.Millisecond)
}

func main() {
    m := new(StateMachine)
    go m.Alpha()
    time.Sleep(100*time.Millisecond)
    for i:=0; i < 2; i++ {
        go m.Bravo()
    }

    fmt.Println(m.MutexD.String())
}

You should see something like:

1 locks requested by main.(*StateMachine).Alpha
2 locks requested by main.(*StateMachine).Bravo

Obviously you can do the same thing for RWMutex objects, and it’s easy to swap them in and out of code by changing the package and adding or removing a “D”. My implementation is here: github.com/bbengfort/x/lock.

Lock Queuing in Go

In Go, you can use sync.Mutex and sync.RWMutex objects to create thread-safe data structures in memory as discussed in “Synchronizing Structs for Safe Concurrency in Go”. When using the sync.RWMutex in Go, there are two kinds of locks: read locks and write locks. The basic difference is that many read locks can be acquired at the same time, but only one write lock can be acquired at at time.

This means that if a thread attempts to acquire a read lock on an object that is already read locked, then it will not block and it will acquire its own read lock. If a thread attempts to acquire a read or a write lock on a write locked object, then it will block until it is unlocked (as will a write lock acquisition on a read locked object).

Granting a lock can be prioritized depending on different policies for accesses. Priorities balance the trade-off between concurrency and starvation as follows:

  1. Read-Preferring RW allows new read locks to be acquired as long as the lock is read-locked, forcing the write-lock acquirer to wait until there are no more read-locks. In high contention environments, this might lead to write-starvation.

  2. Write-Preferring RW prevents a read-lock acquisition if a writer is queued and waiting for the lock. This reduces concurrency, because new read locks have to wait for the write lock, but prevents starvation.

So which of these does Go implement? According to the documentation:

If a goroutine holds a RWMutex for reading and another goroutine might call Lock, no goroutine should expect to be able to acquire a read lock until the initial read lock is released. In particular, this prohibits recursive read locking. This is to ensure that the lock eventually becomes available; a blocked Lock call excludes new readers from acquiring the lock. — godoc

My initial read of this made me think that Go implements write-preferring mutexes. However, this was not the behavior that I observed.

Consider the following locker:

var delay time.Duration
var started time.Time

// Locker holds values that are threadsafe
type Locker struct {
	sync.RWMutex
	value  uint64    // the current value of the locker
	access time.Time // time of the last access
}

// Write to the value of the locker in a threadsafe fashion.
func (l *Locker) Write(value uint64) {
	l.Lock()
	defer l.Unlock()

	// Arbitrary amount of work
	time.Sleep(delay)

	l.value = value
	l.access = time.Now()
	l.log("written")
}

// Read the value of the locker in a threadsafe fasion.
func (l *Locker) Read() uint64 {
	l.RLock()
	defer l.RUnlock()

	// Arbirtray amount of work
	time.Sleep(delay / 2)
	l.access = time.Now()
	l.log("read")
	return l.value
}

// Log the access (not thread-safe)
func (l *Locker) log(method string) {
	after := l.access.Sub(started)
	log.Printf(
		"%d %s after %s\n", l.value, method, after,
	)
}

This locker holds a value and logs all accesses to it after the start time. If we run a few threads to read and write to it we can see concurrent reads in action:

func main() {
	delay = 1 * time.Second
	started = time.Now()
	group := new(errgroup.Group)
	locker := new(Locker)

	// Straight forward, write three reads and a write
	group.Go(func() error { locker.Write(42); return nil })
	group.Go(func() error { locker.Read(); return nil })
	group.Go(func() error { locker.Read(); return nil })
	group.Go(func() error { locker.Read(); return nil })
	group.Go(func() error { locker.Write(101); return nil })

	group.Wait()
}

The output is as follows

$ go run locker.go
2017/09/08 12:26:32 101 written after 1.005058824s
2017/09/08 12:26:33 101 read after 1.50770225s
2017/09/08 12:26:33 101 read after 1.507769109s
2017/09/08 12:26:33 101 read after 1.50773587s
2017/09/08 12:26:34 42 written after 2.511968581s

Note that the last go routine actually managed to acquire the lock first, after which the three readers managed to acquire the lock, then finally the last writer. Now if we interleave the read and write access, adding a sleep between the kick-off of each go routine to ensure that the preceding thread has time to acquire the lock:

func main() {
	delay = 1 * time.Second
	started = time.Now()
	group := new(errgroup.Group)
	locker := new(Locker)

	// Straight forward, write three reads and a write
	group.Go(func() error { locker.Write(42); return nil })
	time.Sleep(10 * time.Millisecond)
	group.Go(func() error { locker.Read(); return nil })
	time.Sleep(10 * time.Millisecond)
	group.Go(func() error { locker.Write(101); return nil })
	time.Sleep(10 * time.Millisecond)
	group.Go(func() error { locker.Read(); return nil })
	time.Sleep(10 * time.Millisecond)
	group.Go(func() error { locker.Write(3); return nil })
	time.Sleep(10 * time.Millisecond)
	group.Go(func() error { locker.Read(); return nil })
	time.Sleep(10 * time.Millisecond)
	group.Go(func() error { locker.Write(18); return nil })
	time.Sleep(10 * time.Millisecond)
	group.Go(func() error { locker.Read(); return nil })

	group.Wait()
}

We get the following output:

go run locker.go
2017/09/08 12:29:28 42 written after 1.000178155s
2017/09/08 12:29:28 42 read after 1.500703007s
2017/09/08 12:29:28 42 read after 1.500691088s
2017/09/08 12:29:28 42 read after 1.500756144s
2017/09/08 12:29:28 42 read after 1.500648159s
2017/09/08 12:29:28 42 read after 1.500762323s
2017/09/08 12:29:28 42 read after 1.500679533s
2017/09/08 12:29:28 42 read after 1.500795204s
2017/09/08 12:29:29 101 written after 2.500971593s
2017/09/08 12:29:30 3 written after 3.505325487s
2017/09/08 12:29:31 18 written after 4.50594131s

This suggests that the reads continue to acquire locks as long as the Locker is read locked, forcing the writes to happen at the end.

I found one Stack Overflow post: “Read preferring RW mutex lock in Golang” that seems to suggest that sync.RWMutex can implement both read and write preferred locking, but doesn’t really give an explanation about how external callers can implement it.

Finally consider the following:

func main() {
	delay = 1 * time.Second
	started = time.Now()
	group := new(errgroup.Group)
	locker := new(Locker)

	// Straight forward, write three reads and a write
	group.Go(func() error { locker.Write(42); return nil })
	group.Go(func() error { locker.Write(101); return nil })
	group.Go(func() error { locker.Write(3); return nil })
	group.Go(func() error { locker.Write(18); return nil })

	for i := 0; i < 22; i++ {
		group.Go(func() error {
			locker.Read()
			return nil
		})
		time.Sleep(delay / 4)
	}

	group.Wait()
}

Given the loop issuing 22 read locks that sleep only a quarter of the time of the write lock, we might expect that this code will issue 22 read locks then all the write locks will occur at the end (and if we put this in a forever loop, then the writes would never occur). However, the output of this is as follows:

2017/09/08 12:43:40 18 written after 1.004461829s
2017/09/08 12:43:40 18 read after 1.508343716s
2017/09/08 12:43:40 18 read after 1.50842899s
2017/09/08 12:43:40 18 read after 1.508362345s
2017/09/08 12:43:40 18 read after 1.508339659s
2017/09/08 12:43:40 18 read after 1.50852229s
2017/09/08 12:43:41 42 written after 2.513789339s
2017/09/08 12:43:42 42 read after 3.0163191s
2017/09/08 12:43:42 42 read after 3.016330534s
2017/09/08 12:43:42 42 read after 3.016355628s
2017/09/08 12:43:42 42 read after 3.016371381s
2017/09/08 12:43:42 42 read after 3.016316992s
2017/09/08 12:43:43 3 written after 4.017954589s
2017/09/08 12:43:43 3 read after 4.518495233s
2017/09/08 12:43:43 3 read after 4.518523255s
2017/09/08 12:43:43 3 read after 4.518537387s
2017/09/08 12:43:43 3 read after 4.518540397s
2017/09/08 12:43:43 3 read after 4.518543262s
2017/09/08 12:43:43 3 read after 4.51863128s
2017/09/08 12:43:44 101 written after 5.521872765s
2017/09/08 12:43:45 101 read after 6.023207828s
2017/09/08 12:43:45 101 read after 6.023225272s
2017/09/08 12:43:45 101 read after 6.023249529s
2017/09/08 12:43:45 101 read after 6.023190828s
2017/09/08 12:43:45 101 read after 6.023243032s
2017/09/08 12:43:45 101 read after 6.023190457s
2017/09/08 12:43:45 101 read after 6.04455716s
2017/09/08 12:43:45 101 read after 6.29457923s

What Go implements is actually something else: read locks can only be acquired so long as the original read lock is maintained (the word “initial” being critical in the documentation). As soon as the first read lock is released, then the queued write-lock gets priority. The first read lock lasts approximately 500ms; this means that there is enough time for between 4-5 other locks to acquire a read lock, as soon as the first read lock completes, the write is given priority.

Messaging Throughput gRPC vs. ZMQ

Building distributed systems in Go requires an RPC or message framework of some sort. In the systems I build I prefer to pass messages serialized with protocol buffers therefore a natural choice for me is grpc. The grpc library uses HTTP2 as a transport layer and provides a code generator based on the protocol buffer syntax making it very simple to use.

For more detailed control, the ZMQ library is an excellent, low latency socket framework. ZMQ provides several communication patterns from basic REQ/REP (request/reply) to PUB/SUB (publish/subscribe). ZMQ is used at a lower level though, so more infrastructure per app needs to be built.

This leads to the obvious question: which RPC framework is faster? Here are the results:

Echo Server Throughput

These results show the message throughput of three echo servers that respond to a simple message with a response including a sequence number. Each server is running on its own EC2 micro instance with 1GB of memory and 1 vCPU. Each client is running on on an EC2 nano instance with 0.5GB of memory and 1 vCPU and are constantly sending messages at the server. The throughput is the number of messages per second the server can handle.

The servers are as follows:

  1. rep: a server that implements a REQ/REP socket and simple handler.
  2. router: a server that implements a REQ/ROUTER socket along with a DEALER/REP socket for 16 workers, connected via a proxy.
  3. grpc: implements a gRPC service.

The runner and results can be found here.

Discussion

All the figures exhibit a standard shape for throughput - namely as more clients are added the throughput increases, but begins to tail off toward an asymptote. The asymptote represents the maximum number of messages a server can respond to without message latency. Generally speaking if a server can handle multiple clients at once, the throughput is higher.

The ZMQ REQ/ROUTER/PROXY/DEALER/REP server with 16 workers outperforms the gRPC server (it has a higher overall throughput). I hypothesize that this is because ZMQ does not have the overhead of HTTP and is in fact lighter weight code than gRPC since none of it is generated. It’s unclear if adding more workers would improve the throughput of the ZMQ router server.

The performance of the REQ/REP server is a mystery. It’s doing way better than the other two. This socket has very little overhead, so for fewer clients it should be performing better. However, this socket also blocks on a per-client basis. Both grpc and router are asynchronous and can handle multiple clients at a time suggesting that they should be much faster.

Online Distribution

This post started out as a discussion of a struct in Go that could keep track of online statistics without keeping an array of values. It ended up being a lesson on over-engineering for concurrency.

The spec of the routine was to build a data structure that could keep track of internal statistics of values over time in a space-saving fashion. The primary interface was a method, Update(sample float64), so that a new sample could be passed to the structure, updating internal parameters. At conclusion, the structure should be able to describe the mean, variance, and range of all values passed to the update method. I created two versions:

  1. A thread-safe version using mutexes, but blocking on Update()
  2. A thread-safe version using a channel and a go routine so that Update() was non-blocking.

I ran some benchmarking, and discovered that the blocking implementation of Update was actually far faster than the non-blocking version. Here are the numbers:

BenchmarkBlocking-8      	20000000            81.1 ns/op
BenchmarkNonBlocking-8   	10000000	       140 ns/op

Apparently, putting a float on a channel, even a buffered channel, incurs some overhead that is more expensive than simply incrementing and summing a few integers and floats. I will present both methods here, but note that the first method (blocking update) should be implemented in production.

You can find this code at github.com/bbengfort/x/stats if you would like to use it in your work.

Online Descriptive Statistics (Blocking)

To track statistics in an online fashion, you need to keep track of the various aggregates that are used to compute the final descriptives statistics of the distribution. For simple statistics such as the minimum, maximum, standard deviation, and mean you need to track the number of samples, the sum of samples, and the sum of the squares of all samples (along with the minimum and maximum value seen). Here is how you do that:

I use this data structure as a lightweight mechanism to keep track of online statistics for experimental results or latency. It gives a good overall view of incoming values at very little expense.

Non-blocking Update

In an attempt to improve the performance of this method, I envisioned a mechanism where I could simply dump values into a buffered channel then run an updater go routine to collect values and perform the online computation. The updater function can simply range over the channel, and the channel can be closed to stop the goroutine and finalize anything still on the channel. This is written as follows:

The lesson was that this is actually less performant, no matter how large the buffer is. I increased the buffer size to 10000001 to ensure that the sender could not block, but I still received 116 ns/op benchmarks. Generally, this style is what I use when the function being implemented is actually pretty heavy (e.g. writes to disk). In this case, the function was too lightweight to matter!

Rapid FS Walks with ErrGroup

I’ve been looking for a way to quickly scan a file system and gather information about the files in directories contained within. I had been doing this with multiprocessing in Python, but figured Go could speed up my performance by a lot. What I discovered when I went down this path was the sync.ErrGroup, an extension of the sync.WaitGroup that helps manage the complexity of multiple go routines but also includes error handling!

The end result of this exploration was a utility called urfs — which you can install on your system to take a uniform random sample of files in a directory or to compute the number of files and bytes per directory. This utility is also extensible to a large number of functionality that requires rapid walking of a file system like search or other utilities.

This post is therefore a bit of a walkthrough on using sync.ErrGroup for scanning a file system and applying arbitrary functions. First a couple of types:

type WalkFunc func(path string) (string, error)

type FSWalker struct {
	Workers    int             
	SkipHidden bool            
	SkipDirs   bool            
	Match      string          
	root       string          
	paths      chan string     
	nPaths     uint64          
	results    chan string     
	nResults   uint64        
	group      *errgroup.Group
	ctx        context.Context
	started    time.Time       
	duration   time.Duration   
}

The first type is a generic function that can be passed to the Walk method of the FSWalker. The FSWalker maintains state with a variety of channels, and of course the errgroup.Group object. The SkipHidden, SkipDirs, and Match properties allow us to filter path types being passed to Walk.

To initialize FSWalker:

func (fs *FSWalker) Init(ctx context.Context) {
	// Set up FSWalker defaults
	fs.Workers = DefaultWorkers
	fs.SkipHidden = true
	fs.SkipDirs = true
	fs.Match = "*"

    // Create the context for the errgroup
	if ctx == nil {
		// Create a new context
		ctx = context.Background()
		deadline, ok := fs.ctx.Deadline()
		if ok {
			ctx, _ = context.WithDeadline(ctx, deadline)
		}
	}

    // Create the err group
    fs.group, fs.ctx = errgroup.WithContext(ctx)

    // Create channels and instantiate other statistics variables
	fs.paths = make(chan string, DefaultBuffer)
	fs.results = make(chan string, DefaultBuffer)
	fs.nPaths = 0
	fs.nResults = 0
	fs.started = time.Time{}
	fs.duration = time.Duration(0)
}

Ok, so we’re doing a lot of work here, but things get paid off in the Walk function where we keep track of the number of paths we’ve seen at a root directory, passing them off to a WalkFunc using a variety of Go routines:

func (fs *FSWalker) Walk(path string, walkFn WalkFunc) error {
	// Compute the duration of the walk
	fs.started = time.Now()
	defer func() { fs.duration = time.Since(fs.started) }()

	// Set the root path for the walk
	fs.root = path

	// Launch the goroutine that populates the paths
	fs.group.Go(func() error {
        // Ensure that the channel is closed when all paths loaded
    	defer close(fs.paths)

        // Apply the path filter to the filepath.Walk function
    	return filepath.Walk(fs.root, fs.filterPaths)
    })

	// Create the worker function and allocate pool
	worker := fs.worker(walkFn)
	for w := 0; w < fs.Workers; w++ {
		fs.group.Go(worker)
	}

	// Wait for the workers to complete, then close the results channel
	go func() {
		fs.group.Wait()
		close(fs.results)
	}()

	// Start gathering the results
	for _ = range fs.results {
		fs.nResults++
	}

	return fs.group.Wait()
}

So this is a lot of code, let’s step through it. The first thing we do is set the started time to now, and defer a function to compute the duration as the difference between the time at the end of the function and the start function. We also set the root value. We then launch a go routine in the ErrGroup by using fs.group.Go(func) — this function must have the signature func() error, so we use an anonymous function to kick off the filepath.Walk, which starts walking the directory structure, adding paths that match the filter criteria to a buffered channel called fs.paths, more on this later. This channel must be closed on complete so that our worker go routines complete, more on that later.

Next we create a worker function using our worker method and walk function. The workers read paths off the fs.paths channel, and apply the walkFn to each path individually. Note that we use a pool-like structure here, limiting the number of workers to 5000 — this is so we don’t get a “too many files open” error when we exhaust the number of file descriptors since Go has unlimited go routines. The worker definitions is here:

func (fs *FSWalker) worker(walkFn WalkFunc) func() error {
	return func() error {
		// Apply the function all paths in the channel
		for path := range fs.paths {
			// avoid race condition
			p := path

			// apply the walk function to the path and return errors
			r, err := walkFn(p)
			if err != nil {
				return err
			}

			// store the result and check the context
			if r != "" {

				select {
				case fs.results <- r:
				case <-fs.ctx.Done():
					return fs.ctx.Err()
				}
			}

		}
		return nil
	}
}

As you can see, the worker function just creates a closure with the signature of our ErrGroup function, so that we can pass it to the wait group. All the worker function does is range over the paths channel, applying the path to the walkFn.

Finally, we kick off another go routine that waits until all the workers have stopped, and when it does, we close our results channel. We do this so that we can start gathering results, immediately; we don’t have to wait. We can do this by simply ranging over the results channel and adding the number of results. A final wait at the end means that we can wait for all go routines to complete.

Lastly the filter function. We want to ignore files and directories that are hidden, e.g. start with a “.” or a “~” on Unix systems. We also want to be able to pass a glob like matcher, e.g. "*.txt" to only match text files. The filter function is here:

// Internal filter paths function that is passed to filepath.Walk
func (fs *FSWalker) filterPaths(path string, info os.FileInfo, err error) error {
	// Propagate any errors
	if err != nil {
		return err
	}

	// Check to ensure that no mode bits are set
	if !info.Mode().IsRegular() {
		return nil
	}

	// Get the name of the file without the complete path
	name := info.Name()

	// Skip hidden files or directories if required.
	if fs.SkipHidden {
		if strings.HasPrefix(name, ".") || strings.HasPrefix(name, "~") {
			return nil
		}
	}

	// Skip directories if required
	if fs.SkipDirs {
		if info.IsDir() {
			return nil
		}
	}

	// Check to see if the pattern matches the file
	match, err := filepath.Match(fs.Match, name)
	if err != nil {
		return err
	} else if !match {
		return nil
	}

	// Increment the total number of paths we've seen.
	atomic.AddUint64(&fs.nPaths, 1)

	select {
	case fs.paths <- path:
	case <-fs.ctx.Done():
		return fs.ctx.Err()
	}

	return nil
}

And that’s it, with this simple framework, you can apply an arbitrary walkFn to all paths in a directory, matching a specific criteria. The big win here is to manage all of the go routines using the ErrGroup and a context.Context object.

The following post: Run strikingly fast parallel file searches in Go with sync.ErrGroup by Brian Ketelsen was the primary inspiration for the use of sync.ErrGroup.

Buffered Write Performance

This is just a quick note on the performance of writing to a file on disk using Go, and reveals a question about a common programming paradigm that I am now suspicious of. I discovered that when I wrapped the open file object with a bufio.Writer that the performance of my writes to disk significantly increased. Ok, so this isn’t about simple file writing to disk, this is about a complex writer that does some seeking in the file writing to different positions and maintains the overall state of what’s on disk in memory, however the question remains:

Why do we buffer our writes to a file?

A couple of answers come to mind: safety, the buffer ensures that writes to the underlying writer are not flushed when an error occurs; helpers, there may be some methods in the buffer struct not available to a native writer; concurrency, the buffer can be appended to concurrently with another part of the buffer being flushed.

However, we determined that in performance critical applications (file systems, databases) the buffer abstraction adds an unacceptable performance overhead. Here are the results.

Results

First, we’re not doing a simple write - we’re appending to a write-ahead log that has fixed length metadata at the top of the file. This means that a single operation to append data to the log consists of the following steps:

  1. Marshal data to bytes
  2. Write data to end of the log file (possibly sync the file)
  3. Seek to the top of the file
  4. Marshall and write fixed length meta data header
  5. Seek to the bottom of the file
  6. Sync the file to disk

So there is a bit more work here than simply throwing data at disk. We can see in the following graph that the performance of the machine (CPU, Memory, and Disk) plays a huge role in determining the performance of these operations in terms of the number of these writes the machine is able to do per second:

Write Throughput (ops/sec)

In the above graph, Hyperion and Lagoon are Dell Optiplex servers and Antigua, Curacao, and Nevis are Intel NUCs. They all have different processors and SSDs, but all have 16GB memory. For throughput, bigger is better (you can do more operations per second). As you can see on all of the servers, there is about a 1.6x increase in throughput using unbuffered writes to the file over buffered writes to the file.

We can inspect the distribution of the latency of each individual operation as follows (with latency, smaller is better — you’re doing operations faster):

Operation Latency Distribution

The boxplot shows the distribution of latency such that the box is between the 25th and 75th percentile (with a bisecting line at the median) - the lines are from the 5th to the 95th percentile, and anything outside the lines are considered outliers and are visualized as diamonds.

We can see the shift not just in the mean, but also the median; a 1.6 increase in speed (decrease in latency) from buffered to unbuffered writes. More importantly, we can see that unbuffered writes are more consistent; e.g. they have a tighter distribution and less variable operational latency. I suspect this means that while both types of writes are bound by disk accesses from other processes, buffered writes are also bound by CPU whereas unbuffered writes are less so.

Method

The idea here is that we are going to open a file and append data to it, tracking what we’re doing with a fixed length metadata header at the beginning of the file. Creating a struct to wrap the file and open, sync, and close it is pretty straight forward:

type Log struct {
    path string
    file *os.File
}

func (l *Log) Open(path string) (err error) {
    l.path = path
    l.file, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0644)
    if err != nil {
        return err
    }
}

func (l *Log) Close() error {
    err := l.file.Close()
    l.file = nil
    return err
}

func (l *Log) Sync() error {
    return l.file.Sync()
}

Now let’s say that we have an entry that knows how to write itself to an io.Writer interface as follows:

type Entry struct {
    Version uint64    `json:"version"`
    Key     string    `json:"key"`
    Value   []byte    `json:"value"`
    Created time.Time `json:"created"`
}

func (e *Entry) Dump(w io.Writer) (int64, error) {
    // Encode entry as JSON data (base64 enocded bytes value)
    data, err := json.Marshal(e)
    if err != nil {
        return -1, err
    }

    // Add a newline to the data for json lines format
    data = append(data, byte('\n'))

    // Write the data to the writer and return.
    return w.Write(data)
}

So the question is, if we have a list of entries we want to append to the log, how do we pass the io.Writer to the Entry.Dump method in order to write them one at a time?

The first method is the standard method, buffered, using bufio.Writer:

func (l *Log) Append(entries ...*Entry) (size int64, err error) {
    // Crate the buffer and define the bytes
    bytes := 0
    buffer := bufio.NewWriter(l.file)

    // Write each entry keeping track of the amount of data written
    for _, entry := range entries {
        if bytes, err =  entry.Write(buffer); err != nil {
            return -1, err
        } else {
            size += bytes
        }

    }

    // Flush the buffer
    if err = buffer.Flush(); err != nil {
        return -1, err
    }

    // Sync the underlying file
    if err = l.Sync(); err != nil {
        return -1, err
    }

    return size, nil
}

As you can see, even though we’re getting a buffered write to disk, we’re not actually leveraging any of the benefits of the buffered write. By eliminating the middleman with an unbuffered approach:

func (l *Log) Append(entries ...*Entry) (size int64, err error) {
    // Write each entry keeping track of the amount of data written
    for _, entry := range entries {
        if bytes, err :=  entry.Write(buffer); err != nil {
            return -1, err
        } else {
            size += bytes
        }

    }

    // Sync the underlying file
    if err = l.Sync(); err != nil {
        return -1, err
    }

    return size, nil
}

We get the performance benefit as shown above. Now, I’m not sure if this is obvious or not; but I do know that it’s commonly taught to wrap the file object with the buffer; the unbuffered approach may be simpler and faster but it may also be less safe, it depends on your use case.

Event Dispatcher in Go

The event dispatcher pattern is extremely common in software design, particularly in languages like JavaScript that are primarily used for user interface work. The dispatcher is an object (usually a mixin to other objects) that can register callback functions for particular events. Then when a dispatch method is called with an event, the dispatcher calls each callback function in order of their registration and passes them a copy of the event. In fact, I’ve already written a version of this pattern in Python: Implementing Observers with Events In this snippet, I’m presenting a version in Go that has been incredibly stable and useful in my code.

There are three types in the snippet below:

  • EventType is a uint16 that represents the type of event that occurs, several constants in the code declare event types along with a string method for human readability. Typing constants this way improves performance in the dispatcher environment.
  • Callback defines the signature of a function that can be registered.
  • Dispatcher is the core of the code and wraps a source — that is the actual object that is doing the dispatching.
  • Event is an interface for event types that has a type, source, and value.

For example, consider if you want to watch a directory for new files being created, you could do something like this:


type DirectoryWatcher struct {
    Dispatcher
    path string // path to directory on disk
}

func (w *DirectoryWatcher) Init(path string) {
    w.path = path
    w.Dispatcher.Init(w)
}

// Watch the given directory and dispatch new file events
func (w *DirectoryWatcher) Watch() error {
    for {
        files, _ := ioutil.ReadDir(w.path)
        for _, file := range files {
            if w.Unseen(file) {
                w.Dispatch(NewFile, file)
            }
        }
        time.Sleep(100 * time.Millisecond)
    }
}

This initializes the DirectoryWatcher dispatcher with the source as the watcher (so you can refer to exactly which directory was being watched). Then as the watcher looks at the directory for new data every 100 milliseconds, if it sees any files that were Unseen() then it dispatches the event.

The dispatcher code is as follows:

So this works very well but there are a copule of key points:

  • When dispatching the event, a single error terminates all event handling. It might be better to create a specific error type that terminates event handling (e.g. do not propagate) and then collect all other errors into a slice and return them from the dispatcher.
  • The event can technically be modified by callback functions since it’s a pointer. It might be better to pass by value to guarantee that all callbacks see the original event.
  • Callback handling is in order of registration, which gets to point number one about canceling event propagation. An alternative is to do all the callbacks concurrently using Go routines; which is something I want to investigate further.

Lazy Pirate Client

In the last post I discussed a simple REQ/REP pattern for ZMQ. However, by itself REQ/REP is pretty fragile. First, every REQ requires a REP and a server can only handle one request at a time. Moreover, if the server fails in the middle of a reply, then everything is hung. We need more reliable REQ/REP, which is actually the subject of an entire chapter in the ZMQ book.

For my purposes, I want to ensure that repliers (servers) can fail without taking out the client. The server can simply sock.Send(zmq.DONTWAIT) to deal with clients that dropout before the communication is complete. Server failure is a bit more difficult to deal with, however. Client side reliability is based on timeouts and retries, dealing with failed messages. ZMQ calls this the Lazy Pirate Pattern.

This is a pretty big chunk of code, but it creates a Client object that wraps a socket and performs lazy pirate sends. The primary code is in the Reset() and Send() methods. The Reset() method sets the linger to zero in order to close the connection immediately without errors; it then closes the connection and reconnects thereby resetting the state to be able to send messages again. This is “brute force but effective and reliable”.

The Send() method fires off a message then uses a zmq.Poller with a timeout to keep checking if a message has been received in that time limit. If it was successful, then great! Otherwise we decrement our retries and try again. If we’re out of retries there is nothing to do but return an error. The code is here:

This code is fairly lengthy, but as it turns out, most of the content for both clients and servers on either side of REQ/REP have similar wrapper code for context, socket, and connection/bind wrapping. So far it’s been very reliable in my code to allow servers to drop out and fail without blocking clients or other nodes in the network.