Context
- I making a system, where different clients issue queries
- A query is resolved by issuing a set of subqueries
- I have an invalidation-worker, which gets notified when subqueries go stale
Goal
- When a subquery goes stale, I want to notify the clients which have made this subquery
Solution
To do this, I am thinking of keeping a mapping. Here's a rough solution you can play with in the REPL:
(ns play
  (:require [clojure.core.async :as a :refer [go <! go-loop >!]]))
(def recordings (atom {}))
(defn record-subquery! [client-id query-n subquery-n]
  (swap! recordings update subquery-n
         (fn [prev]
           (let [prev (or prev #{})]
             (conj prev [client-id query-n])))))
(defn go-subquery [client-id query-n subquery-n]
  (go
    (<! (a/timeout (rand-int 2000)))
    (record-subquery! client-id query-n subquery-n)
    {:client-id client-id
     :query-n query-n
     :subquery-n subquery-n}))
(defn go-query [client-id query-n]
  (go
    (let [subquery-ns (range query-n (+ query-n 5))]
      {:client-id client-id
       :query-n query-n
       :subqueries (->> subquery-ns
                        (map (partial go-subquery client-id query-n))
                        a/merge
                        (a/into [])
                        <!)})))
(comment
  (go (prn (<! (go-query :a 1)))))
(def client-chans {:a (a/chan)
                   :b (a/chan)})
(defn client-worker [client-id query-chan]
  (go-loop []
    (when-some [q (<! query-chan)]
      (prn (format "queried id = %s q = %s" client-id (<! (go-query client-id q))))
      (recur))))
(def invalidation-chan (a/chan))
(defn invalidation-broadcaster []
  (go (loop []
        (<! (a/timeout 1500))
        (when (>! invalidation-chan (rand-int 10))
          (recur)))))
(defn invalidation-worker [chan]
  (go-loop []
    (when-some [sq-id (<! chan)]
      (let [subs (->> sq-id (@recordings))]
        (prn (format "invalidating sq-id = %s subs = %s" sq-id subs))
        (doseq [[client-id query-n] subs]
          (>! (client-id client-chans) query-n))
        (recur)))))
(comment
  (do (client-worker :a (:a client-chans))
      (client-worker :b (:b client-chans))
      (invalidation-worker invalidation-chan)
      (invalidation-broadcaster))
  (a/close! invalidation-chan)
  (go (>! (:a client-chans) 1)))
Problem with the solution
I am a bit sad that record-subquery! is nested under go-subquery. This makes go-query stateful. I do it though to avoid the following race condition:
T0: go-query starts 
T1: subquery-1 completes 
T2: subquery-1 is invalidated 
T3: subquery-2 completes 
T4: go-query completes
In this scenario, we would miss the T2 update.
Would you do this differently?
 
                