Skip to content

Commit

Permalink
initial structure, copied from vertica examples
Browse files Browse the repository at this point in the history
  • Loading branch information
amirtuval committed Oct 7, 2014
0 parents commit 5f03439
Show file tree
Hide file tree
Showing 5 changed files with 451 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
build/
156 changes: 156 additions & 0 deletions AggregateFunctions/Average.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/* Copyright (c) 2005 - 2012 Vertica, an HP company -*- C++ -*- */
/*
* Description: Example User Defined Aggregate Function: Average
*
*/

#include "Vertica.h"
#include <time.h>
#include <sstream>
#include <iostream>

using namespace Vertica;
using namespace std;



/****
* Example implementation of Average: intermediate is a 2 part type: running
* sum and count.
***/
class Average : public AggregateFunction
{
virtual void initAggregate(ServerInterface &srvInterface,
IntermediateAggs &aggs)
{
try {
VNumeric &sum = aggs.getNumericRef(0);
sum.setZero();

vint &cnt = aggs.getIntRef(1);
cnt = 0;
} catch(exception& e) {
// Standard exception. Quit.
vt_report_error(0, "Exception while initializing intermediate aggregates: [%s]", e.what());
}
}

void aggregate(ServerInterface &srvInterface,
BlockReader &argReader,
IntermediateAggs &aggs)
{
try {
VNumeric &sum = aggs.getNumericRef(0);
vint &cnt = aggs.getIntRef(1);

do {
const VNumeric &input = argReader.getNumericRef(0);
if (!input.isNull()) {
sum.accumulate(&input);
cnt++;
}
} while (argReader.next());
} catch(exception& e) {
// Standard exception. Quit.
vt_report_error(0, "Exception while processing aggregate: [%s]", e.what());
}
}

virtual void combine(ServerInterface &srvInterface,
IntermediateAggs &aggs,
MultipleIntermediateAggs &aggsOther)
{
try {
VNumeric &mySum = aggs.getNumericRef(0);
vint &myCount = aggs.getIntRef(1);

// Combine all the other intermediate aggregates
do {
const VNumeric &otherSum = aggsOther.getNumericRef(0);
const vint &otherCount = aggsOther.getIntRef(1);

// Do the actual accumulation
mySum.accumulate(&otherSum);
myCount += otherCount;

} while (aggsOther.next());
} catch(exception& e) {
// Standard exception. Quit.
vt_report_error(0, "Exception while combining intermediate aggregates: [%s]", e.what());
}
}

virtual void terminate(ServerInterface &srvInterface,
BlockWriter &resWriter,
IntermediateAggs &aggs)
{
try {
// Metadata about the type (to allow creation)
const VerticaType &numtype = aggs.getTypeMetaData().getColumnType(0);
const VNumeric &sum = aggs.getNumericRef(0);

// Get the count as a numeric by making a local numeric
//uint64 tmp[sum.getMaxSize() / sizeof(uint64)];
uint64* tmp = (uint64*)malloc(numtype.getMaxSize() / sizeof(uint64));
VNumeric cnt(tmp, numtype.getNumericPrecision(), numtype.getNumericScale());
cnt.copy(aggs.getIntRef(1)); // convert to numeric!

VNumeric &out = resWriter.getNumericRef();
if (cnt.isZero())
out.setZero();
else
out.div(&sum, &cnt);
} catch(exception& e) {
// Standard exception. Quit.
vt_report_error(0, "Exception while computing aggregate output: [%s]", e.what());
}
}

InlineAggregate()
};


/*
* This class provides the meta-data associated with the aggregate function
* shown above, as well as a way of instantiating objects of the class.
*/
class AverageFactory : public AggregateFunctionFactory
{
virtual void getPrototype(ServerInterface &srvfloaterface,
ColumnTypes &argTypes,
ColumnTypes &returnType)
{
argTypes.addNumeric();
returnType.addNumeric();
}

// Provide return type length/scale/precision information (given the input
// type length/scale/precision), as well as column names
virtual void getReturnType(ServerInterface &srvfloaterface,
const SizedColumnTypes &inputTypes,
SizedColumnTypes &outputTypes)
{
int int_part = inputTypes.getColumnType(0).getNumericPrecision();
int frac_part = inputTypes.getColumnType(0).getNumericScale();
outputTypes.addNumeric(int_part+frac_part, frac_part);
}

virtual void getIntermediateTypes(ServerInterface &srvInterface,
const SizedColumnTypes &inputTypes,
SizedColumnTypes
&intermediateTypeMetaData)
{
int int_part = inputTypes.getColumnType(0).getNumericIntegral();
int frac_part = inputTypes.getColumnType(0).getNumericFractional();
intermediateTypeMetaData.addNumeric(int_part+frac_part, frac_part); // intermediate sum
intermediateTypeMetaData.addInt(); // count of items
}

// Create an instance of the AggregateFunction
virtual AggregateFunction *createAggregateFunction(ServerInterface &srvfloaterface)
{ return vt_createFuncObject<Average>(srvfloaterface.allocator); }

};

