Skip to content

Commit

Permalink
[Reactive] Fluent conversion via compose() & to() (#1592)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Mar 31, 2020
1 parent cf5780c commit 49dd04d
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -799,4 +799,40 @@ default <U> Multi<T> retryWhen(
Objects.requireNonNull(whenFunction, "whenFunction is null");
return new MultiRetry<>(this, whenFunction);
}

/**
* Apply the given {@code composer} function to the current {@code Multi} instance and
* return a{@code Multi} wrapping the returned {@link Flow.Publisher} of this function.
* <p>
* Note that the {@code composer} function is executed upon calling this method
* immediately and not when the resulting sequence gets subscribed to.
* </p>
* @param composer the function that receives the current {@code Multi} instance and
* should return a {@code Flow.Publisher} to be wrapped into a
* {@code Multie} to be returned by the method
* @param <U> the output element type
* @return Multi
* @throws NullPointerException if {@code composer} is {@code null}
*/
@SuppressWarnings("unchecked")
default <U> Multi<U> compose(Function<? super Multi<T>, ? extends Flow.Publisher<? extends U>> composer) {
return from((Flow.Publisher<U>) to(composer));
}

/**
* Apply the given {@code converter} function to the current {@code Multi} instance
* and return the value returned by this function.
* <p>
* Note that the {@code converter} function is executed upon calling this method
* immediately and not when the resulting sequence gets subscribed to.
* </p>
* @param converter the function that receives the current {@code Multi} instance and
* should return a value to be returned by the method
* @param <U> the output type
* @return the value returned by the function
* @throws NullPointerException if {@code converter} is {@code null}
*/
default <U> U to(Function<? super Multi<T>, ? extends U> converter) {
return converter.apply(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -516,4 +516,39 @@ default <U> Single<T> retryWhen(
Objects.requireNonNull(whenFunction, "whenFunction is null");
return new SingleRetry<>(this, whenFunction);
}

/**
* Apply the given {@code composer} function to the current {@code Single} instance and
* return the {@code Single} returned by this function.
* <p>
* Note that the {@code composer} function is executed upon calling this method
* immediately and not when the resulting sequence gets subscribed to.
* </p>
* @param composer the function that receives the current {@code Single} instance and
* should return a {@code Single} to be returned by the method
* @param <U> the output element type
* @return Single
* @throws NullPointerException if {@code composer} is {@code null}
*/
@SuppressWarnings("unchecked")
default <U> Single<U> compose(Function<? super Single<T>, ? extends Single<? extends U>> composer) {
return (Single<U>) to(composer);
}

/**
* Apply the given {@code converter} function to the current {@code Single} instance
* and return the value returned by this function.
* <p>
* Note that the {@code converter} function is executed upon calling this method
* immediately and not when the resulting sequence gets subscribed to.
* </p>
* @param converter the function that receives the current {@code Single} instance and
* should return a value to be returned by the method
* @param <U> the output type
* @return the value returned by the function
* @throws NullPointerException if {@code converter} is {@code null}
*/
default <U> U to(Function<? super Single<T>, ? extends U> converter) {
return converter.apply(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.helidon.common.reactive;

import org.testng.annotations.Test;

import java.util.function.Function;

public class MultiComposeTest {

@Test
public void compose() {
TestSubscriber<String> ts = new TestSubscriber<>(Long.MAX_VALUE);

Function<Multi<Integer>, Multi<String>> function =
upstream -> upstream.map(Object::toString);

Multi.just(1)
.compose(function)
.subscribe(ts);

ts.assertResult("1");
}

@Test(expectedExceptions = NullPointerException.class)
public void composeNull() {
Multi.just(1).compose(null);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void composeThrows() {
Multi.just(1)
.compose(s -> { throw new IllegalArgumentException(); });
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.helidon.common.reactive;

import org.testng.annotations.Test;

import java.util.function.Function;

public class MultiToTest {

@Test
public void compose() {
TestSubscriber<String> ts = new TestSubscriber<>(Long.MAX_VALUE);

Function<Multi<Integer>, Multi<String>> function =
upstream -> upstream.map(Object::toString);

Multi.just(1)
.to(function)
.subscribe(ts);

ts.assertResult("1");
}

@Test(expectedExceptions = NullPointerException.class)
public void composeNull() {
Multi.just(1).to(null);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void composeThrows() {
Multi.just(1)
.to(s -> { throw new IllegalArgumentException(); });
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.helidon.common.reactive;

import org.testng.annotations.Test;

import java.util.function.Function;

public class SingleComposeTest {

@Test
public void compose() {
TestSubscriber<String> ts = new TestSubscriber<>(Long.MAX_VALUE);

Function<Single<Integer>, Single<String>> function =
upstream -> upstream.map(Object::toString);

Single.just(1)
.compose(function)
.subscribe(ts);

ts.assertResult("1");
}

@Test(expectedExceptions = NullPointerException.class)
public void composeNull() {
Single.just(1).compose(null);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void composeThrows() {
Single.just(1)
.compose(s -> { throw new IllegalArgumentException(); });
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.helidon.common.reactive;

import org.testng.annotations.Test;

import java.util.function.Function;

public class SingleToTest {

@Test
public void compose() {
TestSubscriber<String> ts = new TestSubscriber<>(Long.MAX_VALUE);

Function<Single<Integer>, Single<String>> function =
upstream -> upstream.map(Object::toString);

Single.just(1)
.to(function)
.subscribe(ts);

ts.assertResult("1");
}

@Test(expectedExceptions = NullPointerException.class)
public void composeNull() {
Single.just(1).to(null);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void composeThrows() {
Single.just(1)
.to(s -> { throw new IllegalArgumentException(); });
}
}

0 comments on commit 49dd04d

Please sign in to comment.