Skip to content

Commit

Permalink
Merge pull request #4 from SeanCheatham/python-temporary-function
Browse files Browse the repository at this point in the history
Python temporary function
  • Loading branch information
SeanCheatham authored Aug 11, 2024
2 parents b77f76b + c25f1b9 commit 8c075a1
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 76 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,19 @@ Smash some code into your keyboard, click submit, and watch as Chainless magical
- Ethereum
- Apparatus

Support for other chains will be added over time. If you'd like to request one, feel free to file an [Issue](https://github.com/SeanCheatham/chainless/issues). Bonus points if you provide some reference material to assist with the integration. Bonus bonus points if you file a PR with an implementation!

## Supported Languages
**Temporary Functions**
- JavaScript (GraalVM)
- Python (GraalVM)

**Persistent Functions**
- JavaScript/TypeScript (NodeJS)
- Java/Scala/Kotlin (JVM)

Support for other languages will be added over time. If you'd like to request one, feel free to file an [Issue](https://github.com/SeanCheatham/chainless/issues). Bonus points if you provide some reference material to assist with the integration. Bonus bonus points if you file a PR with an implementation!

## FAQ
- Is Chainless AI-native?
- No.
Expand Down
3 changes: 2 additions & 1 deletion backend/core/src/main/scala/chainless/ApiServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class ApiServer(
.map(request =>
Response(
body = operator
.retroact(request.code)(request.timestampMs, request.chains)
.retroact(request.code, request.language)(request.timestampMs, request.chains)
.map(_.asJson)
.map(_.noSpaces)
.mergeHaltL(keepAliveTick(2.seconds))
Expand All @@ -245,6 +245,7 @@ class ApiServer(
body = operator
.live(
request.code,
request.language,
FunctionState(request.chainStates.getOrElse(Map.empty), request.state.getOrElse(Json.Null))
)(request.chains)
.map(_.asJson)
Expand Down
63 changes: 32 additions & 31 deletions backend/core/src/main/scala/chainless/ChainlessMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,39 +75,40 @@ object ChainlessMain extends ResourceApp.Forever {
functionInvocationsDb: FunctionInvocationsDb[F],
blocksDb: BlocksDb[F]
) =
Ref
.of[F, Boolean](false)
.toResource
.flatMap(canceledRef =>
NonEmptyChain
.fromChainUnsafe(Chain.fromSeq(List.tabulate(args.runnerCount)(i => i + args.runnerApiBindPortStart)))
.traverse(port =>
Files[F]
.tempDirectory(Some(Path(args.sharedTmpDir)), port.toString, None)
.flatMap(localCodeCache =>
DockerDriver.make[F](
s"http://chainless:$port",
objectStore,
localCodeCache
Files[F].createDirectories(Path(args.sharedTmpDir)).toResource >>
Ref
.of[F, Boolean](false)
.toResource
.flatMap(canceledRef =>
NonEmptyChain
.fromChainUnsafe(Chain.fromSeq(List.tabulate(args.runnerCount)(i => i + args.runnerApiBindPortStart)))
.traverse(port =>
Files[F]
.tempDirectory(Some(Path(args.sharedTmpDir)), port.toString, None)
.flatMap(localCodeCache =>
DockerDriver.make[F](
s"http://chainless:$port",
objectStore,
localCodeCache
)
)
)
.flatMap(dockerDriver =>
JobProcessor.make[F](
dockerDriver,
functionsDb,
functionInvocationsDb,
blocksDb,
objectStore,
canceledRef
.flatMap(dockerDriver =>
JobProcessor.make[F](
dockerDriver,
functionsDb,
functionInvocationsDb,
blocksDb,
objectStore,
canceledRef
)
)
)
.flatTap(jobProcessor =>
new RunnerHttpServer(jobProcessor.nextTask, jobProcessor.completeTask).serve("0.0.0.0", port)
)
)
.flatMap(MultiJobProcessor.make[F])
.flatTap(_ => Resource.onFinalize(canceledRef.set(true)))
)
.flatTap(jobProcessor =>
new RunnerHttpServer(jobProcessor.nextTask, jobProcessor.completeTask).serve("0.0.0.0", port)
)
)
.flatMap(MultiJobProcessor.make[F])
.flatTap(_ => Resource.onFinalize(canceledRef.set(true)))
)
}

@AppName("Chainless")
Expand Down
3 changes: 2 additions & 1 deletion backend/core/src/main/scala/chainless/models/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import scala.concurrent.duration.*
package object models:
case class InitRequest(code: String, config: Json)

case class RetroactRequest(code: String, timestampMs: Long, chains: NonEmptyChain[Chain])
case class RetroactRequest(code: String, timestampMs: Long, chains: NonEmptyChain[Chain], language: String = "js")

case class StreamRequest(
code: String,
language: String = "js",
chainStates: Option[Map[String, String]],
state: Option[Json],
chains: NonEmptyChain[Chain]
Expand Down
44 changes: 25 additions & 19 deletions backend/core/src/main/scala/chainless/runner/temporary/Runner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,25 @@ import cats.NonEmptyParallel
import chainless.models.{*, given}
import io.circe.syntax.*
import io.circe.{Json, JsonNumber, JsonObject}
import org.graalvm.polyglot.proxy.{ProxyArray, ProxyObject}
import org.graalvm.polyglot.proxy.*
import org.graalvm.polyglot.{Context, Value}

import java.util.concurrent.Executors
import scala.annotation.tailrec
import scala.concurrent.ExecutionContext
import scala.jdk.CollectionConverters.*

/** A running instance of a temporary function. Applies each block to the current function.
* @tparam F
*/
trait Runner[F[_]]:
def applyBlock(stateWithChains: FunctionState, blockWithChain: BlockWithChain): F[FunctionState]

object LocalGraalRunner:
import GraalSupport.*
def make[F[_]: Async: NonEmptyParallel](code: String): Resource[F, Runner[F]] =
def make[F[_]: Async: NonEmptyParallel](code: String, language: String): Resource[F, Runner[F]] =
GraalSupport
.makeContext[F]
.evalMap((ec, context) => ec.evalSync(context.eval("js", code)).map((ec, context, _)))
.evalMap((ec, context) => ec.evalSync(context.eval(language, code)).map((ec, context, _)))
.map((ec, context, compiled) =>
new Runner[F]:
given Context = context
Expand All @@ -39,9 +39,17 @@ object LocalGraalRunner:
.guarantee(Async[F].cede)
.flatMap((stateWithChainsJson, blockWithChainJson) =>
ec.evalSync {
val result = compiled.execute(stateWithChainsJson.asValue, blockWithChainJson.asValue)
val json = result.asJson
json
if (language == "js") {
val result = compiled.execute(stateWithChainsJson.asValue, blockWithChainJson.asValue)
val json = result.asJson
json
} else {
val result = context.getPolyglotBindings
.getMember("apply_block")
.execute(stateWithChainsJson.asValue, blockWithChainJson.asValue)
val json = result.asJson
json
}
}
)
.guarantee(Async[F].cede)
Expand Down Expand Up @@ -70,8 +78,8 @@ object GraalSupport:
)
.flatMap(executor =>
Resource
.make(executor.eval(Sync[F].delay(Context.create())))(context =>
executor.eval(Sync[F].blocking(context.close()))
.make(executor.eval(Sync[F].delay(Context.newBuilder("js", "python").allowAllAccess(true).build())))(
context => executor.eval(Sync[F].blocking(context.close()))
)
.tupleLeft(executor)
)
Expand Down Expand Up @@ -119,11 +127,11 @@ object GraalSupport:
}

def onObject(value: JsonObject): Value = {
val map = new java.util.HashMap[String, AnyRef](value.size)
val map = new java.util.HashMap[Object, Object](value.size)
value.toMap.foreach { case (key, value) =>
map.put(key, value.asValue)
}
context.asValue(ProxyObject.fromMap(map))
context.asValue(ProxyHashMap.from(map))
}
}

Expand All @@ -144,19 +152,19 @@ object GraalSupport:
k.asString() -> v.asJson
}.toSeq*
)
else if (value.hasMembers)
else if (value.hasMembers) {
Json.obj(
value.asScalaMapIterator.map { case (k, v) =>
k.asString() -> v.asJson
}.toSeq*
value.getMemberKeys.asScala.map(key => key -> value.getMember(key).asJson).toSeq*
)
else throw new MatchError(value)
} else throw new MatchError(value)

@tailrec
def asScalaIterator(using context: Context): Iterator[Value] =
if (value.isIterator) {
Iterator.unfold(value)(i => Option.when(i.hasIteratorNextElement)(i.getIteratorNextElement -> i))
} else value.getIterator.asScalaIterator

@tailrec
def asScalaMapIterator(using context: Context): Iterator[(Value, Value)] =
if (value.isIterator) {
Iterator.unfold(value)(i =>
Expand All @@ -166,8 +174,6 @@ object GraalSupport:
(arr.getArrayElement(0) -> arr.getArrayElement(1)) -> i
}
)
} else if (value.hasMembers)
value.getMemberKeys.asScala.iterator.map(key => context.asValue(key) -> value.getMember(key))
else value.getHashEntriesIterator.asScalaMapIterator
} else value.getHashEntriesIterator.asScalaMapIterator

extension (json: Json) def asValue(using context: Context): Value = json.foldWith(jsonFolder)
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ class RunnerOperator[F[_]: Async: NonEmptyParallel](
given Logger[F] = Slf4jLogger.getLoggerFromName("Operator")

def retroact(
code: String
code: String,
language: String
)(timestampMs: Long, chains: NonEmptyChain[Chain]): Stream[F, FunctionState] =
Stream
.resource(LocalGraalRunner.make[F](code))
.resource(LocalGraalRunner.make[F](code, language))
.flatMap(runner =>
blocksDb
.blocksAfterTimestamp(chains)(timestampMs)
Expand All @@ -52,11 +53,11 @@ class RunnerOperator[F[_]: Async: NonEmptyParallel](
.map(_._2)
)

def live(code: String, stateWithChains: FunctionState)(
def live(code: String, language: String, stateWithChains: FunctionState)(
chains: NonEmptyChain[Chain]
): Stream[F, FunctionState] =
Stream
.resource(LocalGraalRunner.make[F](code))
.resource(LocalGraalRunner.make[F](code, language))
.flatMap(runner =>
newBlocks
.filter(meta => chains.contains(meta.chain))
Expand Down
3 changes: 2 additions & 1 deletion backend/project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ object Dependencies {

val graalVM = Seq(
"org.graalvm.polyglot" % "polyglot" % "24.0.2",
"org.graalvm.polyglot" % "js-community" % "24.0.2"
"org.graalvm.polyglot" % "js-community" % "24.0.2",
"org.graalvm.polyglot" % "python-community" % "24.0.2"
)

val http4s = Seq(
Expand Down
23 changes: 20 additions & 3 deletions docs/docs/temporary-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,43 @@ These functions only runs while the connection is open. Once closed, the functi
### Prepare
Open the [Chainless App](http://localhost:42069). Then, click the button in the bottom-right corner to expand the menu. Select "Create Temporary Function".

At this page, you can write your function code as a single JavaScript file. Temporary functions may not include additional dependencies, so they are limited in functionality. Edit the code box to implement your function.
At this page, you can write your function code as a single JavaScript or Python file. Temporary functions may not include additional dependencies, so they are limited in functionality. Edit the code box to implement your function.

### Code
The code is generally structured in the following manner:
**JS**
```js
(function(functionState, blockWithMeta) {
let state = { ...functionState.state }
// ...
return state
})
```
*Note: The entire function must be wrapped in (parenthesis).*

**Python**
```python
import polyglot
@polyglot.export_value
def apply_block(stateWithChains, blockWithMeta):
if(stateWithChains["state"] is not None):
state = stateWithChains["state"]
# Modify state
return state
else:
state = {}
# Initialize state
return state
```
*Note: The "polyglot" statements are required*
*Note: Python functionality is in early-preview*

This function accepts a function state and a new block, and produces a new state. It is invoked over-and-over again by the backend as new blocks arrive.

The `functionState` object contains `.state` and `.chainStates` fields. `.state` is the JSON result of the previous invocation of this function. `.chainStates` is a key-value mapping from "chain name" to "block ID".

The `blockWithMeta` object contains `.meta` and `.block` fields. `.meta` includes information like `.chain`, `.blockId`, and `.height`. `.block` contains the raw JSON-encoded data of the block.

The entire function must be wrapped in (parenthesis).

### Historical Blocks
You can optionally select a `Start Time` from which blocks should be retroactively applied. For example,
you can instruct the function to apply all blocks from yesterday before applying new blocks.
Expand Down
12 changes: 8 additions & 4 deletions frontend/lib/http/api_client.dart
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import 'dart:convert';
import 'dart:async';
import 'package:chainless_frontend/models/models.dart';
import 'package:flutter/foundation.dart';
import 'package:http/http.dart' as http;
import 'package:rxdart/rxdart.dart';
import './http_client.dart';

class PublicApiClient {
final String baseAddress;

static const defaultBaseAddress = "/api";
static const defaultBaseAddress =
kDebugMode ? "http://localhost:42069/api" : "/api";

PublicApiClient({this.baseAddress = defaultBaseAddress});

Expand Down Expand Up @@ -139,11 +141,12 @@ class PublicApiClient {
}

Stream<FunctionState> retroact(
String code, DateTime timestamp, List<String> chains) {
String code, String language, DateTime timestamp, List<String> chains) {
final client = makeHttpClient();
Future<Stream<List<int>>> call() async {
final body = {
"code": code,
"language": language,
"timestampMs": timestamp.millisecondsSinceEpoch,
"chains": chains,
};
Expand Down Expand Up @@ -175,12 +178,13 @@ class PublicApiClient {
.doOnError((_, __) => client.close());
}

Stream<FunctionState> streamed(
String code, FunctionState stateWithChains, List<String> chains) {
Stream<FunctionState> streamed(String code, String language,
FunctionState stateWithChains, List<String> chains) {
final client = makeHttpClient();
Future<Stream<List<int>>> call() async {
final body = {
"code": code,
"language": language,
"chainStates": stateWithChains.chainStates,
"state": stateWithChains.state,
"chains": chains,
Expand Down
Loading

0 comments on commit 8c075a1

Please sign in to comment.