Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed Jan 9, 2013
1 parent 85827a3 commit 2da36fa
Show file tree
Hide file tree
Showing 41 changed files with 9,785 additions and 0 deletions.
22 changes: 22 additions & 0 deletions rxjava-core/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apply plugin: 'java'
apply plugin: 'eclipse'

dependencies {
compile 'org.slf4j:slf4j-api:1.7.0'
compile 'com.google.code.findbugs:jsr305:2.0.0'
provided 'junit:junit:4.10'
provided 'org.mockito:mockito-core:1.9.5'
compile 'org.codehaus.groovy:groovy:1.8.8'
compile 'org.jruby:jruby:1.7.2'
}

eclipse {
classpath {
//you can tweak the classpath of the Eclipse project by adding extra configurations:
plusConfigurations += configurations.provided

downloadSources = true
downloadJavadoc = true
}
}

5 changes: 5 additions & 0 deletions rxjava-core/src/main/java/org/rx/functions/Func0.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.rx.functions;

public interface Func0<R> {
public R call();
}
5 changes: 5 additions & 0 deletions rxjava-core/src/main/java/org/rx/functions/Func1.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.rx.functions;

public interface Func1<R, T1> {
public R call(T1 t1);
}
5 changes: 5 additions & 0 deletions rxjava-core/src/main/java/org/rx/functions/Func2.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.rx.functions;

public interface Func2<R, T1, T2> {
public R call(T1 t1, T2 t2);
}
5 changes: 5 additions & 0 deletions rxjava-core/src/main/java/org/rx/functions/Func3.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.rx.functions;

public interface Func3<R, T1, T2, T3> {
public R call(T1 t1, T2 t2, T3 t3);
}
5 changes: 5 additions & 0 deletions rxjava-core/src/main/java/org/rx/functions/Func4.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.rx.functions;

public interface Func4<R, T1, T2, T3, T4> {
public R call(T1 t1, T2 t2, T3 t3, T4 t4);
}
5 changes: 5 additions & 0 deletions rxjava-core/src/main/java/org/rx/functions/FuncN.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.rx.functions;

public interface FuncN<R> {
public R call(Object... args);
}
202 changes: 202 additions & 0 deletions rxjava-core/src/main/java/org/rx/functions/Functions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package org.rx.functions;

import groovy.lang.Closure;

