Skip to content

Commit

Permalink
support time to live
Browse files Browse the repository at this point in the history
alter tag/edge drop column, and the column to be delete is as TTL column
  • Loading branch information
ayyt committed Jun 11, 2019
1 parent 253a57a commit ce7d229
Show file tree
Hide file tree
Showing 27 changed files with 1,320 additions and 310 deletions.
8 changes: 7 additions & 1 deletion src/dataman/ResultSchemaProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ bool ResultSchemaProvider::ResultSchemaField::isValid() const {
*
**********************************/
ResultSchemaProvider::ResultSchemaProvider(Schema schema)
: columns_(std::move(schema.get_columns())) {
: columns_(std::move(schema.get_columns())),
schemaProp_(std::move(schema.get_schema_prop())) {
for (int64_t i = 0; i < static_cast<int64_t>(columns_.size()); i++) {
const std::string& name = columns_[i].get_name();
nameIndex_.emplace(std::make_pair(SpookyHashV2::Hash64(name.data(), name.size(), 0), i));
Expand Down Expand Up @@ -113,5 +114,10 @@ std::shared_ptr<const meta::SchemaProviderIf::Field> ResultSchemaProvider::field
return std::make_shared<ResultSchemaField>(&(columns_[index]));
}


const cpp2::SchemaProp ResultSchemaProvider::getProp() const {
return schemaProp_;
}

} // namespace nebula

4 changes: 4 additions & 0 deletions src/dataman/ResultSchemaProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class ResultSchemaProvider : public meta::SchemaProviderIf {
std::shared_ptr<const meta::SchemaProviderIf::Field> field(
const folly::StringPiece name) const override;

const cpp2::SchemaProp getProp() const override;

protected:
SchemaVer schemaVer_{0};

Expand All @@ -59,6 +61,8 @@ class ResultSchemaProvider : public meta::SchemaProviderIf {

// Default constructor, only used by SchemaWriter
explicit ResultSchemaProvider(SchemaVer ver = 0) : schemaVer_(ver) {}

cpp2::SchemaProp schemaProp_;
};

} // namespace nebula
Expand Down
64 changes: 50 additions & 14 deletions src/graph/AlterEdgeExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,18 @@ AlterEdgeExecutor::AlterEdgeExecutor(Sentence *sentence,


Status AlterEdgeExecutor::prepare() {
return checkIfGraphSpaceChosen();
}
auto status = checkIfGraphSpaceChosen();

if (!status.ok()) {
return status;
}

void AlterEdgeExecutor::execute() {
auto *mc = ectx()->getMetaClient();
auto *name = sentence_->name();
const auto& schemaOpts = sentence_->schemaOptList();
auto spaceId = ectx()->rctx()->session()->space();
const auto& schemaOpts = sentence_->getSchemaOpts();

std::vector<nebula::meta::cpp2::AlterSchemaItem> schemaItems;
for (auto& schemaOpt : schemaOpts) {
nebula::meta::cpp2::AlterSchemaItem schemaItem;
auto opType = schemaOpt->toType();
schemaItem.set_op(std::move(opType));
nebula::meta::cpp2::AlterSchemaOption schemaOption;
auto opType = schemaOpt->toOptionType();
schemaOption.set_type(std::move(opType));
const auto& specs = schemaOpt->columnSpecs();
nebula::cpp2::Schema schema;
for (auto& spec : specs) {
Expand All @@ -40,11 +37,50 @@ void AlterEdgeExecutor::execute() {
column.type.type = columnTypeToSupportedType(spec->type());
schema.columns.emplace_back(std::move(column));
}
schemaItem.set_schema(std::move(schema));
schemaItems.emplace_back(std::move(schemaItem));
schemaOption.set_schema(std::move(schema));
options_.emplace_back(std::move(schemaOption));
}

const auto& schemaProps = sentence_->getSchemaProps();

for (auto& schemaProp : schemaProps) {
nebula::meta::cpp2::AlterSchemaProp alterSchemaProp;
auto propType = schemaProp->getPropType();
StatusOr<int64_t> retInt;
StatusOr<std::string> retStr;
switch (propType) {
case SchemaPropItem::TTL_DURATION:
retInt = schemaProp->getTtlDuration();
if (!retInt.ok()) {
return retInt.status();
}
alterSchemaProp.set_value(folly::to<std::string>(retInt.value()));
break;
case SchemaPropItem::TTL_COL:
// The legality of the column name need be to check in meta
retStr = schemaProp->getTtlCol();
if (!retStr.ok()) {
return retStr.status();
}
alterSchemaProp.set_value(retStr.value());
break;
default:
return Status::Error("Property type not support");
}
alterSchemaProp.set_type(std::move(schemaProp->toPropType()));
schemaProps_.emplace_back(std::move(alterSchemaProp));
}

auto future = mc->alterEdgeSchema(spaceId, *name, std::move(schemaItems));
return Status::OK();
}


void AlterEdgeExecutor::execute() {
auto *mc = ectx()->getMetaClient();
auto *name = sentence_->name();
auto spaceId = ectx()->rctx()->session()->space();

auto future = mc->alterEdgeSchema(spaceId, *name, std::move(options_), std::move(schemaProps_));
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
Expand Down
4 changes: 3 additions & 1 deletion src/graph/AlterEdgeExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ class AlterEdgeExecutor final : public Executor {
void execute() override;

private:
AlterEdgeSentence *sentence_{nullptr};
AlterEdgeSentence *sentence_{nullptr};
std::vector<nebula::meta::cpp2::AlterSchemaOption> options_;
std::vector<nebula::meta::cpp2::AlterSchemaProp> schemaProps_;
};

} // namespace graph
Expand Down
67 changes: 53 additions & 14 deletions src/graph/AlterTagExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,20 @@ AlterTagExecutor::AlterTagExecutor(Sentence *sentence,
sentence_ = static_cast<AlterTagSentence*>(sentence);
}


Status AlterTagExecutor::prepare() {
return checkIfGraphSpaceChosen();
}
auto status = checkIfGraphSpaceChosen();

void AlterTagExecutor::execute() {
auto *mc = ectx()->getMetaClient();
auto *name = sentence_->name();
const auto& schemaOpts = sentence_->schemaOptList();
auto spaceId = ectx()->rctx()->session()->space();
if (!status.ok()) {
return status;
}

const auto& schemaOpts = sentence_->getSchemaOpts();

std::vector<nebula::meta::cpp2::AlterSchemaItem> schemaItems;
for (auto& schemaOpt : schemaOpts) {
nebula::meta::cpp2::AlterSchemaItem schemaItem;
auto opType = schemaOpt->toType();
schemaItem.set_op(std::move(opType));
nebula::meta::cpp2::AlterSchemaOption schemaOption;
auto opType = schemaOpt->toOptionType();
schemaOption.set_type(std::move(opType));
const auto& specs = schemaOpt->columnSpecs();
nebula::cpp2::Schema schema;
for (auto& spec : specs) {
Expand All @@ -38,11 +37,51 @@ void AlterTagExecutor::execute() {
column.type.type = columnTypeToSupportedType(spec->type());
schema.columns.emplace_back(std::move(column));
}
schemaItem.set_schema(std::move(schema));
schemaItems.emplace_back(std::move(schemaItem));
schemaOption.set_schema(std::move(schema));
options_.emplace_back(std::move(schemaOption));
}

auto future = mc->alterTagSchema(spaceId, *name, std::move(schemaItems));
const auto& schemaProps = sentence_->getSchemaProps();

for (auto& schemaProp : schemaProps) {
nebula::meta::cpp2::AlterSchemaProp alterSchemaProp;
auto propType = schemaProp->getPropType();
StatusOr<int64_t> retInt;
StatusOr<std::string> retStr;
switch (propType) {
case SchemaPropItem::TTL_DURATION:
retInt = schemaProp->getTtlDuration();
if (!retInt.ok()) {
return retInt.status();
}
alterSchemaProp.set_value(folly::to<std::string>(retInt.value()));
break;
case SchemaPropItem::TTL_COL:
// The legality of the column name need be to check in meta
retStr = schemaProp->getTtlCol();
if (!retStr.ok()) {
return retStr.status();
}
alterSchemaProp.set_value(retStr.value());
break;
default:
return Status::Error("Property type not support");
}
alterSchemaProp.set_type(std::move(schemaProp->toPropType()));
schemaProps_.emplace_back(std::move(alterSchemaProp));
}

return Status::OK();
}


void AlterTagExecutor::execute() {
auto *mc = ectx()->getMetaClient();
auto *name = sentence_->name();
auto spaceId = ectx()->rctx()->session()->space();


auto future = mc->alterTagSchema(spaceId, *name, std::move(options_), std::move(schemaProps_));
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
Expand Down
4 changes: 3 additions & 1 deletion src/graph/AlterTagExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ class AlterTagExecutor final : public Executor {
void execute() override;

private:
AlterTagSentence *sentence_{nullptr};
AlterTagSentence *sentence_{nullptr};
std::vector<nebula::meta::cpp2::AlterSchemaOption> options_;
std::vector<nebula::meta::cpp2::AlterSchemaProp> schemaProps_;
};

} // namespace graph
Expand Down
89 changes: 80 additions & 9 deletions src/graph/CreateEdgeExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,96 @@ CreateEdgeExecutor::CreateEdgeExecutor(Sentence *sentence,


Status CreateEdgeExecutor::prepare() {
return checkIfGraphSpaceChosen();
}
auto status = checkIfGraphSpaceChosen();

if (!status.ok()) {
return status;
}

void CreateEdgeExecutor::execute() {
auto *mc = ectx()->getMetaClient();
auto *name = sentence_->name();
const auto& specs = sentence_->columnSpecs();
auto spaceId = ectx()->rctx()->session()->space();
const auto& schemaProps = sentence_->getSchemaProps();

nebula::cpp2::Schema schema;
for (auto& spec : specs) {
nebula::cpp2::ColumnDef column;
column.name = *spec->name();
column.type.type = columnTypeToSupportedType(spec->type());
schema.columns.emplace_back(std::move(column));
schema_.columns.emplace_back(std::move(column));
}

// set schema_.schema_prop default
// 0 means infinity
schema_.schema_prop.set_ttl_duration(0);
schema_.schema_prop.set_ttl_col("");

if (schemaProps.size() != 0) {
for (auto& schemaProp : schemaProps) {
switch (schemaProp->getPropType()) {
case SchemaPropItem::TTL_DURATION:
status = setTTLDuration(schemaProp);
if (!status.ok()) {
return status;
}
break;
case SchemaPropItem::TTL_COL:
status = setTTLCol(schemaProp);
if (!status.ok()) {
return status;
}
break;
}
}
if (schema_.schema_prop.ttl_duration != 0) {
// Disable implicit TTL mode
if (schema_.schema_prop.ttl_col.empty()) {
return Status::Error("implicit ttl_col not support");
}

if (schema_.schema_prop.ttl_duration < 0) {
schema_.schema_prop.set_ttl_duration(0);
}
}
}

return Status::OK();
}


Status CreateEdgeExecutor::setTTLDuration(SchemaPropItem* schemaProp) {
auto ret = schemaProp->getTtlDuration();
if (!ret.ok()) {
return ret.status();
}

auto ttlDuration = ret.value();
schema_.schema_prop.set_ttl_duration(ttlDuration);
return Status::OK();
}


Status CreateEdgeExecutor::setTTLCol(SchemaPropItem* schemaProp) {
auto ret = schemaProp->getTtlCol();
if (!ret.ok()) {
return ret.status();
}

auto ttlColName = ret.value();
// check the legality of the ttl column name
for (auto& col : schema_.columns) {
if (col.name == ttlColName) {
schema_.schema_prop.set_ttl_col(ttlColName);
return Status::OK();
}
}
return Status::Error("Ttl column name not exist in columns");
}


void CreateEdgeExecutor::execute() {
auto *mc = ectx()->getMetaClient();
auto *name = sentence_->name();
auto spaceId = ectx()->rctx()->session()->space();

auto future = mc->createEdgeSchema(spaceId, *name, schema);
auto future = mc->createEdgeSchema(spaceId, *name, schema_);
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
Expand Down
5 changes: 5 additions & 0 deletions src/graph/CreateEdgeExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@ class CreateEdgeExecutor final : public Executor {

void execute() override;

Status setTTLDuration(SchemaPropItem* schemaProp);

Status setTTLCol(SchemaPropItem* schemaProp);

private:
CreateEdgeSentence *sentence_{nullptr};
nebula::cpp2::Schema schema_;
};

} // namespace graph
Expand Down
Loading

0 comments on commit ce7d229

Please sign in to comment.