Skip to main content
code-quote and improve title
Source Link
Toby Speight
  • 88.3k
  • 14
  • 104
  • 327

How to best propagate context cancellation and errors in data Data pipeline that handles errors and cancellations

To do this now, I use context.Contextcontext.Context and cancel()cancel() using a pattern like the one below. I'm fairly certain that I am using context.Contextcontext.Context in an unintended way: it seems strange to pass both the ContextContext and its cancellation function to every function. I need the ContextContext so that each function knows when to halt work, and I need the cancellation function so any subprocess may choose to halt work. If I create the child context and cancellation inside the ReadRead/DoWorkDoWork/WriteWrite functions, the error is not propagated to the caller.

Note: The IllegalNumber and IllegalString constants are meant to allow exploring various failure states of the logic to understand the intended behavior of the code. For instance, changing IllegalString to "1" results in WriteObjectsWriteObjects failing.

How to best propagate context cancellation and errors in data pipeline

To do this now, I use context.Context and cancel() using a pattern like the one below. I'm fairly certain that I am using context.Context in an unintended way: it seems strange to pass both the Context and its cancellation function to every function. I need the Context so that each function knows when to halt work, and I need the cancellation function so any subprocess may choose to halt work. If I create the child context and cancellation inside the Read/DoWork/Write functions, the error is not propagated to the caller.

Note: The IllegalNumber and IllegalString constants are meant to allow exploring various failure states of the logic to understand the intended behavior of the code. For instance, changing IllegalString to "1" results in WriteObjects failing.

Data pipeline that handles errors and cancellations

To do this now, I use context.Context and cancel() using a pattern like the one below. I'm fairly certain that I am using context.Context in an unintended way: it seems strange to pass both the Context and its cancellation function to every function. I need the Context so that each function knows when to halt work, and I need the cancellation function so any subprocess may choose to halt work. If I create the child context and cancellation inside the Read/DoWork/Write functions, the error is not propagated to the caller.

Note: The IllegalNumber and IllegalString constants are meant to allow exploring various failure states of the logic to understand the intended behavior of the code. For instance, changing IllegalString to "1" results in WriteObjects failing.

Became Hot Network Question
Source Link

How to best propagate context cancellation and errors in data pipeline

I have code that concurrently reads data from a stream, processes elements, and writes the data to another stream. Each of these individual operations may fail, in which case I want processing to halt and all resources to be released. I also want the caller to have access to the specific error which caused the process to halt.

To do this now, I use context.Context and cancel() using a pattern like the one below. I'm fairly certain that I am using context.Context in an unintended way: it seems strange to pass both the Context and its cancellation function to every function. I need the Context so that each function knows when to halt work, and I need the cancellation function so any subprocess may choose to halt work. If I create the child context and cancellation inside the Read/DoWork/Write functions, the error is not propagated to the caller.

Note: The IllegalNumber and IllegalString constants are meant to allow exploring various failure states of the logic to understand the intended behavior of the code. For instance, changing IllegalString to "1" results in WriteObjects failing.

package main

import (
    "context"
    "errors"
    "fmt"
    "os"
    "strconv"
    "sync"
)

const (
    IllegalNumber = 7
    IllegalString = "0"
)

func ReadObjects(
    ctx context.Context,
    cancel context.CancelCauseFunc,
) <-chan int {
    // Mimic input data.
    readChannel := make(chan int)
    go func() {
        for i := range 5 {
            readChannel <- i
        }
        close(readChannel)
    }()

    result := make(chan int)
    go func() {
        defer close(result)
        for {
            select {
            case <-ctx.Done():
                return
            default:
                i, ok := <-readChannel
                if !ok {
                    return
                }
                if i == IllegalNumber {
                    cancel(errors.New("read illegal number"))
                    return
                }
                result <- i
            }
        }
    }()

    return result
}

func DoWork(
    ctx context.Context,
    cancel context.CancelCauseFunc,
    objs <-chan int,
) <-chan int {
    result := make(chan int)

    go func() {
        defer close(result)
        for {
            select {
            case <-ctx.Done():
                return
            default:
                i, ok := <-objs
                if !ok {
                    return
                }
                newI := i + 1
                if newI == IllegalNumber {
                    cancel(errors.New("created illegal number"))
                    return
                }
                result <- newI
            }
        }
    }()

    return result
}

func WriteObjects(
    ctx context.Context,
    cancel context.CancelCauseFunc,
    objs <-chan int,
) *sync.WaitGroup {
    wg := &sync.WaitGroup{}
    wg.Add(1)

    go func() {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done():
                return
            default:
                i, ok := <-objs
                if !ok {
                    return
                }
                strI := strconv.Itoa(i)
                if strI == IllegalString {
                    cancel(errors.New("tried to write illegal string"))
                    return
                }
                fmt.Println(strI)
            }
        }
    }()
    return wg
}

func main() {
    ctx := context.Background()
    ctx, cancel := context.WithCancelCause(ctx)

    inObjects := ReadObjects(ctx, cancel)
    outObjects := DoWork(ctx, cancel, inObjects)
    writeWg := WriteObjects(ctx, cancel, outObjects)
    writeWg.Wait()

    if ctx.Err() != nil {
        fmt.Println(context.Cause(ctx))
        os.Exit(1)
    }
}

In reality these operations are multi-threaded, channels are properly buffered, but that should be irrelevant to the anti-pattern I'm sure I'm using.