Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

finagle-redis: Add support for Redis Geo commands #628

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ cache:
- $HOME/.sbt/boot/scala-$TRAVIS_SCALA_VERSION
- $HOME/.dodo

addons:
apt:
packages:
- redis-server

before_cache:
# Tricks to avoid unnecessary cache updates
- find $HOME/.ivy2 -name "ivydata-*.properties" -delete
Expand Down
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Runtime Behavior Changes
* finagle-netty4: `Netty4ClientEngineFactory` and `Netty4ServerEngineFactory` now
validate loaded certificates in all cases to ensure that the current date
range is within the validity range specified in the certificate. ``PHAB_ID=D88664``
* finagle-redis: Support Geo index operation with GEOADD, GEODIST and other commands.

Deprecations
~~~~~~~~~~~~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ trait NormalCommands
with SortedSetCommands
with ListCommands
with SetCommands
with GeoCommands
with BtreeSortedSetCommands
with TopologyCommands
with HyperLogLogCommands
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package com.twitter.finagle.redis

import com.twitter.finagle.redis.protocol._
import com.twitter.io.Buf
import com.twitter.io.Buf.Utf8
import com.twitter.util.Future

private[redis] trait GeoCommands {
self: BaseClient =>

/**
* Adds the specified geospatial items (latitude, longitude, name) to the specified `key`.
*
* Redis 3.2.0 or later supports this command.
* For detailed API spec, see:
* [[https://redis.io/commands/geoadd]]
*
* @return Future of the number of elements added to the sorted set
*/
def geoAdd(key: Buf, values: GeoElement*): Future[Long] =
doRequest(GeoAdd(key, values: _*)) {
case IntegerReply(n) => Future.value(n)
}

/**
* Get valid Geohash strings representing the position of elements.
*
* Redis 3.2.0 or later supports this command.
* For detailed API spec, see:
* [[https://redis.io/commands/geohash]]
*
* @return Future of an array where each element is the Geohash corresponding
* to each member, or `None` if member is missing.
*/
def geoHash(key: Buf, members: Buf*): Future[Seq[Option[Buf]]] =
doRequest(GeoHash(key, members: _*)) {
case EmptyMBulkReply => Future.Nil
case MBulkReply(hashes) => Future.value(hashes.map {
case EmptyBulkReply => None
case BulkReply(hashBuf) => Some(hashBuf)
})
}

/**
* Get the positions (longitude,latitude) of all the specified members
* of the geospatial index represented by the sorted set at `key`.
*
* Redis 3.2.0 or later supports this command.
* For detailed API spec, see:
* [[https://redis.io/commands/geopos]]
*
* @return Future of an array where each element is a tuple representing
* longitude and latitude (x,y) of each member, or `None` if member is missing.
*/
def geoPos(key: Buf, members: Buf*): Future[Seq[Option[(Double, Double)]]] = {
doRequest(GeoPos(key, members: _*)) {
case EmptyMBulkReply => Future.Nil
case MBulkReply(positions) => Future.value(positions.map {
case MBulkReply(Seq(longitude, latitude)) => (longitude, latitude) match {
case (BulkReply(lon), BulkReply(lat)) => unapplyCoordinate(lon,lat)
}
case _ => None
})
}
}

private val geoDistResultHandler: PartialFunction[Reply, Future[Option[Double]]] = {
case EmptyBulkReply => Future.None
case BulkReply(buf) => Future.value(Utf8.unapply(buf).map(_.toDouble))
}

/**
* Get the distance between two members in the geospatial index represented by the sorted set.
*
* Redis 3.2.0 or later supports this command.
* For detailed API spec, see:
* [[https://redis.io/commands/geodist]]
*
* @return Future of the distance in the specified unit,
* or `None` if one or both the elements are missing.
*/
def geoDist(key: Buf, member1: Buf, member2: Buf, unit: Option[GeoUnit] = None): Future[Option[Double]] = {
doRequest(GeoDist(key, member1, member2, unit))(geoDistResultHandler)
}

/**
* Get the members of a sorted set which are within the borders of the area specified
* with the center location * and the maximum distance from the center (the radius).
*
* Redis 3.2.0 or later supports this command.
* For detailed API spec, see:
* [[https://redis.io/commands/georadius]]
*
* @return Future of an array where each member represents element.
* Each element additionally contains coordinate, distance and geohash if options are specified
*/
def geoRadius(key: Buf,
longitude: Double,
latitude: Double,
radius: Double,
unit: GeoUnit,
withCoord: Boolean = false,
withDist: Boolean = false,
withHash: Boolean = false,
count: Option[Int] = None,
sort: Option[Sort] = None,
store: Option[Buf] = None,
storeDist: Option[Buf] = None
): Future[Seq[Option[GeoRadiusResult]]] = {
doRequest(GeoRadius(key,
longitude,
latitude,
radius,
unit,
withCoord = withCoord,
withDist = withDist,
withHash = withHash,
count = count,
sort = sort,
store = store,
storeDist = storeDist
))(handleGeoRadiusResponse)
}

/**
* Get the members of a sorted set which are within the borders of the area specified
* with the center location of a `member`.
*
* Redis 3.2.0 or later supports this command.
* For detailed API spec, see:
* [[https://redis.io/commands/georadiusbymember]]
*
* @return Future of an array where each member represents element.
* Each element additionally contains coordinate, distance and geohash if options are specified
*
*/
def geoRadiusByMember(key: Buf,
member: Buf,
radius: Double,
unit: GeoUnit,
withCoord: Boolean = false,
withDist: Boolean = false,
withHash: Boolean = false,
count: Option[Int] = None,
sort: Option[Sort] = None,
store: Option[Buf] = None,
storeDist: Option[Buf] = None
): Future[Seq[Option[GeoRadiusResult]]] = {
doRequest(GeoRadiusByMember(key,
member,
radius,
unit,
withCoord = withCoord,
withDist = withDist,
withHash = withHash,
count = count,
sort = sort,
store = store,
storeDist = storeDist
))(handleGeoRadiusResponse)
}

private val geoRadiusResultParser: PartialFunction[Reply, Option[GeoRadiusResult]] = {
case EmptyBulkReply => None
case BulkReply(member) => Some(GeoRadiusResult(member))
case MBulkReply(attrs) => attrs match {
case BulkReply(member) :: rest =>
Some((rest map {
case IntegerReply(hash) => (r: GeoRadiusResult) => r.copy(hash = Some(hash.toInt))
case BulkReply(dist) => (r: GeoRadiusResult) => r.copy(dist = Buf.Utf8.unapply(dist).map(_.toDouble))
case MBulkReply(BulkReply(lon) :: BulkReply(lat) :: Nil) =>
(r: GeoRadiusResult) => r.copy(coord = unapplyCoordinate(lon,lat))
}).foldRight(GeoRadiusResult(member))((m, r) => m(r)))
}
}

private val handleGeoRadiusResponse: PartialFunction[Reply, Future[Seq[Option[GeoRadiusResult]]]] = {
case EmptyBulkReply => Future.Nil
case EmptyMBulkReply => Future.Nil
case MBulkReply(replies) => Future.value(replies.map(geoRadiusResultParser))
}

private def unapplyCoordinate(longitude: Buf, latitude: Buf): Option[(Double, Double)] = for {
lon <- Buf.Utf8.unapply(longitude)
lat <- Buf.Utf8.unapply(latitude)
} yield (lon.toDouble, lat.toDouble)
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,14 @@ object Command {
val HVALS = Buf.Utf8("HVALS")
val HSTRLEN = Buf.Utf8("HSTRLEN")

// Geo Commands
val GEOADD = Buf.Utf8("GEOADD")
val GEOHASH = Buf.Utf8("GEOHASH")
val GEOPOS = Buf.Utf8("GEOPOS")
val GEODIST = Buf.Utf8("GEODIST")
val GEORADIUS = Buf.Utf8("GEORADIUS")
val GEORADIUSBYMEMBER = Buf.Utf8("GEORADIUSBYMEMBER")

// Command Arguments
val WITHSCORES = Buf.Utf8("WITHSCORES")
val LIMIT = Buf.Utf8("LIMIT")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package com.twitter.finagle.redis.protocol

import com.twitter.io.Buf
import com.twitter.io.Buf.Utf8

case class GeoElement(longitude: Double, latitude: Double, member: Buf)

case class GeoAdd(key: Buf, members: GeoElement*)
extends StrictKeyCommand {
override def name: Buf = Command.GEOADD

override def body: Seq[Buf] = key +: members.flatMap {
e => Seq(Utf8(e.longitude.toString), Utf8(e.latitude.toString), e.member)
}
}

case class GeoHash(key: Buf, members: Buf*) extends StrictKeyCommand {
override def name: Buf = Command.GEOHASH

override def body: Seq[Buf] = key +: members
}

case class GeoPos(key: Buf, members: Buf*) extends StrictKeyCommand {
override def name: Buf = Command.GEOPOS

override def body: Seq[Buf] = key +: members
}

case class GeoDist(key: Buf, member1: Buf, member2: Buf, unit: Option[GeoUnit] = None) extends StrictKeyCommand {
override def name: Buf = Command.GEODIST

override def body: Seq[Buf] = Seq(key, member1, member2) ++ unit.map(_.toBuf).toSeq
}

/** Object to declare the order of result set returned from Redis' geospatial command. */
sealed abstract class Sort(val notation: Buf)

object Sort {
/** Ascending */
case object Asc extends Sort(Utf8("ASC"))
/** Descending */
case object Desc extends Sort(Utf8("DESC"))
}

/**
* Represents value returned from `GEORADIUS` and `GEORADIUSBYMEMBER` command.
* This object always contains the name of the member. It also contains coordinate,
* distance and geohash if command is invoked with options.
*/
case class GeoRadiusResult(member: Buf,
coord: Option[(Double, Double)] = None,
dist: Option[Double] = None,
hash: Option[Int] = None)

sealed trait GeoRadiusBase {
protected val withCoord: Boolean
protected val withDist: Boolean
protected val withHash: Boolean
protected val count: Option[Int]
protected val sort: Option[Sort]
protected val store: Option[Buf]
protected val storeDist: Option[Buf]

protected lazy val coordArg: Seq[Buf] = if (withCoord) Seq(Utf8("WITHCOORD")) else Nil
protected lazy val distArg: Seq[Buf] = if (withDist) Seq(Utf8("WITHDIST")) else Nil
protected lazy val hashArg: Seq[Buf] = if (withHash) Seq(Utf8("WITHHASH")) else Nil
protected lazy val countArg: Seq[Buf] = count.filter(_ > 0)
.map(c => Seq(Utf8("COUNT"), Utf8(c.toString))) getOrElse Nil
protected lazy val sortArg: Seq[Buf] = sort.map(_.notation).toSeq
protected lazy val storeArg: Seq[Buf] = store.toSeq
protected lazy val storeDistArg: Seq[Buf] = storeDist.toSeq

protected lazy val optionalArgs: Seq[Buf] =
coordArg ++ distArg ++ hashArg ++ countArg ++ sortArg ++ storeArg ++ storeDistArg
}

case class GeoRadius(key: Buf,
longitude: Double,
latitude: Double,
radius: Double,
unit: GeoUnit,
withCoord: Boolean = false,
withDist: Boolean = false,
withHash: Boolean = false,
count: Option[Int] = None,
sort: Option[Sort] = None,
store: Option[Buf] = None,
storeDist: Option[Buf] = None
)
extends StrictKeyCommand with GeoRadiusBase {
override def name: Buf = Command.GEORADIUS

override def body: Seq[Buf] = {
Seq(key, Utf8(longitude.toString), Utf8(latitude.toString), Utf8(radius.toString), unit.toBuf) ++ optionalArgs
}
}

case class GeoRadiusByMember(key: Buf, member: Buf, radius: Double, unit: GeoUnit,
withCoord: Boolean = false,
withDist: Boolean = false,
withHash: Boolean = false,
count: Option[Int] = None,
sort: Option[Sort] = None,
store: Option[Buf] = None,
storeDist: Option[Buf] = None
) extends StrictKeyCommand with StrictMemberCommand with GeoRadiusBase {
override def name: Buf = Command.GEORADIUSBYMEMBER

override def body: Seq[Buf] = Seq(key, member, Utf8(radius.toString), unit.toBuf) ++ optionalArgs
}

/**
* Represents unit of geospatial distance. Implementation object of this trait is passed to
* Redis' geospatial commands.
*/
sealed abstract class GeoUnit(str: String) {
lazy val toBuf: Buf = Utf8(str)
}

object GeoUnit {

/** Meter */
case object Meter extends GeoUnit("m")

/** Kilometer */
case object Kilometer extends GeoUnit("km")

/** Mile */
case object Mile extends GeoUnit("mi")

/** Feet */
case object Feet extends GeoUnit("ft")

}
Loading