Skip to main content
Spelling and grammar
Source Link
Toby Speight
  • 88.3k
  • 14
  • 104
  • 327

You are correct that you should not use context this way. Context is for request context and cancellation.

I encountered different approaches for handling errors in concurrent pipelines. One way is to carry the error through the stages and in the end log it, so someone can check it later. What you want is different,: you want to stop the whole pipeline when an error happens. Since this is a concurrent pipeline, you may have multiple errors happen at the same time as well.

One way to deal with this is to use an error channel. The sketch of the idea is something like this:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

errCh:=make(chan error)
inputCh:=make(chan Payload)

// Setup your pipeline
stage1Output:=NewPipelineStage1(ctx,inputCh,errCh)
stage2Output:=NewPipelineStage2(ctx,stage1Output,errCh)
...

// Listen to error channel
go func() {
  for err:=range errCh {
     // Record error
     // Cancel context. This will stop pipeline. You can call cancel
     // multiple times.
     cancel()
  }
}()

// Feed data
for _,data:=range dataSource {
   select {
   case inputCh<-data:
   case <-ctx.Done(): // Stop feeding in case of error
      return
   }
}

Pipeline stages look like this:

func NewPipelineStage1(ctx context.Context, input <-chan Payload, errCh chan<- error) chan Payload{
  outputCh:=make(chan Payload)
  defer close(outputCh)
  go func() {
    for {
       select {
          case data, ok:=-<-input
            if ok {
               // work with data
               if err!=nil {
                 errCh<-err
               }
            } else {
               return
            }
          case <-ctx.Done():
               return
        }
    } 
  }()
  return outputCh
}

You are correct that you should not use context this way. Context is for request context and cancellation.

I encountered different approaches for handling errors in concurrent pipelines. One way is to carry the error through the stages and in the end log it, so someone can check it later. What you want is different, you want to stop the whole pipeline when an error happens. Since this is a concurrent pipeline, you may have multiple errors happen at the same time as well.

One way to deal with this is to use an error channel. The sketch of the idea is something like this:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

errCh:=make(chan error)
inputCh:=make(chan Payload)

// Setup your pipeline
stage1Output:=NewPipelineStage1(ctx,inputCh,errCh)
stage2Output:=NewPipelineStage2(ctx,stage1Output,errCh)
...

// Listen to error channel
go func() {
  for err:=range errCh {
     // Record error
     // Cancel context. This will stop pipeline. You can call cancel
     // multiple times.
     cancel()
  }
}()

// Feed data
for _,data:=range dataSource {
   select {
   case inputCh<-data:
   case <-ctx.Done(): // Stop feeding in case of error
      return
   }
}

Pipeline stages look like this:

func NewPipelineStage1(ctx context.Context, input <-chan Payload, errCh chan<- error) chan Payload{
  outputCh:=make(chan Payload)
  defer close(outputCh)
  go func() {
    for {
       select {
          case data, ok:=-<-input
            if ok {
               // work with data
               if err!=nil {
                 errCh<-err
               }
            } else {
               return
            }
          case <-ctx.Done():
               return
        }
    } 
  }()
  return outputCh
}

You are correct that you should not use context this way. Context is for request context and cancellation.

I encountered different approaches for handling errors in concurrent pipelines. One way is to carry the error through the stages and in the end log it, so someone can check it later. What you want is different: you want to stop the whole pipeline when an error happens. Since this is a concurrent pipeline, you may have multiple errors happen at the same time as well.

One way to deal with this is to use an error channel. The sketch of the idea is something like this:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

errCh:=make(chan error)
inputCh:=make(chan Payload)

// Setup your pipeline
stage1Output:=NewPipelineStage1(ctx,inputCh,errCh)
stage2Output:=NewPipelineStage2(ctx,stage1Output,errCh)
...

// Listen to error channel
go func() {
  for err:=range errCh {
     // Record error
     // Cancel context. This will stop pipeline. You can call cancel
     // multiple times.
     cancel()
  }
}()

// Feed data
for _,data:=range dataSource {
   select {
   case inputCh<-data:
   case <-ctx.Done(): // Stop feeding in case of error
      return
   }
}

Pipeline stages look like this:

func NewPipelineStage1(ctx context.Context, input <-chan Payload, errCh chan<- error) chan Payload{
  outputCh:=make(chan Payload)
  defer close(outputCh)
  go func() {
    for {
       select {
          case data, ok:=-<-input
            if ok {
               // work with data
               if err!=nil {
                 errCh<-err
               }
            } else {
               return
            }
          case <-ctx.Done():
               return
        }
    } 
  }()
  return outputCh
}
added 77 characters in body
Source Link

You are correct that you should not use context this way. Context is for request context and cancellation.

I encountered different approaches for handling errors in concurrent pipelines. One way is to carry the error through the stages and in the end log it, so someone can check it later. What you want is different, you want to stop the whole pipeline when an error happens. Since this is a concurrent pipeline, you may have multiple errors happen at the same time as well.

One way to deal with this is to use an error channel. The sketch of the idea is something like this:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

errCh:=make(chan error)
inputCh:=make(chan Payload)

// Setup your pipeline
stage1Output:=NewPipelineStage1(ctx,inputCh,errCh)
stage2Output:=NewPipelineStage2(ctx,stage1Output,errCh)
...

