Skip to content

Commit

Permalink
Decode avro cloud event (kubeflow#2929)
Browse files Browse the repository at this point in the history
* Decode avro cloud event

Signed-off-by: Sivanantham Chinnaiyan <sivanantham.chinnaiyan@ideas2it.com>

* Decode binary cloud event

Signed-off-by: Sivanantham Chinnaiyan <sivanantham.chinnaiyan@ideas2it.com>

* assert headers for avro test

Signed-off-by: Sivanantham Chinnaiyan <sivanantham.chinnaiyan@ideas2it.com>

---------

Signed-off-by: Sivanantham Chinnaiyan <sivanantham.chinnaiyan@ideas2it.com>
  • Loading branch information
sivanantha321 authored Jun 4, 2023
1 parent 281b316 commit 6b5afd7
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 21 deletions.
3 changes: 1 addition & 2 deletions docs/samples/kafka/image_transformer/image_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import boto3
import cv2
from cloudevents.http import CloudEvent

import kserve
from kserve import InferRequest, InferResponse
Expand All @@ -43,7 +42,7 @@ def __init__(self, name: str, predictor_host: str):
self.predictor_host = predictor_host
self._key = None

async def preprocess(self, inputs: Union[Dict, CloudEvent, InferRequest],
async def preprocess(self, inputs: Union[Dict, InferRequest],
headers: Dict[str, str] = None) -> Union[Dict, InferRequest]:
logging.info("Received inputs %s", inputs)
if inputs['EventName'] == 's3:ObjectCreated:Put':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import requests
import numpy as np
from cloudevents.http import CloudEvent

import kserve
from kserve import InferRequest, InferResponse, InferInput
Expand Down Expand Up @@ -127,7 +126,7 @@ def buildPredictRequest(self, inputs, features) -> Dict:

return request

def preprocess(self, inputs: Union[Dict, CloudEvent, InferRequest],
def preprocess(self, inputs: Union[Dict, InferRequest],
headers: Dict[str, str] = None) -> Union[Dict, InferRequest]:
"""Pre-process activity of the driver input data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import kserve
from typing import Dict, Union
import numpy as np
from cloudevents.http import CloudEvent

from kserve import InferRequest, InferResponse
from kserve.protocol.grpc.grpc_predict_v2_pb2 import ModelInferRequest, ModelInferResponse
Expand Down Expand Up @@ -42,7 +41,7 @@ def __init__(self, name: str, predictor_host: str):
self.model_name = "bert_tf_v2_large_fp16_128_v2"
self.triton_client = None

def preprocess(self, inputs: Union[Dict, CloudEvent, InferRequest],
def preprocess(self, inputs: Union[Dict, InferRequest],
headers: Dict[str, str] = None) -> Union[Dict, InferRequest]:
self.doc_tokens = data_processing.convert_doc_tokens(self.short_paragraph_text)
self.features = data_processing.convert_examples_to_features(self.doc_tokens, inputs["instances"][0],
Expand Down
4 changes: 2 additions & 2 deletions python/kserve/kserve/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,14 @@ def get_output_types(self) -> List[Dict]:
# return [{ "name": "", "datatype": "INT32", "shape": [1,5], }]
return []

async def preprocess(self, payload: Union[Dict, CloudEvent, InferRequest],
async def preprocess(self, payload: Union[Dict, InferRequest],
headers: Dict[str, str] = None) -> Union[Dict, InferRequest]:
"""`preprocess` handler can be overridden for data or feature transformation.
The default implementation decodes to Dict if it is a binary CloudEvent
or gets the data field from a structured CloudEvent.
Args:
payload (Dict|CloudEvent|InferRequest): Body of the request, v2 endpoints pass InferRequest.
payload (Dict|InferRequest): Body of the request, v2 endpoints pass InferRequest.
headers (Dict): Request headers.
Returns:
Expand Down
8 changes: 3 additions & 5 deletions python/kserve/kserve/protocol/dataplane.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,10 @@ def decode_cloudevent(self, body) -> Tuple[Union[Dict, InferRequest], Dict]:
decoded_body = body
attributes = {}
if isinstance(body, CloudEvent):
# Try to decode and parse JSON UTF-8 if possible, otherwise
# just pass the CloudEvent data on to the predict function.
# This is for the cases that CloudEvent encoding is protobuf, avro etc.
attributes = body._get_attributes()
decoded_body = body.get_data()
try:
decoded_body = orjson.loads(body.data.decode('UTF-8'))
attributes = body._get_attributes()
decoded_body = orjson.loads(decoded_body.decode('UTF-8'))
except (orjson.JSONDecodeError, UnicodeDecodeError) as e:
# If decoding or parsing failed, check if it was supposed to be JSON UTF-8
if "content-type" in body._attributes and \
Expand Down
14 changes: 6 additions & 8 deletions python/kserve/test/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import io
import json
import os
Expand All @@ -34,7 +35,6 @@
from kserve.protocol.infer_type import InferRequest
from kserve.utils.utils import get_predict_input, get_predict_response


test_avsc_schema = '''
{
"namespace": "example.avro",
Expand Down Expand Up @@ -146,13 +146,11 @@ def _parserequest(self, request):
return record1

def preprocess(self, request, headers: Dict[str, str] = None):
if isinstance(request, CloudEvent):
attributes = request._attributes
assert attributes["specversion"] == "1.0"
assert attributes["source"] == "https://example.com/event-producer"
assert attributes["type"] == "com.example.sampletype1"
assert attributes["content-type"] == "application/avro"
return self._parserequest(request.data)
assert headers["ce-specversion"] == "1.0"
assert headers["ce-source"] == "https://example.com/event-producer"
assert headers["ce-type"] == "com.example.sampletype1"
assert headers["ce-content-type"] == "application/avro"
return self._parserequest(request)

async def predict(self, request, headers=None):
return {"predictions": [[request['name'], request['favorite_number'], request['favorite_color']]]}
Expand Down

0 comments on commit 6b5afd7

Please sign in to comment.