import org.jruby.Ruby;
import org.jruby.RubyProc;
import org.jruby.javasupport.JavaEmbedUtils;
import org.jruby.runtime.builtin.IRubyObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Functions {

private static final Logger logger = LoggerFactory.getLogger(Functions.class);

/**
* Utility method for determining the type of closure/function and executing it.
*
* @param closure
* @param args
*/
@SuppressWarnings("unchecked")
public static <R> R execute(Object closure, Object... args) {
// if we have a tracer then log the start
long startTime = -1;
if (tracer != null && tracer.isTraceEnabled()) {
try {
startTime = System.nanoTime();
tracer.traceStart(closure, args);
} catch (Exception e) {
logger.warn("Failed to trace log.", e);
}
}
// perform controller logic to determine what type of function we received and execute it
try {
if (closure == null) {
throw new RuntimeException("closure is null. Can't send arguments to null closure.");
}
if (closure instanceof Closure) {
/* handle Groovy */
return (R) ((Closure<?>) closure).call(args);
} else if (closure instanceof RubyProc) {
// handle JRuby
RubyProc rubyProc = ((RubyProc) closure);
Ruby ruby = rubyProc.getRuntime();
IRubyObject rubyArgs[] = new IRubyObject[args.length];
for (int i = 0; i < args.length; i++) {
rubyArgs[i] = JavaEmbedUtils.javaToRuby(ruby, args[i]);
}
return (R) rubyProc.getBlock().call(ruby.getCurrentContext(), rubyArgs);
} else if (closure instanceof Func0) {
Func0<R> f = (Func0<R>) closure;
if (args.length != 0) {
throw new RuntimeException("The closure was Func0 and expected no arguments, but we received: " + args.length);
}
return (R) f.call();
} else if (closure instanceof Func1) {
Func1<R, Object> f = (Func1<R, Object>) closure;
if (args.length != 1) {
throw new RuntimeException("The closure was Func1 and expected 1 argument, but we received: " + args.length);
}
return f.call(args[0]);
} else if (closure instanceof Func2) {
Func2<R, Object, Object> f = (Func2<R, Object, Object>) closure;
if (args.length != 2) {
throw new RuntimeException("The closure was Func2 and expected 2 arguments, but we received: " + args.length);
}
return f.call(args[0], args[1]);
} else if (closure instanceof Func3) {
Func3<R, Object, Object, Object> f = (Func3<R, Object, Object, Object>) closure;
if (args.length != 3) {
throw new RuntimeException("The closure was Func3 and expected 3 arguments, but we received: " + args.length);
}
return (R) f.call(args[0], args[1], args[2]);
} else if (closure instanceof Func4) {
Func4<R, Object, Object, Object, Object> f = (Func4<R, Object, Object, Object, Object>) closure;
if (args.length != 1) {
throw new RuntimeException("The closure was Func4 and expected 4 arguments, but we received: " + args.length);
}
return f.call(args[0], args[1], args[2], args[3]);
} else if (closure instanceof FuncN) {
FuncN<R> f = (FuncN<R>) closure;
return f.call(args);
} else {
throw new RuntimeException("Unsupported closure type: " + closure.getClass().getSimpleName());
}
} finally {
// if we have a tracer then log the end
if (tracer != null && tracer.isTraceEnabled()) {
try {
tracer.traceEnd(startTime, System.nanoTime(), closure, args);
} catch (Exception e) {
logger.warn("Failed to trace log.", e);
}
}
}
}

public static <R, T0> FuncN<R> fromFunc(final Func1<R, T0> f) {
return new FuncN<R>() {

/**
* If it can't cast to this it should throw an exception as that means code is using this wrong.
* <p>
* We unfortunately need FuncN to be Object and this is a bridge between typed and non-typed hence this being unchecked
*/
@SuppressWarnings("unchecked")
@Override
public R call(Object... args) {
if (args.length == 0) {
return f.call(null);
} else {
return f.call((T0) args[0]);
}
}

};
}

public static <R, T0, T1> FuncN<R> fromFunc(final Func2<R, T0, T1> f) {
return new FuncN<R>() {

/**
* If it can't cast to this it should throw an exception as that means code is using this wrong.
* <p>
* We unfortunately need FuncN to be Object and this is a bridge between typed and non-typed hence this being unchecked
*/
@SuppressWarnings("unchecked")
@Override
public R call(Object... args) {
if (args.length < 2) {
throw new RuntimeException("Func2 expecting 2 arguments.");
}
return f.call((T0) args[0], (T1) args[1]);
}

};
}

public static <R, T0, T1, T2> FuncN<R> fromFunc(final Func3<R, T0, T1, T2> f) {
return new FuncN<R>() {

/**
* If it can't cast to this it should throw an exception as that means code is using this wrong.
* <p>
* We unfortunately need FuncN to be Object and this is a bridge between typed and non-typed hence this being unchecked
*/
@SuppressWarnings("unchecked")
@Override
public R call(Object... args) {
if (args.length < 3) {
throw new RuntimeException("Func3 expecting 3 arguments.");
}
return f.call((T0) args[0], (T1) args[1], (T2) args[2]);
}

};
}

public static <R, T0, T1, T2, T3> FuncN<R> fromFunc(final Func4<R, T0, T1, T2, T3> f) {
return new FuncN<R>() {

/**
* If it can't cast to this it should throw an exception as that means code is using this wrong.
* <p>
* We unfortunately need FuncN to be Object and this is a bridge between typed and non-typed hence this being unchecked
*/
@SuppressWarnings("unchecked")
@Override
public R call(Object... args) {
if (args.length < 4) {
throw new RuntimeException("Func4 expecting 4 arguments.");
}
return f.call((T0) args[0], (T1) args[1], (T2) args[2], (T3) args[3]);
}

};
}

private static volatile FunctionTraceLogger tracer = null;

public static interface FunctionTraceLogger {
public boolean isTraceEnabled();

public void traceStart(Object closure, Object... args);

/**
*
* @param start
* nanoTime
* @param end
* nanoTime
* @param closure
* @param args
*/
public void traceEnd(long start, long end, Object closure, Object... args);
}

public static void registerTraceLogger(FunctionTraceLogger tracer) {
Functions.tracer = tracer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.rx.operations;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import javax.annotation.concurrent.ThreadSafe;

import org.rx.reactive.IDisposable;


/**
* Thread-safe wrapper around WatchableSubscription that ensures unsubscribe can be called only once.
*/
@ThreadSafe
/* package */class AtomicWatchableSubscription implements IDisposable {

private AtomicReference<IDisposable> actualSubscription = new AtomicReference<IDisposable>();
private AtomicBoolean unsubscribed = new AtomicBoolean(false);

public AtomicWatchableSubscription() {

}

public AtomicWatchableSubscription(IDisposable actualSubscription) {
this.actualSubscription.set(actualSubscription);
}

/**
* Set the actual subscription once it exists (if it wasn't available when constructed)
*
* @param actualSubscription
* @throws IllegalStateException
* if trying to set more than once (or use this method after setting via constructor)
*/
public AtomicWatchableSubscription setActual(IDisposable actualSubscription) {
if (!this.actualSubscription.compareAndSet(null, actualSubscription)) {
throw new IllegalStateException("Can not set subscription more than once.");
}
return this;
}

@Override
public void unsubscribe() {
// get the real thing and set to null in an atomic operation so we will only ever call unsubscribe once
IDisposable actual = actualSubscription.getAndSet(null);
// if it's not null we will unsubscribe
if (actual != null) {
actual.unsubscribe();
unsubscribed.set(true);
}
}

public boolean isUnsubscribed() {
return unsubscribed.get();
}
}
69 changes: 69 additions & 0 deletions rxjava-core/src/main/java/org/rx/operations/AtomicWatcher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.rx.operations;

import javax.annotation.concurrent.ThreadSafe;

import org.rx.reactive.IObserver;

/**
* A thread-safe Watcher for transitioning states in operators.
* <p>
* Allows both single-threaded and multi-threaded execution controlled by the following FastProperty:
* <li>reactive.watcher.multithreaded.enabled [Default: false]</li>
* <p>
* Single-threaded Execution rules are:
* <ul>
* <li>Allow only single-threaded, synchronous, ordered execution of onNext, onCompleted, onError</li>
* <li>Once an onComplete or onError are performed, no further calls can be executed</li>
* <li>If unsubscribe is called, this means we call completed() and don't allow any further onNext calls.</li>
* </ul>
* <p>
* Multi-threaded Execution rules are:
* <ul>
* <li>Allows multiple threads to perform onNext concurrently</li>
* <li>When an onComplete, onError or unsubscribe request is received, block until all current onNext calls are completed</li>
* <li>When an unsubscribe is received, block until all current onNext are completed</li>
* <li>Once an onComplete or onError are performed, no further calls can be executed</li>
* <li>If unsubscribe is called, this means we call completed() and don't allow any further onNext calls.</li>
* </ul>
*
* @param <T>
*/
@ThreadSafe
/* package */class AtomicWatcher<T> implements IObserver<T> {

/** Allow changing between forcing single or allowing multi-threaded execution of onNext */
private static boolean allowMultiThreaded = true;
static {
String v = System.getProperty("rx.onNext.multithreaded.enabled");
if (v != null) {
// if we have a property set then we'll use it
allowMultiThreaded = Boolean.parseBoolean(v);
}
}

private final IObserver<T> watcher;

public AtomicWatcher(IObserver<T> watcher, AtomicWatchableSubscription subscription) {
if (allowMultiThreaded) {
this.watcher = new AtomicWatcherMultiThreaded<T>(watcher, subscription);
} else {
this.watcher = new AtomicWatcherSingleThreaded<T>(watcher, subscription);
}
}

@Override
public void onCompleted() {
watcher.onCompleted();
}

@Override
public void onError(Exception e) {
watcher.onError(e);
}

@Override
public void onNext(T args) {
watcher.onNext(args);
}

}
Loading

0 comments on commit 2da36fa

Please sign in to comment.