I am trying to read bunch of S3 parquet files in parallel from a S3 bucket. After reading all these files, I am populating my products and productCatalog concurrent map. This happens during server startup and then I have getters method GetProductMap and GetProductCatalogMap to return these maps which be used by main application threads.
My getters method will be called by lot of application threads concurrently so idea is populate these maps during server startup (then also periodically from a background thread using ticker) and then access it via getters from main application threads so I want to be in atomic state when writes happen, it is immediately accessed by reader threads.
type clientRepo struct {
    s3Client       *awss3.S3Client
    deltaChan      chan string
    done           chan struct{}
    err            chan error
    wg             sync.WaitGroup
    cfg            *ParquetReaderConfig
    products       *cmap.ConcurrentMap
    productCatalog *cmap.ConcurrentMap
}
type fileChannel struct {
    fileName string
    index    int
}
Below is my loadFiles method which given a path find all the files I need to read in parallel. I am using errgroup here to communicate error states across goroutines. Idea is very simple here - Find all the files from S3 bucket and then read them in parallel. Populate my internal maps and then use those internal maps to populate my concurrent map.
func (r *clientRepo) loadFiles(path string, spn log.Span) error {
    var err error
    bucket, key, err := awss3.ParseS3Path(path)
    if err != nil {
        return err
    }
    var files []string
    files, err = r.s3Client.ListObjects(bucket, key, ParquetFileExtension)
    if err != nil {
        return err
    }
    spn.Infof("Loading files from %s. Total files: %d", path, len(files))
    start := time.Now()
    fileChan := make(chan fileChannel)
    g, ctx := errgroup.WithContext(context.Background())
    for i := 0; i < runtime.NumCPU()-2; i++ {
        workerNum := i
        g.Go(func() error {
            for file := range fileChan {
                if err := r.read(spn, file.fileName, bucket); err != nil {
                    spn.Infof("worker %d failed to process %s : %s", workerNum, file, err.Error())
                    return err
                } else if err := ctx.Err(); err != nil {
                    spn.Infof("worker %d context error in worker: %s", workerNum, err.Error())
                    return err
                }
            }
            spn.Infof("worker %d processed all work on channel", workerNum)
            return nil
        })
    }
    func() {
        for idx, file := range files {
            select {
            case fileChan <- fileChannel{fileName: file, index: idx}:
                continue
            case <-ctx.Done():
                return
            }
        }
    }()
    close(fileChan)
    err = g.Wait()
    if err != nil {
        return err
    }
    spn.Info("Finished loading all files. Total duration: ", time.Since(start))
    return nil
}
Here is read method which reads each file, deserializes them into ClientProduct struct and then I iterate over that to populate my internal maps. And then from those internal maps, I populate my concurrent map. I am not sure whether I need to do this - Maybe collect all these data in a channel and then populate it in read method but it can increase memory footprint by a lot so that's why I went with this design.
func (r *clientRepo) read(spn log.Span, file string, bucket string) error {
    var err error
    var products = make(map[string]*definitions.CustomerProduct)
    var productCatalog = make(map[string]map[int64]bool)
    fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
    if err != nil {
        return errs.Wrap(err)
    }
    defer xio.CloseIgnoringErrors(fr)
    pr, err := reader.NewParquetReader(fr, nil, int64(r.cfg.DeltaWorkers))
    if err != nil {
        return errs.Wrap(err)
    }
    if pr.GetNumRows() == 0 {
        spn.Infof("Skipping %s due to 0 rows", file)
        return nil
    }
    for {
        rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
        if err != nil {
            return errs.Wrap(err)
        }
        if len(rows) <= 0 {
            break
        }
        byteSlice, err := json.Marshal(rows)
        if err != nil {
            return errs.Wrap(err)
        }
        var productRows []ClientProduct
        err = json.Unmarshal(byteSlice, &productRows)
        if err != nil {
            return errs.Wrap(err)
        }
        for i := range productRows {
            var flatProduct definitions.CustomerProduct
            err = r.Convert(spn, &productRows[i], &flatProduct)
            if err != nil {
                return errs.Wrap(err)
            }
            if flatProduct.StatusCode == definitions.DONE {
                continue
            }
            products[strconv.FormatInt(flatProduct.ProductId, 10)] = &flatProduct
            for _, catalogId := range flatProduct.Catalogs {
                catalogValue := strconv.FormatInt(int64(catalogId), 10)
                if v, ok := productCatalog[catalogValue]; ok {
                    v[flatProduct.ProductId] = true
                } else {
                    productCatalog[catalogValue] = map[int64]bool{flatProduct.ProductId: true}
                }
            }
        }
    }
    for k, v := range products {
        r.products.Set(k, v)
    }
    for k, v := range productCatalog {
        r.productCatalog.Upsert(k, v, func(exists bool, valueInMap interface{}, newValue interface{}) interface{} {
            m := newValue.(map[int64]bool)
            var updatedMap map[int64]bool
            if valueInMap == nil { // New value!
                updatedMap = m
            } else {
                typedValueInMap := valueInMap.([]int64)
                updatedMap = m
                for _, k := range typedValueInMap {
                    updatedMap[k] = true
                }
            }
            a := make([]int64, 0, len(m))
            for k := range m {
                a = append(a, k)
            }
            return a
        })
    }
    return nil
}
And these are my getter methods which will be accessed by main application threads:
func (r *clientRepo) GetProductMap() *cmap.ConcurrentMap {
    return r.products
}
func (r *clientRepo) GetProductCatalogMap() *cmap.ConcurrentMap {
    return r.productCatalog
}
Note:
My products map is made of productId as the key and value as flatProduct.
But my productCatalog map is made of catalogId as the key and unique list of productIds as the value.
Here is the concurrent map I am using - https://github.com/orcaman/concurrent-map And here is the upsert method which I am using - https://github.com/orcaman/concurrent-map/blob/master/concurrent_map.go#L56
I am using this parquet library to read all these S3 files.
Problem Statement
I am looking for ideas to see if there is anything that can be improved in above design or the way I am populating my maps. Opting for code review to see if anything can be improved which can improve some performance or reduce memory footprints.
