Skip to content
This repository has been archived by the owner on Dec 27, 2019. It is now read-only.

Do not emit 'close' event after finish with query #52

Open
btd opened this issue Mar 4, 2019 · 12 comments · May be fixed by #53
Open

Do not emit 'close' event after finish with query #52

btd opened this issue Mar 4, 2019 · 12 comments · May be fixed by #53

Comments

@btd
Copy link

btd commented Mar 4, 2019

We found that one of our apps could not use node11 because of breaking change. I created bug in nodejs repo. And node maintainers said it is bug in this module instead. Stream should not emit 'close' before 'end'

@mcollina
Copy link

mcollina commented Mar 4, 2019

I’m willing to send a PR to fix this if its
welcomed.

@btd
Copy link
Author

btd commented Mar 4, 2019

@mcollina just did. Thank you again.

@matthieusieben
Copy link

matthieusieben commented Dec 4, 2019

The implementation of the brianc/node-pg-query-stream is broken. For example, the stream doc states:

Errors While Reading

Errors occurring during processing of the readable._read() must be propagated through the readable.destroy(err) method. Throwing an Error from within readable._read() or manually emitting an 'error' event results in undefined behavior.

Here is a correct implementation:

const { Readable } = require('stream')
const Cursor = require('pg-cursor')

class PgQueryStream extends Readable {
  constructor(text, values, { rowMode = undefined, types = undefined, batchSize = 100 } = {}) {
    // https://nodejs.org/api/stream.html#stream_new_stream_readable_options
    super({ objectMode: true, emitClose: true, autoDestroy: true, highWaterMark: batchSize })
    this.cursor = new Cursor(text, values, { rowMode, types })

    this._reading = false
    this._callbacks = []

    // delegate Submittable callbacks to cursor
    this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor)
    this.handleDataRow = this.cursor.handleDataRow.bind(this.cursor)
    this.handlePortalSuspended = this.cursor.handlePortalSuspended.bind(this.cursor)
    this.handleCommandComplete = this.cursor.handleCommandComplete.bind(this.cursor)
    this.handleReadyForQuery = this.cursor.handleReadyForQuery.bind(this.cursor)
    this.handleError = this.cursor.handleError.bind(this.cursor)
  }

  submit(connection) {
    this.cursor.submit(connection)
  }

  close(callback) {
    if (this.destroyed) {
      if (callback) setImmediate(callback)
    } else {
      if (callback) this.once('close', callback)
      this.destroy()
    }
  }

  _close() {
    this.cursor.close(() => {
      let cb
      while ((cb = this._callbacks.pop())) cb()
    })
  }

  _destroy(_err, callback) {
    this._callbacks.push(callback)
    if (!this._reading) {
      this._close()
    }
  }

  // https://nodejs.org/api/stream.html#stream_readable_read_size_1
  _read(size) {
    // Prevent _destroy() from closing while reading
    this._reading = true

    this.cursor.read(size, (err, rows, result) => {
      this._reading = false

      if (this.destroyed) {
        // Destroyed while reading?
        this._close()
      } else if (err) {
        // https://nodejs.org/api/stream.html#stream_errors_while_reading
        this.destroy(err)
      } else {
        for (const row of rows) this.push(row)
        if (rows.length < size) this.push(null)
      }
    })
  }
}

module.exports = PgQueryStream

@btd
Copy link
Author

btd commented Dec 6, 2019

@matthieusieben i would suggest to use PR instead of you comment. And it would be good to have new tests.

@btd
Copy link
Author

btd commented Dec 6, 2019

Also @matthieusieben how it is possible to have multiple destroy callbacks?

@matthieusieben
Copy link

I don't think there should be more than one callback. This implementation is safe and works wether there is zero, one or more callbacks added

@matthieusieben
Copy link

I didn't create a pr because I didn't want to bother writing tests & co. I just shared my implementation. I leave it to someone else to do it.

@matthieusieben
Copy link

If you wish, just create a pr with _callback being a nullable value instead of an array.

@btd
Copy link
Author

btd commented Dec 6, 2019 via email

@brianc
Copy link
Owner

brianc commented Dec 20, 2019

I'm going to port this repo over to the pg monorepo. After that I'll rewrite this module & it will be tested under all supported versions of node using the proper version of pg in all cases, and should make fixing this and adding tests much more straight forward...sorry for the hassle!

@matthieusieben
Copy link

If you do this, please start from scratch so that we get a proper implementation of the stream interface, without keeping the legacy from this package!

@brianc
Copy link
Owner

brianc commented Dec 20, 2019

Will try my best! If you're interested in supporting my continued work please consider sponsoring me!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants