Clark Kampfe

Useful Stuff in Clojure - Databases and State

2018-06-12

In the first piece in this series, we looked at how to turn your Clojure project into an artifact that you can deploy.

Keeping things useful, we're going to take a look at the basis for many applications: databases.

In this arena, Clojure is lucky. Being a hosted language on the JVM, Clojure inherits a ton of useful, mature database functionality that you and I get to use to do real stuff.

Let's create another project:

$ boot -d boot/new new -t app -n database

This again creates a Clojure project with the Boot build tool, using the standard app template.

Open up the build.boot file in your project's directory, and add the dependencies for java.jdbc, hikari-cp, sqlite-jdbc, and mount so the top part of your file looks like this:

(boot.core/set-env! :resource-paths #{"resources" "src"}
                    :source-paths   #{"test"}
                    :dependencies   '[[org.clojure/clojure "1.9.0"]
                                      [org.clojure/java.jdbc "0.7.6"]
                                      [hikari-cp "2.4.0"]
                                      [org.xerial/sqlite-jdbc "3.23.1"]
                                      [mount "0.1.12"]
                                      [adzerk/boot-test "RELEASE" :scope "test"]])

We're using Sqlite here, but everything we're doing works with Postres, Mysql, etc. java.jdbc is Clojure's interface to the Java database connection machinery. hikari is for database connection pooling and configuration. mount is for state management, which I'll get to after the next section.

Pop open core.clj, and add the necessary requires to your namespace:

(ns database.core
  (:require [clojure.java.jdbc :as jdbc]
            [hikari-cp.core :as h]
            [mount.core :refer [defstate]])
  (:gen-class))

Managing State

Ok so what's this mount thing?

Mount is a library managing state. That's pretty ambiguous, so what does it really mean? Most applications out there aren't just doing math. They interact with the outside world, either through disk IO, network, the screen, or other peripherals.

Often times, the way we interact with these services is stateful. That is, they maintain a persistent handle to the target service that is long-lived and changes over time.

In our case (and in many database-backed applications) the application will set up a connection to the database that is active for the life of the application. The database may change over time with various inserts, updates, and table modifications, but the identity of the database - the connection - stays the same, or rarely changes. Other scenarios could be a connection to something like Redis, RabbitMQ, Kafka, etc. You want to be able to refer to these long-lived entities, and to be able to independently stop and start your application's connections to them.

To contrast this with something that is not stateful, your application may have some domain logic that is "pure": it takes in a value, performs some calculations (for example, it may do some math), and then returns another value. There is no persistent handle, and the function will return the same value for the same arguments, every time.

Mount is how our application is going to handle starting and stopping its connection to our Sqlite database. Do it like this:

(def datasource-options {:jdbc-url "jdbc:sqlite:db/database.db"
                         :maximum-pool-size 1})

(defstate datasource
  :start (h/make-datasource datasource-options)
  :stop (h/close-datasource datasource))

Notice first that we defined some configuration in datasource-options that tells our application how to reach the database. In our case, this is Sqlite, so it's simple and just points to a file. This is pure data that could come from anywhere, like a config file. It is inlined here for convenience and clarity.

Next is the interesting part. Mount gives us a lot of state management functionality. Among that functionality is the ability to stop and start our stateful components as we wish. To do that, we have to provide Mount with instructions for what it should do to start and stop those specific components.

Here, we define out database connection as datasource using defstate. We tell mount how to start using :start and :stop key-value pairs, providing the "action" as the value to its respective key. When you tell Mount to start, it will run the :start action, and :stop when you tell it to stop. (Aside: the values you provide to defstate are raw expressions, because defstate wraps them in functions)

You can use this pattern to manage connections to queues, start webservers, initialize job schedulers, etc. If you want to see how else you can use Mount, I recommend you check out the documentation. For now, let's continue on to using our database.

clojure.java.jdbc

Let's have a our database do some stuff. If you know SQL, or even if you don't this should be relatively straightforward. This is not a SQL tutorial, so this part does not focus on advanced SQL.

Let's assume that our program receives events every so often, stores those events in the database, and compute some simple aggregates of the events and store those too.

