# Syntax Parsing with CoreNLP and NLTK

Syntactic parsing is a technique by which segmented, tokenized, and part-of-speech tagged text is assigned a structure that reveals the relationships between tokens governed by syntax rules, e.g. by grammars. Consider the sentence:

The factory employs 12.8 percent of Bradford County.

A syntax parse produces a tree that might help us understand that the subject of the sentence is “the factory”, the predicate is “employs”, and the target is “12.8 percent”, which in turn is modified by “Bradford County”. Syntax parses are often a first step toward deep information extraction or semantic understanding of text. Note however, that syntax parsing methods suffer from structural ambiguity, that is the possibility that there exists more than one correct parse for a given sentence. Attempting to select the most likely parse for a sentence is incredibly difficult.

The best general syntax parser that exists for English, Arabic, Chinese, French, German, and Spanish is currently the blackbox parser found in Stanford’s CoreNLP library. This parser is a Java library, however, and requires Java 1.8 to be installed. Luckily it also comes with a server that can be run and accessed from Python using NLTK 3.2.3 or later. Once you have downloaded the JAR files from the CoreNLP download page and installed Java 1.8 as well as pip installed nltk, you can run the server as follows:

from nltk.parse.corenlp import CoreNLPServer

# The server needs to know the location of the following files:
#   - stanford-corenlp-X.X.X.jar
#   - stanford-corenlp-X.X.X-models.jar
STANFORD = os.path.join("models", "stanford-corenlp-full-2018-02-27")

# Create the server
server = CoreNLPServer(
os.path.join(STANFORD, "stanford-corenlp-3.9.1.jar"),
os.path.join(STANFORD, "stanford-corenlp-3.9.1-models.jar"),
)

# Start the server in the background
server.start()


