Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for workload identity federation and consolidate GCP/BigQuery APIs into seperate classes #120

Merged
merged 7 commits into from
Apr 30, 2024
5 changes: 5 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ PATH
specs:
dfe-analytics (1.12.5)
google-cloud-bigquery (~> 1.38)
httparty (~> 0.21)
request_store_rails (~> 2)

GEM
Expand Down Expand Up @@ -171,6 +172,9 @@ GEM
os (>= 0.9, < 2.0)
signet (>= 0.16, < 2.a)
hashdiff (1.0.1)
httparty (0.21.0)
mini_mime (>= 1.0.0)
multi_xml (>= 0.5.2)
httpclient (2.8.3)
i18n (1.14.1)
concurrent-ruby (~> 1.0)
Expand Down Expand Up @@ -203,6 +207,7 @@ GEM
mini_portile2 (2.8.2)
minitest (5.18.1)
multi_json (1.15.0)
multi_xml (0.6.0)
net-imap (0.3.6)
date
net-protocol
Expand Down
29 changes: 27 additions & 2 deletions config/locales/en.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ en:
entity_table_checks_enabled:
description: |
Whether to run entity table checksum job.
default: false
default: false
rack_page_cached:
description: |
A proc which will be called with the rack env, and which should
Expand All @@ -72,4 +72,29 @@ en:
Schedule a maintenance window during which no events are streamed to BigQuery
in the format of '22-01-2024 19:30..22-01-2024 20:30' (UTC).
default: ENV['BIGQUERY_MAINTENANCE_WINDOW']

azure_federated_auth:
description: |
Whether to use azure workload identity federation for authentication
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asatwal does this also switch between APIs? If so could we add that to the description?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this does switch APIs as the old APIs don't work with the new WIF Auth. Good point I'll add to this description.

instead of the BigQuery API JSON Key. Note that this also will also
use a new version of the BigQuery streaming APIs.
default: false
azure_client_id:
description: |
Client Id of the app in azure
default: ENV['AZURE_CLIENT_ID']
azure_token_path:
description: |
Path of token file for used for getting token from azure ad
default: ENV['AZURE_FEDERATED_TOKEN_FILE']
azure_scope:
description: |
Azure audience scope
default: api://AzureADTokenExchange/.default
gcp_scope:
description: |
Google cloud scope
default: https://www.googleapis.com/auth/cloud-platform
google_cloud_credentials:
description: |
Google generated cloud credentials file
default: ENV['GOOGLE_CLOUD_CREDENTIALS']
1 change: 1 addition & 0 deletions dfe-analytics.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Gem::Specification.new do |spec|
end

spec.add_dependency 'google-cloud-bigquery', '~> 1.38'
spec.add_dependency 'httparty', '~> 0.21'
spec.add_dependency 'request_store_rails', '~> 2'

spec.add_development_dependency 'appraisal'
Expand Down
43 changes: 20 additions & 23 deletions lib/dfe/analytics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

require 'request_store_rails'
require 'i18n'
require 'httparty'
require 'google/cloud/bigquery'
require 'dfe/analytics/event_schema'
require 'dfe/analytics/fields'
require 'dfe/analytics/entities'
Expand All @@ -23,34 +25,14 @@
require 'dfe/analytics/middleware/request_identity'
require 'dfe/analytics/middleware/send_cached_page_request_event'
require 'dfe/analytics/railtie'
require 'dfe/analytics/big_query_api'
require 'dfe/analytics/big_query_legacy_api'
require 'dfe/analytics/azure_federated_auth'

module DfE
module Analytics
class ConfigurationError < StandardError; end

def self.events_client
@events_client ||= begin
require 'google/cloud/bigquery'

missing_config = %i[
bigquery_project_id
bigquery_table_name
bigquery_dataset
bigquery_api_json_key
].select { |val| config.send(val).nil? }

raise(ConfigurationError, "DfE::Analytics: missing required config values: #{missing_config.join(', ')}") if missing_config.any?

Google::Cloud::Bigquery.new(
project: config.bigquery_project_id,
credentials: JSON.parse(config.bigquery_api_json_key),
retries: config.bigquery_retries,
timeout: config.bigquery_timeout
).dataset(config.bigquery_dataset, skip_lookup: true)
.table(config.bigquery_table_name, skip_lookup: true)
end
end

def self.config
configurables = %i[
log_only
Expand All @@ -69,6 +51,12 @@ def self.config
entity_table_checks_enabled
rack_page_cached
bigquery_maintenance_window
azure_federated_auth
azure_client_id
azure_token_path
azure_scope
gcp_scope
google_cloud_credentials
]

