From 5f03439f0c19680649790dd068056bc86e80cf03 Mon Sep 17 00:00:00 2001 From: Amir Tuval Date: Tue, 7 Oct 2014 23:07:06 +0300 Subject: [PATCH] initial structure, copied from vertica examples --- .gitignore | 1 + AggregateFunctions/Average.cpp | 156 +++++++++++++++++++++++++ AggregateFunctions/Concatenate.cpp | 127 ++++++++++++++++++++ AggregateFunctions/CountOccurences.cpp | 117 +++++++++++++++++++ makefile | 50 ++++++++ 5 files changed, 451 insertions(+) create mode 100644 .gitignore create mode 100644 AggregateFunctions/Average.cpp create mode 100644 AggregateFunctions/Concatenate.cpp create mode 100644 AggregateFunctions/CountOccurences.cpp create mode 100644 makefile diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d163863 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +build/ \ No newline at end of file diff --git a/AggregateFunctions/Average.cpp b/AggregateFunctions/Average.cpp new file mode 100644 index 0000000..c1c16fb --- /dev/null +++ b/AggregateFunctions/Average.cpp @@ -0,0 +1,156 @@ +/* Copyright (c) 2005 - 2012 Vertica, an HP company -*- C++ -*- */ +/* + * Description: Example User Defined Aggregate Function: Average + * + */ + +#include "Vertica.h" +#include +#include +#include + +using namespace Vertica; +using namespace std; + + + +/**** + * Example implementation of Average: intermediate is a 2 part type: running + * sum and count. + ***/ +class Average : public AggregateFunction +{ + virtual void initAggregate(ServerInterface &srvInterface, + IntermediateAggs &aggs) + { + try { + VNumeric &sum = aggs.getNumericRef(0); + sum.setZero(); + + vint &cnt = aggs.getIntRef(1); + cnt = 0; + } catch(exception& e) { + // Standard exception. Quit. + vt_report_error(0, "Exception while initializing intermediate aggregates: [%s]", e.what()); + } + } + + void aggregate(ServerInterface &srvInterface, + BlockReader &argReader, + IntermediateAggs &aggs) + { + try { + VNumeric &sum = aggs.getNumericRef(0); + vint &cnt = aggs.getIntRef(1); + + do { + const VNumeric &input = argReader.getNumericRef(0); + if (!input.isNull()) { + sum.accumulate(&input); + cnt++; + } + } while (argReader.next()); + } catch(exception& e) { + // Standard exception. Quit. + vt_report_error(0, "Exception while processing aggregate: [%s]", e.what()); + } + } + + virtual void combine(ServerInterface &srvInterface, + IntermediateAggs &aggs, + MultipleIntermediateAggs &aggsOther) + { + try { + VNumeric &mySum = aggs.getNumericRef(0); + vint &myCount = aggs.getIntRef(1); + + // Combine all the other intermediate aggregates + do { + const VNumeric &otherSum = aggsOther.getNumericRef(0); + const vint &otherCount = aggsOther.getIntRef(1); + + // Do the actual accumulation + mySum.accumulate(&otherSum); + myCount += otherCount; + + } while (aggsOther.next()); + } catch(exception& e) { + // Standard exception. Quit. + vt_report_error(0, "Exception while combining intermediate aggregates: [%s]", e.what()); + } + } + + virtual void terminate(ServerInterface &srvInterface, + BlockWriter &resWriter, + IntermediateAggs &aggs) + { + try { + // Metadata about the type (to allow creation) + const VerticaType &numtype = aggs.getTypeMetaData().getColumnType(0); + const VNumeric &sum = aggs.getNumericRef(0); + + // Get the count as a numeric by making a local numeric + //uint64 tmp[sum.getMaxSize() / sizeof(uint64)]; + uint64* tmp = (uint64*)malloc(numtype.getMaxSize() / sizeof(uint64)); + VNumeric cnt(tmp, numtype.getNumericPrecision(), numtype.getNumericScale()); + cnt.copy(aggs.getIntRef(1)); // convert to numeric! + + VNumeric &out = resWriter.getNumericRef(); + if (cnt.isZero()) + out.setZero(); + else + out.div(&sum, &cnt); + } catch(exception& e) { + // Standard exception. Quit. + vt_report_error(0, "Exception while computing aggregate output: [%s]", e.what()); + } + } + + InlineAggregate() +}; + + +/* + * This class provides the meta-data associated with the aggregate function + * shown above, as well as a way of instantiating objects of the class. + */ +class AverageFactory : public AggregateFunctionFactory +{ + virtual void getPrototype(ServerInterface &srvfloaterface, + ColumnTypes &argTypes, + ColumnTypes &returnType) + { + argTypes.addNumeric(); + returnType.addNumeric(); + } + + // Provide return type length/scale/precision information (given the input + // type length/scale/precision), as well as column names + virtual void getReturnType(ServerInterface &srvfloaterface, + const SizedColumnTypes &inputTypes, + SizedColumnTypes &outputTypes) + { + int int_part = inputTypes.getColumnType(0).getNumericPrecision(); + int frac_part = inputTypes.getColumnType(0).getNumericScale(); + outputTypes.addNumeric(int_part+frac_part, frac_part); + } + + virtual void getIntermediateTypes(ServerInterface &srvInterface, + const SizedColumnTypes &inputTypes, + SizedColumnTypes + &intermediateTypeMetaData) + { + int int_part = inputTypes.getColumnType(0).getNumericIntegral(); + int frac_part = inputTypes.getColumnType(0).getNumericFractional(); + intermediateTypeMetaData.addNumeric(int_part+frac_part, frac_part); // intermediate sum + intermediateTypeMetaData.addInt(); // count of items + } + + // Create an instance of the AggregateFunction + virtual AggregateFunction *createAggregateFunction(ServerInterface &srvfloaterface) + { return vt_createFuncObject(srvfloaterface.allocator); } + +}; + +RegisterFactory(AverageFactory); + diff --git a/AggregateFunctions/Concatenate.cpp b/AggregateFunctions/Concatenate.cpp new file mode 100644 index 0000000..b3e27dd --- /dev/null +++ b/AggregateFunctions/Concatenate.cpp @@ -0,0 +1,127 @@ +/* Copyright (c) 2005 - 2012 Vertica, an HP company -*- C++ -*- */ +/* + * Description: Example User Defined Aggregate Function: Concatenate strings + * + */ + +#include "Vertica.h" +#include +#include +#include + +using namespace Vertica; +using namespace std; + + +/** + * User Defined Aggregate Function concatenate that takes in strings and concatenates + * them together. Right now, the max length of the resulting string is ten times the + * maximum length of the input string. + */ +class Concatenate : public AggregateFunction +{ + + virtual void initAggregate(ServerInterface &srvInterface, IntermediateAggs &aggs) + { + try { + VString &concat = aggs.getStringRef(0); + concat.copy(""); + } catch(exception& e) { + // Standard exception. Quit. + vt_report_error(0, "Exception while initializing intermediate aggregates: [%s]", e.what()); + } + } + + void aggregate(ServerInterface &srvInterface, + BlockReader &argReader, + IntermediateAggs &aggs) + { + try { + VString &concat = aggs.getStringRef(0); + string word = concat.str(); + uint32 maxSize = aggs.getTypeMetaData().getColumnType(0).getStringLength(); + do { + const VString &input = argReader.getStringRef(0); + + if (!input.isNull()) { + if ((word.length() + input.length()) > maxSize) break; + word.append(input.str()); + } + } while (argReader.next()); + concat.copy(word); + } catch(exception& e) { + // Standard exception. Quit. + vt_report_error(0, "Exception while processing aggregate: [%s]", e.what()); + } + } + + virtual void combine(ServerInterface &srvInterface, + IntermediateAggs &aggs, + MultipleIntermediateAggs &aggsOther) + { + try { + uint32 maxSize = aggs.getTypeMetaData().getColumnType(0).getStringLength(); + VString myConcat = aggs.getStringRef(0); + + do { + const VString otherConcat = aggsOther.getStringRef(0); + if ((myConcat.length() + otherConcat.length()) <= maxSize) { + string word = myConcat.str(); + word.append(otherConcat.str()); + myConcat.copy(word); + } + } while (aggsOther.next()); + } catch(exception& e) { + // Standard exception. Quit. + vt_report_error(0, "Exception while combining intermediate aggregates: [%s]", e.what()); + } + } + + virtual void terminate(ServerInterface &srvInterface, + BlockWriter &resWriter, + IntermediateAggs &aggs) + { + try { + const VString &concat = aggs.getStringRef(0); + VString &result = resWriter.getStringRef(); + + result.copy(&concat); + } catch(exception& e) { + // Standard exception. Quit. + vt_report_error(0, "Exception while computing aggregate output: [%s]", e.what()); + } + } + + InlineAggregate() +}; + + +class ConcatenateFactory : public AggregateFunctionFactory +{ + virtual void getIntermediateTypes(ServerInterface &srvInterface, const SizedColumnTypes &inputTypes, SizedColumnTypes &intermediateTypeMetaData) + { + int input_len = inputTypes.getColumnType(0).getStringLength(); + intermediateTypeMetaData.addVarchar(input_len*10); + } + + virtual void getPrototype(ServerInterface &srvfloaterface, ColumnTypes &argTypes, ColumnTypes &returnType) + { + argTypes.addVarchar(); + returnType.addVarchar(); + } + + virtual void getReturnType(ServerInterface &srvfloaterface, + const SizedColumnTypes &inputTypes, + SizedColumnTypes &outputTypes) + { + int input_len = inputTypes.getColumnType(0).getStringLength(); + outputTypes.addVarchar(input_len*10); + } + + virtual AggregateFunction *createAggregateFunction(ServerInterface &srvfloaterface) + { return vt_createFuncObject(srvfloaterface.allocator); } + +}; + +RegisterFactory(ConcatenateFactory); + diff --git a/AggregateFunctions/CountOccurences.cpp b/AggregateFunctions/CountOccurences.cpp new file mode 100644 index 0000000..ad9a62c --- /dev/null +++ b/AggregateFunctions/CountOccurences.cpp @@ -0,0 +1,117 @@ +/* Copyright (c) 2005 - 2012 Vertica, an HP company -*- C++ -*- */ +/* + * Description: Example User Defined Aggregate Function: CountOccurences + * + * Create Date: Nov 01, 2012 + */ + +#include "Vertica.h" +#include +#include +#include + +using namespace Vertica; +using namespace std; + +/** + * User Defined Aggregate Function CountOccurences that takes in floats and count the occurences of a + * number specified by the parameter n + */ +class CountOccurences : public AggregateFunction +{ + + virtual void initAggregate(ServerInterface &srvInterface, + IntermediateAggs &aggs) + { + //get the value of n for the parameters + ParamReader paramReader = srvInterface.getParamReader(); + vfloat n = paramReader.getFloatRef("n"); + vfloat &num = aggs.getFloatRef(0); + num = n; + vint &cnt = aggs.getIntRef(1); + cnt = 0; + } + + void aggregate(ServerInterface &srvInterface, + BlockReader &arg_reader, + IntermediateAggs &aggs) + { + vint &cnt = aggs.getIntRef(1); + vfloat &num = aggs.getFloatRef(0); + do { + const vfloat &input = arg_reader.getFloatRef(0); + + if (num==input) { + cnt++; + } + } while (arg_reader.next()); + + } + + virtual void combine(ServerInterface &srvInterface, + IntermediateAggs &aggs, + MultipleIntermediateAggs &aggs_other) + { + vint &myCount = aggs.getIntRef(1); + + // Combine all the other intermediate aggregates + do { + const vint &otherCount = aggs_other.getIntRef(1); + // Do the actual accumulation + myCount += otherCount; + } while (aggs_other.next()); + + } + + virtual void terminate(ServerInterface &srvInterface, + BlockWriter &res_writer, + IntermediateAggs &aggs) + { + res_writer.setInt(aggs.getIntRef(1)); + } + + InlineAggregate() +}; + + +/* + * This class provides the meta-data associated with the function + * shown above, as well as a way of instantiating objects of the class. + */ +class CountOccurencesFactory : public AggregateFunctionFactory +{ + virtual void getPrototype(ServerInterface &srvfloaterface, + ColumnTypes &argTypes, + ColumnTypes &returnType) + { + argTypes.addFloat(); + returnType.addInt(); + } + + virtual void getReturnType(ServerInterface &srvfloaterface, + const SizedColumnTypes &input_types, + SizedColumnTypes &output_types) + { + output_types.addInt(); + } + virtual void getParameterType(ServerInterface &srvInterface, + SizedColumnTypes ¶meterTypes) + { + parameterTypes.addFloat("n"); + } + virtual void getIntermediateTypes(ServerInterface &srvInterface, + const SizedColumnTypes &input_types, + SizedColumnTypes + &intermediateTypeMetaData) + { + intermediateTypeMetaData.addFloat(); // the number to be counted + intermediateTypeMetaData.addInt(); // count of items + } + + virtual AggregateFunction *createAggregateFunction(ServerInterface &srvfloaterface) + { return vt_createFuncObject(srvfloaterface.allocator); } + +}; + +RegisterFactory(CountOccurencesFactory); + diff --git a/makefile b/makefile new file mode 100644 index 0000000..9f35fe9 --- /dev/null +++ b/makefile @@ -0,0 +1,50 @@ +############################ +# Vertica Analytic Database +# +# Makefile to build example user defined functions +# +# To run under valgrind: +# make RUN_VALGRIND=1 run +# +# Copyright (c) 2005 - 2012 Vertica, an HP company +############################ + +## Set to the location of the SDK installation +SDK_HOME?=/opt/vertica/sdk + +CXX?=g++ +CXXFLAGS:=$(CXXFLAGS) -I $(SDK_HOME)/include -g -Wall -Wno-unused-value -shared -fPIC + +ifdef OPTIMIZE +## UDLs should be compiled with compiler optimizations in release builds +CXXFLAGS:=$(CXXFLAGS) -O3 +endif + +## Set to the desired destination directory for .so output files +BUILD_DIR?=$(abspath build) + +## Set to a valid temporary directory +TMPDIR?=/tmp + +## Set to the path to +BOOST_INCLUDE ?= /usr/include +CURL_INCLUDE ?= /usr/include +ZLIB_INCLUDE ?= /usr/include +BZIP_INCLUDE ?= /usr/include + +all: AggregateFunctions + +$(BUILD_DIR)/.exists: + test -d $(BUILD_DIR) || mkdir -p $(BUILD_DIR) + touch $(BUILD_DIR)/.exists + +### +# Aggregate Functions +### +AggregateFunctions: $(BUILD_DIR)/AggregateFunctions.so + +$(BUILD_DIR)/AggregateFunctions.so: AggregateFunctions/*.cpp $(SDK_HOME)/include/Vertica.cpp $(SDK_HOME)/include/BuildInfo.h $(BUILD_DIR)/.exists + $(CXX) $(CXXFLAGS) -o $@ AggregateFunctions/*.cpp $(SDK_HOME)/include/Vertica.cpp + +clean: + rm -f $(BUILD_DIR)/*.so