The server needs to know the location of the JAR files you downloaded, either by adding them to your Java $CLASSPATH or like me, storing them in a models directory that you can access from your project. When you start the server, it runs in the background, ready for parsing. To get constituency parses from the server, instantiate a CoreNLPParser and parse raw text as follows: from nltk.parse.corenlpnltk.pa import CoreNLPParser parser = CoreNLPParser() parse = next(parser.raw_parse("I put the book in the box on the table."))  If you’re in a Jupyter notebook, the tree will be drawn as above. Note that the CoreNLPParser can take a URL to the CoreNLP server, so if you’re deploying this in production, you can run the server in a docker container, etc. and access it for multiple parses. The raw_parse method expects a single sentence as a string; you can also use the parse method to pass in tokenized and tagged text using other NLTK methods. Parses are also handy for identifying questions: next(parser.raw_parse("What is the longest river in the world?"))  Note the SBARQ representing the question; this data can be used to create a classifier that can detect what type of question is being asked, which can then in turn be used to transform the question into a database query! I should also point out why we’re using next(); the parser actually returns a generator of parses, starting with the most likely. By using next, we’re selecting only the first, most likely parse. Constituency parses are deep and contain a lot of information, but often dependency parses are more useful for text analytics and information extraction. To get a Stanford dependency parse with Python: from nltk.parse.corenlp import CoreNLPDependencyParser parser = CoreNLPDependencyParser() parse = next(parser.raw_parse("I put the book in the box on the table."))  Once you’re done parsing, don’t forget to stop the server! # Stop the CoreNLP server server.stop()  To ensure that the server is stopped even when an exception occurs, you can also use the CoreNLPServer context manager as follows: jars = ( "stanford-corenlp-3.9.1.jar", "stanford-corenlp-3.9.1-models.jar" ) with CoreNLPServer(*jars): parser = CoreNLPParser() text = "The runner scored from second on a base hit" parse = next(parser.parse_text(text)) parse.draw()  Note that the parse_text function in the above code allows a string to be passed that might contain multiple sentences and returns a parse for each sentence it segments. Additionally the tokenize and tag methods can be used on the parser to get the Stanford part of speech tags from the text. Unfortunately there isn’t much documentation on this, but for more check out the NLTK CoreNLP API documentation. # Continuing Outer Loops with for/else When you have an outer and an inner loop, how do you continue the outer loop from a condition inside the inner loop? Consider the following code: for i in range(10): for j in range(9): if i <= j: # break out of inner loop # continue outer loop print(i,j) # don't print unless inner loop completes, # e.g. outer loop is not continued print("inner complete!")  Here, we want to print for all i[0,10) all numbers j[0,9) that are less than or equal to i and we want to print complete once we’ve found an entire list of j that meets the criteria. While this seems like a fairly contrived example, I’ve actually encountered this exact situation in several places in code this week, and I’ll provide a real example in a bit. My first instinct simply uses a function to use return to do a “hard break” out of the loop. This allows us to short-circuit functionality by exiting the function, but doesn’t actually provide continue functionality, which is the goal in the above example. The technique does work, however, and in multi-loop situations is probably the best bet. def inner(i): for j in range(9): if i <= j: # Note if this was break, the print statement would execute return print(i,j) print("inner complete") for i in range(10): inner(i)  Much neater, however is using for/else. The else block fires iff the for loop it is connected with completes. This was very weird to me at first, I thought else should trigger if break. Think of it this way though: You’re searching through a list of things, for item in collection and you plan to break when you’ve found the item you’re looking for, else you do something if you exhaust the collection and didn’t find what you were looking for. Therefore we can code our loop as follows: for i in range(10): for j in range(9): if i <= j: break print(i,j) else: # Outer loop is continued continue print("inner complete!")  This is a little strange, because it is probably more appropriate to put our print in the else block, but this was the spec, continue the outer loop if the inner loop gets broken. Here’s a better example with date parsing: # Try to parse a timestamp with a bunch of formats for fmt in (JSON, PG, ISO, RFC, HUMAN): try: ts = datetime.strptime(ts, fmt) break except ValueError: continue else: # Could not parse with any of the formats required raise ValueError("could not parse timestamp")  Is this better or worse than the function version of this? def parse_timestamp(ts): for fmt in (JSON, PG, ISO, RFC, HUMAN): try: return datetime.strptime(ts, fmt) except ValueError: continue raise ValueError("could not parse timestamp") ts = parse_timestamp(ts)  Let’s go to the benchmarks: So basically, there is no meaningful difference, but depending on the context of implementation, using for/else may be a bit more meaningful or easy to test than having to implement another function. Benchmark code can be found here. # Predicted Class Balance This is a follow on to the prediction distribution visualization presented in the last post. This visualization shows a bar chart with the number of predicted and number of actual values for each class, e.g. a class balance chart with predicted balance as well. This visualization actually came before the prior visualization, but I was more excited about that one because it showed where error was occurring similar to a classification report or confusion matrix. I’ve recently been using this chart for initial spot checking more however, since it gives me a general feel for how balanced both the class and the classifier is with respect to each other. It has also helped diagnose what is being displayed in the heat map chart of the other post. The code follows, again prototype code. However in this code I made an effort to use more scikit-learn tooling in the visualization, including their validation and checking code. Hopefully this will help us eliminate problems with various types of input. This code also shows a cross-validation strategy for getting y_true and y_pred from a classifier. I think this type of code will become a cornerstone in Yellowbrick, so please let us know in the YB issues if you see anything fishy with this methodology! # Class Balance Prediction Distribution In this quick snippet I present an alternative to the confusion matrix or classification report visualizations in order to judge the efficacy of multi-class classifiers: The base of the visualization is a class balance chart, the x-axis is the actual (or true class) and the height of the bar chart is the number of instances that match that class in the dataset. The difference here is that each bar is a stacked chart representing the percentage of the predicted class given the actual value. If the predicted color matches the actual color then the classifier was correct, otherwise it was wrong. The code to do this follows. This is simple prototype code that we’ll be including in Yellowbrick soon and may not work in all cases; nor does it include features for doing cross-validation and putting together the two vectors required for visualization. Other interesting things that can be done with this: make the x axis the predicted class instead of the actual class, if classes are ordinal use a heatmap to show predictions over or under the specified class, or find a better way to show “correct” values with out discrete color values. More investigation on this and an implementation in Yellowbrick soon! Update: Thanks @lwgray for putting together the pull request for this! # Synchronization in Write Throughput This post serves as a reminder of how to perform benchmarks when accounting for synchronized writing in Go. The normal benchmarking process involves running a command a large number of times and determining the average amount of time that operation took. When threads come into play, we consider throughput - that is the number of operations that can be conducted per second. However, in order to successfully measure this without duplicating time, the throughput must be measured from the server’s perspective. Let w be the amount of time a single operation takes and n be the number of operations per thread. Given t threads, the cost for each operation from the perspective of the client thread will be t*w because the server is synchronizing writes as shown in the figure above, e.g. the thread has to wait for t-1 other writes to complete before conducting it’s write. This means that each thread returns a latency of n*t*w from it’s perspective. If this is aggregated, the total time is computed n*w*t^2, even though the real time that has passed is actually n*t*w as shown in the single threaded case. For normal server-client throughput we measure the start time of the first access on the server end and the end time of the last access and the duration as the difference between these two timestamps. We then compute the number of operations conducted in that time to measure throughput. This only works if the server is pegged, e.g. it is not waiting for requests. However, for synchronization we can’t measure at the server since we’re trying to determine the cost of locks and scheduling. We’ve used an external benchmarking procedure that may underestimate throughput but allows us to measure from the client-side rather than at the server. I’ve put up a simple library called syncwrite to benchmark this code. Here is the use case: consider a system that has multiple goroutines, each of which want to append to a log file on disk. The append-only log determines the sequence or ordering of events in the system, so appends must be atomic and reads to an index in the log, idempotent. The interface of the Log is as follows: type Log interface { Open(path string) error Append(value []byte) error Get(index int) (*Entry, error) Close() error }  The log embeds sync.RWMutex to ensure no race conditions occur and that our read/write invariants are met. The Open, Append, and Close methods are all protected by a write lock, and the Get method is protected with a read lock. The structs that implement Log all deal with the disk and in-memory storage in different ways: • InMemoryLog: appends to an in-memory slice and does not write to disk. • FileLog: on open, reads entries from file into in-memory slice and reads from it, writes append to both the slice and the file. • LevelDBLog: both writes and reads go to a LevelDB database. In the future (probably in the next post), I will also implement an AsyncLog that wraps a Log and causes write operations to be asynchronous by storing them on a channel and allowing the goroutine to immediately return. ## Benchmarks The benchmarks are associated with an action, which can be one or more operations to disk. In this benchmark we simply evaluate an action that calls the Write method of a log with "foo" as the value. Per-action benchmarks are computed using go-bench, which computes the average time it takes to run the action once: BenchmarkInMemoryLog-8 10000000 210 ns/op BenchmarkFileLog-8 200000 7456 ns/op BenchmarkLevelDBLog-8 100000 12379 ns/op  Writing to the in-memory log is by far the fastest, while writing to the LevelDB log is by far the slowest operation. We expect throughput, that is the number of operations per second, to be equivalent with these per-action benchmarks in a single thread. The theoretical throughput is simply 1/w*1e-9 (converting nanoseconds to seconds). The question is how throughput changes with more threads and in a real workload. Throughput benchmarks are conducted by running t threads, each of which run n actions and returns the amount of time it takes all threads to run n*t actions. As the t increases, the workload stays static, e.g. n becomes smaller to keep n*t constant. The throughput is the number of operations divided by the duration in seconds. Note that the y-axis is on a logarithm scale, and because of the magnitude of in-memory writes, the chart is a bit difficult to read. Therefore the next chart shows the percentage of the theoretical throughput (as computed by w) the real system achieves: ## Observations Looking at the percent theoretical chart, I believe the reason that both In-Memory and LevelDB achieve >100% for up to 4 threads is because the benchmark that computes w has such a high variability; though this does not explain why the File log has dramatically lower throughput. Because the benchmarks were run on a 4 core machine, up to 4 threads for In-Memory and LevelDB can operate without a noticeable decrease in throughput since there is no scheduling issue. However, at 8 threads and above, there is a noticeable drop in the percent of theoretical throughput, probably due to synchronization or scheduling issues. This same drop does not occur in the File log because it was already below it’s theoretical maximum throughput. # Thread and Non-Thread Safe Go Set I came across this now archived project that implements a set data structure in Go and was intrigued by the implementation of both thread-safe and non-thread-safe implementations of the same data structure. Recently I’ve been attempting to get rid of locks in my code in favor of one master data structure that does all of the synchronization, having multiple options for thread safety is useful. Previously I did this by having a lower-case method name (a private method) that was non-thread-safe and an upper-case method name (public) that did implement thread-safety. However, as I’ve started to reorganize my packages this no longer works. The way that the Set implementation works is that it defines a base data structure that is private, set, as well as an interface (set.Interface) that describes the methods a set is expected to have. The set methods are all private, then two data structures are composed that embed the setSet and SetNonTS — the thread and non-thread safe versions of set. In this snippet I’ll just show a bit of boiler plate code that does this for reference, see the full set implementation for more detail. In the implementation above, the set object provides four internal methods: init() creates the internal map data structure, add updates the map with one or more items, remove deletes one or more items from the map, and contains does a simple check to see if the item is in the internal map. All of these methods are private to the set package. The SetNonTs and Set methods embed the set object and add some additional functionality. Both implement a constructor, NewNonTS and New respectively, which call the internal init functions. Both also implement Add and Remove, which silently exit if no items are added, the difference being that Set write locks the data structure after performing that check. Contains is also implemented, which the Set data structure read locks before checking. The only small problem with this implementation is that there is a little bit of code duplication (e.g. the checks for non items in the Add and Remove methods). However, I’ve noticed in my code that often there are tasks that are done in either thread-safe or non-thread safe versions but not both (like marking a flag or sending data to a channel). Because of this, it’s often better to keep those methods separate rather then relying solely on embedding. # Git-Style File Editing in CLI A recent application I was working on required the management of several configuration and list files that needed to be validated. Rather than have the user find and edit these files directly, I wanted to create an editing workflow similar to crontab -e or git commit — the user would call the application, which would redirect to a text editor like vim, then when editing was complete, the application would take over again. This happened to be a Go app, so the following code is in Go, but it would work with any programming language. The workflow is as follows: 1. Find an editor executable 2. Copy the original to a temporary file 3. Exec the editor on the temporary file 4. Wait for the editor to be done 5. Validate the temporary file 6. Copy the temporary file to the original location This worked surprisingly well especially for things like YAML files which are structured enough to be validated easily, but human readable enough to edit. First up, finding an editor executable. I used a three part strategy; first the user could specify the path to an editor in the configuration file (like git), second, the user could set the $EDITOR environment variable, and third, I look for common editors. Here’s the code:

