Skip to content

Commit

Permalink
feat(groups): add groups, bind roles, get group permissions
Browse files Browse the repository at this point in the history
  • Loading branch information
devopstales committed Feb 25, 2024
1 parent fe85285 commit fec40a8
Show file tree
Hide file tree
Showing 18 changed files with 761 additions and 68 deletions.
138 changes: 99 additions & 39 deletions src/kubedash/functions/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -2202,18 +2202,52 @@ def k8sRoleBindingGet(obeject_name, namespace):
ERROR = "k8sRoleBindingGet: %s" % error
ErrorHandler(logger, "error", ERROR)
return None, "Unknow Error"

def k8sRoleBindingCreate(user_role, namespace, username):

def k8sRoleBindingGroupGet(group_name, username_role, user_token):
k8sClientConfigGet(username_role, user_token)
group_role_binding = list()
namespace_list, error = k8sNamespaceListGet("Admin", None)
for ns in namespace_list:
role_binding_list = k8sRoleBindingListGet(username_role, user_token, ns)
for role_binding in role_binding_list:
if group_name in role_binding["group"]:
role_binding["namespace"] = ns
group_role_binding.append(role_binding)
return group_role_binding

def k8sRoleBindingCreate(user_role, namespace, username, group_name):
k8sClientConfigGet("Admin", None)
with k8s_client.ApiClient() as api_client:
api_instance = k8s_client.RbacAuthorizationV1Api(api_client)
pretty = 'true'
field_manager = 'KubeDash'
if email_check(username):
user = username.split("@")[0]

if username:
if email_check(username):
user = username.split("@")[0]
else:
user = username

obeject_name = user + "---" + "kubedash" + "---" + user_role
body_subjects = [
k8s_client.V1Subject(
api_group = "rbac.authorization.k8s.io",
kind = "User",
name = username,
namespace = namespace,
)
]
else:
user = username
obeject_name = user + "---" + "kubedash" + "---" + user_role
obeject_name = group_name + "---" + "kubedash" + "---" + user_role
body_subjects = [
k8s_client.V1Subject(
api_group = "rbac.authorization.k8s.io",
kind = "Group",
name = group_name,
namespace = namespace,
)
]

