Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support external udaf function #2825

Merged
merged 31 commits into from
Mar 22, 2023
Merged
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c18bc2c
feat: support external udaf
dl239 Nov 24, 2022
011c229
refact: update type_ir_builder
dl239 Nov 25, 2022
392b404
refact: refact
dl239 Nov 29, 2022
360438b
Merge branch 'main' of github.com:dl239/OpenMLDB into feat/udaf
dl239 Nov 29, 2022
098e5a7
feat: add node
dl239 Nov 29, 2022
e0030c7
feat: build udaf call
dl239 Nov 30, 2022
66df39d
feat: add udaf registry
dl239 Nov 30, 2022
a98d7b3
feat: support drop
dl239 Dec 7, 2022
e823ad3
Merge branch 'main' of github.com:dl239/OpenMLDB into feat/udaf
dl239 Dec 7, 2022
aed213e
refact: rm unused code
dl239 Dec 7, 2022
539bf6f
refact: rm unused code
dl239 Dec 7, 2022
d480f1c
docs: add udaf docs
dl239 Dec 8, 2022
2d2b319
docs: update
dl239 Dec 8, 2022
2bce9b6
feat: support null args
dl239 Dec 9, 2022
be68dbf
Merge branch 'feat/udaf' of github.com:dl239/OpenMLDB into feat/udaf
dl239 Dec 9, 2022
4f50ac8
fix: fix test case
dl239 Dec 12, 2022
7f9e01c
feat: return result in arg if return_nullable is true
dl239 Dec 12, 2022
5ce768c
docs: update the docs
dl239 Dec 12, 2022
c7413cb
fix: fix test case
dl239 Dec 13, 2022
b89eeb1
Merge branch 'feat/udaf' of github.com:dl239/OpenMLDB into feat/udaf
dl239 Dec 13, 2022
c31b2d8
Merge branch 'main' of github.com:dl239/OpenMLDB into feat/udaf
dl239 Dec 13, 2022
5f99997
merge main
dl239 Dec 20, 2022
1483c9c
Merge branch 'feat/udaf' of github.com:dl239/OpenMLDB into feat/udaf
dl239 Dec 20, 2022
05a1b35
merge main
dl239 Mar 20, 2023
5e1a8c5
fix: fix compile
dl239 Mar 20, 2023
f583a41
fix: fix ExternalUdfUtil
dl239 Mar 20, 2023
497e527
fix: fix comment
dl239 Mar 21, 2023
cede5bd
fix: fix comment
dl239 Mar 21, 2023
d3719f8
fix: fix style
dl239 Mar 21, 2023
b2b8000
fix: fix compile
dl239 Mar 21, 2023
926d158
fix: fix compile
dl239 Mar 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refact: refact
dl239 committed Nov 29, 2022
commit 392b4040bb3fc61bfb027b31edda9933c28d897c
2 changes: 0 additions & 2 deletions hybridse/src/udf/udf.cc
Original file line number Diff line number Diff line change
@@ -84,8 +84,6 @@ void unhex(StringRef *str, StringRef *output, bool* is_null) {
if (a <= 'F' && a >= 'A') { return a - 'A' + 10; }
if (a <= 'f' && a >= 'a') { return a - 'a' + 10; }
if (a <= '9' && a >= '0') { return a - '0'; }
// cannot reach here
return 0;
};

