Skip to content

Commit

Permalink
Cleanup (#23)
Browse files Browse the repository at this point in the history
* Create pulsar-functions module (#1)

* Create pulsar-functions module

* rename `sdk` package to `api`

* Added the first cut of the Java interface for Pulsar functions (#2)

* Cleaned up the logic of supported types
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 9766676 commit fc1f71c
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.api.examples;

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.RequestHandler;

public class VoidFunction implements RequestHandler<String, Void> {
@Override
public Void handleRequest(String input, Context context) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.io.*;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
Expand All @@ -38,20 +39,18 @@
*/
public class JavaInstance {
private static final Logger log = LoggerFactory.getLogger(JavaInstance.class);
enum SupportedTypes {
INTEGER,
STRING,
LONG,
DOUBLE,
BYTE,
SHORT,
FLOAT,
MAP,
LIST
}
private final List<Type> supportedInputTypes = Arrays.asList(
Integer.TYPE,
Double.TYPE,
Long.TYPE,
String.class,
Short.TYPE,
Byte.TYPE,
Float.TYPE,
Map.class,
List.class
);
private ContextImpl context;
private SupportedTypes inputType;
private SupportedTypes outputType;
private RequestHandler requestHandler;
private RawRequestHandler rawRequestHandler;
private ExecutorService executorService;
Expand Down Expand Up @@ -95,30 +94,12 @@ public JavaInstance(JavaInstanceConfig config, Object object) {

private void computeInputAndOutputTypes() {
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(RequestHandler.class, requestHandler.getClass());
inputType = computeSupportedType(typeArgs[0]);
outputType = computeSupportedType(typeArgs[1]);
verifySupportedType(typeArgs[0], false);
verifySupportedType(typeArgs[1], true);
}

private SupportedTypes computeSupportedType(Type type) {
if (type.equals(Integer.TYPE)) {
return SupportedTypes.INTEGER;
} else if (type.equals(Double.TYPE)) {
return SupportedTypes.DOUBLE;
} else if (type.equals(Long.TYPE)) {
return SupportedTypes.LONG;
} else if (type.equals(String.class)) {
return SupportedTypes.STRING;
} else if (type.equals(Short.TYPE)) {
return SupportedTypes.SHORT;
} else if (type.equals(Byte.TYPE)) {
return SupportedTypes.BYTE;
} else if (type.equals(Float.TYPE)) {
return SupportedTypes.FLOAT;
} else if (type.equals(Map.class)) {
return SupportedTypes.MAP;
} else if (type.equals(List.class)) {
return SupportedTypes.LIST;
} else {
private void verifySupportedType(Type type, boolean allowVoid) {
if (!(supportedInputTypes.contains(type) || (allowVoid && !type.equals(Void.TYPE)))) {
throw new RuntimeException("Non Basic types not yet supported: " + type);
}
}
Expand Down

0 comments on commit fc1f71c

Please sign in to comment.