-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for workload identity federation and consolidate GCP/BigQuery APIs into seperate classes #120
Add support for workload identity federation and consolidate GCP/BigQuery APIs into seperate classes #120
Changes from all commits
0aebc67
dc6e29b
17ac625
45c14a0
4157346
f976de1
977c1ae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
# frozen_string_literal: true | ||
|
||
require 'googleauth' | ||
|
||
module DfE | ||
module Analytics | ||
# Azure client for workload identity federation with GCP using OAuth | ||
class AzureFederatedAuth | ||
DEFAULT_AZURE_SCOPE = 'api://AzureADTokenExchange/.default' | ||
DEFAULT_GCP_SCOPE = 'https://www.googleapis.com/auth/cloud-platform' | ||
ACCESS_TOKEN_EXPIRE_TIME_LEEWAY = 10.seconds | ||
|
||
def self.gcp_client_credentials | ||
return @gcp_client_credentials if @gcp_client_credentials && !@gcp_client_credentials.expired? | ||
|
||
azure_token = azure_access_token | ||
|
||
azure_google_exchange_token = azure_google_exchange_access_token(azure_token) | ||
|
||
google_token, expire_time = google_access_token(azure_google_exchange_token) | ||
|
||
expire_time_with_leeway = expire_time.to_datetime - ACCESS_TOKEN_EXPIRE_TIME_LEEWAY | ||
|
||
@gcp_client_credentials = Google::Auth::UserRefreshCredentials.new(access_token: google_token, expires_at: expire_time_with_leeway) | ||
end | ||
|
||
def self.azure_access_token | ||
aad_token_request_body = { | ||
grant_type: 'client_credentials', | ||
client_id: DfE::Analytics.config.azure_client_id, | ||
scope: DfE::Analytics.config.azure_scope, | ||
client_assertion_type: 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer', | ||
client_assertion: File.read(DfE::Analytics.config.azure_token_path) | ||
} | ||
|
||
azure_token_response = | ||
HTTParty.get(DfE::Analytics.config.google_cloud_credentials[:credential_source][:url], body: aad_token_request_body) | ||
|
||
unless azure_token_response.success? | ||
error_message = "Error calling azure token API: status: #{azure_token_response.code} body: #{azure_token_response.body}" | ||
|
||
Rails.logger.error error_message | ||
|
||
raise Error, error_message | ||
end | ||
|
||
azure_token_response.parsed_response['access_token'] | ||
end | ||
|
||
def self.azure_google_exchange_access_token(azure_token) | ||
request_body = { | ||
grant_type: 'urn:ietf:params:oauth:grant-type:token-exchange', | ||
audience: DfE::Analytics.config.google_cloud_credentials[:audience], | ||
scope: DfE::Analytics.config.gcp_scope, | ||
requested_token_type: 'urn:ietf:params:oauth:token-type:access_token', | ||
subject_token: azure_token, | ||
subject_token_type: DfE::Analytics.config.google_cloud_credentials[:subject_token_type] | ||
} | ||
|
||
exchange_token_response = HTTParty.post(DfE::Analytics.config.google_cloud_credentials[:token_url], body: request_body) | ||
|
||
unless exchange_token_response.success? | ||
error_message = "Error calling google exchange token API: status: #{exchange_token_response.code} body: #{exchange_token_response.body}" | ||
|
||
Rails.logger.error error_message | ||
|
||
raise Error, error_message | ||
end | ||
|
||
exchange_token_response.parsed_response['access_token'] | ||
end | ||
|
||
def self.google_access_token(azure_google_exchange_token) | ||
google_token_response = HTTParty.post( | ||
DfE::Analytics.config.google_cloud_credentials[:service_account_impersonation_url], | ||
headers: { 'Authorization' => "Bearer #{azure_google_exchange_token}" }, | ||
body: { scope: DfE::Analytics.config.gcp_scope } | ||
) | ||
|
||
unless google_token_response.success? | ||
error_message = "Error calling google token API: status: #{google_token_response.code} body: #{google_token_response.body}" | ||
|
||
Rails.logger.error error_message | ||
|
||
raise Error, error_message | ||
end | ||
|
||
parsed_response = google_token_response.parsed_response | ||
|
||
[parsed_response['accessToken'], parsed_response['expireTime']] | ||
end | ||
|
||
class Error < StandardError; end | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
# frozen_string_literal: true | ||
|
||
module DfE | ||
module Analytics | ||
# For use with for workload identity federeation | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @asatwal would it be possible for a team to use this API without WIF? How much effort would it be to add that flexibility? In future I can see the old API being decommissioned and some teams needing to move to the new one but not having been able to get things set up in Azure such that they can use WIF. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @asatwal I was wondering generally if it's a strong recommend that teams migrate to WIF? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is possible to use the new streaming APIs with the old authentication, but this would be effort though. I think we could consider this if there is a real need for this. As Erica points out above, we could insist on WIF initially and it's a strong recommendation. |
||
class BigQueryApi | ||
def self.events_client | ||
@events_client ||= begin | ||
missing_config = %i[ | ||
bigquery_project_id | ||
bigquery_table_name | ||
bigquery_dataset | ||
azure_client_id | ||
azure_token_path | ||
azure_scope | ||
gcp_scope | ||
google_cloud_credentials | ||
].select { |val| DfE::Analytics.config.send(val).blank? } | ||
|
||
raise(ConfigurationError, "DfE::Analytics: missing required config values: #{missing_config.join(', ')}") if missing_config.any? | ||
|
||
Google::Apis::BigqueryV2::BigqueryService.new | ||
end | ||
|
||
@events_client.authorization = DfE::Analytics::AzureFederatedAuth.gcp_client_credentials | ||
@events_client | ||
end | ||
|
||
def self.insert(events) | ||
rows = events.map { |event| { json: event } } | ||
data_request = Google::Apis::BigqueryV2::InsertAllTableDataRequest.new(rows: rows, skip_invalid_rows: true) | ||
options = Google::Apis::RequestOptions.new(retries: DfE::Analytics.config.bigquery_retries) | ||
|
||
response = | ||
events_client.insert_all_table_data( | ||
DfE::Analytics.config.bigquery_project_id, | ||
DfE::Analytics.config.bigquery_dataset, | ||
DfE::Analytics.config.bigquery_table_name, | ||
data_request, | ||
options: options | ||
) | ||
|
||
return unless response.insert_errors.present? | ||
|
||
event_count = events.length | ||
error_message = error_message_for(response) | ||
|
||
Rails.logger.error(error_message) | ||
|
||
events.each.with_index(1) do |event, index| | ||
Rails.logger.info("DfE::Analytics possible error processing event (#{index}/#{event_count}): #{event.inspect}") | ||
end | ||
|
||
raise SendEventsError, error_message | ||
end | ||
|
||
def self.error_message_for(response) | ||
message = | ||
response | ||
.insert_errors | ||
.map { |insert_error| "index: #{insert_error.index} error: #{insert_error.errors.map(&:message).join(' ')} insert_error: #{insert_error}" } | ||
.compact.join("\n") | ||
|
||
"DfE::Analytics BigQuery API insert error for #{response.insert_errors.length} event(s):\n#{message}" | ||
end | ||
|
||
class ConfigurationError < StandardError; end | ||
class SendEventsError < StandardError; end | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
# frozen_string_literal: true | ||
|
||
module DfE | ||
module Analytics | ||
# For use with legacy authentication with fixed api key | ||
class BigQueryLegacyApi | ||
def self.events_client | ||
@events_client ||= begin | ||
# Check for missing config items - otherwise may get obscure api errors | ||
missing_config = %i[ | ||
bigquery_project_id | ||
bigquery_table_name | ||
bigquery_dataset | ||
bigquery_api_json_key | ||
].select { |val| DfE::Analytics.config.send(val).blank? } | ||
|
||
raise(ConfigurationError, "DfE::Analytics: missing required config values: #{missing_config.join(', ')}") if missing_config.any? | ||
|
||
Google::Cloud::Bigquery.new( | ||
project: DfE::Analytics.config.bigquery_project_id, | ||
credentials: JSON.parse(DfE::Analytics.config.bigquery_api_json_key), | ||
retries: DfE::Analytics.config.bigquery_retries, | ||
timeout: DfE::Analytics.config.bigquery_timeout | ||
).dataset(DfE::Analytics.config.bigquery_dataset, skip_lookup: true) | ||
.table(DfE::Analytics.config.bigquery_table_name, skip_lookup: true) | ||
end | ||
end | ||
|
||
def self.insert(events) | ||
response = events_client.insert(events, ignore_unknown: true) | ||
|
||
return if response.success? | ||
|
||
event_count = events.length | ||
error_message = error_message_for(response) | ||
|
||
Rails.logger.error(error_message) | ||
|
||
events.each.with_index(1) do |event, index| | ||
Rails.logger.info("DfE::Analytics possible error processing event (#{index}/#{event_count}): #{event.inspect}") | ||
end | ||
|
||
raise SendEventsError, error_message | ||
end | ||
|
||
def self.error_message_for(response) | ||
message = | ||
response | ||
.error_rows | ||
.map { |error_row| "index: #{response.index_for(error_row)} error: #{response.errors_for(error_row)} error_row: #{error_row}" } | ||
.compact.join("\n") | ||
|
||
"DfE::Analytics BigQuery API insert error for #{response.error_rows.length} event(s): response error count: #{response.error_count}\n#{message}" | ||
end | ||
|
||
class ConfigurationError < StandardError; end | ||
class SendEventsError < StandardError; end | ||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@asatwal does this also switch between APIs? If so could we add that to the description?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this does switch APIs as the old APIs don't work with the new WIF Auth. Good point I'll add to this description.