'use strict' const AbortController = globalThis.AbortController || require('abort-controller').AbortController const { codes: { ERR_INVALID_ARG_VALUE, ERR_INVALID_ARG_TYPE, ERR_MISSING_ARGS, ERR_OUT_OF_RANGE }, AbortError } = require('../../ours/errors') const { validateAbortSignal, validateInteger, validateObject } = require('../validators') const kWeakHandler = require('../../ours/primordials').Symbol('kWeak') const kResistStopPropagation = require('../../ours/primordials').Symbol('kResistStopPropagation') const { finished } = require('./end-of-stream') const staticCompose = require('./compose') const { addAbortSignalNoValidate } = require('./add-abort-signal') const { isWritable, isNodeStream } = require('./utils') const { deprecate } = require('../../ours/util') const { ArrayPrototypePush, Boolean, MathFloor, Number, NumberIsNaN, Promise, PromiseReject, PromiseResolve, PromisePrototypeThen, Symbol } = require('../../ours/primordials') const kEmpty = Symbol('kEmpty') const kEof = Symbol('kEof') function compose(stream, options) { if (options != null) { validateObject(options, 'options') } if ((options === null || options === undefined ? undefined : options.signal) != null) { validateAbortSignal(options.signal, 'options.signal') } if (isNodeStream(stream) && !isWritable(stream)) { throw new ERR_INVALID_ARG_VALUE('stream', stream, 'must be writable') } const composedStream = staticCompose(this, stream) if (options !== null && options !== undefined && options.signal) { // Not validating as we already validated before addAbortSignalNoValidate(options.signal, composedStream) } return composedStream } function map(fn, options) { if (typeof fn !== 'function') { throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn) } if (options != null) { validateObject(options, 'options') } if ((options === null || options === undefined ? undefined : options.signal) != null) { validateAbortSignal(options.signal, 'options.signal') } let concurrency = 1 if ((options === null || options === undefined ? undefined : options.concurrency) != null) { concurrency = MathFloor(options.concurrency) } let highWaterMark = concurrency - 1 if ((options === null || options === undefined ? undefined : options.highWaterMark) != null) { highWaterMark = MathFloor(options.highWaterMark) } validateInteger(concurrency, 'options.concurrency', 1) validateInteger(highWaterMark, 'options.highWaterMark', 0) highWaterMark += concurrency return async function* map() { const signal = require('../../ours/util').AbortSignalAny( [options === null || options === undefined ? undefined : options.signal].filter(Boolean) ) const stream = this const queue = [] const signalOpt = { signal } let next let resume let done = false let cnt = 0 function onCatch() { done = true afterItemProcessed() } function afterItemProcessed() { cnt -= 1 maybeResume() } function maybeResume() { if (resume && !done && cnt < concurrency && queue.length < highWaterMark) { resume() resume = null } } async function pump() { try { for await (let val of stream) { if (done) { return } if (signal.aborted) { throw new AbortError() } try { val = fn(val, signalOpt) if (val === kEmpty) { continue } val = PromiseResolve(val) } catch (err) { val = PromiseReject(err) } cnt += 1 PromisePrototypeThen(val, afterItemProcessed, onCatch) queue.push(val) if (next) { next() next = null } if (!done && (queue.length >= highWaterMark || cnt >= concurrency)) { await new Promise((resolve) => { resume = resolve }) } } queue.push(kEof) } catch (err) { const val = PromiseReject(err) PromisePrototypeThen(val, afterItemProcessed, onCatch) queue.push(val) } finally { done = true if (next) { next() next = null } } } pump() try { while (true) { while (queue.length > 0) { const val = await queue[0] if (val === kEof) { return } if (signal.aborted) { throw new AbortError() } if (val !== kEmpty) { yield val } queue.shift() maybeResume() } await new Promise((resolve) => { next = resolve }) } } finally { done = true if (resume) { resume() resume = null } } }.call(this) } function asIndexedPairs(options = undefined) { if (options != null) { validateObject(options, 'options') } if ((options === null || options === undefined ? undefined : options.signal) != null) { validateAbortSignal(options.signal, 'options.signal') } return async function* asIndexedPairs() { let index = 0 for await (const val of this) { var _options$signal if ( options !== null && options !== undefined && (_options$signal = options.signal) !== null && _options$signal !== undefined && _options$signal.aborted ) { throw new AbortError({ cause: options.signal.reason }) } yield [index++, val] } }.call(this) } async function some(fn, options = undefined) { for await (const unused of filter.call(this, fn, options)) { return true } return false } async function every(fn, options = undefined) { if (typeof fn !== 'function') { throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn) } // https://en.wikipedia.org/wiki/De_Morgan%27s_laws return !(await some.call( this, async (...args) => { return !(await fn(...args)) }, options )) } async function find(fn, options) { for await (const result of filter.call(this, fn, options)) { return result } return undefined } async function forEach(fn, options) { if (typeof fn !== 'function') { throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn) } async function forEachFn(value, options) { await fn(value, options) return kEmpty } // eslint-disable-next-line no-unused-vars for await (const unused of map.call(this, forEachFn, options)); } function filter(fn, options) { if (typeof fn !== 'function') { throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn) } async function filterFn(value, options) { if (await fn(value, options)) { return value } return kEmpty } return map.call(this, filterFn, options) } // Specific to provide better error to reduce since the argument is only // missing if the stream has no items in it - but the code is still appropriate class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS { constructor() { super('reduce') this.message = 'Reduce of an empty stream requires an initial value' } } async function reduce(reducer, initialValue, options) { var _options$signal2 if (typeof reducer !== 'function') { throw new ERR_INVALID_ARG_TYPE('reducer', ['Function', 'AsyncFunction'], reducer) } if (options != null) { validateObject(options, 'options') } if ((options === null || options === undefined ? undefined : options.signal) != null) { validateAbortSignal(options.signal, 'options.signal') } let hasInitialValue = arguments.length > 1 if ( options !== null && options !== undefined && (_options$signal2 = options.signal) !== null && _options$signal2 !== undefined && _options$signal2.aborted ) { const err = new AbortError(undefined, { cause: options.signal.reason }) this.once('error', () => {}) // The error is already propagated await finished(this.destroy(err)) throw err } const ac = new AbortController() const signal = ac.signal if (options !== null && options !== undefined && options.signal) { const opts = { once: true, [kWeakHandler]: this, [kResistStopPropagation]: true } options.signal.addEventListener('abort', () => ac.abort(), opts) } let gotAnyItemFromStream = false try { for await (const value of this) { var _options$signal3 gotAnyItemFromStream = true if ( options !== null && options !== undefined && (_options$signal3 = options.signal) !== null && _options$signal3 !== undefined && _options$signal3.aborted ) { throw new AbortError() } if (!hasInitialValue) { initialValue = value hasInitialValue = true } else { initialValue = await reducer(initialValue, value, { signal }) } } if (!gotAnyItemFromStream && !hasInitialValue) { throw new ReduceAwareErrMissingArgs() } } finally { ac.abort() } return initialValue } async function toArray(options) { if (options != null) { validateObject(options, 'options') } if ((options === null || options === undefined ? undefined : options.signal) != null) { validateAbortSignal(options.signal, 'options.signal') } const result = [] for await (const val of this) { var _options$signal4 if ( options !== null && options !== undefined && (_options$signal4 = options.signal) !== null && _options$signal4 !== undefined && _options$signal4.aborted ) { throw new AbortError(undefined, { cause: options.signal.reason }) } ArrayPrototypePush(result, val) } return result } function flatMap(fn, options) { const values = map.call(this, fn, options) return async function* flatMap() { for await (const val of values) { yield* val } }.call(this) } function toIntegerOrInfinity(number) { // We coerce here to align with the spec // https://github.com/tc39/proposal-iterator-helpers/issues/169 number = Number(number) if (NumberIsNaN(number)) { return 0 } if (number < 0) { throw new ERR_OUT_OF_RANGE('number', '>= 0', number) } return number } function drop(number, options = undefined) { if (options != null) { validateObject(options, 'options') } if ((options === null || options === undefined ? undefined : options.signal) != null) { validateAbortSignal(options.signal, 'options.signal') } number = toIntegerOrInfinity(number) return async function* drop() { var _options$signal5 if ( options !== null && options !== undefined && (_options$signal5 = options.signal) !== null && _options$signal5 !== undefined && _options$signal5.aborted ) { throw new AbortError() } for await (const val of this) { var _options$signal6 if ( options !== null && options !== undefined && (_options$signal6 = options.signal) !== null && _options$signal6 !== undefined && _options$signal6.aborted ) { throw new AbortError() } if (number-- <= 0) { yield val } } }.call(this) } function take(number, options = undefined) { if (options != null) { validateObject(options, 'options') } if ((options === null || options === undefined ? undefined : options.signal) != null) { validateAbortSignal(options.signal, 'options.signal') } number = toIntegerOrInfinity(number) return async function* take() { var _options$signal7 if ( options !== null && options !== undefined && (_options$signal7 = options.signal) !== null && _options$signal7 !== undefined && _options$signal7.aborted ) { throw new AbortError() } for await (const val of this) { var _options$signal8 if ( options !== null && options !== undefined && (_options$signal8 = options.signal) !== null && _options$signal8 !== undefined && _options$signal8.aborted ) { throw new AbortError() } if (number-- > 0) { yield val } // Don't get another item from iterator in case we reached the end if (number <= 0) { return } } }.call(this) } module.exports.streamReturningOperators = { asIndexedPairs: deprecate(asIndexedPairs, 'readable.asIndexedPairs will be removed in a future version.'), drop, filter, flatMap, map, take, compose } module.exports.promiseReturningOperators = { every, forEach, reduce, toArray, some, find }