diff --git a/api/.golangci.yaml b/api/.golangci.yaml index 157e8c9..1b1522b 100644 --- a/api/.golangci.yaml +++ b/api/.golangci.yaml @@ -8,6 +8,8 @@ linters: - gci - exhaustivestruct - dupl + - funlen + - cyclop linters-settings: lll: line-length: 140 diff --git a/api/internal/graph/generated/generated.go b/api/internal/graph/generated/generated.go index cfbd193..0211683 100644 --- a/api/internal/graph/generated/generated.go +++ b/api/internal/graph/generated/generated.go @@ -46,6 +46,11 @@ type DirectiveRoot struct { } type ComplexityRoot struct { + AutocompleteResponse struct { + ID func(childComplexity int) int + Name func(childComplexity int) int + } + Entity struct { EntityType func(childComplexity int) int ID func(childComplexity int) int @@ -112,12 +117,13 @@ type ComplexityRoot struct { } Query struct { - Entities func(childComplexity int, entityType string, page *int, limit *int) int - Entity func(childComplexity int, id string) int - Explore func(childComplexity int) int - MediaItem func(childComplexity int, id string) int - MediaItems func(childComplexity int, page *int, limit *int) int - Search func(childComplexity int, q string, page *int, limit *int) int + Autocomplete func(childComplexity int, q string) int + Entities func(childComplexity int, entityType string, page *int, limit *int) int + Entity func(childComplexity int, id string) int + Explore func(childComplexity int) int + MediaItem func(childComplexity int, id string) int + MediaItems func(childComplexity int, page *int, limit *int) int + Search func(childComplexity int, q *string, id *string, page *int, limit *int) int } } @@ -134,7 +140,8 @@ type MutationResolver interface { type QueryResolver interface { MediaItem(ctx context.Context, id string) (*models.MediaItem, error) MediaItems(ctx context.Context, page *int, limit *int) (*models.MediaItemConnection, error) - Search(ctx context.Context, q string, page *int, limit *int) (*models.MediaItemConnection, error) + Search(ctx context.Context, q *string, id *string, page *int, limit *int) (*models.MediaItemConnection, error) + Autocomplete(ctx context.Context, q string) ([]*models.AutocompleteResponse, error) Explore(ctx context.Context) (*models.ExploreResponse, error) Entities(ctx context.Context, entityType string, page *int, limit *int) (*models.EntityItemConnection, error) Entity(ctx context.Context, id string) (*models.Entity, error) @@ -155,6 +162,20 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in _ = ec switch typeName + "." + field { + case "AutocompleteResponse.id": + if e.complexity.AutocompleteResponse.ID == nil { + break + } + + return e.complexity.AutocompleteResponse.ID(childComplexity), true + + case "AutocompleteResponse.name": + if e.complexity.AutocompleteResponse.Name == nil { + break + } + + return e.complexity.AutocompleteResponse.Name(childComplexity), true + case "Entity.entityType": if e.complexity.Entity.EntityType == nil { break @@ -436,6 +457,18 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Photo.IsoEquivalent(childComplexity), true + case "Query.autocomplete": + if e.complexity.Query.Autocomplete == nil { + break + } + + args, err := ec.field_Query_autocomplete_args(context.TODO(), rawArgs) + if err != nil { + return 0, false + } + + return e.complexity.Query.Autocomplete(childComplexity, args["q"].(string)), true + case "Query.entities": if e.complexity.Query.Entities == nil { break @@ -501,7 +534,7 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return 0, false } - return e.complexity.Query.Search(childComplexity, args["q"].(string), args["page"].(*int), args["limit"].(*int)), true + return e.complexity.Query.Search(childComplexity, args["q"].(*string), args["id"].(*string), args["page"].(*int), args["limit"].(*int)), true } return 0, false @@ -630,10 +663,16 @@ type EntityItemConnection { totalCount: Int! } +type AutocompleteResponse { + id: String! + name: String! +} + type Query { mediaItem(id: String!): MediaItem! mediaItems(page: Int, limit: Int): MediaItemConnection! - search(q: String!, page: Int, limit: Int): MediaItemConnection! + search(q: String, id: String, page: Int, limit: Int): MediaItemConnection! + autocomplete(q: String!): [AutocompleteResponse] explore: ExploreResponse! entities(entityType: String!, page: Int, limit: Int): EntityItemConnection! entity(id: String!): Entity! @@ -729,6 +768,21 @@ func (ec *executionContext) field_Query___type_args(ctx context.Context, rawArgs return args, nil } +func (ec *executionContext) field_Query_autocomplete_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { + var err error + args := map[string]interface{}{} + var arg0 string + if tmp, ok := rawArgs["q"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("q")) + arg0, err = ec.unmarshalNString2string(ctx, tmp) + if err != nil { + return nil, err + } + } + args["q"] = arg0 + return args, nil +} + func (ec *executionContext) field_Query_entities_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { var err error args := map[string]interface{}{} @@ -819,33 +873,42 @@ func (ec *executionContext) field_Query_mediaItems_args(ctx context.Context, raw func (ec *executionContext) field_Query_search_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { var err error args := map[string]interface{}{} - var arg0 string + var arg0 *string if tmp, ok := rawArgs["q"]; ok { ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("q")) - arg0, err = ec.unmarshalNString2string(ctx, tmp) + arg0, err = ec.unmarshalOString2ᚖstring(ctx, tmp) if err != nil { return nil, err } } args["q"] = arg0 - var arg1 *int + var arg1 *string + if tmp, ok := rawArgs["id"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("id")) + arg1, err = ec.unmarshalOString2ᚖstring(ctx, tmp) + if err != nil { + return nil, err + } + } + args["id"] = arg1 + var arg2 *int if tmp, ok := rawArgs["page"]; ok { ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("page")) - arg1, err = ec.unmarshalOInt2ᚖint(ctx, tmp) + arg2, err = ec.unmarshalOInt2ᚖint(ctx, tmp) if err != nil { return nil, err } } - args["page"] = arg1 - var arg2 *int + args["page"] = arg2 + var arg3 *int if tmp, ok := rawArgs["limit"]; ok { ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("limit")) - arg2, err = ec.unmarshalOInt2ᚖint(ctx, tmp) + arg3, err = ec.unmarshalOInt2ᚖint(ctx, tmp) if err != nil { return nil, err } } - args["limit"] = arg2 + args["limit"] = arg3 return args, nil } @@ -887,6 +950,76 @@ func (ec *executionContext) field___Type_fields_args(ctx context.Context, rawArg // region **************************** field.gotpl ***************************** +func (ec *executionContext) _AutocompleteResponse_id(ctx context.Context, field graphql.CollectedField, obj *models.AutocompleteResponse) (ret graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + fc := &graphql.FieldContext{ + Object: "AutocompleteResponse", + Field: field, + Args: nil, + IsMethod: false, + IsResolver: false, + } + + ctx = graphql.WithFieldContext(ctx, fc) + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.ID, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) _AutocompleteResponse_name(ctx context.Context, field graphql.CollectedField, obj *models.AutocompleteResponse) (ret graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + fc := &graphql.FieldContext{ + Object: "AutocompleteResponse", + Field: field, + Args: nil, + IsMethod: false, + IsResolver: false, + } + + ctx = graphql.WithFieldContext(ctx, fc) + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Name, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + func (ec *executionContext) _Entity_id(ctx context.Context, field graphql.CollectedField, obj *models.Entity) (ret graphql.Marshaler) { defer func() { if r := recover(); r != nil { @@ -2284,7 +2417,7 @@ func (ec *executionContext) _Query_search(ctx context.Context, field graphql.Col fc.Args = args resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return ec.resolvers.Query().Search(rctx, args["q"].(string), args["page"].(*int), args["limit"].(*int)) + return ec.resolvers.Query().Search(rctx, args["q"].(*string), args["id"].(*string), args["page"].(*int), args["limit"].(*int)) }) if err != nil { ec.Error(ctx, err) @@ -2301,6 +2434,45 @@ func (ec *executionContext) _Query_search(ctx context.Context, field graphql.Col return ec.marshalNMediaItemConnection2ᚖirisᚋapiᚋinternalᚋmodelsᚐMediaItemConnection(ctx, field.Selections, res) } +func (ec *executionContext) _Query_autocomplete(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + fc := &graphql.FieldContext{ + Object: "Query", + Field: field, + Args: nil, + IsMethod: true, + IsResolver: true, + } + + ctx = graphql.WithFieldContext(ctx, fc) + rawArgs := field.ArgumentMap(ec.Variables) + args, err := ec.field_Query_autocomplete_args(ctx, rawArgs) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + fc.Args = args + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.Query().Autocomplete(rctx, args["q"].(string)) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + return graphql.Null + } + res := resTmp.([]*models.AutocompleteResponse) + fc.Result = res + return ec.marshalOAutocompleteResponse2ᚕᚖirisᚋapiᚋinternalᚋmodelsᚐAutocompleteResponse(ctx, field.Selections, res) +} + func (ec *executionContext) _Query_explore(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { defer func() { if r := recover(); r != nil { @@ -3586,6 +3758,38 @@ func (ec *executionContext) ___Type_ofType(ctx context.Context, field graphql.Co // region **************************** object.gotpl **************************** +var autocompleteResponseImplementors = []string{"AutocompleteResponse"} + +func (ec *executionContext) _AutocompleteResponse(ctx context.Context, sel ast.SelectionSet, obj *models.AutocompleteResponse) graphql.Marshaler { + fields := graphql.CollectFields(ec.OperationContext, sel, autocompleteResponseImplementors) + + out := graphql.NewFieldSet(fields) + var invalids uint32 + for i, field := range fields { + switch field.Name { + case "__typename": + out.Values[i] = graphql.MarshalString("AutocompleteResponse") + case "id": + out.Values[i] = ec._AutocompleteResponse_id(ctx, field, obj) + if out.Values[i] == graphql.Null { + invalids++ + } + case "name": + out.Values[i] = ec._AutocompleteResponse_name(ctx, field, obj) + if out.Values[i] == graphql.Null { + invalids++ + } + default: + panic("unknown field " + strconv.Quote(field.Name)) + } + } + out.Dispatch() + if invalids > 0 { + return graphql.Null + } + return out +} + var entityImplementors = []string{"Entity"} func (ec *executionContext) _Entity(ctx context.Context, sel ast.SelectionSet, obj *models.Entity) graphql.Marshaler { @@ -3990,6 +4194,17 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr } return res }) + case "autocomplete": + field := field + out.Concurrently(i, func() (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Query_autocomplete(ctx, field) + return res + }) case "explore": field := field out.Concurrently(i, func() (res graphql.Marshaler) { @@ -4681,6 +4896,53 @@ func (ec *executionContext) marshalN__TypeKind2string(ctx context.Context, sel a return res } +func (ec *executionContext) marshalOAutocompleteResponse2ᚕᚖirisᚋapiᚋinternalᚋmodelsᚐAutocompleteResponse(ctx context.Context, sel ast.SelectionSet, v []*models.AutocompleteResponse) graphql.Marshaler { + if v == nil { + return graphql.Null + } + ret := make(graphql.Array, len(v)) + var wg sync.WaitGroup + isLen1 := len(v) == 1 + if !isLen1 { + wg.Add(len(v)) + } + for i := range v { + i := i + fc := &graphql.FieldContext{ + Index: &i, + Result: &v[i], + } + ctx := graphql.WithFieldContext(ctx, fc) + f := func(i int) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = nil + } + }() + if !isLen1 { + defer wg.Done() + } + ret[i] = ec.marshalOAutocompleteResponse2ᚖirisᚋapiᚋinternalᚋmodelsᚐAutocompleteResponse(ctx, sel, v[i]) + } + if isLen1 { + f(i) + } else { + go f(i) + } + + } + wg.Wait() + return ret +} + +func (ec *executionContext) marshalOAutocompleteResponse2ᚖirisᚋapiᚋinternalᚋmodelsᚐAutocompleteResponse(ctx context.Context, sel ast.SelectionSet, v *models.AutocompleteResponse) graphql.Marshaler { + if v == nil { + return graphql.Null + } + return ec._AutocompleteResponse(ctx, sel, v) +} + func (ec *executionContext) unmarshalOBoolean2bool(ctx context.Context, v interface{}) (bool, error) { res, err := graphql.UnmarshalBoolean(v) return res, graphql.ErrorOnPath(ctx, err) diff --git a/api/internal/graph/resolvers/schema.resolvers.go b/api/internal/graph/resolvers/schema.resolvers.go index f5af8b7..404bad7 100644 --- a/api/internal/graph/resolvers/schema.resolvers.go +++ b/api/internal/graph/resolvers/schema.resolvers.go @@ -221,8 +221,121 @@ func (r *queryResolver) MediaItems(ctx context.Context, page *int, limit *int) ( }, nil } -func (r *queryResolver) Search(ctx context.Context, q string, page *int, limit *int) (*models.MediaItemConnection, error) { - return nil, nil +func (r *queryResolver) Search(ctx context.Context, q *string, id *string, page *int, limit *int) (*models.MediaItemConnection, error) { + defaultSearchLimit := 20 + defaultSearchPage := 1 + + if limit == nil { + limit = &defaultSearchLimit + } + + if page == nil { + page = &defaultSearchPage + } + + skip := int64(*limit * (*page - 1)) + itemsPerPage := int64(*limit) + + var entityIDs []primitive.ObjectID + + if q != nil { + cur, err := r.DB.Collection(models.ColEntity).Find(ctx, bson.D{ + {Key: "$text", Value: bson.D{ + {Key: "$search", Value: *q}, + {Key: "$caseSensitive", Value: false}, + }}}) + if err != nil { + return nil, err + } + + var entities []*models.Entity + if err = cur.All(ctx, &entities); err != nil { + return nil, err + } + + entityIDs = make([]primitive.ObjectID, len(entities)) + + for idx, entity := range entities { + oid, _ := primitive.ObjectIDFromHex(entity.ID) + entityIDs[idx] = oid + } + } else if id != nil { + entityIDs = make([]primitive.ObjectID, 1) + oid, _ := primitive.ObjectIDFromHex(*id) + entityIDs[0] = oid + } + + colQuery := bson.A{ + bson.D{{Key: "$match", Value: bson.D{ + {Key: "entities", Value: bson.D{{ + Key: "$in", Value: entityIDs, + }}}}}, + }, + bson.D{{Key: "$sort", Value: bson.D{{Key: "updatedAt", Value: -1}}}}, + bson.D{{Key: "$skip", Value: skip}}, + bson.D{{Key: "$limit", Value: itemsPerPage}}, + } + cntQuery := bson.A{ + bson.D{{Key: "$match", Value: bson.D{ + {Key: "entities", Value: bson.D{{ + Key: "$in", Value: entityIDs, + }}}}}, + }, + bson.D{{Key: "$count", Value: "count"}}, + } + facetStage := bson.D{{ + Key: "$facet", + Value: bson.D{{Key: "mediaItems", Value: colQuery}, {Key: "totalCount", Value: cntQuery}}, + }} + + cur, err := r.DB.Collection(models.ColMediaItems).Aggregate(ctx, mongo.Pipeline{facetStage}) + if err != nil { + return nil, err + } + + var result []*struct { + MediaItems []*models.MediaItem `bson:"mediaItems"` + TotalCount []*struct { + Count *int `bson:"count"` + } `bson:"totalCount"` + } + + if err = cur.All(ctx, &result); err != nil { + return nil, err + } + + totalCount := 0 + if len(result) != 0 && len(result[0].TotalCount) != 0 { + totalCount = *result[0].TotalCount[0].Count + } + + return &models.MediaItemConnection{ + TotalCount: totalCount, + Nodes: result[0].MediaItems, + }, nil +} + +func (r *queryResolver) Autocomplete(ctx context.Context, q string) ([]*models.AutocompleteResponse, error) { + cur, err := r.DB.Collection(models.ColEntity).Find(ctx, bson.D{ + {Key: "$text", Value: bson.D{ + {Key: "$search", Value: q}, + {Key: "$caseSensitive", Value: false}, + }}}) + if err != nil { + return nil, err + } + + var entities []*models.Entity + if err = cur.All(ctx, &entities); err != nil { + return nil, err + } + + matches := make([]*models.AutocompleteResponse, len(entities)) + for idx, entity := range entities { + matches[idx] = &models.AutocompleteResponse{ID: entity.ID, Name: entity.Name} + } + + return matches, nil } func (r *queryResolver) Explore(ctx context.Context) (*models.ExploreResponse, error) { diff --git a/api/internal/models/models_gen.go b/api/internal/models/models_gen.go index 701c0a5..9bead99 100644 --- a/api/internal/models/models_gen.go +++ b/api/internal/models/models_gen.go @@ -2,6 +2,11 @@ package models +type AutocompleteResponse struct { + ID string `json:"id"` + Name string `json:"name"` +} + type EntityItemConnection struct { Nodes []*Entity `json:"nodes"` TotalCount int `json:"totalCount"` diff --git a/api/schema.graphql b/api/schema.graphql index add6bc7..c0e0b32 100644 --- a/api/schema.graphql +++ b/api/schema.graphql @@ -61,10 +61,16 @@ type EntityItemConnection { totalCount: Int! } +type AutocompleteResponse { + id: String! + name: String! +} + type Query { mediaItem(id: String!): MediaItem! mediaItems(page: Int, limit: Int): MediaItemConnection! - search(q: String!, page: Int, limit: Int): MediaItemConnection! + search(q: String, id: String, page: Int, limit: Int): MediaItemConnection! + autocomplete(q: String!): [AutocompleteResponse] explore: ExploreResponse! entities(entityType: String!, page: Int, limit: Int): EntityItemConnection! entity(id: String!): Entity! diff --git a/docker-compose.yaml b/docker-compose.yaml index a0a3df8..38bbf86 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -48,6 +48,8 @@ services: command: 'mongod --port 5010' ports: - '5010:5010' + volumes: + - ./infra/mongodb/create_indexes.js:/docker-entrypoint-initdb.d/create_indexes.js:ro cdn-master: container_name: cdn-master image: chrislusf/seaweedfs diff --git a/infra/mongodb/create_indexes.js b/infra/mongodb/create_indexes.js new file mode 100644 index 0000000..ec7cc68 --- /dev/null +++ b/infra/mongodb/create_indexes.js @@ -0,0 +1 @@ +db.entities.createIndex({ name: "text" }); \ No newline at end of file diff --git a/worker/pipeline/things.py b/worker/pipeline/things.py index 2a77ba2..8279156 100644 --- a/worker/pipeline/things.py +++ b/worker/pipeline/things.py @@ -547,10 +547,16 @@ def get_inference_results(self, inference_type): print(f'error while making inference request, status code: {res.status_code}') return result_classes + def upsert_things(self, things): + """Upserts things for future usage""" + for thing in things: + self.db['things'].find_one_and_update({ 'name': thing }, {'$set': { 'name': thing }}, upsert=True, return_document=ReturnDocument.AFTER) + def upsert_entity(self, data): """Upserts things entity""" entity_oids = [] for cat_class in data: + cat_class = cat_class.replace('_', ' ') result = self.db['entities'].find_one_and_update( {'name': cat_class, 'entityType': 'things'}, {'$set': { 'name': cat_class, 'imageUrl': self.image_url }}, @@ -580,5 +586,6 @@ def process(self): if category not in content_categories: content_categories.append(category) + self.upsert_things(od_classes + ic_classes) entity_oids = self.upsert_entity(od_classes + ic_classes) self.update({ '$set': { 'contentCategories': content_categories }, '$addToSet': { 'entities': { '$each': entity_oids } } }) diff --git a/worker/worker.py b/worker/worker.py index 17e8d85..7345985 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -1,4 +1,5 @@ """Worker""" +import os import json import urllib.request import pika @@ -29,6 +30,7 @@ def message_callback(ch, method, ___, body): component.process() # manually acknowledge the message + os.remove(f'image-{oid}') ch.basic_ack(delivery_tag=method.delivery_tag) def start_consumers():