diff --git a/.gitignore b/.gitignore index d25bdfe..8ed5784 100755 --- a/.gitignore +++ b/.gitignore @@ -1,10 +1,11 @@ **credential** **/builds/** -lambda_golang/landing -lambda_golang/stories -lambda_golang/landing_metadata -lambda_golang/story +lambda_golang/* +!lambda_golang/go.mod +!lambda_golang/go.sum +!lambda_golang/*/ +!lambda_golang/*/** venv # Binaries for programs and plugins diff --git a/cloud_environments/terraform.sh b/cloud_environments/terraform.sh index f4e6c14..8bf1757 100755 --- a/cloud_environments/terraform.sh +++ b/cloud_environments/terraform.sh @@ -22,9 +22,10 @@ set +o allexport if ( cd $GOLANG_SRC_DIR && \ go build ./cmd/landing && \ - go build ./cmd/landing_metadata && \ + go build ./cmd/landing_metadata_cronjob && \ go build ./cmd/stories && \ go build ./cmd/story && \ + go build ./cmd/stories_finalizer && \ cd $PYTHON_SRC_DIR && python -m compileall layer src ); then cd $DEPLOY_DIR @@ -37,7 +38,7 @@ if ( # https://github.com/terraform-aws-modules/terraform-aws-step-functions/issues/20 # terraform "$@" \ # -target=module.main.module.scraper_lambda \ - # -target=module.main.module.landing_parse_metadata_lambda + # -target=module.main.module.landing_metadata_cronjob_lambda terraform "$@" else diff --git a/cloud_module/dynamodb/table.tf b/cloud_module/dynamodb/table.tf index 2048b24..23a24de 100644 --- a/cloud_module/dynamodb/table.tf +++ b/cloud_module/dynamodb/table.tf @@ -1,12 +1,12 @@ resource "aws_ssm_parameter" "media_table" { name = "/app/media-literacy/table" type = "String" - value = aws_dynamodb_table.media_table.arn + value = "${aws_dynamodb_table.media_table.arn},${aws_dynamodb_table.media_table.id}" } // https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/dynamodb_table#attributes-reference resource "aws_dynamodb_table" "media_table" { - name = "Mediatable" + name = "${title(replace("${var.project_alias}_${var.environment_name}", "-", "_"))}" billing_mode = "PROVISIONED" read_capacity = 20 write_capacity = 20 @@ -23,8 +23,12 @@ resource "aws_dynamodb_table" "media_table" { type = "S" } + attribute { + name = "s3Key" + type = "S" + } + // other fields - // S3 key // docType = {landing | story | landingMetadata | ...} // events @@ -58,6 +62,15 @@ resource "aws_dynamodb_table" "media_table" { non_key_attributes = ["s3Key"] } + global_secondary_index { + name = "s3KeyIndex" + hash_key = "s3Key" + range_key = "createdAt" + write_capacity = 10 + read_capacity = 10 + projection_type = "KEYS_ONLY" + } + tags = { Project = local.project_name Environment = var.environment_name diff --git a/cloud_module/pipeline/global_ssm.tf b/cloud_module/pipeline/global_ssm.tf index 3449bca..22f9862 100644 --- a/cloud_module/pipeline/global_ssm.tf +++ b/cloud_module/pipeline/global_ssm.tf @@ -9,4 +9,8 @@ data aws_ssm_parameter media_table { locals { newssite_economy_tokens = split(",", data.aws_ssm_parameter.newssite_economy.value) newssite_economy_alias = local.newssite_economy_tokens[2] + + _media_table_tokens = split(",", data.aws_ssm_parameter.media_table.value) + media_table_arn = local._media_table_tokens[0] + media_table_id = local._media_table_tokens[1] } diff --git a/cloud_module/pipeline/lambda.tf b/cloud_module/pipeline/lambda.tf index c94c24a..8946f13 100755 --- a/cloud_module/pipeline/lambda.tf +++ b/cloud_module/pipeline/lambda.tf @@ -59,7 +59,7 @@ module "step_function" { module "scraper_lambda" { source = "terraform-aws-modules/lambda/aws" create_function = true - function_name = "${local.project_name}-scraper-lambda" + function_name = "${local.project_name}-landing-lambda" description = "Lambda function for scraping" handler = "landing" runtime = "go1.x" @@ -82,6 +82,13 @@ module "scraper_lambda" { attach_policy_statements = true policy_statements = { + allow_db_query = { + effect = "Allow", + actions = [ + "dynamodb:PutItem" + ], + resources = [local.media_table_arn] + } s3_archive_bucket = { effect = "Allow", actions = [ @@ -98,6 +105,7 @@ module "scraper_lambda" { S3_ARCHIVE_BUCKET = data.aws_s3_bucket.archive.id NEWSSITE_ECONOMY = data.aws_ssm_parameter.newssite_economy.value + DYNAMODB_TABLE_ID = local.media_table_id } tags = { diff --git a/cloud_module/pipeline/landing_s3_trigger.tf b/cloud_module/pipeline/landing_s3_trigger.tf deleted file mode 100644 index ef0b96d..0000000 --- a/cloud_module/pipeline/landing_s3_trigger.tf +++ /dev/null @@ -1,38 +0,0 @@ -resource "aws_lambda_permission" "allow_bucket_trigger_by_landing" { - statement_id = "AllowExecutionFromS3Bucket" - action = "lambda:InvokeFunction" - function_name = module.landing_parse_metadata_lambda.lambda_function_arn - principal = "s3.amazonaws.com" - source_arn = data.aws_s3_bucket.archive.arn -} - -resource "aws_lambda_permission" "allow_bucket_trigger_by_landing_metadata" { - statement_id = "AllowExecutionFromS3Bucket" - action = "lambda:InvokeFunction" - function_name = module.stories_queue_consumer_lambda.lambda_function_arn - principal = "s3.amazonaws.com" - source_arn = data.aws_s3_bucket.archive.arn -} - -resource "aws_s3_bucket_notification" "bucket_notification" { - bucket = data.aws_s3_bucket.archive.id - - lambda_function { - lambda_function_arn = module.landing_parse_metadata_lambda.lambda_function_arn - events = ["s3:ObjectCreated:*"] - filter_prefix = "${local.newssite_economy_alias}/" - filter_suffix = "landing.html" - } - - lambda_function { - lambda_function_arn = module.stories_queue_consumer_lambda.lambda_function_arn - events = ["s3:ObjectCreated:*"] - filter_prefix = "${local.newssite_economy_alias}/" - filter_suffix = "/metadata.json" - } - - depends_on = [ - aws_lambda_permission.allow_bucket_trigger_by_landing, - aws_lambda_permission.allow_bucket_trigger_by_landing_metadata - ] -} diff --git a/cloud_module/pipeline/stories_sqs.tf b/cloud_module/pipeline/s3_triggers.tf similarity index 58% rename from cloud_module/pipeline/stories_sqs.tf rename to cloud_module/pipeline/s3_triggers.tf index f8fa669..33ac4a8 100644 --- a/cloud_module/pipeline/stories_sqs.tf +++ b/cloud_module/pipeline/s3_triggers.tf @@ -1,35 +1,31 @@ -module "stories_queue" { - source = "terraform-aws-modules/sqs/aws" - version = ">= 2.0, < 3.0" - - # SQS queue attributes: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_CreateQueue.html - - # FIFO queue should append suffix .fifo - name = "${local.project_name}-stories-queue" - - delay_seconds = 0 - - # so we can use per-message delay - fifo_queue = false - - # FIFO queue only - # content_based_deduplication = true - - visibility_timeout_seconds = 3600 +resource "aws_s3_bucket_notification" "bucket_notification" { + bucket = data.aws_s3_bucket.archive.id + + lambda_function { + lambda_function_arn = module.landing_metadata_s3_trigger_lambda.lambda_function_arn + events = ["s3:ObjectCreated:*"] + filter_prefix = "${local.newssite_economy_alias}/" + filter_suffix = "/metadata.json" + } - # enable long polling - receive_wait_time_seconds = 10 + depends_on = [ + aws_lambda_permission.allow_bucket_trigger_by_landing_metadata + ] +} - tags = { - Project = local.project_name - } +resource "aws_lambda_permission" "allow_bucket_trigger_by_landing_metadata" { + statement_id = "AllowExecutionFromS3Bucket" + action = "lambda:InvokeFunction" + function_name = module.landing_metadata_s3_trigger_lambda.lambda_function_arn + principal = "s3.amazonaws.com" + source_arn = data.aws_s3_bucket.archive.arn } -module "stories_queue_consumer_lambda" { +module "landing_metadata_s3_trigger_lambda" { source = "terraform-aws-modules/lambda/aws" create_function = true - function_name = "${local.project_name}-fetch-stories" + function_name = "${local.project_name}-stories-lambda" description = "Fetch ${local.project_name} stories; triggered by metadata.json creation" handler = "stories" runtime = "go1.x" @@ -62,30 +58,20 @@ module "stories_queue_consumer_lambda" { } EOF - # event source mapping for long polling - event_source_mapping = { - sqs = { - event_source_arn = module.stories_queue.this_sqs_queue_arn - batch_size = 1 - } - } - allowed_triggers = { - sqs = { - principal = "sqs.amazonaws.com" - source_arn = module.stories_queue.this_sqs_queue_arn - } - } attach_policy_statements = true policy_statements = { - pull_sqs = { + allow_db_put = { effect = "Allow", - actions = ["sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes"], - resources = [module.stories_queue.this_sqs_queue_arn] + actions = [ + "dynamodb:UpdateItem", + ], + resources = [ + local.media_table_arn, + ] } s3_archive_bucket = { effect = "Allow", actions = [ - "s3:PutObject", "s3:GetObject" ], resources = [ @@ -107,8 +93,10 @@ EOF SLACK_WEBHOOK_URL = var.slack_post_webhook_url LOGLEVEL = "DEBUG" ENV = local.environment + DEBUG = "true" S3_ARCHIVE_BUCKET = data.aws_s3_bucket.archive.id + DYNAMODB_TABLE_ID = local.media_table_id SFN_ARN = module.batch_stories_sfn.state_machine_arn } diff --git a/cloud_module/pipeline/scheduler.tf b/cloud_module/pipeline/scheduler.tf index 6fcfd5b..bad68a3 100644 --- a/cloud_module/pipeline/scheduler.tf +++ b/cloud_module/pipeline/scheduler.tf @@ -60,3 +60,90 @@ data "aws_iam_policy_document" "scheduler" { } } } + + +resource "aws_cloudwatch_event_rule" "landing_metadata_scheduler" { + count = var.environment_name == "" ? 1 : 0 + + name = "${local.project_name}-schedule-start-metadata-for-landing" + # schedule experssion + # https://docs.aws.amazon.com/eventbridge/latest/userguide/scheduled-events.html + schedule_expression = "rate(1 hours)" + description = "Every hour to give courtesy to the website" +} + +resource "aws_cloudwatch_event_target" "landing_metadata_scheduler_event_target" { + count = var.environment_name == "" ? 1 : 0 + + target_id = "${local.project_name}-schedule-start-metadata-for-landing-event-target" + rule = aws_cloudwatch_event_rule.landing_metadata_scheduler.0.name + arn = module.landing_metadata_cronjob_lambda.lambda_function_arn +} + +module landing_metadata_cronjob_lambda { + source = "terraform-aws-modules/lambda/aws" + create_function = true + function_name = "${local.project_name}-landing-metadata-cronjob-lambda" + description = "Query landing pages in db; compute & archive their metadata" + handler = "landing_metadata_cronjob" + runtime = "go1.x" + + source_path = [{ + path = "${var.repo_dir}/lambda_golang/" + commands = ["${local.go_build_flags} go build ./cmd/landing_metadata_cronjob", ":zip"] + patterns = ["landing_metadata_cronjob"] + }] + + timeout = 900 + cloudwatch_logs_retention_in_days = 7 + + publish = true + + attach_policy_statements = true + policy_statements = { + allow_db_query = { + effect = "Allow", + actions = [ + "dynamodb:Query", + "dynamodb:UpdateItem", + ], + resources = [ + local.media_table_arn, + "${local.media_table_arn}/index/metadataIndex" + ] + } + s3_archive_bucket = { + effect = "Allow", + actions = [ + "s3:GetObject", + "s3:PutObject", + ], + resources = [ + "${data.aws_s3_bucket.archive.arn}/*", + ] + } + # enable getting 404 instead of 403 in case of not found + # https://stackoverflow.com/a/19808954/9814131 + s3_archive_bucket_check_404 = { + effect = "Allow", + actions = [ + "s3:ListBucket", + ], + resources = [ + "${data.aws_s3_bucket.archive.arn}", + ] + } + } + + environment_variables = { + SLACK_WEBHOOK_URL = var.slack_post_webhook_url + LOG_LEVEL = "DEBUG" + DEBUG = "true" + S3_ARCHIVE_BUCKET = data.aws_s3_bucket.archive.id + DYNAMODB_TABLE_ID = local.media_table_id + } + + tags = { + Project = local.project_name + } +} \ No newline at end of file diff --git a/cloud_module/pipeline/sfn_def/batch_stories_def.json b/cloud_module/pipeline/sfn_def/batch_stories_def.json index 76194b8..0c8124e 100644 --- a/cloud_module/pipeline/sfn_def/batch_stories_def.json +++ b/cloud_module/pipeline/sfn_def/batch_stories_def.json @@ -12,6 +12,7 @@ "Parameters": { "story.$": "$$.Map.Item.Value", "newsSiteAlias.$": "$.newsSiteAlias", + "landingPageUuid.$": "$.landingPageUuid", "landingPageTimeStamp.$": "$.landingPageTimeStamp" }, "Iterator": { @@ -32,6 +33,11 @@ } } }, + "Next": "Stories-Finalizer" + }, + "Stories-Finalizer": { + "Type":"Task", + "Resource": "${STORIES_FINALIZER_LAMBDA_ARN}", "End": true } } diff --git a/cloud_module/pipeline/stories_sfn.tf b/cloud_module/pipeline/stories_sfn.tf index 6f45d2c..2234b47 100644 --- a/cloud_module/pipeline/stories_sfn.tf +++ b/cloud_module/pipeline/stories_sfn.tf @@ -5,6 +5,7 @@ module batch_stories_sfn { definition = templatefile("${path.module}/sfn_def/batch_stories_def.json", { FETCH_STORY_LAMBDA_ARN = module.fetch_story_lambda.lambda_function_arn + STORIES_FINALIZER_LAMBDA_ARN = module.stories_finalizer_lambda.lambda_function_arn }) # allow step function to invoke other service @@ -14,7 +15,8 @@ module batch_stories_sfn { service_integrations = { lambda = { lambda = [ - module.fetch_story_lambda.lambda_function_arn + module.fetch_story_lambda.lambda_function_arn, + module.stories_finalizer_lambda.lambda_function_arn ] } } @@ -26,19 +28,18 @@ module batch_stories_sfn { } } - -module landing_parse_metadata_lambda { +module fetch_story_lambda { source = "terraform-aws-modules/lambda/aws" create_function = true - function_name = "${local.project_name}-batch-stories-fetch-parse" - description = "Scrape metadata from a landing page" - handler = "landing_metadata" + function_name = "${local.project_name}-story-lambda" + description = "Fetch and archive a story page" + handler = "story" runtime = "go1.x" source_path = [{ path = "${var.repo_dir}/lambda_golang/" - commands = ["${local.go_build_flags} go build ./cmd/landing_metadata", ":zip"] - patterns = ["landing_metadata"] + commands = ["${local.go_build_flags} go build ./cmd/story", ":zip"] + patterns = ["story"] }] timeout = 900 @@ -48,10 +49,14 @@ module landing_parse_metadata_lambda { attach_policy_statements = true policy_statements = { - pipeline_sqs = { + allow_db_put = { effect = "Allow", - actions = ["sqs:SendMessage", "sqs:GetQueueUrl"], - resources = [module.stories_queue.this_sqs_queue_arn] + actions = [ + "dynamodb:PutItem", + ], + resources = [ + local.media_table_arn, + ] } s3_archive_bucket = { effect = "Allow", @@ -77,14 +82,12 @@ module landing_parse_metadata_lambda { } environment_variables = { - STORIES_QUEUE_NAME = module.stories_queue.this_sqs_queue_name - SLACK_WEBHOOK_URL = var.slack_post_webhook_url LOG_LEVEL = "DEBUG" DEBUG = "true" S3_ARCHIVE_BUCKET = data.aws_s3_bucket.archive.id - NEWSSITE_ECONOMY = data.aws_ssm_parameter.newssite_economy.value + DYNAMODB_TABLE_ID = local.media_table_id } tags = { @@ -92,46 +95,35 @@ module landing_parse_metadata_lambda { } } -module fetch_story_lambda { +module "stories_finalizer_lambda" { source = "terraform-aws-modules/lambda/aws" create_function = true - function_name = "${local.project_name}-fetch-story" - description = "Fetch and archive a story page" - handler = "story" - runtime = "go1.x" + function_name = "${local.project_name}-stories-finalizer-lambda" + description = "Finalizer as last sfn step after all stories fetched" + handler = "stories_finalizer" + runtime = "go1.x" source_path = [{ path = "${var.repo_dir}/lambda_golang/" - commands = ["${local.go_build_flags} go build ./cmd/story", ":zip"] - patterns = ["story"] + commands = ["${local.go_build_flags} go build ./cmd/stories_finalizer", ":zip"] + patterns = ["stories_finalizer"] }] timeout = 900 cloudwatch_logs_retention_in_days = 7 - publish = true attach_policy_statements = true policy_statements = { - s3_archive_bucket = { + allow_db_put = { effect = "Allow", actions = [ - "s3:PutObject", - "s3:GetObject" + "dynamodb:Query", + "dynamodb:UpdateItem", ], resources = [ - "${data.aws_s3_bucket.archive.arn}/*", - ] - } - # enable getting 404 instead of 403 in case of not found - # https://stackoverflow.com/a/19808954/9814131 - s3_archive_bucket_check_404 = { - effect = "Allow", - actions = [ - "s3:ListBucket", - ], - resources = [ - "${data.aws_s3_bucket.archive.arn}", + local.media_table_arn, + "${local.media_table_arn}/index/s3KeyIndex" ] } } @@ -140,7 +132,7 @@ module fetch_story_lambda { SLACK_WEBHOOK_URL = var.slack_post_webhook_url LOG_LEVEL = "DEBUG" DEBUG = "true" - S3_ARCHIVE_BUCKET = data.aws_s3_bucket.archive.id + DYNAMODB_TABLE_ID = local.media_table_id } tags = { diff --git a/lambda_golang/cmd/landing/main.go b/lambda_golang/cmd/landing/main.go index ecdb4a4..a2091a5 100644 --- a/lambda_golang/cmd/landing/main.go +++ b/lambda_golang/cmd/landing/main.go @@ -4,13 +4,13 @@ import ( "context" "fmt" "strings" - "time" "github.com/aws/aws-lambda-go/lambda" "github.com/rivernews/GoTools" "github.com/rivernews/media-literacy/pkg/cloud" + "github.com/rivernews/media-literacy/pkg/common" "github.com/rivernews/media-literacy/pkg/newssite" ) @@ -30,7 +30,7 @@ type LambdaResponse struct { func HandleRequest(ctx context.Context, name LambdaEvent) (LambdaResponse, error) { newsSite := newssite.GetNewsSite("NEWSSITE_ECONOMY") - bodyText := newssite.Fetch(newsSite.LandingURL) + bodyText := common.Fetch(newsSite.LandingURL) GoTools.Logger("INFO", "In golang runtime now!\n\n```\n "+bodyText[:500]+"\n ...```\n End of message") // scraper @@ -57,10 +57,21 @@ func HandleRequest(ctx context.Context, name LambdaEvent) (LambdaResponse, error GoTools.Logger("INFO", successMessage) // S3 archive + landingPageS3Key := fmt.Sprintf("%s/daily-headlines/%s/landing.html", newsSite.Alias, common.Now()) cloud.Archive(cloud.ArchiveArgs{ BodyText: bodyText, - Key: fmt.Sprintf("%s/daily-headlines/%s/landing.html", newsSite.Alias, time.Now().Format(time.RFC3339)), + Key: landingPageS3Key, }) + out := cloud.DynamoDBPutItem(ctx, newssite.MediaTableItem{ + S3Key: landingPageS3Key, + DocType: newssite.DOCTYPE_LANDING, + Events: []newssite.MediaTableItemEvent{ + newssite.GetEventLandingPageFetched(newsSite.Alias, landingPageS3Key), + newssite.GetEventLandingMetadataRequested(landingPageS3Key), + }, + IsDocTypeWaitingForMetadata: newssite.DOCTYPE_LANDING, + }) + GoTools.Logger("DEBUG", fmt.Sprintf("```%s```\n", GoTools.AsJson(out))) return LambdaResponse{ OK: true, diff --git a/lambda_golang/cmd/landing_metadata_cronjob/main.go b/lambda_golang/cmd/landing_metadata_cronjob/main.go index c563bab..5f84d87 100644 --- a/lambda_golang/cmd/landing_metadata_cronjob/main.go +++ b/lambda_golang/cmd/landing_metadata_cronjob/main.go @@ -35,17 +35,13 @@ type LambdaResponse struct { Message string `json:"message:"` } -func HandleRequest(ctx context.Context, s3Event events.S3Event) (LambdaResponse, error) { - - // TODO: create a cronjob rule in tf - // TODO: point cronjob to this func - - // TODO: change from s3Event to fetch from dynamoDB (query) - +func HandleRequest(ctx context.Context, cronjobEvent events.CloudWatchEvent) (LambdaResponse, error) { GoTools.Logger("INFO", "Landing page metadata.json generator launched") - for _, record := range s3Event.Records { - landingPageS3Key := record.S3.Object.URLDecodedKey + items := newssite.DynamoDBQueryWaitingMetadata(ctx, newssite.DOCTYPE_LANDING) + + for _, landingItem := range *items { + landingPageS3Key := landingItem.S3Key GoTools.Logger("INFO", fmt.Sprintf("Captured landing page at %s", landingPageS3Key)) landingPageHtmlText := cloud.Pull(landingPageS3Key) @@ -53,21 +49,26 @@ func HandleRequest(ctx context.Context, s3Event events.S3Event) (LambdaResponse, metadataS3DirKeyTokens := landingPageS3KeyTokens[:len(landingPageS3KeyTokens)-1] metadataS3Key := fmt.Sprintf("%s/metadata.json", strings.Join(metadataS3DirKeyTokens, "/")) - result := newssite.GetStoriesFromEconomy(landingPageHtmlText) - metadataJSONString := GoTools.AsJson(result) + landingPageMetadata := newssite.GetStoriesFromEconomy(landingPageHtmlText) + landingPageMetadata.LandingPageS3Key = landingPageS3Key + landingPageMetadata.LandingPageUuid = landingItem.Uuid + landingPageMetadata.LandingPageCreatedAt = landingItem.CreatedAt + metadataJSONString := GoTools.AsJson(landingPageMetadata) cloud.Archive(cloud.ArchiveArgs{ BodyText: metadataJSONString, Key: metadataS3Key, FileTypeExtension: "json", }) + newssite.DynamoDBUpdateItemMarkAsMetadataComplete( + ctx, + landingItem.Uuid, + landingItem.CreatedAt, + newssite.GetEventLandingMetadataDone(metadataS3Key, landingPageS3Key), + ) bucket := GoTools.GetEnvVarHelper("S3_ARCHIVE_BUCKET") GoTools.Logger("INFO", fmt.Sprintf("Saved landing page metadata to s3://%s/%s", bucket, metadataS3Key)) - - // TODO: update `isDocTypeWaitingForMetadata` on landing page entry in db - - // (optional) TODO: add event "metadataGenFinish" on landing page entry in db } return LambdaResponse{ diff --git a/lambda_golang/cmd/landing_s3_trigger/main.go b/lambda_golang/cmd/landing_s3_trigger/main.go deleted file mode 100644 index be4c521..0000000 --- a/lambda_golang/cmd/landing_s3_trigger/main.go +++ /dev/null @@ -1,142 +0,0 @@ -// You must use `main` package for lambda -// https://stackoverflow.com/a/50701572/9814131 -package main - -import ( - "context" - "fmt" - - //"math" - - "github.com/aws/aws-lambda-go/events" // https://github.com/aws/aws-lambda-go/blob/main/events/README_S3.md - "github.com/aws/aws-lambda-go/lambda" - "github.com/rivernews/GoTools" - - //"github.com/aws/aws-sdk-go-v2/aws" - //"github.com/aws/aws-sdk-go-v2/config" - //"github.com/aws/aws-sdk-go-v2/service/sqs" - - "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" - - // local packages - "github.com/rivernews/media-literacy/pkg/cloud" -) - -func main() { - lambda.Start(HandleRequest) -} - -type StepFunctionInput struct { - LandingS3Key string `json:"landingS3Key"` -} - -type LambdaResponse struct { - OK bool `json:"OK:"` - Message string `json:"message:"` -} - -func HandleRequest(ctx context.Context, s3Event events.S3Event) (LambdaResponse, error) { - GoTools.Logger("INFO", "Landing page metadata.json generator launched") - - for _, record := range s3Event.Records { - landingPageS3Key := record.S3.Object.URLDecodedKey - GoTools.Logger("INFO", fmt.Sprintf("Captured landing page at %s", landingPageS3Key)) - - // TODO: push into dynamoDB instead - - cloud.DynamoDBPutItem(ctx, map[string]types.AttributeValue{ - "s3Key": &types.AttributeValueMemberS{Value: "12346"}, - "docType": &types.AttributeValueMemberS{Value: "John Doe"}, - "events": &types.AttributeValueMemberS{Value: "john@doe.io"}, - "isDocTypeWaitingForMetadata": &types.AttributeValueMemberS{Value: "TODO"}, - }) - - // TODO: let landing s3 trigger switch to point to this func - - // landingPageHtmlText := cloud.Pull(landingPageS3Key) - // landingPageS3KeyTokens := strings.Split(landingPageS3Key, "/") - // metadataS3DirKeyTokens := landingPageS3KeyTokens[:len(landingPageS3KeyTokens)-1] - // metadataS3Key := fmt.Sprintf("%s/metadata.json", strings.Join(metadataS3DirKeyTokens, "/")) - - // result := newssite.GetStoriesFromEconomy(landingPageHtmlText) - // metadataJSONString := GoTools.AsJson(result) - - // cloud.Archive(cloud.ArchiveArgs{ - // BodyText: metadataJSONString, - // Key: metadataS3Key, - // FileTypeExtension: "json", - // }) - - // bucket := GoTools.GetEnvVarHelper("S3_ARCHIVE_BUCKET") - // GoTools.Logger("INFO", fmt.Sprintf("Saved landing page metadata to s3://%s/%s", bucket, metadataS3Key)) - } - - return LambdaResponse{ - OK: true, - Message: "Done", - }, nil - - /* - - // e.g. 90 links in total - // chunk size := 30 - // chunk count = 3 - chunkSize := 30 - chunkCount := int(math.Ceil(float64(len(stories) / chunkSize))) - linkChunks := make([][]string, chunkCount) - - for i := 0; i < chunkCount; i++ { - linkChunk := make([]string, chunkSize) - for j := 0; j < chunkSize; j++ { - linkChunk[j] = stories[i*chunkSize+j].URL - } - linkChunks[i] = linkChunk - } - - GoTools.Logger("INFO", fmt.Sprintf("Pulled landing page content:\n ``` %s ``` \n ", landingPageHtmlText[:500])) - - for _, linkChunk := range linkChunks { - // send SQS - // refer to - // https://aws.github.io/aws-sdk-go-v2/docs/code-examples/sqs/sendmessage/ - queueName := GoTools.GetEnvVarHelper("STORIES_QUEUE_NAME") - awsConfig, configErr := config.LoadDefaultConfig(context.TODO()) - if configErr != nil { - GoTools.Logger("ERROR", "AWS shared configuration failed", configErr.Error()) - } - sqsClient := sqs.NewFromConfig(awsConfig) - getQueueResponse, getQueueError := sqsClient.GetQueueUrl(context.TODO(), &sqs.GetQueueUrlInput{QueueName: aws.String(queueName)}) - if getQueueError != nil { - GoTools.Logger("ERROR", fmt.Sprintf("Error getting queue URL: %s", getQueueError.Error())) - } - queueURL := getQueueResponse.QueueUrl - - res, err := sqsClient.SendMessage(context.TODO(), &sqs.SendMessageInput{ - // AWS required attributes - // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html - // Golang API Reference - // https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/sqs#SendMessageInput - - QueueUrl: queueURL, - MessageBody: aws.String(strings.Join(linkChunk, " ")), - - // Only FIFO queue can use `MessageGroupId` - // MessageGroupId: aws.String(fmt.Sprintf("%s-00", queueName)), - - // TODO: add randomized delay - DelaySeconds: 0, - }) - - if err != nil { - GoTools.Logger("ERROR", fmt.Sprintf("Error sending message: %s", err)) - } - GoTools.SimpleLogger("INFO", fmt.Sprintf("Message sent %s", *res.MessageId)) - } - - return LambdaResponse{ - OK: true, - Message: fmt.Sprintf("Sent %d messages OK", len(linkChunks)), - }, nil - - */ -} diff --git a/lambda_golang/cmd/stories/main.go b/lambda_golang/cmd/stories/main.go index 6c8743f..c46081f 100644 --- a/lambda_golang/cmd/stories/main.go +++ b/lambda_golang/cmd/stories/main.go @@ -33,26 +33,34 @@ var sfnClient *sfn.Client // SQS event // refer to https://github.com/aws/aws-lambda-go/blob/v1.26.0/events/README_SQS.md func HandleRequest(ctx context.Context, S3Event events.S3Event) (LambdaResponse, error) { - GoTools.Logger("INFO", "Fetch stories lambda launched ... triggered by metadata.json creation.") + GoTools.Logger("INFO", "Fetch stories lambda launched ... triggered by metadata.json creation, ready to batch fetch storeis in Sfn") for _, record := range S3Event.Records { GoTools.Logger("INFO", fmt.Sprintf("S3 event ``` %s ```\n ", GoTools.AsJson(record))) - - metadataS3KeyTokens := strings.Split(record.S3.Object.URLDecodedKey, "/") + metadataS3Key := record.S3.Object.URLDecodedKey + metadataS3KeyTokens := strings.Split(metadataS3Key, "/") newsSiteAlias := metadataS3KeyTokens[0] landingPageTimeStamp := metadataS3KeyTokens[len(metadataS3KeyTokens)-2] - metadataJSONString := cloud.Pull(record.S3.Object.URLDecodedKey) + metadataJSONString := cloud.Pull(metadataS3Key) var metadata newssite.LandingPageMetadata GoTools.FromJson([]byte(metadataJSONString), &metadata) GoTools.Logger("INFO", fmt.Sprintf("Test first story: %d:%d", len(metadata.Stories), len(metadata.UntitledStories))) // fire step function, input = {stories: [{}, {}, {}...], newsSiteAlias:"", landingPageTimeStamp:""} + var processStories []newssite.Topic + if GoTools.Debug { + processStories = metadata.Stories[:2] + } else { + processStories = metadata.Stories + } sfnInput := GoTools.AsJson(&newssite.StepFunctionInput{ - Stories: metadata.Stories, + Stories: processStories, NewsSiteAlias: newsSiteAlias, + LandingPageUuid: metadata.LandingPageUuid, + LandingPageS3Key: metadata.LandingPageS3Key, LandingPageTimeStamp: landingPageTimeStamp, }) executionName := strings.ReplaceAll(fmt.Sprintf("%s--%s", landingPageTimeStamp, time.Now().Format(time.RFC3339)), ":", "") @@ -67,6 +75,7 @@ func HandleRequest(ctx context.Context, S3Event events.S3Event) (LambdaResponse, StateMachineArn: aws.String(sfnArn), }, ) + newssite.DynamoDBUpdateItemAddEvent(ctx, metadata.LandingPageUuid, metadata.LandingPageCreatedAt, newssite.GetEventLandingStoriesRequested(metadataS3Key, len(processStories))) GoTools.Logger("INFO", fmt.Sprintf("Sfn output ``` %s ```\n", GoTools.AsJson(sfnOutput))) diff --git a/lambda_golang/cmd/stories_finalizer/main.go b/lambda_golang/cmd/stories_finalizer/main.go index 4d03b82..d61de0a 100644 --- a/lambda_golang/cmd/stories_finalizer/main.go +++ b/lambda_golang/cmd/stories_finalizer/main.go @@ -1 +1,44 @@ -// TODO: Sfn add a step after map - log event "FinishStoriesFetchingAll" into DDB landing page object pipelineEvents. \ No newline at end of file +package main + +import ( + "fmt" + + "github.com/aws/aws-lambda-go/lambda" + + "context" + + "github.com/rivernews/GoTools" + "github.com/rivernews/media-literacy/pkg/newssite" +) + +func main() { + lambda.Start(HandleRequest) +} + +type LambdaResponse struct { + OK bool `json:"OK:"` + Message string `json:"message:"` +} + +func HandleRequest(ctx context.Context, stepFunctionInput newssite.StepFunctionInput) (LambdaResponse, error) { + GoTools.Logger("INFO", "Stories finalizer launched") + + result := newssite.DynamoDBQueryByS3Key(ctx, stepFunctionInput.LandingPageS3Key) + + if len(*result) != 1 { + GoTools.Logger("ERROR", fmt.Sprintf( + "Stories finalizer expect exactly one landing page `%s`, but query resulted in `%d` items", + stepFunctionInput.LandingPageS3Key, + len(*result), + )) + } + + landingPageItem := (*result)[0] + + newssite.DynamoDBUpdateItemAddEvent(ctx, landingPageItem.Uuid, landingPageItem.CreatedAt, newssite.GetEventLandingStoriesFetched(stepFunctionInput.LandingPageS3Key)) + + return LambdaResponse{ + OK: true, + Message: "OK", + }, nil +} diff --git a/lambda_golang/cmd/story/main.go b/lambda_golang/cmd/story/main.go index 275c9c3..5b8fe18 100644 --- a/lambda_golang/cmd/story/main.go +++ b/lambda_golang/cmd/story/main.go @@ -12,6 +12,7 @@ import ( "github.com/rivernews/GoTools" "github.com/rivernews/media-literacy/pkg/cloud" + "github.com/rivernews/media-literacy/pkg/common" "github.com/rivernews/media-literacy/pkg/newssite" ) @@ -42,13 +43,28 @@ func HandleRequest(ctx context.Context, stepFunctionMapIterationInput newssite.S }) GoTools.Logger("INFO", fmt.Sprintf("IP=`%s` waited %d - %s", bytes.TrimSpace(responseBody), totalWait, stepFunctionMapIterationInput.Story.Name)) - - storyHtmlBodyText := newssite.Fetch(stepFunctionMapIterationInput.Story.URL) + storyS3Key := fmt.Sprintf("%s/stories/%s-%s/story.html", stepFunctionMapIterationInput.NewsSiteAlias, stepFunctionMapIterationInput.LandingPageTimeStamp, stepFunctionMapIterationInput.Story.Name) + storyHtmlBodyText := common.Fetch(stepFunctionMapIterationInput.Story.URL) cloud.Archive(cloud.ArchiveArgs{ BodyText: storyHtmlBodyText, - Key: fmt.Sprintf("%s/stories/%s-%s/story.html", stepFunctionMapIterationInput.NewsSiteAlias, stepFunctionMapIterationInput.LandingPageTimeStamp, stepFunctionMapIterationInput.Story.Name), + Key: storyS3Key, }) + cloud.DynamoDBPutItem( + ctx, + newssite.MediaTableItem{ + S3Key: storyS3Key, + DocType: newssite.DOCTYPE_STORY, + Events: []newssite.MediaTableItemEvent{ + newssite.GetEventStoryFetched( + stepFunctionMapIterationInput.Story.Name, + stepFunctionMapIterationInput.Story.URL, + ), + }, + IsDocTypeWaitingForMetadata: newssite.DOCTYPE_STORY, + }, + ) + return LambdaResponse{ OK: true, Message: "Story consumer fetch parse ok", diff --git a/lambda_golang/go.mod b/lambda_golang/go.mod index cf92983..1df9965 100644 --- a/lambda_golang/go.mod +++ b/lambda_golang/go.mod @@ -16,12 +16,14 @@ require ( require ( github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.23 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.17 // indirect + github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.13.20 // indirect github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.17 // indirect ) require ( github.com/andybalholm/cascadia v1.2.0 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.4.0 // indirect + github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.10.0 github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.5.0 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.2.2 // indirect github.com/aws/aws-sdk-go-v2/service/dynamodb v1.17.1 diff --git a/lambda_golang/go.sum b/lambda_golang/go.sum index fd128c6..b39d7fb 100644 --- a/lambda_golang/go.sum +++ b/lambda_golang/go.sum @@ -6,7 +6,6 @@ github.com/andybalholm/cascadia v1.2.0/go.mod h1:YCyR8vOZT9aZ1CHEd8ap0gMVm2aFgxB github.com/aws/aws-lambda-go v1.26.0 h1:6ujqBpYF7tdZcBvPIccs98SpeGfrt/UOVEiexfNIdHA= github.com/aws/aws-lambda-go v1.26.0/go.mod h1:jJmlefzPfGnckuHdXX7/80O3BvUUi12XOkbv4w9SGLU= github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= -github.com/aws/aws-sdk-go-v2 v1.16.15 h1:2sInOWGE4HV54R90Pj8QgqBBw3Qf1I0husqbqjPZzys= github.com/aws/aws-sdk-go-v2 v1.16.15/go.mod h1:SwiyXi/1zTUZ6KIAmLK5V5ll8SiURNUYOqTerZPaF9k= github.com/aws/aws-sdk-go-v2 v1.16.16 h1:M1fj4FE2lB4NzRb9Y0xdWsn2P0+2UHVxwKyOa4YJNjk= github.com/aws/aws-sdk-go-v2 v1.16.16/go.mod h1:SwiyXi/1zTUZ6KIAmLK5V5ll8SiURNUYOqTerZPaF9k= @@ -14,15 +13,15 @@ github.com/aws/aws-sdk-go-v2/config v1.7.0 h1:J2cZ7qe+3IpqBEXnHUrFrOjoB9BlsXg7j5 github.com/aws/aws-sdk-go-v2/config v1.7.0/go.mod h1:w9+nMZ7soXCe5nT46Ri354SNhXDQ6v+V5wqDjnZE+GY= github.com/aws/aws-sdk-go-v2/credentials v1.4.0 h1:kmvesfjY861FzlCU9mvAfe01D9aeXcG2ZuC+k9F2YLM= github.com/aws/aws-sdk-go-v2/credentials v1.4.0/go.mod h1:dgGR+Qq7Wjcd4AOAW5Rf5Tnv3+x7ed6kETXyS9WCuAY= +github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.10.0 h1:bKbdstt7+PzIRSIXZ11Yo8Qh8t0AHn6jEYUfsbVcLjE= +github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.10.0/go.mod h1:+CBJZMhsb1pTUcB/NTdS505bDX10xS4xnPMqDZj2Ptw= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.5.0 h1:OxTAgH8Y4BXHD6PGCJ8DHx2kaZPCQfSTqmDsdRZFezE= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.5.0/go.mod h1:CpNzHK9VEFUCknu50kkB8z58AH2B5DvPP7ea1LHve/Y= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.5.0 h1:zzH1xd1/PX7bFO4/BQwVQP+UcBfYieI1sMH9DA68xZY= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.5.0/go.mod h1:fPU0eFGnS47RyKHHs8BNcCKOm5oOA5xm0BlrZWsQT/A= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.22 h1:pE27/u2A7JlwICjOvONQDob8PToShRTkuiUE74ymVWg= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.22/go.mod h1:/vNv5Al0bpiF8YdX2Ov6Xy05VTiXsql94yUqJMYaj0w= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.23 h1:s4g/wnzMf+qepSNgTvaQQHNxyMLKSawNhKCPNy++2xY= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.23/go.mod h1:2DFxAQ9pfIRy0imBCJv+vZ2X6RKxves6fbnEuSry6b4= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.16 h1:L5LKGHHXOl4t7+5QZMTl38GIzSAq07XUTRtEquiHGMA= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.16/go.mod h1:62dsXI0BqTIGomDl8Hpm33dv0OntGaVblri3ZRParVQ= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.17 h1:/K482T5A3623WJgWT8w1yRAFK4RzGzEl7y39yhtn9eA= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.17/go.mod h1:pRwaTYCJemADaqCbUAxltMoHKata7hmB5PjEXeu0kfg= @@ -30,7 +29,8 @@ github.com/aws/aws-sdk-go-v2/internal/ini v1.2.2 h1:d95cddM3yTm4qffj3P6EnP+TzX1S github.com/aws/aws-sdk-go-v2/internal/ini v1.2.2/go.mod h1:BQV0agm+JEhqR+2RT5e1XTFIDcAAV0eW6z2trp+iduw= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.17.1 h1:1QpTkQIAaZpR387it1L+erjB5bStGFCJRvmXsodpPEU= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.17.1/go.mod h1:BZhn/C3z13ULTSstVi2Kymc62bgjFh/JwLO9Tm2OFYI= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.3.0 h1:gceOysEWNNwLd6cki65IMBZ4WAM0MwgBQq2n7kejoT8= +github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.13.20 h1:V9q4A0qnUfDsfivspY1LQRQTOG3Y9FLHvXIaTbcU7XM= +github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.13.20/go.mod h1:7qWU48SMzlrfOlNhHpazW3psFWlOIWrq4SmOr2/ESmk= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.3.0/go.mod h1:v8ygadNyATSm6elwJ/4gzJwcFhri9RqS8skgHKiwXPU= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.9 h1:Lh1AShsuIJTwMkoxVCAYPJgNG5H+eN6SmoUn8nOZ5wE= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.9/go.mod h1:a9j48l6yL5XINLHLcOKInjdvknN+vWqPBxqeIDw7ktw= diff --git a/lambda_golang/pkg/cloud/dynamodb_common.go b/lambda_golang/pkg/cloud/dynamodb_common.go index 5a36fdf..04267fd 100644 --- a/lambda_golang/pkg/cloud/dynamodb_common.go +++ b/lambda_golang/pkg/cloud/dynamodb_common.go @@ -5,9 +5,11 @@ import ( "sync" "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/rivernews/GoTools" + "github.com/rivernews/media-literacy/pkg/common" "github.com/google/uuid" ) @@ -24,23 +26,31 @@ func SharedDynamoDBClient() *dynamodb.Client { return dynamoDBClient } -func DynamoDBPutItem(ctx context.Context, item map[string]types.AttributeValue) *dynamodb.PutItemOutput { - tableID := GoTools.GetEnvVarHelper("MEDIA_TABLE_ID") +func GetTableName() string { + tableID := GoTools.GetEnvVarHelper("DYNAMODB_TABLE_ID") if tableID == "" { - GoTools.Logger("ERROR", "MEDIA_TABLE_ID is required please set this env var") + GoTools.Logger("ERROR", "DYNAMODB_TABLE_ID is required please set this env var") + } + return tableID +} + +func DynamoDBPutItem(ctx context.Context, item any) *dynamodb.PutItemOutput { + dynamoDBItem, err := attributevalue.MarshalMap(item) + if err != nil { + GoTools.Logger("ERROR", err.Error()) } - if _, exist := item["uuid"]; !exist { - item["uuid"] = uuid.New().String() + if _, exist := dynamoDBItem["uuid"]; !exist { + dynamoDBItem["uuid"] = &types.AttributeValueMemberS{Value: uuid.New().String()} } - if _, exist := item["createdAt"]; !exist { - // TODO + if _, exist := dynamoDBItem["createdAt"]; !exist { + dynamoDBItem["createdAt"] = &types.AttributeValueMemberS{Value: common.Now()} } - out, err := dynamoDBClient.PutItem(ctx, &dynamodb.PutItemInput{ - TableName: aws.String(tableID), - Item: item, + out, err := SharedDynamoDBClient().PutItem(ctx, &dynamodb.PutItemInput{ + TableName: aws.String(GetTableName()), + Item: dynamoDBItem, }) if err != nil { diff --git a/lambda_golang/pkg/common/utilities.go b/lambda_golang/pkg/common/utilities.go new file mode 100644 index 0000000..cccef5d --- /dev/null +++ b/lambda_golang/pkg/common/utilities.go @@ -0,0 +1,37 @@ +package common + +import ( + "io" + "net/http" + "time" + + "github.com/rivernews/GoTools" + "golang.org/x/net/html/charset" +) + +func Fetch(url string) string { + resp, err := http.Get(url) + if err != nil { + // handle error + GoTools.Logger("ERROR", err.Error()) + } + defer resp.Body.Close() + + contentType := resp.Header.Get("Content-Type") // Optional, better guessing + GoTools.Logger("DEBUG", "ContentType is ", contentType) + utf8reader, err := charset.NewReader(resp.Body, contentType) + if err != nil { + GoTools.Logger("ERROR", err.Error()) + } + + body, err := io.ReadAll(utf8reader) + if err != nil { + // handle error + GoTools.Logger("ERROR", err.Error()) + } + return string(body) +} + +func Now() string { + return time.Now().Format(time.RFC3339) +} diff --git a/lambda_golang/pkg/newssite/economy.go b/lambda_golang/pkg/newssite/media.go similarity index 76% rename from lambda_golang/pkg/newssite/economy.go rename to lambda_golang/pkg/newssite/media.go index 86f08b5..0e801c1 100644 --- a/lambda_golang/pkg/newssite/economy.go +++ b/lambda_golang/pkg/newssite/media.go @@ -14,23 +14,29 @@ type Topic struct { } type LandingPageMetadata struct { - Stories []Topic `json:"stories"` - UntitledStories []Topic `json:"untitledstories"` + LandingPageS3Key string `json:"landingPageS3Key"` + LandingPageUuid string `json:"landingPageUuid"` + LandingPageCreatedAt string `json:"landingPageCreatedAt"` + Stories []Topic `json:"stories"` + UntitledStories []Topic `json:"untitledstories"` } type StepFunctionInput struct { Stories []Topic `json:"stories"` NewsSiteAlias string `json:"newsSiteAlias"` + LandingPageUuid string `json:"landingPageUuid"` + LandingPageS3Key string `json:"landingPageS3Key"` LandingPageTimeStamp string `json:"landingPageTimeStamp"` } type StepFunctionMapIterationInput struct { Story Topic `json:"story"` NewsSiteAlias string `json:"newsSiteAlias"` + LandingPageUuid string `json:"landingPageUuid"` LandingPageTimeStamp string `json:"landingPageTimeStamp"` } -func GetStoriesFromEconomy(body string) LandingPageMetadata { +func GetStoriesFromEconomy(body string) *LandingPageMetadata { // Load the HTML document doc, err := goquery.NewDocumentFromReader(strings.NewReader(body)) if err != nil { @@ -62,7 +68,7 @@ func GetStoriesFromEconomy(body string) LandingPageMetadata { }) GoTools.Logger("INFO", "Skipped due to empty title URLs:\n", emptyTitleURLs.String()) - return LandingPageMetadata{ + return &LandingPageMetadata{ Stories: topics, UntitledStories: untitledTopics, } diff --git a/lambda_golang/pkg/newssite/media_table.go b/lambda_golang/pkg/newssite/media_table.go new file mode 100644 index 0000000..b407b6f --- /dev/null +++ b/lambda_golang/pkg/newssite/media_table.go @@ -0,0 +1,214 @@ +package newssite + +import ( + "context" + "fmt" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/rivernews/GoTools" + "github.com/rivernews/media-literacy/pkg/cloud" + "github.com/rivernews/media-literacy/pkg/common" +) + +type EventName string + +const ( + // @`landing` ✅ 📩 (put in db) ✅ + //EVENT_LANDING_PAGE_REQUESTED EventName = "LANDING_PAGE_REQUESTED" + // -`landing_s3_trigger` + EVENT_LANDING_PAGE_FETCHED EventName = "LANDING_PAGE_FETCHED" + // @`landing_metadata` -> `landing_metadata_cronjob` (query db; store metadata) ✅ (cronjob trigger) ✅ + EVENT_LANDING_METADATA_REQUESTED EventName = "LANDING_METADATA_REQUESTED" + EVENT_LANDING_METADATA_DONE EventName = "LANDING_METADATA_DONE" + // `stories` (metadata triggers; sfn) ✅ + EVENT_LANDING_STORIES_REQUESTED EventName = "LANDING_STORIES_REQUESTED" + // `story` (sfn map; archive story) ✅ 📩 + //EVENT_STORY_REQUESTED EventName = "STORY_REQUESTED" + // random wait + EVENT_STORY_FETCHED EventName = "STORY_FETCHED" + // +`stories_finalizer` (sfn last step) ✅ + EVENT_LANDING_STORIES_FETCHED EventName = "LANDING_STORIES_FETCHED" +) + +// func GetEventLandingPageRequested(newsSiteAlias string, newsSiteURL string) MediaTableItemEvent { +// return MediaTableItemEvent{ +// EventName: EVENT_LANDING_PAGE_REQUESTED, +// Detail: fmt.Sprintf("Requested landing page for %s at %s", newsSiteAlias, newsSiteURL), +// EventTime: common.Now(), +// } +// } + +func GetEventLandingPageFetched(newsSiteAlias string, landingPageS3Key string) MediaTableItemEvent { + return MediaTableItemEvent{ + EventName: EVENT_LANDING_PAGE_FETCHED, + Detail: fmt.Sprintf("Fetched landing page for %s; stored at %s", newsSiteAlias, landingPageS3Key), + EventTime: common.Now(), + } +} + +func GetEventLandingMetadataRequested(landingPageS3Key string) MediaTableItemEvent { + return MediaTableItemEvent{ + EventName: EVENT_LANDING_METADATA_REQUESTED, + Detail: fmt.Sprintf("Requested metadata for landing page %s", landingPageS3Key), + EventTime: common.Now(), + } +} + +func GetEventLandingMetadataDone(metadataS3Key string, landingPageS3Key string) MediaTableItemEvent { + return MediaTableItemEvent{ + EventName: EVENT_LANDING_METADATA_DONE, + Detail: fmt.Sprintf("Metadata is computed and archived at `%s`; for landing page `%s`", metadataS3Key, landingPageS3Key), + EventTime: common.Now(), + } +} + +func GetEventLandingStoriesRequested(metadataS3Key string, storiesCount int) MediaTableItemEvent { + return MediaTableItemEvent{ + EventName: EVENT_LANDING_STORIES_REQUESTED, + Detail: fmt.Sprintf("Stories(%d) requested for landing page based on metadata %s", storiesCount, metadataS3Key), + EventTime: common.Now(), + } +} + +// if we want below, we need to PutItem w/o s3Key first +// then after fetched, UpdateItem - need to update s3Key as well! +// func GetEventStoryRequested(storyTitle string, storyURL string) MediaTableItemEvent { +// return MediaTableItemEvent{ +// EventName: EVENT_STORY_REQUESTED, +// Detail: fmt.Sprintf("Story %s requested %s", storyTitle, storyURL), +// EventTime: common.Now(), +// } +// } + +func GetEventStoryFetched(storyTitle string, storyURL string) MediaTableItemEvent { + return MediaTableItemEvent{ + EventName: EVENT_STORY_FETCHED, + Detail: fmt.Sprintf("Story %s fetched %s", storyTitle, storyURL), + EventTime: common.Now(), + } +} + +func GetEventLandingStoriesFetched(landingPageS3Key string) MediaTableItemEvent { + return MediaTableItemEvent{ + EventName: EVENT_LANDING_STORIES_FETCHED, + Detail: fmt.Sprintf("All stories fetched for landing page %s", landingPageS3Key), + EventTime: common.Now(), + } +} + +type MediaTableItemEvent struct { + EventName EventName `dynamodbav:"eventName" json:"eventName"` + Detail string `dynamodbav:"detail" json:"detail"` + EventTime string `dynamodbav:"eventTime" json:"eventTime"` +} + +type DocType string + +const ( + DOCTYPE_LANDING DocType = "LANDING" + DOCTYPE_STORY DocType = "STORY" +) + +type TableIndex string + +const ( + METADATA_INDEX TableIndex = "metadataIndex" + S3KEY_INDEX TableIndex = "s3KeyIndex" +) + +type MediaTableItem struct { + Uuid string `dynamodbav:"uuid,omitempty" json:"uuid,omitempty"` + CreatedAt string `dynamodbav:"createdAt,omitempty" json:"createdAt,omitempty"` + S3Key string `dynamodbav:"s3Key" json:"s3Key"` + DocType DocType `dynamodbav:"docType" json:"docType"` + Events []MediaTableItemEvent `dynamodbav:"events" json:"events"` + IsDocTypeWaitingForMetadata DocType `dynamodbav:"isDocTypeWaitingForMetadata,omitempty" json:"isDocTypeWaitingForMetadata,omitempty"` +} + +func DynamoDBQueryByS3Key(ctx context.Context, s3Key string) *[]MediaTableItem { + out, err := cloud.SharedDynamoDBClient().Query(ctx, &dynamodb.QueryInput{ + TableName: aws.String(cloud.GetTableName()), + IndexName: aws.String(string(S3KEY_INDEX)), + KeyConditionExpression: aws.String("s3Key = :s3Key and createdAt < :createdAt"), + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":s3Key": &types.AttributeValueMemberS{Value: s3Key}, + ":createdAt": &types.AttributeValueMemberS{Value: common.Now()}, + }, + Limit: aws.Int32(10), + }) + if err != nil { + GoTools.Logger("ERROR", err.Error()) + } + + var results []MediaTableItem + attributevalue.UnmarshalListOfMaps(out.Items, &results) + + return &results +} + +func DynamoDBQueryWaitingMetadata(ctx context.Context, docType DocType) *[]MediaTableItem { + out, err := cloud.SharedDynamoDBClient().Query(ctx, &dynamodb.QueryInput{ + TableName: aws.String(cloud.GetTableName()), + IndexName: aws.String(string(METADATA_INDEX)), + KeyConditionExpression: aws.String("isDocTypeWaitingForMetadata = :isDocTypeWaitingForMetadata and createdAt < :createdAt"), + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":isDocTypeWaitingForMetadata": &types.AttributeValueMemberS{Value: string(docType)}, + ":createdAt": &types.AttributeValueMemberS{Value: common.Now()}, + }, + Limit: aws.Int32(10), + }) + if err != nil { + GoTools.Logger("ERROR", err.Error()) + } + + var results []MediaTableItem + attributevalue.UnmarshalListOfMaps(out.Items, &results) + + return &results +} + +func DynamoDBUpdateItem(ctx context.Context, uuid string, createdAt string, event MediaTableItemEvent, isMarkMetadataComplete bool) *dynamodb.UpdateItemOutput { + dynamoDBItemEvent, err := attributevalue.MarshalMap(event) + if err != nil { + GoTools.Logger("ERROR", err.Error()) + } + updateItemInput := dynamodb.UpdateItemInput{ + TableName: aws.String(cloud.GetTableName()), + Key: map[string]types.AttributeValue{ + "uuid": &types.AttributeValueMemberS{Value: uuid}, + "createdAt": &types.AttributeValueMemberS{Value: createdAt}, + }, + // manual + // https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Expressions.UpdateExpressions.html#Expressions.UpdateExpressions.ADD + UpdateExpression: aws.String(`SET events = list_append(events, :e)`), + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":e": &types.AttributeValueMemberL{ + Value: []types.AttributeValue{ + &types.AttributeValueMemberM{Value: dynamoDBItemEvent}, + }, + }, + }, + } + if isMarkMetadataComplete { + *(updateItemInput.UpdateExpression) = *(updateItemInput.UpdateExpression) + ` REMOVE isDocTypeWaitingForMetadata` + } + + out, err := cloud.SharedDynamoDBClient().UpdateItem(ctx, &updateItemInput) + + if err != nil { + GoTools.Logger("ERROR", err.Error()) + } + + return out +} + +func DynamoDBUpdateItemAddEvent(ctx context.Context, uuid string, createdAt string, event MediaTableItemEvent) *dynamodb.UpdateItemOutput { + return DynamoDBUpdateItem(ctx, uuid, createdAt, event, false) +} + +func DynamoDBUpdateItemMarkAsMetadataComplete(ctx context.Context, uuid string, createdAt string, event MediaTableItemEvent) *dynamodb.UpdateItemOutput { + return DynamoDBUpdateItem(ctx, uuid, createdAt, event, true) +} diff --git a/lambda_golang/pkg/newssite/utilities.go b/lambda_golang/pkg/newssite/utilities.go index 27c2c9b..bb88b2f 100644 --- a/lambda_golang/pkg/newssite/utilities.go +++ b/lambda_golang/pkg/newssite/utilities.go @@ -1,12 +1,8 @@ package newssite import ( - "io" - "net/http" "strings" - "golang.org/x/net/html/charset" - "github.com/rivernews/GoTools" ) @@ -29,26 +25,3 @@ func GetNewsSite(envVar string) NewsSite { LandingURL: tokens[3], } } - -func Fetch(url string) string { - resp, err := http.Get(url) - if err != nil { - // handle error - GoTools.Logger("ERROR", err.Error()) - } - defer resp.Body.Close() - - contentType := resp.Header.Get("Content-Type") // Optional, better guessing - GoTools.Logger("DEBUG", "ContentType is ", contentType) - utf8reader, err := charset.NewReader(resp.Body, contentType) - if err != nil { - GoTools.Logger("ERROR", err.Error()) - } - - body, err := io.ReadAll(utf8reader) - if err != nil { - // handle error - GoTools.Logger("ERROR", err.Error()) - } - return string(body) -}