I have code that concurrently reads data from a stream, processes elements, and writes the data to another stream. Each of these individual operations may fail, in which case I want processing to halt and all resources to be released. I also want the caller to have access to the specific error which caused the process to halt.
To do this now, I use context.Context and cancel() using a pattern like the one below. I'm fairly certain that I am using context.Context in an unintended way: it seems strange to pass both the Context and its cancellation function to every function. I need the Context so that each function knows when to halt work, and I need the cancellation function so any subprocess may choose to halt work. If I create the child context and cancellation inside the Read/DoWork/Write functions, the error is not propagated to the caller.
Note: The IllegalNumber and IllegalString constants are meant to allow exploring various failure states of the logic to understand the intended behavior of the code. For instance, changing IllegalString to "1" results in WriteObjects failing.
package main
import (
"context"
"errors"
"fmt"
"os"
"strconv"
"sync"
)
const (
IllegalNumber = 7
IllegalString = "0"
)
func ReadObjects(
ctx context.Context,
cancel context.CancelCauseFunc,
) <-chan int {
// Mimic input data.
readChannel := make(chan int)
go func() {
for i := range 5 {
readChannel <- i
}
close(readChannel)
}()
result := make(chan int)
go func() {
defer close(result)
for {
select {
case <-ctx.Done():
return
default:
i, ok := <-readChannel
if !ok {
return
}
if i == IllegalNumber {
cancel(errors.New("read illegal number"))
return
}
result <- i
}
}
}()
return result
}
func DoWork(
ctx context.Context,
cancel context.CancelCauseFunc,
objs <-chan int,
) <-chan int {
result := make(chan int)
go func() {
defer close(result)
for {
select {
case <-ctx.Done():
return
default:
i, ok := <-objs
if !ok {
return
}
newI := i + 1
if newI == IllegalNumber {
cancel(errors.New("created illegal number"))
return
}
result <- newI
}
}
}()
return result
}
func WriteObjects(
ctx context.Context,
cancel context.CancelCauseFunc,
objs <-chan int,
) *sync.WaitGroup {
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
i, ok := <-objs
if !ok {
return
}
strI := strconv.Itoa(i)
if strI == IllegalString {
cancel(errors.New("tried to write illegal string"))
return
}
fmt.Println(strI)
}
}
}()
return wg
}
func main() {
ctx := context.Background()
ctx, cancel := context.WithCancelCause(ctx)
inObjects := ReadObjects(ctx, cancel)
outObjects := DoWork(ctx, cancel, inObjects)
writeWg := WriteObjects(ctx, cancel, outObjects)
writeWg.Wait()
if ctx.Err() != nil {
fmt.Println(context.Cause(ctx))
os.Exit(1)
}
}
In reality these operations are multi-threaded, channels are properly buffered, but that should be irrelevant to the anti-pattern I'm sure I'm using.