body = k8s_client.V1RoleBinding(
api_version = "rbac.authorization.k8s.io/v1",
kind = "RoleBinding",
Expand All @@ -2226,14 +2260,7 @@ def k8sRoleBindingCreate(user_role, namespace, username):
kind = "ClusterRole",
name = "template-namespaced-resources---" + user_role,
),
subjects = [
k8s_client.V1Subject(
api_group = "rbac.authorization.k8s.io",
kind = "User",
name = username,
namespace = namespace,
)
]
subjects = body_subjects
)
try:
api_response = api_instance.create_namespaced_role_binding(
Expand All @@ -2248,16 +2275,22 @@ def k8sRoleBindingCreate(user_role, namespace, username):
return False, None


def k8sRoleBindingAdd(user_role, username, user_namespaces, user_all_namespaces):
if email_check(username):
user = username.split("@")[0]
def k8sRoleBindingAdd(user_role, username, group_name, user_namespaces, user_all_namespaces):
if username:
if email_check(username):
user = username.split("@")[0]
else:
user = username

obeject_name = user + "---" + "kubedash" + "---" + user_role
else:
user = username
obeject_name = user + "---" + "kubedash" + "---" + user_role
obeject_name = group_name + "---" + "kubedash" + "---" + user_role

if user_all_namespaces:
namespace_list, error = k8sNamespaceListGet("Admin", None)
else:
namespace_list = user_namespaces

for namespace in namespace_list:
is_rolebinding_exists, error = k8sRoleBindingGet(obeject_name, namespace)
if error:
Expand All @@ -2267,7 +2300,7 @@ def k8sRoleBindingAdd(user_role, username, user_namespaces, user_all_namespaces)
ErrorHandler(logger, "CannotConnect", "RoleBinding %s alredy exists in %s namespace" % (obeject_name, namespace))
logger.info("RoleBinding %s alredy exists" % obeject_name) # WARNING
else:
k8sRoleBindingCreate(user_role, namespace, username)
k8sRoleBindingCreate(user_role, namespace, username, group_name)


##############################################################
Expand Down Expand Up @@ -2362,18 +2395,46 @@ def k8sClusterRoleBindingGet(obeject_name):
ERROR = "k8sClusterRoleBindingGet: %s" % error
ErrorHandler(logger, "error", ERROR)
return None, "Unknow Error"

def k8sClusterRoleBindingCreate(user_cluster_role, username):

def k8sClusterRoleBindingGroupGet(group_name, username_role, user_token):
k8sClientConfigGet(username_role, user_token)
cluster_role_binding_list = k8sClusterRoleBindingListGet(username_role, user_token)
group_cluster_role_binding = list()
for cluster_role_binding in cluster_role_binding_list:
if group_name in cluster_role_binding["group"]:
group_cluster_role_binding.append(cluster_role_binding)
return group_cluster_role_binding

def k8sClusterRoleBindingCreate(user_cluster_role, username, group_name):
k8sClientConfigGet("Admin", None)
with k8s_client.ApiClient() as api_client:
api_instance = k8s_client.RbacAuthorizationV1Api(api_client)
pretty = 'true'
field_manager = 'KubeDash'
if email_check(username):
user = username.split("@")[0]
if username:
if email_check(username):
user = username.split("@")[0]
else:
user = username

obeject_name = user + "---" + "kubedash" + "---" + user_cluster_role
body_subjects = [
k8s_client.V1Subject(
api_group = "rbac.authorization.k8s.io",
kind = "User",
name = username,
)
]
else:
user = username
obeject_name = user + "---" + "kubedash" + "---" + user_cluster_role
obeject_name = group_name + "---" + "kubedash" + "---" + user_cluster_role
body_subjects = [
k8s_client.V1Subject(
api_group = "rbac.authorization.k8s.io",
kind = "Group",
name = group_name,
)
]

body = k8s_client.V1ClusterRoleBinding(
api_version = "rbac.authorization.k8s.io/v1",
kind = "ClusterRoleBinding",
Expand All @@ -2385,13 +2446,7 @@ def k8sClusterRoleBindingCreate(user_cluster_role, username):
kind = "ClusterRole",
name = "template-cluster-resources---" + user_cluster_role,
),
subjects = [
k8s_client.V1Subject(
api_group = "rbac.authorization.k8s.io",
kind = "User",
name = username,
)
]
subjects = body_subjects
)
try:
pi_response = api_instance.create_cluster_role_binding(
Expand All @@ -2404,12 +2459,17 @@ def k8sClusterRoleBindingCreate(user_cluster_role, username):
else:
logger.info("ClusterRoleBinding %s alredy exists" % obeject_name) # WARNING

def k8sClusterRoleBindingAdd(user_cluster_role, username):
if email_check(username):
user = username.split("@")[0]
def k8sClusterRoleBindingAdd(user_cluster_role, username, group_name):
if username:
if email_check(username):
user = username.split("@")[0]
else:
user = username

obeject_name = user + "---" + "kubedash" + "---" + user_cluster_role
else:
user = username
obeject_name = user + "---" + "kubedash" + "---" + user_cluster_role
obeject_name = group_name + "---" + "kubedash" + "---" + user_cluster_role

is_clusterrolebinding_exists, error = k8sClusterRoleBindingGet(obeject_name)
if error:
ErrorHandler(logger, error, "get ClusterRoleBinding %s" % obeject_name)
Expand All @@ -2418,7 +2478,7 @@ def k8sClusterRoleBindingAdd(user_cluster_role, username):
ErrorHandler(logger, "CannotConnect", "ClusterRoleBinding %s alredy exists" % obeject_name)
logger.info("ClusterRoleBinding %s alredy exists" % obeject_name) # WARNING
else:
k8sClusterRoleBindingCreate(user_cluster_role, username)
k8sClusterRoleBindingCreate(user_cluster_role, username, group_name)

##############################################################
# Security
Expand Down
Loading

0 comments on commit fec40a8

Please sign in to comment.