@config ||= Struct.new(*configurables).new
Expand All @@ -93,6 +81,15 @@ def self.configure
config.entity_table_checks_enabled ||= false
config.rack_page_cached ||= proc { |_rack_env| false }
config.bigquery_maintenance_window ||= ENV.fetch('BIGQUERY_MAINTENANCE_WINDOW', nil)
config.azure_federated_auth ||= false

return unless config.azure_federated_auth

config.azure_client_id ||= ENV.fetch('AZURE_CLIENT_ID', nil)
config.azure_token_path ||= ENV.fetch('AZURE_FEDERATED_TOKEN_FILE', nil)
config.google_cloud_credentials ||= JSON.parse(ENV.fetch('GOOGLE_CLOUD_CREDENTIALS', '{}')).deep_symbolize_keys
config.azure_scope ||= DfE::Analytics::AzureFederatedAuth::DEFAULT_AZURE_SCOPE
config.gcp_scope ||= DfE::Analytics::AzureFederatedAuth::DEFAULT_GCP_SCOPE
end

def self.initialize!
Expand Down
96 changes: 96 additions & 0 deletions lib/dfe/analytics/azure_federated_auth.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# frozen_string_literal: true

require 'googleauth'

module DfE
module Analytics
# Azure client for workload identity federation with GCP using OAuth
class AzureFederatedAuth
DEFAULT_AZURE_SCOPE = 'api://AzureADTokenExchange/.default'
DEFAULT_GCP_SCOPE = 'https://www.googleapis.com/auth/cloud-platform'
ACCESS_TOKEN_EXPIRE_TIME_LEEWAY = 10.seconds

def self.gcp_client_credentials
return @gcp_client_credentials if @gcp_client_credentials && !@gcp_client_credentials.expired?

azure_token = azure_access_token

azure_google_exchange_token = azure_google_exchange_access_token(azure_token)

google_token, expire_time = google_access_token(azure_google_exchange_token)

expire_time_with_leeway = expire_time.to_datetime - ACCESS_TOKEN_EXPIRE_TIME_LEEWAY

@gcp_client_credentials = Google::Auth::UserRefreshCredentials.new(access_token: google_token, expires_at: expire_time_with_leeway)
end

def self.azure_access_token
aad_token_request_body = {
grant_type: 'client_credentials',
client_id: DfE::Analytics.config.azure_client_id,
scope: DfE::Analytics.config.azure_scope,
client_assertion_type: 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
client_assertion: File.read(DfE::Analytics.config.azure_token_path)
}

azure_token_response =
HTTParty.get(DfE::Analytics.config.google_cloud_credentials[:credential_source][:url], body: aad_token_request_body)

unless azure_token_response.success?
error_message = "Error calling azure token API: status: #{azure_token_response.code} body: #{azure_token_response.body}"

Rails.logger.error error_message

raise Error, error_message
end

azure_token_response.parsed_response['access_token']
end

def self.azure_google_exchange_access_token(azure_token)
request_body = {
grant_type: 'urn:ietf:params:oauth:grant-type:token-exchange',
audience: DfE::Analytics.config.google_cloud_credentials[:audience],
scope: DfE::Analytics.config.gcp_scope,
requested_token_type: 'urn:ietf:params:oauth:token-type:access_token',
subject_token: azure_token,
subject_token_type: DfE::Analytics.config.google_cloud_credentials[:subject_token_type]
}

exchange_token_response = HTTParty.post(DfE::Analytics.config.google_cloud_credentials[:token_url], body: request_body)

unless exchange_token_response.success?
error_message = "Error calling google exchange token API: status: #{exchange_token_response.code} body: #{exchange_token_response.body}"

Rails.logger.error error_message

raise Error, error_message
end

exchange_token_response.parsed_response['access_token']
end

def self.google_access_token(azure_google_exchange_token)
google_token_response = HTTParty.post(
DfE::Analytics.config.google_cloud_credentials[:service_account_impersonation_url],
headers: { 'Authorization' => "Bearer #{azure_google_exchange_token}" },
body: { scope: DfE::Analytics.config.gcp_scope }
)

unless google_token_response.success?
error_message = "Error calling google token API: status: #{google_token_response.code} body: #{google_token_response.body}"

Rails.logger.error error_message

raise Error, error_message
end

parsed_response = google_token_response.parsed_response

[parsed_response['accessToken'], parsed_response['expireTime']]
end

