Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ap method and a simple test #643

Merged
merged 14 commits into from
Oct 28, 2018
Merged

Add ap method and a simple test #643

merged 14 commits into from
Oct 28, 2018

Conversation

amsross
Copy link
Contributor

@amsross amsross commented Feb 28, 2018

This is a rehash of an unfinished part of #114. The implementation and examples there only work if all of the streams are synchronous.

I would still like to add some tests to validate the functionality.

per https://github.com/fantasyland/fantasy-land#apply

@amsross
Copy link
Contributor Author

amsross commented Feb 28, 2018

Still needs appropriate tests.

lib/index.js Outdated
return this.map(function (f) {
return m.fork().map(f);
})
.parallel(Infinity);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically I would use .merge() here because of m.fork(), but I feel like that is taking too much liberty with users' code since order of results is not guaranteed. I think that also violates the composition specification because the results will not necessarily be the same if the function or values are asynchronous (re: events).

@vqvu
Copy link
Collaborator

vqvu commented Mar 1, 2018

First of all, thanks for picking this up. I have a few comments.

The implementation and examples there only work if all of the streams are synchronous.

It's definitely possible to implement even this version of ap without requiring that the streams be synchronous. You just need the stream of functions to be finite (and ideally small). Basically, save all of the functions in an array, then apply them to the values. That said, I don't think this version of ap is a good fit for streams (see next).

I know that #114 says ap should "apply each of a stream of functions to each of a stream of values.", but that definition has always bothered me, since it doesn't handle infinite streams in an elegant way. For example, x.ap(_([f, g]) is the same as x.ap(_([f])) if x is an infinite stream, and x.ap(y) essentially leaks memory if y is infinite. Furthermore, it introduces a "bunching" behavior that seems suboptimal.

It turns out there are more than one implementation that will satisfy the fantasy-land spec.

  1. Apply all functions to all values (interleaving the functions).
  2. Apply all functions to all values (interleaving the value). I think this is what you've implemented.
  3. Apply the latest function to the latest value. This is what most.js does. Credits to them for the inspiration.

(1) is slightly better than (2), since the stream being interleaved needs to be saved, and the stream of functions is likely to be shorter, it slightly more optimal to interleave the functions. That said, they both have the same limitation of throwing away (some) timing data.

In contrast, (3) doesn't. To illustrate, consider the streams (the numbers in the first row are seconds since the start).

Value 1 2 3 4 5 6 7
a 1 2 3
b g1 g2

The outputs of the implementations look like this

u.ap(v) 1 2 3 4 5 6 7
Impl 1 g1(1) g2(1), g1(2), g2(2) g1(3), g2(3)
Impl 2 g1(1) g1(2) g1(3), g2(1), g2(2), g2(3), g3(1), g3(2), g3(3)
Impl 3 g1(1) g1(2) g2(1) g2(3)

Notice the bunchings at t=6 and t=7. Meanwhile, even though stream a emitted a value at t=5, implementations 1 and 2 didn't reflect that at all. In fact, if a were this different stream, the output of those implementations would not change while the output of implementation 3 would.

Value 1 2 3 4 5 6 7
a' 1 2 3

So I would say implementation 3 has going it for it the following things

  1. It preserves the most timing data (no bunching).
  2. It handles infinite streams gracefully (both a and b can be infinite) with O(1) memory.
  3. The interpretation of applying a time-varying function to a time-varying value is extremely elegant. It's not hard to think of a case where you'd need something like this, especially in the context of say, a UI. On the other hand, it's less clear why you would want to compute the cross product of a list of functions and a list of values.
  4. It matches another streams implementation.

Given the above, I would like to implement ap using implementation 3 unless you have a strong argument against it.

@amsross
Copy link
Contributor Author

amsross commented Mar 1, 2018

I'm still going over the differences in implementations, but I think I should note that using .parallel(Infinity) seems to have been a mistake on my part because it does lead to the loss of timing data and bunching as you pointed out. When I use this I typically implement it with .merge(), which has the results I expect.

I've seen the most.js implementation and I can certainly see the elegance of latestU(latestA), however I'm unfamiliar with a use case where that's desired. I'm going to chalk that up to my unfamiliarity with streams in UIs.

If ap is implemented with .merge() instead of .parallel(Infinity), the results with infinite streams u and a look like this for me (I stretched t out a bit for my own clarification):

Value 1 2 3 4 5 6 7 8 9 10 11
a 1 2 3 4
b g1 g2 g3
merge g1(1) g1(2) g1(3), g2(3) g1(4), g2(4), g3(4)
parallel g1(1) g1(2) g1(3) g1(4), g2(3), g2(4), g3(4)