var editors = [4]string{"vim", "emacs", "nano"}

func findEditor() (string, error) {

config, err := LoadConfig()
if err != nil {
return "", err
}

if config.Editor != "" {
return config.Editor, nil
}

if editor := os.Getenv("EDITOR"); editor != "" {
return editor, nil
}

for _, name := range editors {
path, err := exec.LookPath(name)
if err == nil {
return path, nil
}
}

return "", errors.New("no editor found")
}


The crucial part of this is exec.LookPath which searches the $PATH for editor and returns the full path to exec it. Next up is copying the file: func copyFile(src, dst string) error { in, err := os.Open(src) if err != nil { return err } defer in.Close() out, err := os.Create(dst) if err != nil { return err } defer out.Close() if _, err = io.Copy(out, in); err != nil { return err } return nil }  Finally the full editor workflow: func EditFile(path string) error { // Find the editor to use editor, err := findEditor() if err != nil { return err } // Create the temporary directory and ensure we clean up when done. tmpDir := os.TempDir() defer os.RemoveAll(tmpDir) // Get the temporary file location tmpFile := filepath.Join(tmpDir, filepath.Base(path)) // Copy the original file to the tmpFile if err = copyFile(path, tmpFile); err != nil { return err } // Create the editor command cmd := exec.Command(editor, tmpFile) cmd.Stdin = os.Stdin cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr // Start the editor command and wait for it to finish if err = cmd.Start(); err != nil { return err } if err = cmd.Wait(); err != nil { return err } // Copy the tmp file back to the original file return copyFile(tmpFile, path) }  This workflow assumes that the file being edited already exists, but of course you could modify it any number of ways. For example, you could use a template to populate the temporary file (similar to what git does for a commit message), or you could add more validation around input and output. # Transaction Handling with Psycopg2 Databases are essential to most applications, however most database interaction is often overlooked by Python developers who use higher level libraries like Django or SQLAlchemy. We use and love PostgreSQL with Psycopg2, but I recently realized that I didn’t have a good grasp on how exactly psycopg2 implemented core database concepts: particularly transaction isolation and thread safety. Here’s what the documentation says regarding transactions: Transactions are handled by the connection class. By default, the first time a command is sent to the database (using one of the cursors created by the connection), a new transaction is created. The following database commands will be executed in the context of the same transaction – not only the commands issued by the first cursor, but the ones issued by all the cursors created by the same connection. Should any command fail, the transaction will be aborted and no further command will be executed until a call to the rollback() method. Transactions are therefore connection specific. When you create a connection, you can create multiple cursors, the transaction begins when the first cursor issues an execute – all all commands executed by all cursors after that are part of the same transaction until commit or rollback. After any of these methods are called, the next transaction is started on the next execute call. This brings up a very important point: By default even a simple SELECT will start a transaction: in long-running programs, if no further action is taken, the session will remain “idle in transaction”, an undesirable condition for several reasons (locks are held by the session, tables bloat…). For long lived scripts, either make sure to terminate a transaction as soon as possible or use an autocommit connection. This seems to indicate that when working directly with psycopg2, understanding transactions is essential to writing stable scripts. This post therefore details my notes and techniques for working more effectively with PostgreSQL from Python. ## Database Preliminaries In order to demonstrate the code in this blog post, we need a database. The classic database example taught to undergraduates is that of a bank account, so we’ll continue with that theme here! Sorry if this part is tedious, feel free to skip ahead. In a file, schema.sql, I defined the following schema as DDL (data definition language): DROP TABLE IF EXISTS users CASCADE; CREATE TABLE users ( id SERIAL PRIMARY KEY, username VARCHAR(255) UNIQUE, pin SMALLINT NOT NULL ); DROP TYPE IF EXISTS account_type CASCADE; CREATE TYPE account_type AS ENUM ('checking', 'savings'); DROP TABLE IF EXISTS accounts CASCADE; CREATE TABLE accounts ( id SERIAL PRIMARY KEY, type account_type, owner_id INTEGER NOT NULL, balance NUMERIC DEFAULT 0.0, CONSTRAINT positive_balance CHECK (balance >= 0), FOREIGN KEY (owner_id) REFERENCES users (id) ); DROP TYPE IF EXISTS ledger_type CASCADE; CREATE TYPE ledger_type AS ENUM ('credit', 'debit'); DROP TABLE IF EXISTS ledger; CREATE TABLE ledger ( id SERIAL PRIMARY KEY, account_id INTEGER NOT NULL, date DATE NOT NULL DEFAULT CURRENT_DATE, type ledger_type NOT NULL, amount NUMERIC NOT NULL, FOREIGN KEY (account_id) REFERENCES accounts (id) );  This creates a simple database with two tables. The owners table contains a PIN code for verification. Owners can have one or more accounts, and accounts have the constraint that the balance can never fall below$0.00. We can also seed the database with some initial data:

INSERT INTO users (id, username, pin) VALUES
(1, 'alice', 1234),
(2, 'bob', 9999);

INSERT INTO accounts (type, owner_id, balance) VALUES
('checking', 1, 250.0),
('savings', 1, 5.00),
('checking', 2, 100.0),
('savings', 2, 2342.13);


Moving to Python code we can add some template code to allow us to connect to the database and execute the SQL in our file above:

import os
import psycopg2 as pg

def connect(env="DATABASE_URL"):
url = os.getenv(env)
if not url:
raise ValueError("no database url specified")
return pg.connect(url)

def createdb(conn, schema="schema.sql"):
with open(schema, 'r') as f:
sql = f.read()

try:
with conn.cursor() as curs:
curs.execute(sql)
conn.commit()
except Exception as e:
conn.rollback()
raise e


The connect function looks for the database connection string in the environment variable $DATABASE_URL. Because database configuration code can contain passwords and network information it is always best to store it in the environment or in a local, secure configuration file that can only be accessed by the process and not checked in with code. The connection string should look something like: postgresql://user@localhost:5432/dbname. The createdb function reads the SQL from the schema.sql file and executes it against the database. Note this is why we have the DROP TABLE IF EXISTS statements, so we can guarantee we always start with a fresh database when we run this script. This function also gives us our first glance at transactions and database interaction with Python. Complying with PEP 249 we create a connection to the database, then create a cursor from the connection. Cursors manage the execution of SQL against the database as well as data retrieval. We execute the SQL in our schema file, committing the transaction if no exceptions are raised, and rolling back if it fails. We will explore this more in the next section. ## Transaction Management A transaction consists of one or more related operations that represent a single unit of work. For example, in the bank account example you might have a deposit transaction that executes queries to look up the account and verify the user, add a record to a list of daily deposits, check if the daily deposit limit has been reached, then modify the account balance. All of these operations represent all of the steps required to perform a deposit. The goal of a transaction is that when the transaction is complete, the database remains in a single consistent state. Consistency is often defined by invariants or constraints that describe at a higher level how the database should maintain information. From a programming perspective, if those constraints are violated an exception is raised. For example, the database has a positive_balance constraint, if the balance for an account goes below zero an exception is raised. When this constraint is violated the database must remain unchanged and all operations performed by the transaction must be rolled back. If the transaction was successful we can then commit the changes, which guarantee that the database has successfully applied our operation. So why do we need to manage transactions? Consider the following code: conn = connect() curs = conn.cursor() try: # Execute a command that will raise a constraint curs.execute("UPDATE accounts SET balance=%s", (-130.935,)) except Exception as e: print(e) # Constraint exception # Execute another command, but because of the previous exception: curs = conn.cursor() try: curs.execute("SELECT id, type FROM accounts WHERE owner_id=%s", (1,)) except pg.InternalError as e: print(e)  The first curs.execute triggers the constraint exception, which is caught and printed. However, the database is now in an inconsistent state. When you try to execute the second query, a psycopg2.InternalError is raised: "current transaction is aborted, commands ignored until end of transaction block". In order to continue with the application, conn.rollback() needs to be called to end the transaction and start a new one. NOTE: Using with conn.cursor() as curs: causes the same behavior, the context manager does not automatically clean up the state of the transaction. This essentially means all transactions can be wrapped in a try block, if they conclude successfully they can be committed, however if they raise an exception, they must be rolled back. A basic decorator that does this is as follows: from functools import wraps def transaction(func): @wraps(func) def inner(*args, **kwargs): conn = connect() try: func(conn, *args, **kwargs) conn.commit() except Exception as e: conn.rollback() log.error("{} error: {}".format(func.__name__, e)) finally: conn.close() return inner  This decorator wraps the specified function, returning an inner function that injects a new connection as the first argument to the decorated function. If the decorated function raises an exception, the transaction is rolled back and the error is logged. The decorator method is nice but the connection injection can be a bit weird. An alternative is a context manager that ensures the connection is committed or rolled back in a similar fashion: from contextlib import contextmanager @contextmanager def transaction(): try: conn = connect() yield conn conn.commit() except Exception as e: conn.rollback() log.error("db error: {}".format(e)) finally: conn.close()  This allows you to write code using with as follows: with transaction() as conn: # do transaction  The context manager allows you to easily compose two transactions inside a single function — of course this may be against the point. However, it is no problem to combine both the decorator and the context manager methods into two steps (more on this in isolation levels). ### ATM Application So let’s talk about two specific transactions for an imaginary database application: deposit and withdraw. Each of these operations has several steps: 1. Validate the user with the associated PIN 2. Ensure the user owns the account being modified 3. Write a ledger record with the credit or debit being applied 4. On credit, ensure the daily deposit limit isn’t reached 5. Modify the balance of the account 6. Fetch the current balance to display to the user Each transaction will perform 6-7 distinct SQL queries: SELECT, INSERT, and UPDATE. If any of them fails, then the database should remain completely unchanged. Failure in this case is that an exception is raised, which is potentially the easiest thing to do when you have a stack of functions calling other functions. Let’s look at deposit first: @transaction def deposit(conn, user, pin, account, amount): # Step 1: authenticate the user via pin and verify account ownership authenticate(conn, user, pin, account) # Step 2: add the ledger record with the credit ledger(conn, account, "credit", amount) # Step 3: update the account value by adding the amount update_balance(conn, account, amount) # Fetch the current balance in the account and log it record = "withdraw${:0.2f} from account {} | current balance: ${:0.2f}" log.info(record.format(amount, account, balance(conn, account)))  This function simply calls other functions, passing the transaction context (in this case a connection as well as input details) to other functions which may or may not raise exceptions. Here are the two authenticate methods: def authenticate(conn, user, pin, account=None): """ Returns an account id if the name is found and if the pin matches. """ with conn.cursor() as curs: sql = "SELECT 1 AS authd FROM users WHERE username=%s AND pin=%s" curs.execute(sql, (user, pin)) if curs.fetchone() is None: raise ValueError("could not validate user via PIN") return True if account: # Verify account ownership if account is provided verify_account(conn, user, account) def verify_account(conn, user, account): """ Verify that the account is held by the user. """ with conn.cursor() as curs: sql = ( "SELECT 1 AS verified FROM accounts a " "JOIN users u on u.id = a.owner_id " "WHERE u.username=%s AND a.id=%s" ) curs.execute(sql, (user, account)) if curs.fetchone() is None: raise ValueError("account belonging to user not found") return True  The authenticate and verify_account functions basically look in the database to see if there is a record that matches the conditions — a user with a matching PIN in authenticate and a (user, account_id) pair in verify_account. Both of these functions rely on the UNIQUE constraint in the database for usernames and account ids. This example shows how the function call stack can get arbitrarily deep; verify_account is called by authenticate which is called by deposit. By raising an exception at any point in the stack, the transaction will proceed no further, protecting us from harm later in the transaction. Note also that neither of these functions have an @transaction decorator, this is because it is expected that they are called from within another transaction. They are independent operations, but they can be called independently in a transaction with the context manager. Next we insert a ledger record: MAX_DEPOSIT_LIMIT = 1000.00 def ledger(conn, account, record, amount): """ Add a ledger record with the amount being credited or debited. """ # Perform the insert with conn.cursor() as curs: sql = "INSERT INTO ledger (account_id, type, amount) VALUES (%s, %s, %s)" curs.execute(sql, (account, record, amount)) # If we are crediting the account, perform daily deposit verification if record == "credit": check_daily_deposit(conn, account) def check_daily_deposit(conn, account): """ Raise an exception if the deposit limit has been exceeded. """ with conn.cursor() as curs: sql = ( "SELECT amount FROM ledger " "WHERE date=now()::date AND type='credit' AND account_id=%s" ) curs.execute(sql, (account,)) total = sum(row[0] for row in curs.fetchall()) if total > MAX_DEPOSIT_LIMIT: raise Exception("daily deposit limit has been exceeded!")  This is the first place that we modify the state of the database by inserting a ledger record. If, when we check_daily_deposit, we discover that our deposit limit has been exceeded for the day, an exception is raised that will rollback the transaction. This will ensure that the ledger record is not accidentally stored on disk. Finally we update the account balance: def update_balance(conn, account, amount): """ Add the amount (or subtract if negative) to the account balance. """ amount = Decimal(amount) with conn.cursor() as curs: current = balance(conn, account) sql = "UPDATE accounts SET balance=%s WHERE id=%s" curs.execute(sql, (current+amount, account)) def balance(conn, account): with conn.cursor() as curs: curs.execute("SELECT balance FROM accounts WHERE id=%s", (account,)) return curs.fetchone()[0]  I’ll have more to say on update_balance when we discuss isolation levels, but suffice it to say, this is another place where if the transaction fails we want to ensure that our account is not modified! In order to complete the example, here is the withdraw transaction: @transaction def withdraw(conn, user, pin, account, amount): # Step 1: authenticate the user via pin and verify account ownership authenticate(conn, user, pin, account) # Step 2: add the ledger record with the debit ledger(conn, account, "debit", amount) # Step 3: update the account value by subtracting the amount update_balance(conn, account, amount * -1) # Fetch the current balance in the account and log it record = "withdraw${:0.2f} from account {} | current balance: ${:0.2f}" log.info(record.format(amount, account, balance(conn, account)))  This is similar but modifies the inputs to the various operations to decrease the amount of the account by a debit ledger record. We can run: if __name__ == '__main__': conn = connect() createdb(conn) # Successful deposit deposit('alice', 1234, 1, 785.0) # Successful withdrawal withdraw('alice', 1234, 1, 230.0) # Unsuccessful deposit deposit('alice', 1234, 1, 489.0) # Successful deposit deposit('bob', 9999, 2, 220.23)  And we should see the following log records: 2017-12-06 20:01:00,086 withdraw$785.00 from account 1 | current balance: $1035.00 2017-12-06 20:01:00,094 withdraw error: could not validate user via PIN 2017-12-06 20:01:00,103 withdraw$230.00 from account 1 | current balance: $805.00 2017-12-06 20:01:00,118 deposit error: daily deposit limit has been exceeded! 2017-12-06 20:01:00,130 withdraw$220.23 from account 2 | current balance: $225.23  This should set a baseline for creating simple and easy to use transactions in Python. However, if you remember your databases class as an undergraduate, things get more interesting when two transactions are occurring at the same time. We’ll explore that from a single process by looking at multi-threaded database connections. ## Threads Let’s consider how to run two transactions at the same time from within the same application. The simplest way to do this is to use the threading library to execute transactions simultaneously. How do you achieve thread safety when accessing the database? Back to the docs: Connection objects are thread-safe: many threads can access the same database either using separate sessions and creating a connection per thread or using the same connection and creating separate cursors. In DB API 2.0 parlance, Psycopg is level 2 thread safe. This means that every thread must have its own conn object (which explore in the connection pool section). Any cursor created from the same connection object will be in the same transaction no matter the thread. We also want to consider how each transaction influences each other, and we’ll take a look at that first by exploring isolation levels and session state. ### Session State Let’s say that Alice and Charlie have a joint account, under Alice’s name. They both show up to ATMs at the same time, Alice tries to deposit$75 and then withdraw $25 and Charlie attempts to withdraw$300. We can simulate this with threads as follows:

import time
import random
import threading

def op1():
time.sleep(random.random())
withdraw('alice', 1234, 1, 300.0)

def op2():
time.sleep(random.random())
deposit('alice', 1234, 1, 75.0)
withdraw('alice', 1234, 1, 25.0)

threads = [
threading.Thread(target=op1),
threading.Thread(target=op2),
]

for t in threads:
t.start()

for t in threads:
t.join()


Depending on the timing, one of two things can happen. Charlie can get rejected as not having enough money in his account, and the final state of the database can be $300 or all transaction can succeed with the final state of the database set to$0. There are three transactions happening, two withdraw transactions and a deposit. Each of these transactions runs in isolation, meaning that they see the database how they started and any changes that they make; so if Charlie’s withdraw and Alice’s deposit happen simultaneously, Charlie will be rejected since it doesn’t know about the deposit until it’s finished. No matter what, the database will be left in the same state.

However, for performance reasons, you may want to modify the isolation level for a particular transaction. Possible levels are as follows:

1. READ UNCOMMITTED: lowest isolation level, transaction may read values that are not yet committed (and may never be committed).
2. READ COMMITTED: write locks are maintained but read locks are released after select, meaning two different values can be read in different parts of the transaction.
3. REPEATABLE READ: keep both read and write locks so multiple reads return same values but phantom reads can occur.
4. SERIALIZABLE: the highest isolation level: read, write, and range locks are maintained until the end of the transaction.
5. DEFAULT: set by server configuration not Python, usually READ COMMITTED.

Note that as the isolation level increases, the number of locks being maintained also increases, which severely impacts performance if there is lock contention or deadlocks. It is possible to set the isolation level on a per-transaction basis in order to improve performance of all transactions happening concurrently. To do this we must modify the session parameters on the connection, which modify the behavior of the transaction or statements that follow in that particular session. Additionally we can set the session to readonly, which does not allow writes to temporary tables (for performance and security) or to deferrable.

Deferrability is very interesting in a transaction, because it modifies how database constraints are checked. Non-deferrable transactions immediately check the constraint after a statement is executed. This means that UPDATE accounts SET balance=-5.45 will immediately raise an exception. Deferrable transactions however wait until the transaction is concluded before checking the constraints. This allows you to write multiple overlapping operations that may put the database into a correct state by the end of the transaction, but potentially not during the transaction (this also overlaps with the performance of various isolation levels).

In order to change the session, we’ll use a context manager as we did before to modify the session for the transaction, then reset the session back to the defaults:

@contextmanager
def session(conn, isolation_level=None, readonly=None, deferrable=None):
try:
conn.set_session(
isolation_level=isolation_level,
readonly=readonly,
deferrable=deferrable
)
yield conn
finally:
# Reset the session to defaults
conn.set_session(None, None, None, None)


We can then use with to conduct transactions with different isolation levels:

with transaction() as conn:
with session(conn, isolation_level="READ COMMITTED") as conn:
# Do transaction


NOTE: There cannot be an ongoing transaction when the session is set therefore it is more common for me to set the isolation level, readonly, and deferrable inside of the transaction decorator, rather than using two separate context managers as shown above. Frankly, it is also common to set these properties on a per-process basis rather than on a per-transaction basis, therefore the session is set in connect.

### Connection Pools

Connections cannot be shared across threads. In the threading example above, if we remove the @transaction decorator and pass the same connection into both operations as follows:

conn = connect()

def op1():
time.sleep(random.random())
withdraw(conn, 'alice', 1234, 1, 300.0)

def op2():
time.sleep(random.random())
deposit(conn, 'alice', 1234, 1, 75.0)
withdraw(conn, 'alice', 1234, 1, 25.0)


If the op1 withdraw fires first, the exception will cause all of the op2 statements to also fail, since its in the same transaction. This essentially means that both op1 and op2 are in the same transaction even though they are in different threads!

We’ve avoided this so far by creating a new connection every time a transaction runs. However, connecting to the database can be expensive and in high-transaction workloads we may want to simply keep the connection open, but ensure they are only used by one transaction at a time. The solution is to use connection pools. We can modify our connect function as follows:

from psycopg2.pool import ThreadedConnectionPool

def connect(env="DATABASE_URL", connections=2):
"""
Connect to the database using an environment variable.
"""
url = os.getenv(env)
if not url:
raise ValueError("no database url specified")

minconns = connections
maxconns = connections * 2
return ThreadedConnectionPool(minconns, maxconns, url)


This creates a thread-safe connection pool that establishes at least 2 connections and will go up to a maximum of 4 connections on demand. In order to use the pool object in our transaction decorator, we will have to connect when the decorator is imported, creating a global pool object:

pool = connect()

@contextmanager
def transaction(name="transaction", **kwargs):
# Get the session parameters from the kwargs
options = {
"isolation_level": kwargs.get("isolation_level", None),
"readonly": kwargs.get("readonly", None),
"deferrable": kwargs.get("deferrable", None),
}

try:
conn = pool.getconn()
conn.set_session(**options)
yield conn
conn.commit()
except Exception as e:
conn.rollback()
log.error("{} error: {}".format(name, e))
finally:
conn.reset()
pool.putconn(conn)


Using pool.getconn retrieves a connection from the pool (if one is available, blocking until one is ready), then when we’re done we can pool.putconn to release the connection object.

## Conclusion

This has been a ton of notes on more direct usage of psycopg2. Sorry I couldn’t write a more conclusive conclusion but it’s late and this post is now close to 4k words. Time to go get dinner!

## Notes

I used logging as the primary output to this application. The logging was set up as follows:

import logging

LOG_FORMAT = "%(asctime)s %(message)s"

logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
log = logging.getLogger('balance')


For the complete code, see this gist.

# Lock Diagnostics in Go

By now it’s pretty clear that I’ve just had a bear of a time with locks and synchronization inside of multi-threaded environments with Go. Probably most gophers would simply tell me that I should share memory by communicating rather than to communication by sharing memory — and frankly I’m in that camp too. The issue is that:

• Mutexes can be more expressive than channels
• Channels are fairly heavyweight

So to be honest, there are situations where a mutex is a better choice than a channel. I believe that one of those situations is when dealing with replicated state machines … which is what I’ve been working on the past few months. The issue is that the state of the replica has to be consistent across a variety of events: timers and remote messages. The problem is that the timers and network traffic are all go routines, and there can be a lot of them running in the system at a time.

Of course you could simply create a channel and push event objects to it to serialize all events. The problem with that is that events generate other events that have to be ordered with respect to the parent event. For example, one event might require the generation of messages to be sent to remote replicas, which requires a per-remote state read that is variable. Said another way, the state can be read locked for all go routines operating at that time, but no write locks can be acquired. Hence the last post.

Things got complicated. Lock contention was a thing.

So I had to diagnose who was trying to acquire locks and when and why they were contending. For reference, the most common issues were:

1. A global read lock was being released before all sub read locks were finished.
2. A struct with an embedded RWMutex was then embedded by another object with only a Mutex but it still had RLock() methods as a result (or vice versa).
3. The wrong lock was being called on embedded structs.

The primary lesson I learned was this: when embedding synchronized objects, only embed the mutex on the child object. Hopefully that rule of thumb lasts.

I learned these lessons using a handy little diagnostic tool that this snippet is about. Basically I wanted to track who was acquiring locks and who was waiting on locks. I could then print out a report when I thought something was contending (e.g. on an Interrupt signal) and figure things out.

First step, figure out the name of the calling method:

// Caller returns the name function that called the function which
// called the caller function.
func caller() string {
pc, _, _, ok := runtime.Caller(2)
details := runtime.FuncForPC(pc)
if ok && details != nil {
return details.Name()
}
return UnknownCaller
}


This handy little snippet uses the runtime package to detect the caller two steps above the caller() function in the stack. This allows you to call caller() inside of a function to get the name of the function that’s calling the function calling caller(). Confusing? Try this:

func outer() string {
return inner()
}

func inner() string {
return caller()
}


Calling outer() will return something like main.outer — the function that called the inner() function. Here is a runnable example.

With that in hand we can simply create a map[string]int64 and increment any calls by caller name before Lock() and decrement any calls by caller name after Unlock(). Here is the example:

But … that’s actually a little more complicated than I let on!

The problem is that we definitely have multiple go routines calling locks on the lockable struct. However, if we simply try to access the map in the MutexD, then we can have a panic for concurrent map reads and writes. So now, I use the share memory by communicating technique and pass signals via an internal channel, which is read by a go routine ranging over it.

How to use it? Well do something like this:

type StateMachine struct {
MutexD
}

func (s *StateMachine) Alpha() {
s.Lock()
defer s.Unlock()
time.Sleep(1*time.Second)
}

func (s *StateMachine) Bravo() {
s.Lock()
defer s.Unlock()
time.Sleep(100*time.Millisecond)
}

func main() {
m := new(StateMachine)
go m.Alpha()
time.Sleep(100*time.Millisecond)
for i:=0; i < 2; i++ {
go m.Bravo()
}

fmt.Println(m.MutexD.String())
}


You should see something like:

1 locks requested by main.(*StateMachine).Alpha
2 locks requested by main.(*StateMachine).Bravo


Obviously you can do the same thing for RWMutex objects, and it’s easy to swap them in and out of code by changing the package and adding or removing a “D”. My implementation is here: github.com/bbengfort/x/lock.

# Lock Queuing in Go

In Go, you can use sync.Mutex and sync.RWMutex objects to create thread-safe data structures in memory as discussed in “Synchronizing Structs for Safe Concurrency in Go”. When using the sync.RWMutex in Go, there are two kinds of locks: read locks and write locks. The basic difference is that many read locks can be acquired at the same time, but only one write lock can be acquired at at time.

This means that if a thread attempts to acquire a read lock on an object that is already read locked, then it will not block and it will acquire its own read lock. If a thread attempts to acquire a read or a write lock on a write locked object, then it will block until it is unlocked (as will a write lock acquisition on a read locked object).

Granting a lock can be prioritized depending on different policies for accesses. Priorities balance the trade-off between concurrency and starvation as follows:

1. Read-Preferring RW allows new read locks to be acquired as long as the lock is read-locked, forcing the write-lock acquirer to wait until there are no more read-locks. In high contention environments, this might lead to write-starvation.

2. Write-Preferring RW prevents a read-lock acquisition if a writer is queued and waiting for the lock. This reduces concurrency, because new read locks have to wait for the write lock, but prevents starvation.

So which of these does Go implement? According to the documentation:

If a goroutine holds a RWMutex for reading and another goroutine might call Lock, no goroutine should expect to be able to acquire a read lock until the initial read lock is released. In particular, this prohibits recursive read locking. This is to ensure that the lock eventually becomes available; a blocked Lock call excludes new readers from acquiring the lock. — godoc

My initial read of this made me think that Go implements write-preferring mutexes. However, this was not the behavior that I observed.

Consider the following locker:

var delay time.Duration
var started time.Time

// Locker holds values that are threadsafe
type Locker struct {
sync.RWMutex
value  uint64    // the current value of the locker
access time.Time // time of the last access
}

// Write to the value of the locker in a threadsafe fashion.
func (l *Locker) Write(value uint64) {
l.Lock()
defer l.Unlock()

// Arbitrary amount of work
time.Sleep(delay)

l.value = value
l.access = time.Now()
l.log("written")
}

// Read the value of the locker in a threadsafe fasion.
func (l *Locker) Read() uint64 {
l.RLock()
defer l.RUnlock()

// Arbirtray amount of work
time.Sleep(delay / 2)
l.access = time.Now()
l.log("read")
return l.value
}

// Log the access (not thread-safe)
func (l *Locker) log(method string) {
after := l.access.Sub(started)
log.Printf(
"%d %s after %s\n", l.value, method, after,
)
}


This locker holds a value and logs all accesses to it after the start time. If we run a few threads to read and write to it we can see concurrent reads in action:

func main() {
delay = 1 * time.Second
started = time.Now()
group := new(errgroup.Group)
locker := new(Locker)

// Straight forward, write three reads and a write
group.Go(func() error { locker.Write(42); return nil })
group.Go(func() error { locker.Read(); return nil })
group.Go(func() error { locker.Read(); return nil })
group.Go(func() error { locker.Read(); return nil })
group.Go(func() error { locker.Write(101); return nil })

group.Wait()
}


The output is as follows

\$ go run locker.go
2017/09/08 12:26:32 101 written after 1.005058824s
2017/09/08 12:26:33 101 read after 1.50770225s
2017/09/08 12:26:33 101 read after 1.507769109s
2017/09/08 12:26:33 101 read after 1.50773587s
2017/09/08 12:26:34 42 written after 2.511968581s


Note that the last go routine actually managed to acquire the lock first, after which the three readers managed to acquire the lock, then finally the last writer. Now if we interleave the read and write access, adding a sleep between the kick-off of each go routine to ensure that the preceding thread has time to acquire the lock:

func main() {
delay = 1 * time.Second
started = time.Now()
group := new(errgroup.Group)
locker := new(Locker)

// Straight forward, write three reads and a write
group.Go(func() error { locker.Write(42); return nil })
time.Sleep(10 * time.Millisecond)
group.Go(func() error { locker.Read(); return nil })
time.Sleep(10 * time.Millisecond)
group.Go(func() error { locker.Write(101); return nil })
time.Sleep(10 * time.Millisecond)
group.Go(func() error { locker.Read(); return nil })
time.Sleep(10 * time.Millisecond)
group.Go(func() error { locker.Write(3); return nil })
time.Sleep(10 * time.Millisecond)
group.Go(func() error { locker.Read(); return nil })
time.Sleep(10 * time.Millisecond)
group.Go(func() error { locker.Write(18); return nil })
time.Sleep(10 * time.Millisecond)
group.Go(func() error { locker.Read(); return nil })

group.Wait()
}


We get the following output:

go run locker.go
2017/09/08 12:29:28 42 written after 1.000178155s
2017/09/08 12:29:28 42 read after 1.500703007s
2017/09/08 12:29:28 42 read after 1.500691088s
2017/09/08 12:29:28 42 read after 1.500756144s
2017/09/08 12:29:28 42 read after 1.500648159s
2017/09/08 12:29:28 42 read after 1.500762323s
2017/09/08 12:29:28 42 read after 1.500679533s
2017/09/08 12:29:28 42 read after 1.500795204s
2017/09/08 12:29:29 101 written after 2.500971593s
2017/09/08 12:29:30 3 written after 3.505325487s
2017/09/08 12:29:31 18 written after 4.50594131s


This suggests that the reads continue to acquire locks as long as the Locker is read locked, forcing the writes to happen at the end.

I found one Stack Overflow post: “Read preferring RW mutex lock in Golang” that seems to suggest that sync.RWMutex can implement both read and write preferred locking, but doesn’t really give an explanation about how external callers can implement it.

Finally consider the following:

func main() {
delay = 1 * time.Second
started = time.Now()
group := new(errgroup.Group)
locker := new(Locker)

// Straight forward, write three reads and a write
group.Go(func() error { locker.Write(42); return nil })
group.Go(func() error { locker.Write(101); return nil })
group.Go(func() error { locker.Write(3); return nil })
group.Go(func() error { locker.Write(18); return nil })

for i := 0; i < 22; i++ {
group.Go(func() error {
locker.Read()
return nil
})
time.Sleep(delay / 4)
}

group.Wait()
}


Given the loop issuing 22 read locks that sleep only a quarter of the time of the write lock, we might expect that this code will issue 22 read locks then all the write locks will occur at the end (and if we put this in a forever loop, then the writes would never occur). However, the output of this is as follows:

2017/09/08 12:43:40 18 written after 1.004461829s
2017/09/08 12:43:40 18 read after 1.508343716s
2017/09/08 12:43:40 18 read after 1.50842899s
2017/09/08 12:43:40 18 read after 1.508362345s
2017/09/08 12:43:40 18 read after 1.508339659s
2017/09/08 12:43:40 18 read after 1.50852229s
2017/09/08 12:43:41 42 written after 2.513789339s
2017/09/08 12:43:42 42 read after 3.0163191s
2017/09/08 12:43:42 42 read after 3.016330534s
2017/09/08 12:43:42 42 read after 3.016355628s
2017/09/08 12:43:42 42 read after 3.016371381s
2017/09/08 12:43:42 42 read after 3.016316992s
2017/09/08 12:43:43 3 written after 4.017954589s
2017/09/08 12:43:43 3 read after 4.518495233s
2017/09/08 12:43:43 3 read after 4.518523255s
2017/09/08 12:43:43 3 read after 4.518537387s
2017/09/08 12:43:43 3 read after 4.518540397s
2017/09/08 12:43:43 3 read after 4.518543262s
2017/09/08 12:43:43 3 read after 4.51863128s
2017/09/08 12:43:44 101 written after 5.521872765s
2017/09/08 12:43:45 101 read after 6.023207828s
2017/09/08 12:43:45 101 read after 6.023225272s
2017/09/08 12:43:45 101 read after 6.023249529s
2017/09/08 12:43:45 101 read after 6.023190828s
2017/09/08 12:43:45 101 read after 6.023243032s
2017/09/08 12:43:45 101 read after 6.023190457s
2017/09/08 12:43:45 101 read after 6.04455716s
2017/09/08 12:43:45 101 read after 6.29457923s


What Go implements is actually something else: read locks can only be acquired so long as the original read lock is maintained (the word “initial” being critical in the documentation). As soon as the first read lock is released, then the queued write-lock gets priority. The first read lock lasts approximately 500ms; this means that there is enough time for between 4-5 other locks to acquire a read lock, as soon as the first read lock completes, the write is given priority.