Skip to content

Commit

Permalink
Add support for workload identity federation and consolidate GCP/BigQ…
Browse files Browse the repository at this point in the history
…uery APIs into seperate classes
  • Loading branch information
asatwal committed Mar 18, 2024
1 parent e55b9ff commit 6b1eba7
Show file tree
Hide file tree
Showing 14 changed files with 601 additions and 205 deletions.
28 changes: 26 additions & 2 deletions config/locales/en.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ en:
entity_table_checks_enabled:
description: |
Whether to run entity table checksum job.
default: false
default: false
rack_page_cached:
description: |
A proc which will be called with the rack env, and which should
Expand All @@ -72,4 +72,28 @@ en:
Schedule a maintenance window during which no events are streamed to BigQuery
in the format of '22-01-2024 19:30..22-01-2024 20:30' (UTC).
default: ENV['BIGQUERY_MAINTENANCE_WINDOW']

azure_federated_auth:
description: |
Whether to use azure workload identity federation for authentication
instead of the BigQuery API JSON Key
default: false
azure_client_id:
description: |
Client Id of the app in azure
default: ENV['AZURE_CLIENT_ID']
azure_token_path:
description: |
Path of token file for used for getting token from azure ad
default: ENV['AZURE_FEDERATED_TOKEN_FILE']
azure_scope:
description: |
Azure audience scope
default: api://AzureADTokenExchange/.default
gcp_scope:
description: |
Google cloud scope
default: https://www.googleapis.com/auth/cloud-platform
google_cloud_credentials:
description: |
Google generated cloud credentials file
default: ENV['GOOGLE_CLOUD_CREDENTIALS']
46 changes: 23 additions & 23 deletions lib/dfe/analytics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require 'request_store_rails'
require 'i18n'
require 'google/cloud/bigquery'
require 'dfe/analytics/event_schema'
require 'dfe/analytics/fields'
require 'dfe/analytics/entities'
Expand All @@ -23,34 +24,14 @@
require 'dfe/analytics/middleware/request_identity'
require 'dfe/analytics/middleware/send_cached_page_request_event'
require 'dfe/analytics/railtie'
require 'dfe/analytics/big_query_api'
require 'dfe/analytics/big_query_legacy_api'
require 'dfe/analytics/azure_federated_auth'

module DfE
module Analytics
class ConfigurationError < StandardError; end

def self.events_client
@events_client ||= begin
require 'google/cloud/bigquery'

missing_config = %i[
bigquery_project_id
bigquery_table_name
bigquery_dataset
bigquery_api_json_key
].select { |val| config.send(val).nil? }

raise(ConfigurationError, "DfE::Analytics: missing required config values: #{missing_config.join(', ')}") if missing_config.any?

Google::Cloud::Bigquery.new(
project: config.bigquery_project_id,
credentials: JSON.parse(config.bigquery_api_json_key),
retries: config.bigquery_retries,
timeout: config.bigquery_timeout
).dataset(config.bigquery_dataset, skip_lookup: true)
.table(config.bigquery_table_name, skip_lookup: true)
end
end

def self.config
configurables = %i[
log_only
Expand All @@ -69,6 +50,12 @@ def self.config
entity_table_checks_enabled
rack_page_cached
bigquery_maintenance_window
azure_federated_auth
azure_client_id
azure_token_path
azure_scope
gcp_scope
google_cloud_credentials
]

@config ||= Struct.new(*configurables).new
Expand All @@ -93,6 +80,19 @@ def self.configure
config.entity_table_checks_enabled ||= false
config.rack_page_cached ||= proc { |_rack_env| false }
config.bigquery_maintenance_window ||= ENV.fetch('BIGQUERY_MAINTENANCE_WINDOW', nil)
config.azure_federated_auth ||= false

return unless config.azure_federated_auth

config.azure_client_id ||= ENV.fetch('AZURE_CLIENT_ID', nil)
config.azure_token_path ||= ENV.fetch('AZURE_FEDERATED_TOKEN_FILE', nil)
config.google_cloud_credentials ||= ENV.fetch('GOOGLE_CLOUD_CREDENTIALS', nil)
config.azure_scope ||= DfE::Analytics::AzureFederatedAuth::DEFAULT_AZURE_SCOPE
config.gcp_scope ||= DfE::Analytics::AzureFederatedAuth::DEFAULT_GCP_SCOPE