RegisterFactory(AverageFactory);

127 changes: 127 additions & 0 deletions AggregateFunctions/Concatenate.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/* Copyright (c) 2005 - 2012 Vertica, an HP company -*- C++ -*- */
/*
* Description: Example User Defined Aggregate Function: Concatenate strings
*
*/

#include "Vertica.h"
#include <time.h>
#include <sstream>
#include <iostream>

using namespace Vertica;
using namespace std;


/**
* User Defined Aggregate Function concatenate that takes in strings and concatenates
* them together. Right now, the max length of the resulting string is ten times the
* maximum length of the input string.
*/
class Concatenate : public AggregateFunction
{

virtual void initAggregate(ServerInterface &srvInterface, IntermediateAggs &aggs)
{
try {
VString &concat = aggs.getStringRef(0);
concat.copy("");
} catch(exception& e) {
// Standard exception. Quit.
vt_report_error(0, "Exception while initializing intermediate aggregates: [%s]", e.what());
}
}

void aggregate(ServerInterface &srvInterface,
BlockReader &argReader,
IntermediateAggs &aggs)
{
try {
VString &concat = aggs.getStringRef(0);
string word = concat.str();
uint32 maxSize = aggs.getTypeMetaData().getColumnType(0).getStringLength();
do {
const VString &input = argReader.getStringRef(0);

if (!input.isNull()) {
if ((word.length() + input.length()) > maxSize) break;
word.append(input.str());
}
} while (argReader.next());
concat.copy(word);
} catch(exception& e) {
// Standard exception. Quit.
vt_report_error(0, "Exception while processing aggregate: [%s]", e.what());
}
}

virtual void combine(ServerInterface &srvInterface,
IntermediateAggs &aggs,
MultipleIntermediateAggs &aggsOther)
{
try {
uint32 maxSize = aggs.getTypeMetaData().getColumnType(0).getStringLength();
VString myConcat = aggs.getStringRef(0);

do {
const VString otherConcat = aggsOther.getStringRef(0);
if ((myConcat.length() + otherConcat.length()) <= maxSize) {
string word = myConcat.str();
word.append(otherConcat.str());
myConcat.copy(word);
}
} while (aggsOther.next());
} catch(exception& e) {
// Standard exception. Quit.
vt_report_error(0, "Exception while combining intermediate aggregates: [%s]", e.what());
}
}

virtual void terminate(ServerInterface &srvInterface,
BlockWriter &resWriter,
IntermediateAggs &aggs)
{
try {
const VString &concat = aggs.getStringRef(0);
VString &result = resWriter.getStringRef();

result.copy(&concat);
} catch(exception& e) {
// Standard exception. Quit.
vt_report_error(0, "Exception while computing aggregate output: [%s]", e.what());
}
}

InlineAggregate()
};


class ConcatenateFactory : public AggregateFunctionFactory
{
virtual void getIntermediateTypes(ServerInterface &srvInterface, const SizedColumnTypes &inputTypes, SizedColumnTypes &intermediateTypeMetaData)
{
int input_len = inputTypes.getColumnType(0).getStringLength();
intermediateTypeMetaData.addVarchar(input_len*10);
}

virtual void getPrototype(ServerInterface &srvfloaterface, ColumnTypes &argTypes, ColumnTypes &returnType)
{
argTypes.addVarchar();
returnType.addVarchar();
}

virtual void getReturnType(ServerInterface &srvfloaterface,
const SizedColumnTypes &inputTypes,
SizedColumnTypes &outputTypes)
{
int input_len = inputTypes.getColumnType(0).getStringLength();
outputTypes.addVarchar(input_len*10);
}

virtual AggregateFunction *createAggregateFunction(ServerInterface &srvfloaterface)
{ return vt_createFuncObject<Concatenate>(srvfloaterface.allocator); }

};

RegisterFactory(ConcatenateFactory);

Loading

0 comments on commit 5f03439

Please sign in to comment.