// Listen to error channel
go func() {
  for err:=range errCh {
     // Record error
     // Cancel context. This will stop pipeline. You can call cancel
     // multiple times.
     cancel()
  }
}()

// Feed data
for _,data:=range dataSource {
   select {
   case inputCh<-data:
   case <-ctx.Done(): // Stop feeding in case of error
      return
   }
}

Pipeline stages look like this:

func NewPipelineStage1(ctx context.Context, input <-chan Payload, errCh chan<- error) chan Payload{
  outputCh:=make(chan Payload)
  defer close(outputCh)
  go func() {
    for {
       select {
          case data, ok:=-<-input
            if ok {
               // work with data
               if err!=nil {
                 errCh<-err
               }
            } else {
               return
            }
          case <-ctx.Done():
               return
        }
    } 
  }()
  return outputCh
}

You are correct that you should not use context this way. Context is for request context and cancellation.

I encountered different approaches for handling errors in concurrent pipelines. One way is to carry the error through the stages and in the end log it, so someone can check it later. What you want is different, you want to stop the whole pipeline when an error happens. Since this is a concurrent pipeline, you may have multiple errors happen at the same time as well.

One way to deal with this is to use an error channel. The sketch of the idea is something like this:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

errCh:=make(chan error)
inputCh:=make(chan Payload)

// Setup your pipeline
stage1Output:=NewPipelineStage1(ctx,inputCh,errCh)
stage2Output:=NewPipelineStage2(ctx,stage1Output,errCh)
...

// Listen to error channel
go func() {
  for err:=range errCh {
     // Record error
     // Cancel context. This will stop pipeline. You can call cancel
     // multiple times.
     cancel()
  }
}()

// Feed data
for _,data:=range dataSource {
   select {
   case inputCh<-data:
   case <-ctx.Done(): // Stop feeding in case of error
      return
   }
}

Pipeline stages look like this:

func NewPipelineStage1(ctx context.Context, input <-chan Payload, errCh chan<- error) chan Payload{
  outputCh:=make(chan Payload)
  defer close(outputCh)
  go func() {
    for {
       select {
          case data, ok:=-<-input
            if ok {
               // work with data
            } else {
               return
            }
          case <-ctx.Done():
               return
        }
    } 
  }()
  return outputCh
}

You are correct that you should not use context this way. Context is for request context and cancellation.

I encountered different approaches for handling errors in concurrent pipelines. One way is to carry the error through the stages and in the end log it, so someone can check it later. What you want is different, you want to stop the whole pipeline when an error happens. Since this is a concurrent pipeline, you may have multiple errors happen at the same time as well.

One way to deal with this is to use an error channel. The sketch of the idea is something like this:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

errCh:=make(chan error)
inputCh:=make(chan Payload)

// Setup your pipeline
stage1Output:=NewPipelineStage1(ctx,inputCh,errCh)
stage2Output:=NewPipelineStage2(ctx,stage1Output,errCh)
...

// Listen to error channel
go func() {
  for err:=range errCh {
     // Record error
     // Cancel context. This will stop pipeline. You can call cancel
     // multiple times.
     cancel()
  }
}()

// Feed data
for _,data:=range dataSource {
   select {
   case inputCh<-data:
   case <-ctx.Done(): // Stop feeding in case of error
      return
   }
}

Pipeline stages look like this:

func NewPipelineStage1(ctx context.Context, input <-chan Payload, errCh chan<- error) chan Payload{
  outputCh:=make(chan Payload)
  defer close(outputCh)
  go func() {
    for {
       select {
          case data, ok:=-<-input
            if ok {
               // work with data
               if err!=nil {
                 errCh<-err
               }
            } else {
               return
            }
          case <-ctx.Done():
               return
        }
    } 
  }()
  return outputCh
}
Source Link

You are correct that you should not use context this way. Context is for request context and cancellation.

I encountered different approaches for handling errors in concurrent pipelines. One way is to carry the error through the stages and in the end log it, so someone can check it later. What you want is different, you want to stop the whole pipeline when an error happens. Since this is a concurrent pipeline, you may have multiple errors happen at the same time as well.

One way to deal with this is to use an error channel. The sketch of the idea is something like this:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

errCh:=make(chan error)
inputCh:=make(chan Payload)

// Setup your pipeline
stage1Output:=NewPipelineStage1(ctx,inputCh,errCh)
stage2Output:=NewPipelineStage2(ctx,stage1Output,errCh)
...

// Listen to error channel
go func() {
  for err:=range errCh {
     // Record error
     // Cancel context. This will stop pipeline. You can call cancel
     // multiple times.
     cancel()
  }
}()

// Feed data
for _,data:=range dataSource {
   select {
   case inputCh<-data:
   case <-ctx.Done(): // Stop feeding in case of error
      return
   }
}

Pipeline stages look like this:

func NewPipelineStage1(ctx context.Context, input <-chan Payload, errCh chan<- error) chan Payload{
  outputCh:=make(chan Payload)
  defer close(outputCh)
  go func() {
    for {
       select {
          case data, ok:=-<-input
            if ok {
               // work with data
            } else {
               return
            }
          case <-ctx.Done():
               return
        }
    } 
  }()
  return outputCh
}