From da428548f8610d42421f760fa9366dba8e73b679 Mon Sep 17 00:00:00 2001 From: Kunshang Ji Date: Fri, 26 Feb 2021 15:21:35 +0800 Subject: [PATCH] replace intel-arrow with apache-arrow v3.0.0 --- HCFS-based-cache/pom.xml | 4 ++-- .../cachedfs/cacheUtil/PlasmaCacheManager.java | 17 +++++++++++------ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/HCFS-based-cache/pom.xml b/HCFS-based-cache/pom.xml index 0344000ff..2c0630549 100644 --- a/HCFS-based-cache/pom.xml +++ b/HCFS-based-cache/pom.xml @@ -42,9 +42,9 @@ ${hadoop.version} - com.intel.arrow + org.apache.arrow arrow-plasma - 0.17.0 + 3.0.0 redis.clients diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/PlasmaCacheManager.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/PlasmaCacheManager.java index 4b0046b13..72ce8c8eb 100644 --- a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/PlasmaCacheManager.java +++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/PlasmaCacheManager.java @@ -17,8 +17,11 @@ package com.intel.oap.fs.hadoop.cachedfs.cacheutil; +import java.nio.ByteBuffer; + import org.apache.arrow.plasma.PlasmaClient; import org.apache.arrow.plasma.exceptions.*; +import sun.nio.ch.DirectBuffer; public class PlasmaCacheManager implements CacheManager { @@ -41,11 +44,13 @@ public void put(ObjectId id) { public FiberCache get(ObjectId id) { // TODO: what if get an unsealed object? Let's throw an exception here, // higher level should catch this exception and do some fall back. - try { - // TODO: should not return a ArrowFiberCache directly - return new SimpleFiberCache(client.getObjAsByteBuffer(id.toByteArray(), -1, false)); - } catch(PlasmaGetException e) { - throw new CacheManagerException("Plasma exception:" + e.getMessage()); + // TODO: should not return a ArrowFiberCache directly + ByteBuffer bb = client.getObjAsByteBuffer(id.toByteArray(), -1, false); + // get api may return a nullptr which means get an invalid value. + if(((DirectBuffer)bb).address() != 0) { + return new SimpleFiberCache(bb); + } else { + throw new CacheManagerException("Plasma get an invalid value."); } } @@ -67,7 +72,7 @@ public FiberCache create(ObjectId id, Long length) { if (length > Integer.MAX_VALUE) { throw new ArithmeticException("Can't create $length bytes Object"); } - return new SimpleFiberCache(client.create(id.toByteArray(), length.intValue())); + return new SimpleFiberCache(client.create(id.toByteArray(), length.intValue(), null)); } catch (DuplicateObjectException | PlasmaOutOfMemoryException e) { throw new CacheManagerException("Plasma exception:" + e.getMessage()); }