Skip to main content
added 3 characters in body
Source Link
  • I am making a system, where different clients issue queries
  • A query is resolved by issuing a set of subqueries
  • I have an invalidation-worker, which gets notified when subqueries go stale
  • I making a system, where different clients issue queries
  • A query is resolved by issuing a set of subqueries
  • I have an invalidation-worker, which gets notified when subqueries go stale
  • I am making a system, where different clients issue queries
  • A query is resolved by issuing a set of subqueries
  • I have an invalidation-worker, which gets notified when subqueries go stale
edited title
Link

Client <> Query Subscriptions<> Subquery Bookkeeping

Source Link

Client <> Query Subscriptions

Context

  • I making a system, where different clients issue queries
  • A query is resolved by issuing a set of subqueries
  • I have an invalidation-worker, which gets notified when subqueries go stale

Goal

  • When a subquery goes stale, I want to notify the clients which have made this subquery

Solution

To do this, I am thinking of keeping a mapping. Here's a rough solution you can play with in the REPL:

(ns play
  (:require [clojure.core.async :as a :refer [go <! go-loop >!]]))

(def recordings (atom {}))

(defn record-subquery! [client-id query-n subquery-n]
  (swap! recordings update subquery-n
         (fn [prev]
           (let [prev (or prev #{})]
             (conj prev [client-id query-n])))))

(defn go-subquery [client-id query-n subquery-n]
  (go
    (<! (a/timeout (rand-int 2000)))
    (record-subquery! client-id query-n subquery-n)
    {:client-id client-id
     :query-n query-n
     :subquery-n subquery-n}))

(defn go-query [client-id query-n]
  (go
    (let [subquery-ns (range query-n (+ query-n 5))]
      {:client-id client-id
       :query-n query-n
       :subqueries (->> subquery-ns
                        (map (partial go-subquery client-id query-n))
                        a/merge
                        (a/into [])
                        <!)})))

(comment
  (go (prn (<! (go-query :a 1)))))

(def client-chans {:a (a/chan)
                   :b (a/chan)})

(defn client-worker [client-id query-chan]
  (go-loop []
    (when-some [q (<! query-chan)]
      (prn (format "queried id = %s q = %s" client-id (<! (go-query client-id q))))
      (recur))))

(def invalidation-chan (a/chan))

(defn invalidation-broadcaster []
  (go (loop []
        (<! (a/timeout 1500))
        (when (>! invalidation-chan (rand-int 10))
          (recur)))))

(defn invalidation-worker [chan]
  (go-loop []
    (when-some [sq-id (<! chan)]
      (let [subs (->> sq-id (@recordings))]
        (prn (format "invalidating sq-id = %s subs = %s" sq-id subs))
        (doseq [[client-id query-n] subs]
          (>! (client-id client-chans) query-n))
        (recur)))))

(comment
  (do (client-worker :a (:a client-chans))
      (client-worker :b (:b client-chans))
      (invalidation-worker invalidation-chan)
      (invalidation-broadcaster))

  (a/close! invalidation-chan)
  (go (>! (:a client-chans) 1)))

Problem with the solution

I am a bit sad that record-subquery! is nested under go-subquery. This makes go-query stateful. I do it though to avoid the following race condition:

T0: go-query starts 
T1: subquery-1 completes 
T2: subquery-1 is invalidated 
T3: subquery-2 completes 
T4: go-query completes

In this scenario, we would miss the T2 update.

Would you do this differently?