Skip to content

Commit

Permalink
feat(store): add observable proposal interop to store
Browse files Browse the repository at this point in the history
- Adds dependency on `symbol-observable` to pull in `Symbol.observable`
- Adds `Symbol.observable` method to the store that returns a generic observable
- Adds comprehensive tests to ensure interoperability. (rxjs 5 was used for a simple integration test, and
  is a dev only dependency)

closes #1631
  • Loading branch information
benlesh authored and gaearon committed Apr 19, 2016
1 parent f02e825 commit 64551cb
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 2 deletions.
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@
"dependencies": {
"lodash": "^4.2.1",
"lodash-es": "^4.2.1",
"loose-envify": "^1.1.0"
"loose-envify": "^1.1.0",
"symbol-observable": "^0.2.1"
},
"devDependencies": {
"babel-cli": "^6.3.15",
Expand Down Expand Up @@ -101,6 +102,7 @@
"isparta": "^4.0.0",
"mocha": "^2.2.5",
"rimraf": "^2.3.4",
"rxjs": "^5.0.0-beta.6",
"typescript": "^1.8.0",
"typescript-definition-tester": "0.0.4",
"webpack": "^1.9.6"
Expand Down
47 changes: 46 additions & 1 deletion src/createStore.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import isPlainObject from 'lodash/isPlainObject'
import $$observable from 'symbol-observable'

/**
* These are private action types reserved by Redux.
Expand Down Expand Up @@ -198,6 +199,49 @@ export default function createStore(reducer, initialState, enhancer) {
dispatch({ type: ActionTypes.INIT })
}

/**
* Interoperability point for observable/reactive libraries.
* @returns {observable} A minimal observable of state changes.
* For more information, see the observable proposal:
* https://github.com/zenparsing/es-observable
*/
function observable() {
var outerSubscribe = subscribe
return {
/**
* The minimal observable subscription method.
* @param {Object} observer Any object that can be used as an observer.
* The observer object should have a `next` method.
* @returns {subscription} An object with an `unsubscribe` method that can
* be used to unsubscribe the observable from the store, and prevent further
* emission of values from the observable.
*/
subscribe(observer) {
if (typeof observer !== 'object') {
throw new TypeError('Expected observer to be an object')
}

var observeState = () => {
if (observer.next) {
observer.next(getState())
}
}

// send initial state to observer
observeState()

// send subsequent states to observer
var unsubscribe = outerSubscribe(observeState)

// return an unsubscribable
return { unsubscribe }
},
[$$observable]() {
return this
}
}
}

// When a store is created, an "INIT" action is dispatched so that every
// reducer returns their initial state. This effectively populates
// the initial state tree.
Expand All @@ -207,6 +251,7 @@ export default function createStore(reducer, initialState, enhancer) {
dispatch,
subscribe,
getState,
replaceReducer
replaceReducer,
[$$observable]: observable
}
}
116 changes: 116 additions & 0 deletions test/createStore.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import expect from 'expect'
import { createStore, combineReducers } from '../src/index'
import { addTodo, dispatchInMiddle, throwError, unknownAction } from './helpers/actionCreators'
import * as reducers from './helpers/reducers'
import * as Rx from 'rxjs'
import $$observable from 'symbol-observable'

describe('createStore', () => {
it('exposes the public API', () => {
Expand Down Expand Up @@ -610,4 +612,118 @@ describe('createStore', () => {
store.subscribe(undefined)
).toThrow()
})

describe('Symbol.observable interop point', () => {
it('should exist', () => {
const store = createStore(() => {})
expect(typeof store[$$observable]).toBe('function')
})

describe('returned value', () => {
it('should be subscribable', () => {
const store = createStore(() => {})
const obs = store[$$observable]()
expect(typeof obs.subscribe).toBe('function')
})

it('should throw a TypeError if an observer object is not supplied to subscribe', () => {
const store = createStore(() => {})
const obs = store[$$observable]()

expect(function () {
obs.subscribe()
}).toThrow()

expect(function () {
obs.subscribe(() => {})
}).toThrow()

expect(function () {
obs.subscribe({})
}).toNotThrow()
})

it('should return a subscription object when subscribed', () => {
const store = createStore(() => {})
const obs = store[$$observable]()
const sub = obs.subscribe({})
expect(typeof sub.unsubscribe).toBe('function')
})
})

it('should pass an integration test with no unsubscribe', () => {
function foo(state = 0, action) {
return action.type === 'foo' ? 1 : state
}

function bar(state = 0, action) {
return action.type === 'bar' ? 2 : state
}

const store = createStore(combineReducers({ foo, bar }))
const observable = store[$$observable]()
const results = []

observable.subscribe({
next(state) {
results.push(state)
}
})

store.dispatch({ type: 'foo' })
store.dispatch({ type: 'bar' })

expect(results).toEqual([ { foo: 0, bar: 0 }, { foo: 1, bar: 0 }, { foo: 1, bar: 2 } ])
})

it('should pass an integration test with an unsubscribe', () => {
function foo(state = 0, action) {
return action.type === 'foo' ? 1 : state
}

function bar(state = 0, action) {
return action.type === 'bar' ? 2 : state
}

const store = createStore(combineReducers({ foo, bar }))
const observable = store[$$observable]()
const results = []

const sub = observable.subscribe({
next(state) {
results.push(state)
}
})

store.dispatch({ type: 'foo' })
sub.unsubscribe()
store.dispatch({ type: 'bar' })

expect(results).toEqual([ { foo: 0, bar: 0 }, { foo: 1, bar: 0 } ])
})

it('should pass an integration test with a common library (RxJS)', () => {
function foo(state = 0, action) {
return action.type === 'foo' ? 1 : state
}

function bar(state = 0, action) {
return action.type === 'bar' ? 2 : state
}

const store = createStore(combineReducers({ foo, bar }))
const observable = Rx.Observable.from(store)
const results = []

const sub = observable
.map(state => ({ fromRx: true, ...state }))
.subscribe(state => results.push(state))

store.dispatch({ type: 'foo' })
sub.unsubscribe()
store.dispatch({ type: 'bar' })

expect(results).toEqual([ { foo: 0, bar: 0, fromRx: true }, { foo: 1, bar: 0, fromRx: true } ])
})
})
})

0 comments on commit 64551cb

Please sign in to comment.