You can see that the bunching becomes less pronounced and the timing is (more) preserved. This feels to me more like a half-step between Impl 2 and Impl 3 where all functions are applied to the most recent values.

I will take a stab at implementing something more like 3 so I can get a better perspective on the differences in the use cases I am familiar with.

Also, thanks so much for your thoughtful and informative response above!

@amsross
Copy link
Contributor Author

amsross commented Mar 1, 2018

Just for reference, my typical use case for this is to represent closures across a set of possible permutations of inputs. A closure could be operations on a database and the permutations could be the possible input values and possible database connection configurations. In these cases u is typically synchronous and finite, but there are times when it is both asynchronous and infinite.

const asyncFiniteStreamOfDbConnections = h.of(db => connect(db))
  .ap(syncFiniteStreamOfDbConfigs)

const asyncInfiniteStreamOfDocuments = h('msg', eventEmitter)

// store each message in ALL of the provided dbs, not just the latest
h.of(db => document => db.store(document))
  .ap(asyncFiniteStreamOfDbConnections)
  .ap(asyncInfiniteStreamOfDocuments)

In these cases I expect ap to be usable as in liftA2(f, [1, 2], [3, 4]) where the calls are:

  • f(1, 3)
  • f(1, 4)
  • f(2, 3)
  • f(2, 4)

@amsross
Copy link
Contributor Author

amsross commented Mar 1, 2018

Here's a rough alternative implementation that maybe you could weigh in on for comparison. It appears to me to behave similarly to most.js:

const ap = a => u => h([
  a.map(a => ({ a })),
  u.map(u => ({ u })),
]).merge()
  .scan1((x, y) => Object.assign({}, x, y))
  .filter(({ a, u }) => a && u)
  .map(({ a, u }) => u(a))

The timing and results:

Value 1 2 3 4 5 6 7 8 9 10
a 1 2 3 4
u g1 g2 g3
events g1(1) g1(2) g2(2) g2(3) g2(4), g3(4)

@amsross
Copy link
Contributor Author

amsross commented Oct 22, 2018

Sorry for letting this sleep here for so long.

@vqvu I'd be interested in your opinion on this implementation using higher-level highland functionality as well as what tests would be necessary to really prove this out.

lib/index.js Outdated Show resolved Hide resolved
lib/index.js Show resolved Hide resolved
lib/index.js Outdated Show resolved Hide resolved
@vqvu
Copy link
Collaborator

vqvu commented Oct 24, 2018

Thanks for picking this back up! LGTM on the implementation. And sorry for not following through on this. It looks like I was the one who dropped the ball.

Re: tests

  1. I think we'll want to add a test that validates the Fantasy Land spec. Specifically, that

    v.ap(u.ap(a.map(f => g => x => f(g(x))))) is equivalent to v.ap(u).ap(a)

  2. I think the reflect timing of value and function arrival test should also validate the timing aspect of things. Use a fake clock, increment it slowly, and verify that the values are being emitted at the right time. There's an example in the ratelimit tests.

Other than that, your tests look fine to me.

@vqvu
Copy link
Collaborator

vqvu commented Oct 24, 2018

Looks like a transitive dependency of ours no longer works with node 0.10, and that's why Travis is complaining. I've locked nodeunit to version 0.11.2 as a workaround.

@amsross
Copy link
Contributor Author

amsross commented Oct 24, 2018

Resolved #643 (comment) in 98f1b7d and 0fe6fec

@amsross
Copy link
Contributor Author

amsross commented Oct 24, 2018

Also, I'm unsure what to update in https://github.com/caolan/highland/blob/master/CHANGELOG.md

Does this go under a 2.x version, or a 3.00-beta.x heading? Will this change appear in both?

lib/index.js Outdated Show resolved Hide resolved
lib/index.js Outdated Show resolved Hide resolved
lib/index.js Outdated Show resolved Hide resolved
test/test.js Show resolved Hide resolved
@vqvu
Copy link
Collaborator

vqvu commented Oct 25, 2018

Does this go under a 2.x version, or a 3.00-beta.x heading? Will this change appear in both?

This will go under a new 3.0.0-beta.7 heading. It won't appear in 2.x unless someone backports it to the 2.x branch, and I'm trying to keep new transforms from being added in 2.x (bugfix-only for that branch).

@amsross
Copy link
Contributor Author

amsross commented Oct 25, 2018

I've addressed the requested changes.

I also updated CHANGELOG.md to include the addition of the ap method and package.json to reflect the resulting version bump.

I can revert these if you have a different method for making these changes.

Copy link
Collaborator

@vqvu vqvu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the CHANGELOG.md. You don't need to update package.json since our release script does this automatically.

package.json Outdated Show resolved Hide resolved
@vqvu vqvu merged commit 26d396f into caolan:master Oct 28, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants