diff --git a/src/test/java/io/reactivex/tck/CompletableAndThenPublisherTckTest.java b/src/test/java/io/reactivex/tck/CompletableAndThenPublisherTckTest.java new file mode 100644 index 0000000000..5a4c5f450c --- /dev/null +++ b/src/test/java/io/reactivex/tck/CompletableAndThenPublisherTckTest.java @@ -0,0 +1,30 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.*; + +@Test +public class CompletableAndThenPublisherTckTest extends BaseTck { + + @Override + public Publisher createPublisher(final long elements) { + return + Completable.complete().hide().andThen(Flowable.range(0, (int)elements)) + ; + } +} diff --git a/src/test/java/io/reactivex/tck/MaybeFlatMapPublisherTckTest.java b/src/test/java/io/reactivex/tck/MaybeFlatMapPublisherTckTest.java new file mode 100644 index 0000000000..a05af6e197 --- /dev/null +++ b/src/test/java/io/reactivex/tck/MaybeFlatMapPublisherTckTest.java @@ -0,0 +1,37 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@Test +public class MaybeFlatMapPublisherTckTest extends BaseTck { + + @Override + public Publisher createPublisher(final long elements) { + return + Maybe.just(1).hide().flatMapPublisher(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.range(0, (int)elements); + } + }) + ; + } +} diff --git a/src/test/java/io/reactivex/tck/MulticastProcessorAsPublisherTckTest.java b/src/test/java/io/reactivex/tck/MulticastProcessorAsPublisherTckTest.java new file mode 100644 index 0000000000..01f4710c58 --- /dev/null +++ b/src/test/java/io/reactivex/tck/MulticastProcessorAsPublisherTckTest.java @@ -0,0 +1,63 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.processors.MulticastProcessor; +import io.reactivex.schedulers.Schedulers; + +@Test +public class MulticastProcessorAsPublisherTckTest extends BaseTck { + + public MulticastProcessorAsPublisherTckTest() { + super(100); + } + + @Override + public Publisher createPublisher(final long elements) { + final MulticastProcessor mp = MulticastProcessor.create(); + mp.start(); + + Schedulers.io().scheduleDirect(new Runnable() { + @Override + public void run() { + long start = System.currentTimeMillis(); + while (!mp.hasSubscribers()) { + try { + Thread.sleep(1); + } catch (InterruptedException ex) { + return; + } + + if (System.currentTimeMillis() - start > 200) { + return; + } + } + + for (int i = 0; i < elements; i++) { + while (!mp.offer(i)) { + Thread.yield(); + if (System.currentTimeMillis() - start > 1000) { + return; + } + } + } + mp.onComplete(); + } + }); + return mp; + } +} diff --git a/src/test/java/io/reactivex/tck/MulticastProcessorRefCountedTckTest.java b/src/test/java/io/reactivex/tck/MulticastProcessorRefCountedTckTest.java new file mode 100644 index 0000000000..24a3b8d09f --- /dev/null +++ b/src/test/java/io/reactivex/tck/MulticastProcessorRefCountedTckTest.java @@ -0,0 +1,65 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import java.util.concurrent.*; + +import org.reactivestreams.*; +import org.reactivestreams.tck.*; +import org.testng.annotations.Test; + +import io.reactivex.exceptions.TestException; +import io.reactivex.processors.*; + +@Test +public class MulticastProcessorRefCountedTckTest extends IdentityProcessorVerification { + + public MulticastProcessorRefCountedTckTest() { + super(new TestEnvironment(50)); + } + + @Override + public Processor createIdentityProcessor(int bufferSize) { + MulticastProcessor mp = MulticastProcessor.create(true); + return mp; + } + + @Override + public Publisher createFailedPublisher() { + MulticastProcessor mp = MulticastProcessor.create(); + mp.start(); + mp.onError(new TestException()); + return mp; + } + + @Override + public ExecutorService publisherExecutorService() { + return Executors.newCachedThreadPool(); + } + + @Override + public Integer createElement(int element) { + return element; + } + + @Override + public long maxSupportedSubscribers() { + return 1; + } + + @Override + public long maxElementsFromPublisher() { + return 1024; + } +} diff --git a/src/test/java/io/reactivex/tck/MulticastProcessorTckTest.java b/src/test/java/io/reactivex/tck/MulticastProcessorTckTest.java new file mode 100644 index 0000000000..53dd476e6f --- /dev/null +++ b/src/test/java/io/reactivex/tck/MulticastProcessorTckTest.java @@ -0,0 +1,65 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import java.util.concurrent.*; + +import org.reactivestreams.*; +import org.reactivestreams.tck.*; +import org.testng.annotations.Test; + +import io.reactivex.exceptions.TestException; +import io.reactivex.processors.*; + +@Test +public class MulticastProcessorTckTest extends IdentityProcessorVerification { + + public MulticastProcessorTckTest() { + super(new TestEnvironment(50)); + } + + @Override + public Processor createIdentityProcessor(int bufferSize) { + MulticastProcessor mp = MulticastProcessor.create(); + return new RefCountProcessor(mp); + } + + @Override + public Publisher createFailedPublisher() { + MulticastProcessor mp = MulticastProcessor.create(); + mp.start(); + mp.onError(new TestException()); + return mp; + } + + @Override + public ExecutorService publisherExecutorService() { + return Executors.newCachedThreadPool(); + } + + @Override + public Integer createElement(int element) { + return element; + } + + @Override + public long maxSupportedSubscribers() { + return 1; + } + + @Override + public long maxElementsFromPublisher() { + return 1024; + } +} diff --git a/src/test/java/io/reactivex/tck/SingleFlatMapFlowableTckTest.java b/src/test/java/io/reactivex/tck/SingleFlatMapFlowableTckTest.java new file mode 100644 index 0000000000..009bd5e50f --- /dev/null +++ b/src/test/java/io/reactivex/tck/SingleFlatMapFlowableTckTest.java @@ -0,0 +1,37 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@Test +public class SingleFlatMapFlowableTckTest extends BaseTck { + + @Override + public Publisher createPublisher(final long elements) { + return + Single.just(1).hide().flatMapPublisher(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.range(0, (int)elements); + } + }) + ; + } +}