Context
- I am making a system, where different clients issue queries
- A query is resolved by issuing a set of subqueries
- I have an invalidation-worker, which gets notified when subqueries go stale
Goal
- When a subquery goes stale, I want to notify the clients which have made this subquery
Solution
To do this, I am thinking of keeping a mapping. Here's a rough solution you can play with in the REPL:
(ns play
(:require [clojure.core.async :as a :refer [go <! go-loop >!]]))
(def recordings (atom {}))
(defn record-subquery! [client-id query-n subquery-n]
(swap! recordings update subquery-n
(fn [prev]
(let [prev (or prev #{})]
(conj prev [client-id query-n])))))
(defn go-subquery [client-id query-n subquery-n]
(go
(<! (a/timeout (rand-int 2000)))
(record-subquery! client-id query-n subquery-n)
{:client-id client-id
:query-n query-n
:subquery-n subquery-n}))
(defn go-query [client-id query-n]
(go
(let [subquery-ns (range query-n (+ query-n 5))]
{:client-id client-id
:query-n query-n
:subqueries (->> subquery-ns
(map (partial go-subquery client-id query-n))
a/merge
(a/into [])
<!)})))
(comment
(go (prn (<! (go-query :a 1)))))
(def client-chans {:a (a/chan)
:b (a/chan)})
(defn client-worker [client-id query-chan]
(go-loop []
(when-some [q (<! query-chan)]
(prn (format "queried id = %s q = %s" client-id (<! (go-query client-id q))))
(recur))))
(def invalidation-chan (a/chan))
(defn invalidation-broadcaster []
(go (loop []
(<! (a/timeout 1500))
(when (>! invalidation-chan (rand-int 10))
(recur)))))
(defn invalidation-worker [chan]
(go-loop []
(when-some [sq-id (<! chan)]
(let [subs (->> sq-id (@recordings))]
(prn (format "invalidating sq-id = %s subs = %s" sq-id subs))
(doseq [[client-id query-n] subs]
(>! (client-id client-chans) query-n))
(recur)))))
(comment
(do (client-worker :a (:a client-chans))
(client-worker :b (:b client-chans))
(invalidation-worker invalidation-chan)
(invalidation-broadcaster))
(a/close! invalidation-chan)
(go (>! (:a client-chans) 1)))
Problem with the solution
I am a bit sad that record-subquery! is nested under go-subquery. This makes go-query stateful. I do it though to avoid the following race condition:
T0: go-query starts
T1: subquery-1 completes
T2: subquery-1 is invalidated
T3: subquery-2 completes
T4: go-query completes
In this scenario, we would miss the T2 update.
Would you do this differently?