Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to spread records in a pipeline to multiple readers #4471

Open
2 tasks done
obones opened this issue Aug 29, 2024 · 4 comments
Open
2 tasks done

How to spread records in a pipeline to multiple readers #4471

obones opened this issue Aug 29, 2024 · 4 comments

Comments

@obones
Copy link

obones commented Aug 29, 2024

Node.js Version

v18.20.4

NPM Version

v10.7.0

Operating System

Windows/Linux

Subsystem

stream

Description

Hello,

I currently have the following simple stream pipeline:

source --> transform --> destination

Quite expectedly, the transform stream processes one item at a time from the source and this does what I need it to do.
The method given to transform is an async one as it needs to await a call to an HTTPs API which will take longer than the time it takes to produce the next record.
In an effort to improve processing time, I would like to split the record processing across multiple consumers like so:

         /--  reader 1  --\
        /---  reader 2  ---\
source --       ...        -- destination
        \--- reader N-1 ---/
         \--  reader N  --/

I have read question #2707 but this describes a case where the same record exiting a stream is duplicated across multiple downward streams but I don't want duplication, just somewhat "parallel" execution.

Is this something that is possible within the stream ecosystem?
If yes, how would you suggest that I write it?

Thanks for your help

Minimal Reproduction

No response

Output

No response

Before You Submit

  • I have looked for issues that already exist before submitting this
  • My issue follows the guidelines in the README file, and follows the 'How to ask a good question' guide at https://stackoverflow.com/help/how-to-ask
@avivkeller
Copy link
Member

@nodejs/streams

@benjamingr
Copy link
Member

@redyetidev please don't ing teams for issues in the help repo :)

@obones look into .map on streams and specifically its concurrency parameter. The example in the docs (multiple readers) shows what you need.

@obones
Copy link
Author

obones commented Aug 30, 2024

Thanks, I did not find this in the midst of information in that lib.
However, it is marked as "experimental" and the explaining text gets me worried:

removal may occur in any future release

If I'm to start using it and it disappears in the next Node version, that would not be practical.

@ronag
Copy link
Member

ronag commented Aug 30, 2024

You could also do it yourself:

await stream.promises.pipeline(
  src,
  parallelOutOfOrder(fn, { concurrency }),
  dst
)

class ConditionVariable {
  #condition
  #resume = []

  constructor(condition) {
    this.#condition = condition
  }

  notify() {
    for (const resolve of this.#resume.splice(0)) {
      resolve()
    }
  }

  notifyOne() {
    const resolve = this.#resume.shift()
    resolve?.()
  }

  async wait() {
    while (!this.#condition()) {
      await new Promise((resolve) => {
        this.#resume.push(resolve)
      })
    }
  }
}

async function * parallelOutOfOrder (fn, { concurrency }) 
  return (source, { signal }) => {
    const queue = new PQueue({ concurrency })
    const buffer = []

    let error = null
    let ended = false

    const readController = new ConditionVariable(() => buffer.length > 0 || error || ended)
    const pumpController = new ConditionVariable(() => buffer.length === 0)

    const pump = async () => {
      try {
        for await (const value of source) {
          queue
            .add(async ({ signal }) => {
              let ret
              try {
                ret = { status: 'fulfilled', value: await fn(value, { signal }) }
              } catch (err) {
                ret = { status: 'rejected', reason: err }
              }
              await pumpController.wait()
              buffer.push(ret)
              readController.notify()
            })

          await queue.onEmpty()
        }

        ended = true
        readController.notify()
      } catch (err) {
        error = err
        readController.notify()
      }
    }

    pump()

    while (true) {
      signal?.throwIfAborted()

      if (buffer.length > 0) {
        yield* buffer.splice(0)
      } else if (error) {
        throw error
      } else if (ended) {
        break
      } else {
        pumpController.notifyOne()
        await readController.wait()
      }
    }
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants