Skip to content

Stream this.push and spread operator does not work #26582

@akaNightmare

Description

@akaNightmare
  • Version: v11.10.0
  • Platform: Darwin Ivans-MacBook-Pro.local 18.2.0 Darwin Kernel Version 18.2.0: Thu Dec 20 20:46:53 PST 2018; root:xnu-4903.241.1~1/RELEASE_X86_64 x86_64

Hello, I have written a simple Transform stream to getting some extended user info:

import { Transform } from 'stream';
import { getUsersPublicInfo } from '../../user/services';

const USERS_BATCH_SIZE = 100;

/**
 * Class (extended from Transform stream)
 * for getting minimum user info for diff
 * types of reports. Returns `id`, `email`
 * and `name` in a stream as an object.
 */
export class FillUserTransform extends Transform {
    constructor(options) {
        super({
            readableObjectMode: true,
            writableObjectMode: true,
            ...(options || {}),
        });
        this._userIds = [];
    }

    /**
     * @param {{user_id: string}} chunk
     * @param {string} encoding
     * @param {Function} cb
     * @return {*}
     * @private
     */
    _transform(chunk, encoding, cb) {
        this._userIds.push(chunk.user_id);
        if (this._userIds.length >= USERS_BATCH_SIZE) {
            this._writeUserInfo(cb);
        } else {
            return void cb();
        }
    }

    _flush(cb) {
        if (this._userIds.length > 0) {
            this._writeUserInfo(cb);
        } else {
            return void cb();
        }
    }

    /**
     * @param {Function} cb
     * @return {*}
     * @private
     */
    _writeUserInfo(cb) {
        this.pause();
        getUsersPublicInfo(this._userIds)
            .then(users => {
                if (users.length > 0) {
                    this.push(...users);
                }
                this._userIds.length = 0;
                this.resume();
                cb();
            })
            .catch(err => {
                this._userIds.length = 0;
                this.resume();
                cb(err);
            });
    }
}

but this.push(...users); does not work as expected, replaced spread operator with forEach - works

users.forEach(user => {
    this.push(user);
});

Metadata

Metadata

Assignees

No one assigned

    Labels

    invalidIssues and PRs that are invalid.streamIssues and PRs related to the stream subsystem.

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions