diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java index 71685e3f1ab4d..56db598c8ca28 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java @@ -34,39 +34,24 @@ * credential. */ public class GcpCredentialFactory implements CredentialFactory { - /** - * The scope cloud-platform provides access to all Cloud Platform resources. cloud-platform isn't - * sufficient yet for talking to datastore so we request those resources separately. - * - *

Note that trusted scope relationships don't apply to OAuth tokens, so for services we access - * directly (GCS) as opposed to through the backend (BigQuery, GCE), we need to explicitly request - * that scope. - */ - private static final List SCOPES = - Arrays.asList( - "https://www.googleapis.com/auth/cloud-platform", - "https://www.googleapis.com/auth/devstorage.full_control", - "https://www.googleapis.com/auth/userinfo.email", - "https://www.googleapis.com/auth/datastore", - "https://www.googleapis.com/auth/bigquery", - "https://www.googleapis.com/auth/bigquery.insertdata", - "https://www.googleapis.com/auth/pubsub"); - + // A list of OAuth scopes to request when creating a credential. + private List oauthScopes; // If non-null, a list of service account emails to be used as an impersonation chain. private @Nullable List impersonateServiceAccountChain; - private GcpCredentialFactory(@Nullable List impersonateServiceAccountChain) { + private GcpCredentialFactory( + List oauthScopes, @Nullable List impersonateServiceAccountChain) { if (impersonateServiceAccountChain != null) { checkArgument(impersonateServiceAccountChain.size() > 0); } + this.oauthScopes = oauthScopes; this.impersonateServiceAccountChain = impersonateServiceAccountChain; } public static GcpCredentialFactory fromOptions(PipelineOptions options) { - @Nullable - String impersonateServiceAccountArg = - options.as(GcpOptions.class).getImpersonateServiceAccount(); + GcpOptions gcpOptions = options.as(GcpOptions.class); + @Nullable String impersonateServiceAccountArg = gcpOptions.getImpersonateServiceAccount(); @Nullable List impersonateServiceAccountChain = @@ -74,7 +59,7 @@ public static GcpCredentialFactory fromOptions(PipelineOptions options) { ? null : Arrays.asList(impersonateServiceAccountArg.split(",")); - return new GcpCredentialFactory(impersonateServiceAccountChain); + return new GcpCredentialFactory(gcpOptions.getGcpOauthScopes(), impersonateServiceAccountChain); } /** Returns a default GCP {@link Credentials} or null when it fails. */ @@ -82,7 +67,7 @@ public static GcpCredentialFactory fromOptions(PipelineOptions options) { public @Nullable Credentials getCredential() { try { GoogleCredentials applicationDefaultCredentials = - GoogleCredentials.getApplicationDefault().createScoped(SCOPES); + GoogleCredentials.getApplicationDefault().createScoped(oauthScopes); if (impersonateServiceAccountChain == null) { return applicationDefaultCredentials; @@ -94,7 +79,7 @@ public static GcpCredentialFactory fromOptions(PipelineOptions options) { GoogleCredentials impersonationCredentials = ImpersonatedCredentials.create( - applicationDefaultCredentials, targetPrincipal, delegationChain, SCOPES, 0); + applicationDefaultCredentials, targetPrincipal, delegationChain, oauthScopes, 0); return impersonationCredentials; } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java index cd60835e85b93..d0daa870cf128 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java @@ -37,6 +37,8 @@ import java.nio.charset.StandardCharsets; import java.nio.file.FileAlreadyExistsException; import java.security.GeneralSecurityException; +import java.util.Arrays; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.regex.Matcher; @@ -140,6 +142,43 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions { void setWorkerZone(String workerZone); + /** + * Controls the OAuth scopes that will be requested when creating {@link Credentials} with the + * {@link GcpCredentialFactory} (which is the default {@link CredentialFactory}). If the {@link + * #setGcpCredential credential} or {@link #setCredentialFactoryClass credential factory} have + * been set then this field may do nothing. + */ + @Default.InstanceFactory(GcpOAuthScopesFactory.class) + @Description( + "Controls the OAuth scopes that will be requested when creating credentials with the GcpCredentialFactory (which is the default credential factory). If the GCP credential or credential factory have been set then this property may do nothing.") + List getGcpOauthScopes(); + + void setGcpOauthScopes(List oauthScopes); + + /** Returns the default set of OAuth scopes. */ + class GcpOAuthScopesFactory implements DefaultValueFactory> { + + @Override + public List create(PipelineOptions options) { + /** + * The scope cloud-platform provides access to all Cloud Platform resources. cloud-platform + * isn't sufficient yet for talking to datastore so we request those resources separately. + * + *

Note that trusted scope relationships don't apply to OAuth tokens, so for services we + * access directly (GCS) as opposed to through the backend (BigQuery, GCE), we need to + * explicitly request that scope. + */ + return Arrays.asList( + "https://www.googleapis.com/auth/cloud-platform", + "https://www.googleapis.com/auth/devstorage.full_control", + "https://www.googleapis.com/auth/userinfo.email", + "https://www.googleapis.com/auth/datastore", + "https://www.googleapis.com/auth/bigquery", + "https://www.googleapis.com/auth/bigquery.insertdata", + "https://www.googleapis.com/auth/pubsub"); + } + } + /** * The class of the credential factory that should be created and used to create credentials. If * gcpCredential has not been set explicitly, an instance of this class will be constructed and @@ -291,7 +330,7 @@ public Credentials create(PipelineOptions options) { } } - /** EneableStreamingEngine defaults to false unless one of the two experiments is set. */ + /** EnableStreamingEngine defaults to false unless one of the two experiments is set. */ class EnableStreamingEngineFactory implements DefaultValueFactory { @Override public Boolean create(PipelineOptions options) { diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 27a3c40cd4b32..699ec79d4b8ad 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -24,7 +24,6 @@ import threading from apache_beam.options.pipeline_options import GoogleCloudOptions -from apache_beam.options.pipeline_options import PipelineOptions # google.auth is only available when Beam is installed with the gcp extra. try: @@ -44,16 +43,6 @@ _LOGGER = logging.getLogger(__name__) -CLIENT_SCOPES = [ - 'https://www.googleapis.com/auth/bigquery', - 'https://www.googleapis.com/auth/cloud-platform', - 'https://www.googleapis.com/auth/devstorage.full_control', - 'https://www.googleapis.com/auth/userinfo.email', - 'https://www.googleapis.com/auth/datastore', - 'https://www.googleapis.com/auth/spanner.admin', - 'https://www.googleapis.com/auth/spanner.data' -] - def set_running_in_gce(worker_executing_project): """For internal use only; no backwards-compatibility guarantees. @@ -153,7 +142,9 @@ def _get_service_credentials(pipeline_options): return None try: - credentials, _ = google.auth.default(scopes=CLIENT_SCOPES) # pylint: disable=c-extension-no-member + # pylint: disable=c-extension-no-member + credentials, _ = google.auth.default( + scopes=pipeline_options.view_as(GoogleCloudOptions).gcp_oauth_scopes) credentials = _Credentials._add_impersonation_credentials( credentials, pipeline_options) credentials = _ApitoolsCredentialsAdapter(credentials) @@ -170,14 +161,9 @@ def _get_service_credentials(pipeline_options): @staticmethod def _add_impersonation_credentials(credentials, pipeline_options): - if isinstance(pipeline_options, PipelineOptions): - gcs_options = pipeline_options.view_as(GoogleCloudOptions) - impersonate_service_account = gcs_options.impersonate_service_account - elif isinstance(pipeline_options, dict): - impersonate_service_account = pipeline_options.get( - 'impersonate_service_account') - else: - return credentials + gcs_options = pipeline_options.view_as(GoogleCloudOptions) + impersonate_service_account = gcs_options.impersonate_service_account + scopes = gcs_options.gcp_oauth_scopes if impersonate_service_account: _LOGGER.info('Impersonating: %s', impersonate_service_account) impersonate_accounts = impersonate_service_account.split(',') @@ -187,6 +173,6 @@ def _add_impersonation_credentials(credentials, pipeline_options): source_credentials=credentials, target_principal=target_principal, delegates=delegate_to, - target_scopes=CLIENT_SCOPES, + target_scopes=scopes, ) return credentials diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 0bec4664777f9..05c752b3cf5f1 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -56,6 +56,7 @@ from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.metrics import monitoring_infos from apache_beam.options import value_provider +from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.transforms import DoFn from apache_beam.typehints.typehints import Any from apache_beam.utils import retry @@ -331,7 +332,7 @@ class BigQueryWrapper(object): def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None): self.client = client or bigquery.BigqueryV2( http=get_new_http(), - credentials=auth.get_service_credentials(None), + credentials=auth.get_service_credentials(PipelineOptions()), response_encoding='utf8', additional_http_headers={ "user-agent": "apache-beam-%s" % apache_beam.__version__ diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 7fa9396ed2e95..faeae76541ed7 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -640,6 +640,14 @@ class GoogleCloudOptions(PipelineOptions): COMPUTE_API_SERVICE = 'compute.googleapis.com' STORAGE_API_SERVICE = 'storage.googleapis.com' DATAFLOW_ENDPOINT = 'https://dataflow.googleapis.com' + OAUTH_SCOPES = [ + 'https://www.googleapis.com/auth/bigquery', + 'https://www.googleapis.com/auth/cloud-platform', + 'https://www.googleapis.com/auth/devstorage.full_control', + 'https://www.googleapis.com/auth/userinfo.email', + 'https://www.googleapis.com/auth/datastore', + 'https://www.googleapis.com/auth/spanner' + ] @classmethod def _add_argparse_args(cls, parser): @@ -773,6 +781,16 @@ def _add_argparse_args(cls, parser): 'either a single service account as the impersonator, or a ' 'comma-separated list of service accounts to create an ' 'impersonation delegation chain.') + parser.add_argument( + '--gcp_oauth_scope', + '--gcp_oauth_scopes', + dest='gcp_oauth_scopes', + action='append', + default=cls.OAUTH_SCOPES, + help=( + 'Controls the OAuth scopes that will be requested when creating ' + 'GCP credentials. Note: If set programmatically, must be set as a ' + 'list of strings')) def _create_default_gcs_bucket(self): try: diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py index 4807a693126d6..bdd9ab4d1a89c 100644 --- a/sdks/python/apache_beam/runners/interactive/utils.py +++ b/sdks/python/apache_beam/runners/interactive/utils.py @@ -35,6 +35,7 @@ from apache_beam.internal.gcp import auth from apache_beam.internal.http_client import get_new_http from apache_beam.io.gcp.internal.clients import storage +from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.pipeline import Pipeline from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.runners.interactive.caching.cacheable import Cacheable @@ -452,7 +453,7 @@ def assert_bucket_exists(bucket_name): try: from apitools.base.py.exceptions import HttpError storage_client = storage.StorageV1( - credentials=auth.get_service_credentials(None), + credentials=auth.get_service_credentials(PipelineOptions()), get_credentials=False, http=get_new_http(), response_encoding='utf8')