Now is where we break out the jdbc interface we pulled in at the beginning:

(defn run-statement! [s]
  (jdbc/with-db-connection [conn {:datasource datasource}]
    (let [result (jdbc/execute! conn s)]
      result)))

and

(run-statement! [(str "CREATE TABLE IF NOT EXISTS events ("
                      "id INTEGER PRIMARY KEY,"
                      "number INTEGER,"
                      "timestamp TEXT DEFAULT CURRENT_TIMESTAMP"
                      ")")])

With these two bits of code, we're defining first a function to take some DDL and run it, and secondly calling that function with our CREATE TABLE SQL.

First, a short digression for a note about an important Clojure pattern: the function uses a common idiom, emdodied as with-db-connection, using it to check out a database connection, conn (described in {:datasource datasource}), using that conn in the body of the expression (for us, jdbc/execute!), returning a result, and then deallocating the conn that we acquired.

When programming Clojure, it will be someone common to see various forms of with-* provided by the core language or various libraries. with-open is by far the most common, but others exist, like the ones in this article. The general pattern they take looks like this:

(with-something [resource-name resource-initialization-expression]
  (do-stuff-with-resource resource-name))

where the resource-initialization-expression will give us some resource that we bind to resource-name, which we then use in the body of the expression somehow.

Back to the task at hand.

We've created a table. Let's add some functions to insert data and query it.

These aren't much different, they follow the same pattern, but use different functions from clojure.java.jdbc:

(defn run-query! [q]
  (jdbc/with-db-connection [conn {:datasource datasource}]
    (let [rows (jdbc/query conn q)]
      rows)))

(defn insert! [table-name rows]
  (jdbc/with-db-connection [conn {:datasource datasource}]
    (let [result (jdbc/insert-multi! conn table-name rows)]
      result)))
      