return unless config.google_cloud_credentials.present?

config.google_cloud_credentials = JSON.parse(config.google_cloud_credentials, object_class: Struct)
end

def self.initialize!
Expand Down
93 changes: 93 additions & 0 deletions lib/dfe/analytics/azure_federated_auth.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# frozen_string_literal: true

require 'googleauth'

module DfE
module Analytics
# Azure client for workload identity federation with GCP using OAuth
class AzureFederatedAuth
DEFAULT_AZURE_SCOPE = 'api://AzureADTokenExchange/.default'
DEFAULT_GCP_SCOPE = 'https://www.googleapis.com/auth/cloud-platform'

def self.gcp_client_credentials
return @gcp_client_credentials if @gcp_client_credentials && !@gcp_client_credentials.expired?

azure_token = azure_access_token

azure_google_exchange_token = azure_google_exchange_access_token(azure_token)

google_token, expiry_time = google_access_token(azure_google_exchange_token)

@gcp_client_credentials = Google::Auth::UserRefreshCredentials.new(access_token: google_token, expires_at: expiry_time)
end

def self.azure_access_token
aad_token_request_body = {
grant_type: 'client_credentials',
client_id: DfE::Analytics.config.azure_client_id,
scope: DfE::Analytics.config.azure_scope,
client_assertion_type: 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
client_assertion: File.read(DfE::Analytics.config.azure_token_path)
}

azure_token_response =
HTTParty.get(DfE::Analytics.config.google_cloud_credentials.credential_source.url, body: aad_token_request_body)

unless azure_token_response.success?
error_message = "Error calling azure token API: status: #{azure_token_response.code} body: #{azure_token_response.body}"

Rails.logger.error error_message

raise AzureFederatedAuthError, error_message
end

azure_token_response.parsed_response['access_token']
end

def self.azure_google_exchange_access_token(azure_token)
request_body = {
grant_type: 'urn:ietf:p1Garams:oauth:grant-type:token-exchange',
audience: DfE::Analytics.config.google_cloud_credentials.audience,
scope: DfE::Analytics.config.gcp_scope,
requested_token_type: 'urn:ietf:params:oauth:token-type:access_token',
subject_token: azure_token,
subject_token_type: DfE::Analytics.config.google_cloud_credentials.subject_token_type
}

exchange_token_response = http_client.post(DfE::Analytics.config.google_cloud_credentials.token_url, body: request_body)

unless exchange_token_response.success?
error_message = "Error calling google exchange token API: status: #{exchange_token_response.code} body: #{exchange_token_response.body}"

Rails.logger.error error_message

raise AzureFederatedAuthError, error_message
end

exchange_token_response.parsed_response['access_token']
end

def self.google_access_token(azure_google_exchange_token)
google_token_response = http_client.post(
DfE::Analytics.config.google_cloud_credentials.service_account_impersonation_url,
headers: { 'Authorization' => "Bearer #{azure_google_exchange_token}" },
body: { scope: DfE::Analytics.config.gcp_scope }
)

unless google_token_response.success?
error_message = "Error calling google token API: status: #{google_token_response.code} body: #{google_token_response.body}"

Rails.logger.error error_message

raise AzureFederatedAuthError, error_message
end

parsed_response = google_token_response.parsed_response

[parsed_response['accessToken'], parsed_response['expiryTime']]
end
end

class AzureFederatedAuthError < StandardError; end
end
end
71 changes: 71 additions & 0 deletions lib/dfe/analytics/big_query_api.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# frozen_string_literal: true

module DfE
module Analytics
# For use with for workload identity federeation
class BigQueryApi
def self.events_client
@events_client ||= begin
missing_config = %i[
bigquery_project_id
bigquery_table_name
bigquery_dataset
azure_client_id
azure_token_path
azure_scope
gcp_scope
google_cloud_credentials
].select { |val| DfE::Analytics.config.send(val).nil? }

raise(ConfigurationError, "DfE::Analytics: missing required config values: #{missing_config.join(', ')}") if missing_config.any?

Google::Apis::BigqueryV2::BigqueryService.new
end

@events_client.authorisation = DfE::Analytics::AzureFederatedAuth.gcp_client_credentials
@events_client
end

def self.insert(events)
rows = events.map { |event| { json: event } }
data_request = Google::Apis::BigqueryV2::InsertAllTableDataRequest.new(rows: rows, skip_invalid_rows: true)
options = Google::Apis::RequestOptions.new(retries: DfE::Analytics.config.bigquery_retries)

