Skip to main content
4 of 4
added 12 characters in body
rose
  • 325
  • 1
  • 6

Avoid worker pools

The loadFiles method spawns runtime.NumCPU()-2 goroutines to act as workers in a pool. This is a pattern you are probably familiar with from other languages where threads are OS threads, and so it's better for efficiency if each thread can be scheduled on a separate CPU core. However, in go, goroutines are lightweight and already distributed across OS threads, so it makes less sense to limit the number of threads in this way. In particular if some of the worker goroutines are blocked waiting on I/O, some CPU cores will be sitting idle.

There are good reasons to limit the number of concurrent goroutines, usually related to file descriptor limit and memory usage. However using the number of CPUs to limit the number of goroutines is a code smell.

I would recommend a concurrency pattern we saw in a previous question, namely limiting the number of active goroutines using a semaphore. Then loadFiles would look something like

func (r *clientRepo) loadFiles(ctx context.Context, path string, spn log.Span) error {
    // Load list of files as before.
    files := ...

    g, ctx := errgroup.WithContext(ctx)
    // sem acts as a semaphore to limit the number of concurrent goroutines.
    sem := make(chan struct{}, 100)
    for _, file := range files {
        select {
        case <-ctx.Done():
            break
        case sem <- struct{}{}:
        }
        file := file
        g.Go(func() error {
            defer func() { <-sem }()
            return r.read(spn, file.fileName, bucket)
        })
    }

    if err := g.Wait(); err != nil {
        return err
    }
    spn.Info("Finished loading all files. Total duration: ", time.Since(start))
    return nil
}

A few other minor comments

Pass context as a parameter

Both loadFiles and read should take context as a parameter instead of calling context.Background. This gives the flexibility to add a timeout or cancellation later.

Avoid nested maps.

Nested maps can require a lot of allocations. In read, consider making productCatalog a flat map with a struct key type.

In fact, you could consider inserting directly into the concurrent map without constructing a separate map locally. Then read would look something like

func (r *clientRepo) read(ctx context.Context, spn log.Span, file string, bucket string) error {
    // Initialize the reader as before.
    pr := ...

    for {
        rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
        if err != nil {
            return err
        }
        if len(rows) <= 0 {
            break
        }

        byteSlice, err := json.Marshal(rows)
        if err != nil {
            return err
        }
        var productRows []ClientProduct
        err = json.Unmarshal(byteSlice, &productRows)
        if err != nil {
            return err
        }

        for i := range productRows {
            // Going with the idea that Convert returns
            // a CustomerProduct.
            flatProduct, err := r.Convert(spn, productRows[i])
            if err != nil {
                return err
            }
            if flatProduct.StatusCode == definitions.DONE {
                continue
            }
            r.products.Set(strconv.Itoa(flatProduct.ProductId, 10), flatProduct)
            for _, catalogId := range flatProduct.Catalogs {
                catalogValue := strconv.FormatInt(int64(catalogId), 10)
                r.productCatalog.Upsert(catalogValue, flatProduct.ProductId, func(exists bool, valueInMap interface{}, newValue interface{}) interface{} {
                    productID := newValue.(int64)
                    if valueInMap == nil {
                        return []int64{productID}
                    }
                    oldIDs := valueInMap.([]int64)

                    for _, id := range oldIDs {
                        if id == productID {
                            // Already exists, don't add duplicates.
                            return oldIDs
                        }
                    }
                    return append(oldIDs, productID)
                })
            }
        }
    }
    return nil
}

This uses a linear scan to check for duplicate product IDs, so it will be faster than allocating a map if the number of IDs is small. Up to you to test the performance of this suggestion.

Avoid "getters"

The GetProductMap and GetProductCatalogMap methods are unnecessary boilerplate. Just make products and productCatalog exported fields.

rose
  • 325
  • 1
  • 6