Skip to content

Commit

Permalink
Add support for workload identity federation and consolidate GCP/BigQ…
Browse files Browse the repository at this point in the history
…uery APIs into seperate classes (#120)

* Add support for workload identity federation and consolidate GCP/BigQuery APIs into seperate classes

* Add httparty GEM for calling APIs

* Add spec for DfE::Analytics::AzureFederatedAuth class

* Update desc for azure_federated_auth to clarify new BQ APIs also being used

* Change: expiry time -> expire time to match field returned by GCP API

* Convert date time string to class before data time arithmetic

* Change authorisation -> authorization when calling GCP APIs
  • Loading branch information
asatwal authored Apr 30, 2024
1 parent 12c184d commit 656807c
Show file tree
Hide file tree
Showing 17 changed files with 994 additions and 205 deletions.
5 changes: 5 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ PATH
specs:
dfe-analytics (1.12.5)
google-cloud-bigquery (~> 1.38)
httparty (~> 0.21)
request_store_rails (~> 2)

GEM
Expand Down Expand Up @@ -171,6 +172,9 @@ GEM
os (>= 0.9, < 2.0)
signet (>= 0.16, < 2.a)
hashdiff (1.0.1)
httparty (0.21.0)
mini_mime (>= 1.0.0)
multi_xml (>= 0.5.2)
httpclient (2.8.3)
i18n (1.14.1)
concurrent-ruby (~> 1.0)
Expand Down Expand Up @@ -203,6 +207,7 @@ GEM
mini_portile2 (2.8.2)
minitest (5.18.1)
multi_json (1.15.0)
multi_xml (0.6.0)
net-imap (0.3.6)
date
net-protocol
Expand Down
29 changes: 27 additions & 2 deletions config/locales/en.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ en:
entity_table_checks_enabled:
description: |
Whether to run entity table checksum job.
default: false
default: false
rack_page_cached:
description: |
A proc which will be called with the rack env, and which should
Expand All @@ -72,4 +72,29 @@ en:
Schedule a maintenance window during which no events are streamed to BigQuery
in the format of '22-01-2024 19:30..22-01-2024 20:30' (UTC).
default: ENV['BIGQUERY_MAINTENANCE_WINDOW']

azure_federated_auth:
description: |
Whether to use azure workload identity federation for authentication
instead of the BigQuery API JSON Key. Note that this also will also
use a new version of the BigQuery streaming APIs.
default: false
azure_client_id:
description: |
Client Id of the app in azure
default: ENV['AZURE_CLIENT_ID']
azure_token_path:
description: |
Path of token file for used for getting token from azure ad
default: ENV['AZURE_FEDERATED_TOKEN_FILE']
azure_scope:
description: |
Azure audience scope
default: api://AzureADTokenExchange/.default
gcp_scope:
description: |
Google cloud scope
default: https://www.googleapis.com/auth/cloud-platform
google_cloud_credentials:
description: |
Google generated cloud credentials file
default: ENV['GOOGLE_CLOUD_CREDENTIALS']
1 change: 1 addition & 0 deletions dfe-analytics.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Gem::Specification.new do |spec|
end

spec.add_dependency 'google-cloud-bigquery', '~> 1.38'
spec.add_dependency 'httparty', '~> 0.21'
spec.add_dependency 'request_store_rails', '~> 2'

spec.add_development_dependency 'appraisal'
Expand Down
43 changes: 20 additions & 23 deletions lib/dfe/analytics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

require 'request_store_rails'
require 'i18n'
require 'httparty'
require 'google/cloud/bigquery'
require 'dfe/analytics/event_schema'
require 'dfe/analytics/fields'
require 'dfe/analytics/entities'
Expand All @@ -23,34 +25,14 @@
require 'dfe/analytics/middleware/request_identity'
require 'dfe/analytics/middleware/send_cached_page_request_event'
require 'dfe/analytics/railtie'
require 'dfe/analytics/big_query_api'
require 'dfe/analytics/big_query_legacy_api'
require 'dfe/analytics/azure_federated_auth'

module DfE
module Analytics
class ConfigurationError < StandardError; end

def self.events_client
@events_client ||= begin
require 'google/cloud/bigquery'

missing_config = %i[
bigquery_project_id
bigquery_table_name
bigquery_dataset
bigquery_api_json_key
].select { |val| config.send(val).nil? }

raise(ConfigurationError, "DfE::Analytics: missing required config values: #{missing_config.join(', ')}") if missing_config.any?

Google::Cloud::Bigquery.new(
project: config.bigquery_project_id,
credentials: JSON.parse(config.bigquery_api_json_key),
retries: config.bigquery_retries,
timeout: config.bigquery_timeout
).dataset(config.bigquery_dataset, skip_lookup: true)
.table(config.bigquery_table_name, skip_lookup: true)
end
end

def self.config
configurables = %i[
log_only
Expand All @@ -69,6 +51,12 @@ def self.config
entity_table_checks_enabled
rack_page_cached
bigquery_maintenance_window
azure_federated_auth
azure_client_id
azure_token_path
azure_scope
gcp_scope
google_cloud_credentials
]

@config ||= Struct.new(*configurables).new
Expand All @@ -93,6 +81,15 @@ def self.configure
config.entity_table_checks_enabled ||= false
config.rack_page_cached ||= proc { |_rack_env| false }
config.bigquery_maintenance_window ||= ENV.fetch('BIGQUERY_MAINTENANCE_WINDOW', nil)
config.azure_federated_auth ||= false

return unless config.azure_federated_auth

config.azure_client_id ||= ENV.fetch('AZURE_CLIENT_ID', nil)
config.azure_token_path ||= ENV.fetch('AZURE_FEDERATED_TOKEN_FILE', nil)
config.google_cloud_credentials ||= JSON.parse(ENV.fetch('GOOGLE_CLOUD_CREDENTIALS', '{}')).deep_symbolize_keys
config.azure_scope ||= DfE::Analytics::AzureFederatedAuth::DEFAULT_AZURE_SCOPE
config.gcp_scope ||= DfE::Analytics::AzureFederatedAuth::DEFAULT_GCP_SCOPE
end

def self.initialize!
Expand Down
96 changes: 96 additions & 0 deletions lib/dfe/analytics/azure_federated_auth.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# frozen_string_literal: true

require 'googleauth'

module DfE
module Analytics
# Azure client for workload identity federation with GCP using OAuth
class AzureFederatedAuth
DEFAULT_AZURE_SCOPE = 'api://AzureADTokenExchange/.default'
DEFAULT_GCP_SCOPE = 'https://www.googleapis.com/auth/cloud-platform'
ACCESS_TOKEN_EXPIRE_TIME_LEEWAY = 10.seconds

def self.gcp_client_credentials
return @gcp_client_credentials if @gcp_client_credentials && !@gcp_client_credentials.expired?

azure_token = azure_access_token

azure_google_exchange_token = azure_google_exchange_access_token(azure_token)

google_token, expire_time = google_access_token(azure_google_exchange_token)

expire_time_with_leeway = expire_time.to_datetime - ACCESS_TOKEN_EXPIRE_TIME_LEEWAY

@gcp_client_credentials = Google::Auth::UserRefreshCredentials.new(access_token: google_token, expires_at: expire_time_with_leeway)
end

def self.azure_access_token
aad_token_request_body = {
grant_type: 'client_credentials',
client_id: DfE::Analytics.config.azure_client_id,
scope: DfE::Analytics.config.azure_scope,
client_assertion_type: 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
client_assertion: File.read(DfE::Analytics.config.azure_token_path)
}

azure_token_response =
HTTParty.get(DfE::Analytics.config.google_cloud_credentials[:credential_source][:url], body: aad_token_request_body)

unless azure_token_response.success?
error_message = "Error calling azure token API: status: #{azure_token_response.code} body: #{azure_token_response.body}"

Rails.logger.error error_message

raise Error, error_message
end

azure_token_response.parsed_response['access_token']
end

def self.azure_google_exchange_access_token(azure_token)
request_body = {
grant_type: 'urn:ietf:params:oauth:grant-type:token-exchange',
audience: DfE::Analytics.config.google_cloud_credentials[:audience],
scope: DfE::Analytics.config.gcp_scope,
requested_token_type: 'urn:ietf:params:oauth:token-type:access_token',
subject_token: azure_token,
subject_token_type: DfE::Analytics.config.google_cloud_credentials[:subject_token_type]
}

exchange_token_response = HTTParty.post(DfE::Analytics.config.google_cloud_credentials[:token_url], body: request_body)

unless exchange_token_response.success?
error_message = "Error calling google exchange token API: status: #{exchange_token_response.code} body: #{exchange_token_response.body}"

Rails.logger.error error_message

raise Error, error_message
end

exchange_token_response.parsed_response['access_token']
end

def self.google_access_token(azure_google_exchange_token)
google_token_response = HTTParty.post(
DfE::Analytics.config.google_cloud_credentials[:service_account_impersonation_url],
headers: { 'Authorization' => "Bearer #{azure_google_exchange_token}" },
body: { scope: DfE::Analytics.config.gcp_scope }
)

unless google_token_response.success?
error_message = "Error calling google token API: status: #{google_token_response.code} body: #{google_token_response.body}"

Rails.logger.error error_message

raise Error, error_message
end

parsed_response = google_token_response.parsed_response

[parsed_response['accessToken'], parsed_response['expireTime']]
end

class Error < StandardError; end
end
end
end
71 changes: 71 additions & 0 deletions lib/dfe/analytics/big_query_api.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# frozen_string_literal: true

module DfE
module Analytics
# For use with for workload identity federeation
class BigQueryApi
def self.events_client
@events_client ||= begin
missing_config = %i[
bigquery_project_id
bigquery_table_name
bigquery_dataset
azure_client_id
azure_token_path
azure_scope
gcp_scope
google_cloud_credentials
].select { |val| DfE::Analytics.config.send(val).blank? }

raise(ConfigurationError, "DfE::Analytics: missing required config values: #{missing_config.join(', ')}") if missing_config.any?

Google::Apis::BigqueryV2::BigqueryService.new
end

@events_client.authorization = DfE::Analytics::AzureFederatedAuth.gcp_client_credentials
@events_client
end

def self.insert(events)
rows = events.map { |event| { json: event } }
data_request = Google::Apis::BigqueryV2::InsertAllTableDataRequest.new(rows: rows, skip_invalid_rows: true)
options = Google::Apis::RequestOptions.new(retries: DfE::Analytics.config.bigquery_retries)

response =
events_client.insert_all_table_data(
DfE::Analytics.config.bigquery_project_id,
DfE::Analytics.config.bigquery_dataset,
DfE::Analytics.config.bigquery_table_name,
data_request,
options: options
)

return unless response.insert_errors.present?

event_count = events.length
error_message = error_message_for(response)

Rails.logger.error(error_message)

events.each.with_index(1) do |event, index|
Rails.logger.info("DfE::Analytics possible error processing event (#{index}/#{event_count}): #{event.inspect}")
end

raise SendEventsError, error_message
end

def self.error_message_for(response)
message =
response
.insert_errors
.map { |insert_error| "index: #{insert_error.index} error: #{insert_error.errors.map(&:message).join(' ')} insert_error: #{insert_error}" }
.compact.join("\n")

"DfE::Analytics BigQuery API insert error for #{response.insert_errors.length} event(s):\n#{message}"
end

class ConfigurationError < StandardError; end
class SendEventsError < StandardError; end
end
end
end
60 changes: 60 additions & 0 deletions lib/dfe/analytics/big_query_legacy_api.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# frozen_string_literal: true

module DfE
module Analytics
# For use with legacy authentication with fixed api key
class BigQueryLegacyApi
def self.events_client
@events_client ||= begin
# Check for missing config items - otherwise may get obscure api errors
missing_config = %i[
bigquery_project_id
bigquery_table_name
bigquery_dataset
bigquery_api_json_key
].select { |val| DfE::Analytics.config.send(val).blank? }

raise(ConfigurationError, "DfE::Analytics: missing required config values: #{missing_config.join(', ')}") if missing_config.any?

Google::Cloud::Bigquery.new(
project: DfE::Analytics.config.bigquery_project_id,
credentials: JSON.parse(DfE::Analytics.config.bigquery_api_json_key),
retries: DfE::Analytics.config.bigquery_retries,
timeout: DfE::Analytics.config.bigquery_timeout
).dataset(DfE::Analytics.config.bigquery_dataset, skip_lookup: true)
.table(DfE::Analytics.config.bigquery_table_name, skip_lookup: true)
end
end

def self.insert(events)
response = events_client.insert(events, ignore_unknown: true)

return if response.success?

event_count = events.length
error_message = error_message_for(response)

Rails.logger.error(error_message)

events.each.with_index(1) do |event, index|
Rails.logger.info("DfE::Analytics possible error processing event (#{index}/#{event_count}): #{event.inspect}")
end

raise SendEventsError, error_message
end

def self.error_message_for(response)
message =
response
.error_rows
.map { |error_row| "index: #{response.index_for(error_row)} error: #{response.errors_for(error_row)} error_row: #{error_row}" }
.compact.join("\n")

"DfE::Analytics BigQuery API insert error for #{response.error_rows.length} event(s): response error count: #{response.error_count}\n#{message}"
end

class ConfigurationError < StandardError; end
class SendEventsError < StandardError; end
end
end
end
Loading

0 comments on commit 656807c

Please sign in to comment.