Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

Commit

Permalink
Added another MergeSequence test
Browse files Browse the repository at this point in the history
  • Loading branch information
jroper committed Jun 24, 2020
1 parent d30117d commit c556561
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 24 deletions.
6 changes: 6 additions & 0 deletions node-support/src/cloudevents.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
* limitations under the License.
*/

/**
* CloudEvent data.
*
* @interface module:cloudstate.CloudEvent
* @property {string} specversion The CloudEvent spec version
*/
function toCloudevent(metadata) {
return {
get specversion() {
Expand Down
92 changes: 88 additions & 4 deletions node-support/src/metadata.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,51 @@
* limitations under the License.
*/

function valueFromEntry(entry) {
if (entry.bytesValue !== undefined) {
return entry.bytesValue;
} else {
return entry.stringValue;
}
}

/**
* A metadata value. Can either be a string or a buffer.
*
* @typedef module:cloudstate.MetadataValue
* @type {string|Buffer}
*/

/**
* @classdesc Cloudstate metadata.
*
* Metadata is treated as case insensitive on lookup, and case sensitive on set. Multiple values per key are supported,
* setting a value will add it to the current values for that key. You should delete first if you wish to replace a
* value.
*
* Values can either by strings or byte buffers. If a non string or byte buffer value is set, it will be converted to
* a string using toString.
*
* @interface module:cloudstate.Metadata
* @param {array} entries The list of entries
*/
function Metadata(entries) {
/**
* The metadata expressed as an object.
*
* The object keys are case insensitive, ie, `metadata.foo` and `metadata.Foo` both return the same value. If there
* are multiple values for a given key, the first one set for that key will be returned. Setting a value will add it
* to the list of existing values for that key.
*
* @name module:cloudstate.Metadata#getMap
* @type {Object<String, module:cloudstate.MetadataValue>}
*/
this.getMap = new Proxy({}, {
get: (target, key) => {
for (const idx in entries) {
const entry = entries[idx];
if (key.toLowerCase() === entry.key.toLowerCase()) {
return entry.value;
return valueFromEntry(entry);
}
}
},
Expand Down Expand Up @@ -50,21 +88,54 @@ function Metadata(entries) {
}
});

/**
* Get all the values for the given key.
*
* The key is case insensitive.
*
* @function module:cloudstate.Metadata#get
* @param {string} key The key to get.
* @returns {Array<module:cloudstate.MetadataValue>} All the values, or an empty array if no values exist for the key.
*/
this.get = key => {
const values = [];
entries.forEach(entry => {
if (key.toLowerCase() === entry.key.toLowerCase()) {
values.push(entry.value);
values.push(valueFromEntry(entry));
}
});
return values;
};


/**
* Set a given key value.
*
* This will append the key value to the metadata, it won't replace any existing values for existing keys.
*
* @function module:cloudstate.Metadata#set
* @param {string} key The key to set.
* @param {module:cloudstate.MetadataValue} value The value to set.
*/
this.set = (key, value) => {
entries.push({key, value})
const entry = {key};
if (typeof value === "string") {
entry.stringValue = value;
} else if (Buffer.isBuffer(value)) {
entry.bytesValue = value;
} else {
entry.stringValue = value.toString();
}
entries.push(entry);
};

/**
* Delete all values with the given key.
*
* The key is case insensitive.
*
* @function module:cloudstate.Metadata#delete
* @param {string} key The key to delete.
*/
this.delete = key => {
let idx = 0;
while (idx < entries.length) {
Expand All @@ -77,6 +148,14 @@ function Metadata(entries) {
}
};

/**
* Whether there exists a metadata value for the given key.
*
* The key is case insensitive.
*
* @function module:cloudstate.Metadata#has
* @param {string} key The key to check.
*/
this.has = key => {
for (const idx in entries) {
const entry = entries[idx];
Expand All @@ -86,6 +165,11 @@ function Metadata(entries) {
}
};

/**
* Clear the metadata.
*
* @function module:cloudstate.Metadata#clear
*/
this.clear = () => {
entries.splice(0, entries.length);
};
Expand Down
6 changes: 3 additions & 3 deletions proxy/core/src/main/scala/io/cloudstate/proxy/Serve.scala
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ object Serve {
val handler = rpcMethodSerializers(req.uri.path)

val metadata = Metadata(req.headers.map { header =>
MetadataEntry(header.name(), header.value)
MetadataEntry(header.name(), MetadataEntry.Value.StringValue(header.value))
})

val reader = GrpcProtocolNative.newReader(Codecs.detect(req).get)
Expand All @@ -264,8 +264,8 @@ object Serve {
(Metadata.defaultInstance, Source(other).concat(rest))
}

val headers = metadata.entries.map {
case MetadataEntry(key, value, _) => RawHeader(key, value)
val headers = metadata.entries.collect {
case MetadataEntry(key, MetadataEntry.Value.StringValue(value), _) => RawHeader(key, value)
}.toList

(headers, replies.via(handler.processReplies))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,17 +323,18 @@ object EventingManager {
}

private def cloudEventToMetadata(cloudEvent: CloudEvent): Metadata = {
import MetadataEntry.Value.StringValue
// We use the HTTP binary mode transcoding rules
val builder = Seq.newBuilder[MetadataEntry]
builder += MetadataEntry("ce-id", cloudEvent.id)
builder += MetadataEntry("ce-source", cloudEvent.source)
builder += MetadataEntry("ce-specversion", cloudEvent.specversion)
builder += MetadataEntry("ce-type", cloudEvent.`type`)
builder += MetadataEntry("Content-Type", cloudEvent.datacontenttype)
builder += MetadataEntry("ce-id", StringValue(cloudEvent.id))
builder += MetadataEntry("ce-source", StringValue(cloudEvent.source))
builder += MetadataEntry("ce-specversion", StringValue(cloudEvent.specversion))
builder += MetadataEntry("ce-type", StringValue(cloudEvent.`type`))
builder += MetadataEntry("Content-Type", StringValue(cloudEvent.datacontenttype))

cloudEvent.dataschema.foreach(v => builder += MetadataEntry("ce-dataschema", v))
cloudEvent.subject.foreach(v => builder += MetadataEntry("ce-subject", v))
cloudEvent.time.foreach(v => builder += MetadataEntry("ce-time", v.toString))
cloudEvent.dataschema.foreach(v => builder += MetadataEntry("ce-dataschema", StringValue(v)))
cloudEvent.subject.foreach(v => builder += MetadataEntry("ce-subject", StringValue(v)))
cloudEvent.time.foreach(v => builder += MetadataEntry("ce-time", StringValue(v.toString)))

Metadata(builder.result())
}
Expand Down Expand Up @@ -421,7 +422,9 @@ object EventingManager {
val metadata = maybeMetadata
.getOrElse(Metadata.defaultInstance)
.entries
.map(e => e.key -> e.value)
.collect {
case MetadataEntry(key, MetadataEntry.Value.StringValue(value), _) => key -> value
}
.toMap

val (ceType, contentType, bytes) = payload.typeUrl match {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.cloudstate.proxy.streams

import akka.stream.{Attributes, Inlet, Outlet, UniformFanInShape}
import akka.NotUsed
import akka.stream.{Attributes, Graph, Inlet, Outlet, UniformFanInShape}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}

import scala.collection.mutable
Expand All @@ -10,9 +11,13 @@ object MergeSequence {
private case class Pushed[T](in: Inlet[T], sequence: Long, elem: T)

private implicit def ordering[T]: Ordering[Pushed[T]] = Ordering.by[Pushed[T], Long](_.sequence).reverse

def apply[T](inputPorts: Int = 2)(extractSequence: T => Long): Graph[UniformFanInShape[T, T], NotUsed] =
new MergeSequence(inputPorts)(extractSequence)
}

class MergeSequence[T](inputPorts: Int)(extractSequence: T => Long) extends GraphStage[UniformFanInShape[T, T]] {
final class MergeSequence[T](val inputPorts: Int)(extractSequence: T => Long)
extends GraphStage[UniformFanInShape[T, T]] {
private val in: IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("MergeSequence.in" + i))
private val out: Outlet[T] = Outlet("MergeSequence.out")
override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package io.cloudstate.proxy.streams

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, SourceShape}
import akka.stream.SourceShape
import akka.stream.scaladsl.{GraphDSL, Sink, Source}
import akka.testkit.TestKit
import org.scalatest.{AsyncWordSpecLike, Matchers}
Expand All @@ -11,8 +12,6 @@ import scala.concurrent.Future

class MergeSequenceSpec extends TestKit(ActorSystem("MergeSequenceSpec")) with AsyncWordSpecLike with Matchers {

private implicit val mat = ActorMaterializer()

"MergeSequence" should {
"merge interleaved streams" in {
merge(
Expand Down Expand Up @@ -65,19 +64,34 @@ class MergeSequenceSpec extends TestKit(ActorSystem("MergeSequenceSpec")) with A
)
}

"propagate errors" in recoverToSucceededIf[SpecificError] {
mergeSources(
Source(List(0L, 3L)),
Source(List(1L, 2L)).flatMapConcat {
case 1L => Source.single(1L)
case 2L => Source.failed(SpecificError())
}
)
}

}

private def merge(seqs: immutable.Seq[Long]*): Future[immutable.Seq[Long]] =
mergeSources(seqs.map(Source(_)): _*)

private def mergeSources(sources: Source[Long, NotUsed]*): Future[immutable.Seq[Long]] =
Source
.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val merge = builder.add(new MergeSequence[Long](seqs.size)(identity))
seqs.foreach { seq =>
Source(seq) ~> merge
val merge = builder.add(new MergeSequence[Long](sources.size)(identity))
sources.foreach { source =>
source ~> merge
}

SourceShape(merge.out)
})
.runWith(Sink.seq)

private case class SpecificError() extends Exception

}

0 comments on commit c556561

Please sign in to comment.