(defn track-average! []
  (jdbc/with-db-transaction [conn {:datasource datasource}]
    (let [average (->> (jdbc/query conn ["SELECT AVG(events.number) as average FROM events"])
                       first
                       :average)]
      (->> (jdbc/insert! conn :events_averages {:average average})
           (map #(get % (keyword "last_insert_rowid()")))
           first))))

(insert! :events [{:number 4820402}])
;; returns: ({:last_insert_rowid() 4})
      
(run-query! ["SELECT * FROM events"])
;; returns: ({:id 1, :number 4820402})

(track-average!)
;; returns: an integer representing the ID of the row it just inserted

The last function is probably the least unlike the others. It explicitly creates a database transaction, and runs two queries within that transaction. The first is a read query which finds the average of the number field on the events table. It then inserts that average value into a different, new table, the events_averages table.

The events_averages table doesn't exist, so let's create it.

(run-statement! [(str "CREATE TABLE IF NOT EXISTS events_averages ("
                      "id INTEGER PRIMARY KEY,"
                      "average REAL,"
                      "timestamp TEXT DEFAULT CURRENT_TIMESTAMP"
                      ")")])

No different than the other table, really. We could even reuse our our existing run-statement!.

That's about it for this one. With the above, you can set up stateful components and use JDBC-compatible databases with clojure.java.jdbc. There isn't a whole lot more to it, honestly.

Super party bonus time!

Following are two functions that generate fake events and feeds them to the rest of the application. This isn't strictly related to using clojure.java.jdbc or mount, but it's a good example of a quasi-real use case for concurrency in Clojure. I've inlined comments with explanations for what each expression does.

;; given a function,
;; start a new thread running that function that
;; will not prevent the JVM from shutting down
(defn start-daemon-thread [f]
  (doto (Thread. ^Runnable f)
    (.setDaemon true)
    (.start)))

;; our main function
(defn -main
  "I don't do a whole lot ... yet."
  [& args]

  ;; start our stateful components,
  ;; which at this point is just the db connection
  (mount.core/start)

  ;; create a queue with a fixed size of 15.
  ;; threads will publish "log lines" to this queue,
  ;; and another thread will print those "log lines"
  (let [logging-queue (java.util.concurrent.ArrayBlockingQueue. 15)]

    ;; create the `events_averages` table, just like we did above
    (run-statement! [(str "CREATE TABLE IF NOT EXISTS events_averages ("
                          "id INTEGER PRIMARY KEY,"
                          "average REAL,"
                          "timestamp TEXT DEFAULT CURRENT_TIMESTAMP"
                          ")")])

    ;; create the `events` table, just like we did above
    (run-statement! [(str "CREATE TABLE IF NOT EXISTS events ("
                          "id INTEGER PRIMARY KEY,"
                          "number INTEGER,"
                          "timestamp TEXT DEFAULT CURRENT_TIMESTAMP"
                          ")")])

    ;; start a new thread
    ;; that will consume events from `logging-queue`,
    ;; and print those events preceded by a timestamp
    (start-daemon-thread
     (fn []
       (while true
         (println (str "[" (java.time.Instant/now) "]")
                  (.take logging-queue)))))

    ;; start a new thread,
    ;; that every 5 seconds will run the `track-average!` query,
    ;; printing the data it previously inserted
    (start-daemon-thread
     (fn []
       (while true
         (let [id (track-average!)]
           (doseq [row (query! ["SELECT * FROM events_averages WHERE id = ?" id])]
             (.put logging-queue (str "AVERAGE EVENT NUMBER: " (:average row)))))
         (Thread/sleep 5000))))

    ;; start a new thread that will sleep a random amount of time
    ;; (between 1 and 5 seconds), inserting a random number to the `events` table,
    ;; sending that random number off to be logged
    (start-daemon-thread
     (fn []
       (while true
         (let [random-number (rand-int 60000)]
           (insert! :events [{:number random-number}])
           (.put logging-queue (str "received and inserted: " random-number)))
         (Thread/sleep (+ (rand-int 4000) 1000)))))

    ;; start a new thread that every 5 seconds will
    ;; count the number of rows in the `events` table,
    ;; and send that number off to be logged
    (start-daemon-thread
     (fn []
       (while true
         (doseq [row (query! ["SELECT COUNT(*) as count FROM events"])]
           (.put logging-queue (str "EVENTS COUNT: " (:count row))))
         (Thread/sleep 5000)))))

  ;; block the main thread for 30 seconds so we don't exit after starting the other threads
  (Thread/sleep 30000))

Running it looks like this:

clark$> java -jar target/database-0.1.0-SNAPSHOT-standalone.jar 
[2018-06-12T22:14:43.046Z] received and inserted: 39884
[2018-06-12T22:14:43.074Z] EVENTS COUNT: 12
[2018-06-12T22:14:43.075Z] AVERAGE EVENT NUMBER: 29521.5
[2018-06-12T22:14:43.079Z] received and inserted: 50832
[2018-06-12T22:14:47.592Z] EVENTS COUNT: 13
[2018-06-12T22:14:48.081Z] AVERAGE EVENT NUMBER: 31160.76923076923
[2018-06-12T22:14:48.084Z] received and inserted: 57947
[2018-06-12T22:14:52.522Z] EVENTS COUNT: 14
[2018-06-12T22:14:53.082Z] AVERAGE EVENT NUMBER: 33074.07142857143
[2018-06-12T22:14:53.092Z] received and inserted: 32677
[2018-06-12T22:14:56.197Z] EVENTS COUNT: 15
[2018-06-12T22:14:58.085Z] AVERAGE EVENT NUMBER: 33047.6
[2018-06-12T22:14:58.100Z] received and inserted: 41916
[2018-06-12T22:15:01.074Z] EVENTS COUNT: 16
[2018-06-12T22:15:03.088Z] AVERAGE EVENT NUMBER: 33601.875
[2018-06-12T22:15:03.107Z] received and inserted: 40567
[2018-06-12T22:15:04.161Z] received and inserted: 33285
[2018-06-12T22:15:05.472Z] received and inserted: 33913
[2018-06-12T22:15:06.915Z] EVENTS COUNT: 19
[2018-06-12T22:15:08.091Z] AVERAGE EVENT NUMBER: 33968.15789473684
[2018-06-12T22:15:08.115Z] received and inserted: 58119

github twitter rss