if (!*is_null) { // every character is valid hex character
4 changes: 1 addition & 3 deletions hybridse/src/udf/udf_library.h
Original file line number Diff line number Diff line change
@@ -51,7 +51,6 @@ class UdafRegistry;
class CompositeRegistry;
class UdfResolveContext;

template <typename T>
class ArgSignatureTable;

template <template <typename> typename FTemplate>
@@ -101,8 +100,7 @@ class UdfLibrary {

bool HasFunction(const std::string& name) const;

std::shared_ptr<ArgSignatureTable<std::shared_ptr<UdfRegistry>>> FindAll(
const std::string& name) const;
std::shared_ptr<ArgSignatureTable> FindAll(const std::string& name) const;

bool IsUdaf(const std::string& name, size_t args) const;
bool IsUdaf(const std::string& name) const;
149 changes: 149 additions & 0 deletions hybridse/src/udf/udf_registry.cc
Original file line number Diff line number Diff line change
@@ -27,6 +27,146 @@ using ::hybridse::common::kCodegenError;
namespace hybridse {
namespace udf {

Status ArgSignatureTable::Find(UdfResolveContext* ctx, std::shared_ptr<UdfRegistry>* res, std::string* signature,
int* variadic_pos) {
std::vector<const node::TypeNode*> arg_types;
for (size_t i = 0; i < ctx->arg_size(); ++i) {
arg_types.push_back(ctx->arg_type(i));
}
return Find(arg_types, res, signature, variadic_pos);
}

Status ArgSignatureTable::Find(const std::vector<const node::TypeNode*>& arg_types, std::shared_ptr<UdfRegistry>* res,
std::string* signature, int* variadic_pos) {
std::stringstream ss;
for (size_t i = 0; i < arg_types.size(); ++i) {
auto type_node = arg_types[i];
if (type_node == nullptr) {
ss << "?";
} else {
ss << type_node->GetName();
}
if (i < arg_types.size() - 1) {
ss << ", ";
}
}

// There are four match conditions:
// (1) explicit match without placeholders
// (2) explicit match with placeholders
// (3) variadic match without placeholders
// (4) variadic match with placeholders
// The priority is (1) > (2) > (3) > (4)
typename TableType::iterator placeholder_match_iter = table_.end();
typename TableType::iterator variadic_placeholder_match_iter =
table_.end();
typename TableType::iterator variadic_match_iter = table_.end();
int variadic_match_pos = -1;
int variadic_placeholder_match_pos = -1;

for (auto iter = table_.begin(); iter != table_.end(); ++iter) {
auto& def_item = iter->second;
auto& def_arg_types = def_item.arg_types;
if (def_item.is_variadic) {
// variadic match
bool match = true;
bool placeholder_match = false;
int non_variadic_arg_num = def_arg_types.size();
if (arg_types.size() <
static_cast<size_t>(non_variadic_arg_num)) {
continue;
}
for (int j = 0; j < non_variadic_arg_num; ++j) {
if (def_arg_types[j] == nullptr) { // any arg
placeholder_match = true;
match = false;
} else if (!node::TypeEquals(def_arg_types[j],
arg_types[j])) {
placeholder_match = false;
match = false;
break;
}
}
if (match) {
if (variadic_match_pos < non_variadic_arg_num) {
placeholder_match_iter = iter;
variadic_match_pos = non_variadic_arg_num;
}
} else if (placeholder_match) {
if (variadic_placeholder_match_pos < non_variadic_arg_num) {
variadic_placeholder_match_iter = iter;
variadic_placeholder_match_pos = non_variadic_arg_num;
}
}

} else if (arg_types.size() == def_arg_types.size()) {
// explicit match
bool match = true;
bool placeholder_match = false;
for (size_t j = 0; j < arg_types.size(); ++j) {
if (def_arg_types[j] == nullptr) {
placeholder_match = true;
match = false;
} else if (!node::TypeEquals(def_arg_types[j],
arg_types[j])) {
placeholder_match = false;
match = false;
break;
}
}
if (match) {
*variadic_pos = -1;
*signature = iter->first;
*res = def_item.value;
return Status::OK();
} else if (placeholder_match) {
placeholder_match_iter = iter;
}
}
}

if (placeholder_match_iter != table_.end()) {
*variadic_pos = -1;
*signature = placeholder_match_iter->first;
*res = placeholder_match_iter->second.value;
return Status::OK();
} else if (variadic_match_iter != table_.end()) {
*variadic_pos = variadic_match_pos;
*signature = variadic_match_iter->first;
*res = variadic_match_iter->second.value;
return Status::OK();
} else if (variadic_placeholder_match_iter != table_.end()) {
*variadic_pos = variadic_placeholder_match_pos;
*signature = variadic_placeholder_match_iter->first;
*res = variadic_placeholder_match_iter->second.value;
return Status::OK();
} else {
return Status(common::kCodegenError,
"Resolve udf signature failure: <" + ss.str() + ">");
}
}

Status ArgSignatureTable::Register(const std::vector<const node::TypeNode*>& args,
bool is_variadic, const std::shared_ptr<UdfRegistry>& t) {
std::stringstream ss;
for (size_t i = 0; i < args.size(); ++i) {
if (args[i] == nullptr) {
ss << "?";
} else {
ss << args[i]->GetName();
}
if (i < args.size() - 1) {
ss << ", ";
}
}
std::string key = ss.str();
auto iter = table_.find(key);
CHECK_TRUE(iter == table_.end(), common::kCodegenError,
"Duplicate signature: ", key);
table_.insert(iter, {key, DefItem(t, args, is_variadic)});
return Status::OK();
}

const std::string UdfResolveContext::GetArgSignature() const {
return hybridse::udf::GetArgSignature(args_);
}
@@ -122,6 +262,15 @@ Status DynamicUdfRegistry::ResolveFunction(UdfResolveContext* ctx,
return Status::OK();
}

Status DynamicUdafRegistry::ResolveFunction(UdfResolveContext* ctx,
node::FnDefNode** result) {
CHECK_TRUE(extern_def_->ret_type() != nullptr, kCodegenError,
"No return type specified for ", extern_def_->GetName());
DLOG(INFO) << "Resolve udaf \"" << name() << "\" -> " << extern_def_->GetFlatString();
*result = extern_def_;
return Status::OK();
}

Status SimpleUdfRegistry::ResolveFunction(UdfResolveContext* ctx,
node::FnDefNode** result) {
*result = fn_def_;
166 changes: 21 additions & 145 deletions hybridse/src/udf/udf_registry.h
Original file line number Diff line number Diff line change
@@ -109,154 +109,21 @@ class UdfRegistry {
std::string doc_;
};

template <typename T>
class ArgSignatureTable {
public:
Status Find(UdfResolveContext* ctx, T* res, std::string* signature,
int* variadic_pos) {
std::vector<const node::TypeNode*> arg_types;
for (size_t i = 0; i < ctx->arg_size(); ++i) {
arg_types.push_back(ctx->arg_type(i));
}
return Find(arg_types, res, signature, variadic_pos);
}

Status Find(const std::vector<const node::TypeNode*>& arg_types, T* res,
std::string* signature, int* variadic_pos) {
std::stringstream ss;
for (size_t i = 0; i < arg_types.size(); ++i) {
auto type_node = arg_types[i];
if (type_node == nullptr) {
ss << "?";
} else {
ss << type_node->GetName();
}
if (i < arg_types.size() - 1) {
ss << ", ";
}
}

// There are four match conditions:
// (1) explicit match without placeholders
// (2) explicit match with placeholders
// (3) variadic match without placeholders
// (4) variadic match with placeholders
// The priority is (1) > (2) > (3) > (4)
typename TableType::iterator placeholder_match_iter = table_.end();
typename TableType::iterator variadic_placeholder_match_iter =
table_.end();
typename TableType::iterator variadic_match_iter = table_.end();
int variadic_match_pos = -1;
int variadic_placeholder_match_pos = -1;

for (auto iter = table_.begin(); iter != table_.end(); ++iter) {
auto& def_item = iter->second;
auto& def_arg_types = def_item.arg_types;
if (def_item.is_variadic) {
// variadic match
bool match = true;
bool placeholder_match = false;
int non_variadic_arg_num = def_arg_types.size();
if (arg_types.size() <
static_cast<size_t>(non_variadic_arg_num)) {
continue;
}
for (int j = 0; j < non_variadic_arg_num; ++j) {
if (def_arg_types[j] == nullptr) { // any arg
placeholder_match = true;
match = false;
} else if (!node::TypeEquals(def_arg_types[j],
arg_types[j])) {
placeholder_match = false;
match = false;
break;
}
}
if (match) {
if (variadic_match_pos < non_variadic_arg_num) {
placeholder_match_iter = iter;
variadic_match_pos = non_variadic_arg_num;
}
} else if (placeholder_match) {
if (variadic_placeholder_match_pos < non_variadic_arg_num) {
variadic_placeholder_match_iter = iter;
variadic_placeholder_match_pos = non_variadic_arg_num;
}
}
Status Find(UdfResolveContext* ctx, std::shared_ptr<UdfRegistry>* res, std::string* signature, int* variadic_pos);

} else if (arg_types.size() == def_arg_types.size()) {
// explicit match
bool match = true;
bool placeholder_match = false;
for (size_t j = 0; j < arg_types.size(); ++j) {
if (def_arg_types[j] == nullptr) {
placeholder_match = true;
match = false;
} else if (!node::TypeEquals(def_arg_types[j],
arg_types[j])) {
placeholder_match = false;
match = false;
break;
}
}
if (match) {
*variadic_pos = -1;
*signature = iter->first;
*res = def_item.value;
return Status::OK();
} else if (placeholder_match) {
placeholder_match_iter = iter;
}
}
}

if (placeholder_match_iter != table_.end()) {
*variadic_pos = -1;
*signature = placeholder_match_iter->first;
*res = placeholder_match_iter->second.value;
return Status::OK();
} else if (variadic_match_iter != table_.end()) {
*variadic_pos = variadic_match_pos;
*signature = variadic_match_iter->first;
*res = variadic_match_iter->second.value;
return Status::OK();
} else if (variadic_placeholder_match_iter != table_.end()) {
*variadic_pos = variadic_placeholder_match_pos;
*signature = variadic_placeholder_match_iter->first;
*res = variadic_placeholder_match_iter->second.value;
return Status::OK();
} else {
return Status(common::kCodegenError,
"Resolve udf signature failure: <" + ss.str() + ">");
}
}
Status Find(const std::vector<const node::TypeNode*>& arg_types, std::shared_ptr<UdfRegistry>* res,
std::string* signature, int* variadic_pos);

Status Register(const std::vector<const node::TypeNode*>& args,
bool is_variadic, const T& t) {
std::stringstream ss;
for (size_t i = 0; i < args.size(); ++i) {
if (args[i] == nullptr) {
ss << "?";
} else {
ss << args[i]->GetName();
}
if (i < args.size() - 1) {
ss << ", ";
}
}
std::string key = ss.str();
auto iter = table_.find(key);
CHECK_TRUE(iter == table_.end(), common::kCodegenError,
"Duplicate signature: ", key);
table_.insert(iter, {key, DefItem(t, args, is_variadic)});
return Status::OK();
}
bool is_variadic, const std::shared_ptr<UdfRegistry>& t);

struct DefItem {
T value;
std::shared_ptr<UdfRegistry> value;
std::vector<const node::TypeNode*> arg_types;
bool is_variadic;
DefItem(const T& value,
DefItem(const std::shared_ptr<UdfRegistry>& value,
const std::vector<const node::TypeNode*>& arg_types,
bool is_variadic)
: value(value), arg_types(arg_types), is_variadic(is_variadic) {}
@@ -272,7 +139,7 @@ class ArgSignatureTable {

struct UdfLibraryEntry {
// argument matching table
ArgSignatureTable<std::shared_ptr<UdfRegistry>> signature_table;
ArgSignatureTable signature_table;

// record whether is udaf for specified argument num
std::unordered_set<size_t> udaf_arg_nums;
@@ -364,7 +231,7 @@ class UdfRegistryHelper {

void SetDoc(const std::string& doc) {
doc_ = doc;
for (auto reg : registries_) {
for (auto& reg : registries_) {
reg->SetDoc(doc);
}
}
@@ -872,17 +739,26 @@ class ExternalFuncRegistry : public UdfRegistry {

class DynamicUdfRegistry : public UdfRegistry {
public:
explicit DynamicUdfRegistry(const std::string& name,
node::DynamicUdfFnDefNode* extern_def)
explicit DynamicUdfRegistry(const std::string& name, node::DynamicUdfFnDefNode* extern_def)
: UdfRegistry(name), extern_def_(extern_def) {}

Status ResolveFunction(UdfResolveContext* ctx,
node::FnDefNode** result) override;
Status ResolveFunction(UdfResolveContext* ctx, node::FnDefNode** result) override;

private:
node::DynamicUdfFnDefNode* extern_def_;
};

class DynamicUdafRegistry : public UdfRegistry {
public:
explicit DynamicUdafRegistry(const std::string& name, node::DynamicUdafFnDefNode* extern_def)
: UdfRegistry(name), extern_def_(extern_def) {}

Status ResolveFunction(UdfResolveContext* ctx, node::FnDefNode** result) override;

private:
node::DynamicUdafFnDefNode* extern_def_;
};

template <bool A, bool B>
struct ConditionAnd {
static const bool value = false;
14 changes: 10 additions & 4 deletions include/base/string_ref.h
Original file line number Diff line number Diff line change
@@ -28,12 +28,16 @@ namespace base {

struct StringRef {
StringRef() : size_(0), data_(nullptr) {}
StringRef(std::nullptr_t) : size_(0), data_(nullptr) {}
StringRef(std::nullptr_t) : size_(0), data_(nullptr) {} // NOLINT

StringRef(const char* str) : size_(strlen(str)), data_(str) {}
StringRef(const char* str) // NOLINT
: size_(strlen(str)), data_(str) {}
StringRef(uint32_t size, const char* data) : size_(size), data_(data) {}

StringRef(const std::string& str) : size_(str.size()), data_(str.data()) {}
StringRef(const std::string& str) // NOLINT
: size_(str.size()), data_(str.data()) {}

~StringRef() {}

const inline bool IsNull() const { return nullptr == data_; }
const std::string ToString() const {
@@ -48,9 +52,11 @@ struct StringRef {
if (data_ == nullptr) {
return "NULL";
}

std::string out("\"");
out.append(data_, size_);
out.append("\"");

return out;
}

@@ -61,7 +67,7 @@ struct StringRef {
if (a.size_ < b.size_) {
r = -1;
} else if (a.size_ > b.size_) {
r = 1;
r = +1;
}
}
return r;