class Error < StandardError; end
end
end
end
71 changes: 71 additions & 0 deletions lib/dfe/analytics/big_query_api.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# frozen_string_literal: true

module DfE
module Analytics
# For use with for workload identity federeation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asatwal would it be possible for a team to use this API without WIF? How much effort would it be to add that flexibility? In future I can see the old API being decommissioned and some teams needing to move to the new one but not having been able to get things set up in Azure such that they can use WIF.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asatwal I was wondering generally if it's a strong recommend that teams migrate to WIF?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is possible to use the new streaming APIs with the old authentication, but this would be effort though. I think we could consider this if there is a real need for this. As Erica points out above, we could insist on WIF initially and it's a strong recommendation.

class BigQueryApi
def self.events_client
@events_client ||= begin
missing_config = %i[
bigquery_project_id
bigquery_table_name
bigquery_dataset
azure_client_id
azure_token_path
azure_scope
gcp_scope
google_cloud_credentials
].select { |val| DfE::Analytics.config.send(val).blank? }

raise(ConfigurationError, "DfE::Analytics: missing required config values: #{missing_config.join(', ')}") if missing_config.any?

Google::Apis::BigqueryV2::BigqueryService.new
end

@events_client.authorization = DfE::Analytics::AzureFederatedAuth.gcp_client_credentials
@events_client
end

def self.insert(events)
rows = events.map { |event| { json: event } }
data_request = Google::Apis::BigqueryV2::InsertAllTableDataRequest.new(rows: rows, skip_invalid_rows: true)
options = Google::Apis::RequestOptions.new(retries: DfE::Analytics.config.bigquery_retries)

response =
events_client.insert_all_table_data(
DfE::Analytics.config.bigquery_project_id,
DfE::Analytics.config.bigquery_dataset,
DfE::Analytics.config.bigquery_table_name,
data_request,
options: options
)

return unless response.insert_errors.present?

event_count = events.length
error_message = error_message_for(response)

Rails.logger.error(error_message)

events.each.with_index(1) do |event, index|
Rails.logger.info("DfE::Analytics possible error processing event (#{index}/#{event_count}): #{event.inspect}")
end

raise SendEventsError, error_message
end

def self.error_message_for(response)
message =
response
.insert_errors
.map { |insert_error| "index: #{insert_error.index} error: #{insert_error.errors.map(&:message).join(' ')} insert_error: #{insert_error}" }
.compact.join("\n")

"DfE::Analytics BigQuery API insert error for #{response.insert_errors.length} event(s):\n#{message}"
end

class ConfigurationError < StandardError; end
class SendEventsError < StandardError; end
end
end
end
60 changes: 60 additions & 0 deletions lib/dfe/analytics/big_query_legacy_api.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# frozen_string_literal: true

module DfE
module Analytics
# For use with legacy authentication with fixed api key
class BigQueryLegacyApi
def self.events_client
@events_client ||= begin
# Check for missing config items - otherwise may get obscure api errors
missing_config = %i[
bigquery_project_id
bigquery_table_name
bigquery_dataset
bigquery_api_json_key
].select { |val| DfE::Analytics.config.send(val).blank? }

raise(ConfigurationError, "DfE::Analytics: missing required config values: #{missing_config.join(', ')}") if missing_config.any?

Google::Cloud::Bigquery.new(
project: DfE::Analytics.config.bigquery_project_id,
credentials: JSON.parse(DfE::Analytics.config.bigquery_api_json_key),
retries: DfE::Analytics.config.bigquery_retries,
timeout: DfE::Analytics.config.bigquery_timeout
).dataset(DfE::Analytics.config.bigquery_dataset, skip_lookup: true)
.table(DfE::Analytics.config.bigquery_table_name, skip_lookup: true)
end
end

def self.insert(events)
response = events_client.insert(events, ignore_unknown: true)

return if response.success?

event_count = events.length
error_message = error_message_for(response)

Rails.logger.error(error_message)

events.each.with_index(1) do |event, index|
Rails.logger.info("DfE::Analytics possible error processing event (#{index}/#{event_count}): #{event.inspect}")
end

raise SendEventsError, error_message
end

def self.error_message_for(response)
message =
response
.error_rows
.map { |error_row| "index: #{response.index_for(error_row)} error: #{response.errors_for(error_row)} error_row: #{error_row}" }
.compact.join("\n")

"DfE::Analytics BigQuery API insert error for #{response.error_rows.length} event(s): response error count: #{response.error_count}\n#{message}"
end

class ConfigurationError < StandardError; end
class SendEventsError < StandardError; end
end
end
end
Loading
Loading