response =
events_client.insert_all_table_data(
DfE::Analytics.config.bigquery_project_id,
DfE::Analytics.config.bigquery_dataset,
DfE::Analytics.config.bigquery_table_name,
data_request,
options: options
)

return unless response.insert_errors.present?

event_count = events.length
error_message = error_message_for(response)

Rails.logger.error(error_message)

events.each.with_index(1) do |event, index|
Rails.logger.info("DfE::Analytics possible error processing event (#{index}/#{event_count}): #{event.inspect}")
end

raise SendEventsError, error_message
end

def self.error_message_for(response)
message =
response
.insert_errors
.map { |insert_error| "index: #{insert_error.index} error: #{insert_error.errors.map(&:message).join(' ')} insert_error: #{insert_error}" }
.compact.join("\n")

"DfE::Analytics BigQuery API insert error for #{response.insert_errors.length} event(s):\n#{message}"
end

class ConfigurationError < StandardError; end
class SendEventsError < StandardError; end
end
end
end
60 changes: 60 additions & 0 deletions lib/dfe/analytics/big_query_legacy_api.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# frozen_string_literal: true

module DfE
module Analytics
# For use with legacy authentication with fixed api key
class BigQueryLegacyApi
def self.events_client
@events_client ||= begin
# Check for missing config items - otherwise may get obscure api errors
missing_config = %i[
bigquery_project_id
bigquery_table_name
bigquery_dataset
bigquery_api_json_key
].select { |val| DfE::Analytics.config.send(val).blank? }

raise(ConfigurationError, "DfE::Analytics: missing required config values: #{missing_config.join(', ')}") if missing_config.any?

Google::Cloud::Bigquery.new(
project: DfE::Analytics.config.bigquery_project_id,
credentials: JSON.parse(DfE::Analytics.config.bigquery_api_json_key),
retries: DfE::Analytics.config.bigquery_retries,
timeout: DfE::Analytics.config.bigquery_timeout
).dataset(DfE::Analytics.config.bigquery_dataset, skip_lookup: true)
.table(DfE::Analytics.config.bigquery_table_name, skip_lookup: true)
end
end

def self.insert(events)
response = events_client.insert(events, ignore_unknown: true)

return if response.success?

event_count = events.length
error_message = error_message_for(response)

Rails.logger.error(error_message)

events.each.with_index(1) do |event, index|
Rails.logger.info("DfE::Analytics possible error processing event (#{index}/#{event_count}): #{event.inspect}")
end

raise SendEventsError, error_message
end

def self.error_message_for(response)
message =
response
.error_rows
.map { |error_row| "index: #{response.index_for(error_row)} error: #{response.errors_for(error_row)} error_row: #{error_row}" }
.compact.join("\n")

"DfE::Analytics BigQuery API insert error for #{response.error_rows.length} event(s): response error count: #{response.error_count}\n#{message}"
end

class ConfigurationError < StandardError; end
class SendEventsError < StandardError; end
end
end
end
28 changes: 1 addition & 27 deletions lib/dfe/analytics/send_events.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,35 +31,9 @@ def perform(events)
.each { |event| Rails.logger.info("DfE::Analytics processing: #{event.inspect}") }
end

response = DfE::Analytics.events_client.insert(events, ignore_unknown: true)

unless response.success?
event_count = events.length
error_message = error_message_for(response)

Rails.logger.error(error_message)

events.each.with_index(1) do |event, index|
Rails.logger.info("DfE::Analytics possible error processing event (#{index}/#{event_count}): #{event.inspect}")
end

raise SendEventsError, error_message
end
DfE::Analytics.config.azure_federated_auth ? DfE::Analytics::BigQueryApi.insert(events) : DfE::Analytics::BigQueryLegacyApi.insert(events)
end
end

def error_message_for(resp)
message =
resp
.error_rows
.map { |row| "index: #{resp.index_for(row)} error: #{resp.errors_for(row)} error_row: #{row}" }
.compact.join("\n")

"DfE::Analytics BigQuery API insert error for #{resp.error_rows.length} event(s): response error count: #{resp.error_count}\n#{message}"
end
end

class SendEventsError < StandardError
end
end
end
Loading

0 comments on commit 6b1eba7

Please sign in to comment.