Skip to content

Commit

Permalink
Enable RBAC check
Browse files Browse the repository at this point in the history
This will allow krkn to check if the current user context can do something and allow us to create alternate flows in the krkn code based on the user's privilege/RBAC.

Signed-off-by: yogananth subramanian <ysubrama@redhat.com>
  • Loading branch information
yogananth-subramanian committed Aug 1, 2024
1 parent a55662e commit f250993
Showing 1 changed file with 42 additions and 0 deletions.
42 changes: 42 additions & 0 deletions src/krkn_lib/k8s/krkn_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def __initialize_clients(self, kubeconfig_path: str = None):
self.apps_api = client.AppsV1Api(self.api_client)
self.batch_cli = client.BatchV1Api(self.k8s_client)
self.net_cli = client.NetworkingV1Api(self.api_client)
self.auth_cli = client.AuthorizationV1Api(self.k8s_client)
self.custom_object_client = client.CustomObjectsApi(
self.k8s_client
)
Expand Down Expand Up @@ -1640,6 +1641,47 @@ def check_if_pvc_exists(
logging.error("Namespace '%s' doesn't exist", str(namespace))
return False

def check_rbac_access(self, resource: str, verb: str,
namespace: str = None) -> bool:
"""
Check if the current user can perform an action in the given namespace.
If namespace is not passed, check would be performed against all namespace.
:param resource: One of the existing resource types
:param verb: Verb is a kubernetes resource API verb.
:param namespace: Namespace is the namespace of the action being requested.
:return: boolean value indicating whether
the user is allowed to do the requested action.
"""

if namespace:
body = client.V1SelfSubjectAccessReview(
spec=client.V1SelfSubjectAccessReviewSpec(
resource_attributes=client.V1ResourceAttributes(
namespace=namespace,
resource=resource,
verb=verb
)
)
)
else:
body = client.V1SelfSubjectAccessReview(
spec=client.V1SelfSubjectAccessReviewSpec(
resource_attributes=client.V1ResourceAttributes(
resource=resource,
verb=verb
)
)
)

try:
api_response = self.auth_cli.create_self_subject_access_review(body=body)
allowed=api_response.status.allowed
except ApiException as e:
logging.error("Exception when calling AuthorizationV1Api->create_self_subject_access_review: %s\n", str(e))

return allowed

def get_pvc_info(self, name: str, namespace: str) -> PVC:
"""
Retrieve information about a Persistent Volume Claim in a
Expand Down

0 comments on commit f250993

Please sign in to comment.