Skip to content

Commit

Permalink
Merge pull request #1327 from akarnokd/MoreJoinPatterns
Browse files Browse the repository at this point in the history
Join patterns extension for 4..9 and N arity joins.
  • Loading branch information
benjchristensen committed Jun 12, 2014
2 parents 56d76bb + 0e2e73f commit 8326b76
Show file tree
Hide file tree
Showing 36 changed files with 3,333 additions and 264 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public abstract class ActivePlan0 {
protected final Map<JoinObserver, JoinObserver> joinObservers = new HashMap<JoinObserver, JoinObserver>();

public abstract void match();
protected abstract void match();

protected void addJoinObserver(JoinObserver joinObserver) {
joinObservers.put(joinObserver, joinObserver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@
/**
* Represents an active plan.
*/
public class ActivePlan1<T1> extends ActivePlan0 {
public final class ActivePlan1<T1> extends ActivePlan0 {
private final Action1<T1> onNext;
private final Action0 onCompleted;
private final JoinObserver1<T1> first;
private final JoinObserver1<T1> jo1;

public ActivePlan1(JoinObserver1<T1> first, Action1<T1> onNext, Action0 onCompleted) {
ActivePlan1(JoinObserver1<T1> jo1, Action1<T1> onNext, Action0 onCompleted) {
this.onNext = onNext;
this.onCompleted = onCompleted;
this.first = first;
addJoinObserver(first);
this.jo1 = jo1;
addJoinObserver(jo1);
}

@Override
public void match() {
if (!first.queue().isEmpty()) {
Notification<T1> n1 = first.queue().peek();
protected void match() {
if (!jo1.queue().isEmpty()) {
Notification<T1> n1 = jo1.queue().peek();
if (n1.isOnCompleted()) {
onCompleted.call();
} else {
Expand Down
24 changes: 12 additions & 12 deletions rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan2.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,26 @@
/**
* Represents an active plan.
*/
public class ActivePlan2<T1, T2> extends ActivePlan0 {
public final class ActivePlan2<T1, T2> extends ActivePlan0 {
private final Action2<T1, T2> onNext;
private final Action0 onCompleted;
private final JoinObserver1<T1> first;
private final JoinObserver1<T2> second;
private final JoinObserver1<T1> jo1;
private final JoinObserver1<T2> jo2;

public ActivePlan2(JoinObserver1<T1> first, JoinObserver1<T2> second, Action2<T1, T2> onNext, Action0 onCompleted) {
ActivePlan2(JoinObserver1<T1> jo1, JoinObserver1<T2> jo2, Action2<T1, T2> onNext, Action0 onCompleted) {
this.onNext = onNext;
this.onCompleted = onCompleted;
this.first = first;
this.second = second;
addJoinObserver(first);
addJoinObserver(second);
this.jo1 = jo1;
this.jo2 = jo2;
addJoinObserver(jo1);
addJoinObserver(jo2);
}

@Override
public void match() {
if (!first.queue().isEmpty() && !second.queue().isEmpty()) {
Notification<T1> n1 = first.queue().peek();
Notification<T2> n2 = second.queue().peek();
protected void match() {
if (!jo1.queue().isEmpty() && !jo2.queue().isEmpty()) {
Notification<T1> n1 = jo1.queue().peek();
Notification<T2> n2 = jo2.queue().peek();

if (n1.isOnCompleted() || n2.isOnCompleted()) {
onCompleted.call();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
/**
* Represents an active plan.
*/
public class ActivePlan3<T1, T2, T3> extends ActivePlan0 {
public final class ActivePlan3<T1, T2, T3> extends ActivePlan0 {
private final Action3<T1, T2, T3> onNext;
private final Action0 onCompleted;
private final JoinObserver1<T1> first;
private final JoinObserver1<T2> second;
private final JoinObserver1<T3> third;

public ActivePlan3(JoinObserver1<T1> first,
ActivePlan3(JoinObserver1<T1> first,
JoinObserver1<T2> second,
JoinObserver1<T3> third,
Action3<T1, T2, T3> onNext,
Expand All @@ -45,7 +45,7 @@ public ActivePlan3(JoinObserver1<T1> first,
}

@Override
public void match() {
protected void match() {
if (!first.queue().isEmpty()
&& !second.queue().isEmpty()
&& !third.queue().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.joins;

import rx.Notification;
import rx.functions.Action0;
import rx.functions.Action4;

/**
* Represents an active plan.
*/
public final class ActivePlan4<T1, T2, T3, T4> extends ActivePlan0 {
private final Action4<T1, T2, T3, T4> onNext;
private final Action0 onCompleted;
private final JoinObserver1<T1> jo1;
private final JoinObserver1<T2> jo2;
private final JoinObserver1<T3> jo3;
private final JoinObserver1<T4> jo4;

ActivePlan4(
JoinObserver1<T1> jo1,
JoinObserver1<T2> jo2,
JoinObserver1<T3> jo3,
JoinObserver1<T4> jo4,
Action4<T1, T2, T3, T4> onNext,
Action0 onCompleted) {
this.onNext = onNext;
this.onCompleted = onCompleted;
this.jo1 = jo1;
this.jo2 = jo2;
this.jo3 = jo3;
this.jo4 = jo4;
addJoinObserver(jo1);
addJoinObserver(jo2);
addJoinObserver(jo3);
addJoinObserver(jo4);
}

@Override
protected void match() {
if (!jo1.queue().isEmpty()
&& !jo2.queue().isEmpty()
&& !jo3.queue().isEmpty()
&& !jo4.queue().isEmpty()) {
Notification<T1> n1 = jo1.queue().peek();
Notification<T2> n2 = jo2.queue().peek();
Notification<T3> n3 = jo3.queue().peek();
Notification<T4> n4 = jo4.queue().peek();

if (n1.isOnCompleted()
|| n2.isOnCompleted()
|| n3.isOnCompleted()
|| n4.isOnCompleted()) {
onCompleted.call();
} else {
dequeue();
onNext.call(n1.getValue(), n2.getValue(), n3.getValue(), n4.getValue());
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.joins;

import rx.Notification;
import rx.functions.Action0;
import rx.functions.Action5;

/**
* Represents an active plan.
*/
public final class ActivePlan5<T1, T2, T3, T4, T5> extends ActivePlan0 {
private final Action5<T1, T2, T3, T4, T5> onNext;
private final Action0 onCompleted;
private final JoinObserver1<T1> jo1;
private final JoinObserver1<T2> jo2;
private final JoinObserver1<T3> jo3;
private final JoinObserver1<T4> jo4;
private final JoinObserver1<T5> jo5;

ActivePlan5(
JoinObserver1<T1> jo1,
JoinObserver1<T2> jo2,
JoinObserver1<T3> jo3,
JoinObserver1<T4> jo4,
JoinObserver1<T5> jo5,
Action5<T1, T2, T3, T4, T5> onNext,
Action0 onCompleted) {
this.onNext = onNext;
this.onCompleted = onCompleted;
this.jo1 = jo1;
this.jo2 = jo2;
this.jo3 = jo3;
this.jo4 = jo4;
this.jo5 = jo5;
addJoinObserver(jo1);
addJoinObserver(jo2);
addJoinObserver(jo3);
addJoinObserver(jo4);
addJoinObserver(jo5);
}

@Override
protected void match() {
if (!jo1.queue().isEmpty()
&& !jo2.queue().isEmpty()
&& !jo3.queue().isEmpty()
&& !jo4.queue().isEmpty()
&& !jo5.queue().isEmpty()
) {
Notification<T1> n1 = jo1.queue().peek();
Notification<T2> n2 = jo2.queue().peek();
Notification<T3> n3 = jo3.queue().peek();
Notification<T4> n4 = jo4.queue().peek();
Notification<T5> n5 = jo5.queue().peek();

if (n1.isOnCompleted()
|| n2.isOnCompleted()
|| n3.isOnCompleted()
|| n4.isOnCompleted()
|| n5.isOnCompleted()
) {
onCompleted.call();
} else {
dequeue();
onNext.call(
n1.getValue(),
n2.getValue(),
n3.getValue(),
n4.getValue(),
n5.getValue()
);
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.joins;

import rx.Notification;
import rx.functions.Action0;
import rx.functions.Action6;

/**
* Represents an active plan.
*/
public final class ActivePlan6<T1, T2, T3, T4, T5, T6> extends ActivePlan0 {
private final Action6<T1, T2, T3, T4, T5, T6> onNext;
private final Action0 onCompleted;
private final JoinObserver1<T1> jo1;
private final JoinObserver1<T2> jo2;
private final JoinObserver1<T3> jo3;
private final JoinObserver1<T4> jo4;
private final JoinObserver1<T5> jo5;
private final JoinObserver1<T6> jo6;

ActivePlan6(
JoinObserver1<T1> jo1,
JoinObserver1<T2> jo2,
JoinObserver1<T3> jo3,
JoinObserver1<T4> jo4,
JoinObserver1<T5> jo5,
JoinObserver1<T6> jo6,
Action6<T1, T2, T3, T4, T5, T6> onNext,
Action0 onCompleted) {
this.onNext = onNext;
this.onCompleted = onCompleted;
this.jo1 = jo1;
this.jo2 = jo2;
this.jo3 = jo3;
this.jo4 = jo4;
this.jo5 = jo5;
this.jo6 = jo6;
addJoinObserver(jo1);
addJoinObserver(jo2);
addJoinObserver(jo3);
addJoinObserver(jo4);
addJoinObserver(jo5);
addJoinObserver(jo6);
}

@Override
protected void match() {
if (!jo1.queue().isEmpty()
&& !jo2.queue().isEmpty()
&& !jo3.queue().isEmpty()
&& !jo4.queue().isEmpty()
&& !jo5.queue().isEmpty()
&& !jo6.queue().isEmpty()
) {
Notification<T1> n1 = jo1.queue().peek();
Notification<T2> n2 = jo2.queue().peek();
Notification<T3> n3 = jo3.queue().peek();
Notification<T4> n4 = jo4.queue().peek();
Notification<T5> n5 = jo5.queue().peek();
Notification<T6> n6 = jo6.queue().peek();

if (n1.isOnCompleted()
|| n2.isOnCompleted()
|| n3.isOnCompleted()
|| n4.isOnCompleted()
|| n5.isOnCompleted()
|| n6.isOnCompleted()
) {
onCompleted.call();
} else {
dequeue();
onNext.call(
n1.getValue(),
n2.getValue(),
n3.getValue(),
n4.getValue(),
n5.getValue(),
n6.getValue()
);
}
}
}

}
Loading

0 comments on commit 8326b76

Please sign in to comment.