20
20
"""
21
21
22
22
import abc
23
- import openshift as oc
24
- from openshift import OpenShiftPythonException
23
+ import pathlib
24
+ from kubernetes import config
25
+ from jinja2 import Environment , FileSystemLoader
26
+ import os
27
+
28
+ global path_set
29
+ path_set = False
30
+
31
+ """
32
+ auth = KubeConfigFileAuthentication(
33
+ kube_config_path="config"
34
+ )
35
+ auth.load_kube_config()
36
+
37
+
38
+ """
25
39
26
40
27
41
class Authentication (metaclass = abc .ABCMeta ):
@@ -43,13 +57,44 @@ def logout(self):
43
57
pass
44
58
45
59
60
+ class KubeConfiguration (metaclass = abc .ABCMeta ):
61
+ """
62
+ An abstract class that defines the method for loading a user defined config file using the `load_kube_config()` function
63
+ """
64
+
65
+ def load_kube_config (self ):
66
+ """
67
+ Method for setting your Kubernetes configuration to a certain file
68
+ """
69
+ pass
70
+
71
+ def config_check (self ):
72
+ """
73
+ Method for setting your Kubernetes configuration to a certain file
74
+ """
75
+ pass
76
+
77
+ def logout (self ):
78
+ """
79
+ Method for logging out of the remote cluster
80
+ """
81
+ pass
82
+
83
+
46
84
class TokenAuthentication (Authentication ):
47
85
"""
48
86
`TokenAuthentication` is a subclass of `Authentication`. It can be used to authenticate to an OpenShift
49
87
cluster when the user has an API token and the API server address.
50
88
"""
51
89
52
- def __init__ (self , token : str = None , server : str = None , skip_tls : bool = False ):
90
+ def __init__ (
91
+ self ,
92
+ token : str = None ,
93
+ server : str = None ,
94
+ skip_tls : bool = False ,
95
+ ca_cert_path : str = "/etc/pki/tls/certs/ca-bundle.crt" ,
96
+ username : str = "user" ,
97
+ ):
53
98
"""
54
99
Initialize a TokenAuthentication object that requires a value for `token`, the API Token
55
100
and `server`, the API server address for authenticating to an OpenShift cluster.
@@ -58,65 +103,176 @@ def __init__(self, token: str = None, server: str = None, skip_tls: bool = False
58
103
self .token = token
59
104
self .server = server
60
105
self .skip_tls = skip_tls
106
+ self .ca_cert_path = ca_cert_path
107
+ self .username = username
61
108
62
109
def login (self ) -> str :
63
110
"""
64
- This function is used to login to an OpenShift cluster using the user's API token and API server address.
65
- Depending on the cluster, a user can choose to login in with "--insecure-skip-tls-verify` by setting `skip_tls`
66
- to `True`.
111
+ This function is used to login to a Kubernetes cluster using the user's API token and API server address.
112
+ Depending on the cluster, a user can choose to login in with `--insecure-skip-tls-verify` by setting `skip_tls`
113
+ to `True` or `--certificate-authority` by setting `skip_tls` to false and providing a path to a ca bundle with `ca_cert_path`.
114
+
115
+ If a user does not have a Kubernetes config file one is created from a template with the appropriate user functionality
116
+ and if they do it is updated with new credentials.
67
117
"""
68
- args = [f"--token={ self .token } " , f"--server={ self .server } " ]
69
- if self .skip_tls :
70
- args .append ("--insecure-skip-tls-verify" )
118
+ dir = pathlib .Path (__file__ ).parent .parent .resolve ()
119
+ home = os .path .expanduser ("~" )
71
120
try :
72
- response = oc .invoke ("login" , args )
73
- except OpenShiftPythonException as osp : # pragma: no cover
74
- error_msg = osp .result .err ()
75
- if "The server uses a certificate signed by unknown authority" in error_msg :
76
- return "Error: certificate auth failure, please set `skip_tls=True` in TokenAuthentication"
77
- elif "invalid" in error_msg :
78
- raise PermissionError (error_msg )
121
+ security = "insecure-skip-tls-verify: false"
122
+ if self .skip_tls == False :
123
+ security = "certificate-authority: %s" % self .ca_cert_path
124
+ else :
125
+ security = "insecure-skip-tls-verify: true"
126
+
127
+ env = Environment (
128
+ loader = FileSystemLoader (f"{ dir } /templates" ),
129
+ trim_blocks = True ,
130
+ lstrip_blocks = True ,
131
+ )
132
+ template = env .get_template ("config.yaml" )
133
+ server = self .server
134
+ cluster_name = server [8 :].replace ("." , "-" )
135
+ # If there is no .kube folder it is created.
136
+ if not os .path .isdir ("%s/.kube" % home ):
137
+ os .mkdir ("%s/.kube" % home )
138
+
139
+ # If a config file exists then it will be updated with new fields and values.
140
+ if os .path .isfile ("%s/.kube/config" % home ):
141
+ file = open (r"%s/.kube/config" % home , "r" ).readlines ()
142
+ write_file = open (r"%s/.kube/config" % home , "w" )
143
+ existing = False
144
+ # Check for existing config
145
+ for line in file :
146
+ if self .server in line :
147
+ existing = True
148
+
149
+ if existing == False :
150
+ for line in file :
151
+ # All of these fields are given new lines underneath with credentials info.
152
+ if "clusters:" in line :
153
+ write_file .write (line )
154
+ write_file .write (
155
+ "- cluster:\n %(security)s\n server: %(server)s\n name: %(cluster)s\n "
156
+ % {
157
+ "security" : security ,
158
+ "server" : self .server ,
159
+ "cluster" : cluster_name ,
160
+ }
161
+ )
162
+ continue
163
+ if "contexts:" in line :
164
+ write_file .write (line )
165
+ write_file .write (
166
+ "- context:\n cluster: %(cluster)s\n namespace: default\n user: %(user)s/%(cluster)s\n name: default/%(cluster)s/%(user)s\n "
167
+ % {"cluster" : cluster_name , "user" : self .username }
168
+ )
169
+ continue
170
+ if "current-context:" in line :
171
+ write_file .write (
172
+ "current-context: default/{}/{}\n " .format (
173
+ cluster_name , self .username
174
+ )
175
+ )
176
+ continue
177
+ if "users:" in line :
178
+ write_file .write (line )
179
+ write_file .write (
180
+ "- name: {}/{}\n user:\n token: {}\n " .format (
181
+ self .username , cluster_name , self .token
182
+ )
183
+ )
184
+ continue
185
+
186
+ write_file .write (line )
187
+ else :
188
+ # If there is an existing config just update the token and username
189
+ for line in file :
190
+ if "users:" in line :
191
+ write_file .write (line )
192
+ write_file .write (
193
+ "- name: {}/{}\n user:\n token: {}\n " .format (
194
+ self .username , cluster_name , self .token
195
+ )
196
+ )
197
+ continue
198
+ write_file .write (line )
199
+
200
+ response = "Updated config file at %s/.kube/config" % home
79
201
else :
80
- return error_msg
81
- return response .out ()
202
+ # Create a new config file from the config template and store it in HOME/.kube
203
+ file = open ("%s/.kube/config" % home , "w" )
204
+ file .write (
205
+ template .render (
206
+ security = security ,
207
+ server = server ,
208
+ cluster = cluster_name ,
209
+ context_name = "default/{}/{}" .format (
210
+ cluster_name , self .username
211
+ ),
212
+ current_context = "default/{}/{}" .format (
213
+ cluster_name , self .username
214
+ ),
215
+ username = "{}/{}" .format (self .username , cluster_name ),
216
+ token = self .token ,
217
+ )
218
+ )
219
+ response = (
220
+ "Logged in and created new config file at %s/.kube/config" % home
221
+ )
222
+ except :
223
+ response = "Error logging in. Have you inputted correct credentials?"
224
+ return response
82
225
83
226
def logout (self ) -> str :
84
227
"""
85
- This function is used to logout of an OpenShift cluster.
228
+ This function is used to logout of a Kubernetes cluster.
86
229
"""
87
- args = [f"--token={ self .token } " , f"--server={ self .server } " ]
88
- response = oc .invoke ("logout" , args )
89
- return response .out ()
230
+ home = os .path .expanduser ("~" )
231
+ file = open (r"%s/.kube/config" % home , "r" )
232
+ lines = file .readlines ()
233
+ line_count = 0
234
+ for line in lines :
235
+ if (
236
+ "- name: {}/{}" .format (self .username , self .server [8 :].replace ("." , "-" ))
237
+ not in line .strip ()
238
+ ):
239
+ line_count = line_count + 1
240
+ else :
241
+ break
242
+ # The name, user and token are removed from the config file
243
+ with open (r"%s/.kube/config" % home , "w" ) as file :
244
+ for number , line in enumerate (lines ):
245
+ if number not in [line_count , line_count + 1 , line_count + 2 ]:
246
+ file .write (line )
247
+ print ("logged out of user %s" % self .username )
90
248
91
249
92
- class PasswordUserAuthentication ( Authentication ):
250
+ class KubeConfigFileAuthentication ( KubeConfiguration ):
93
251
"""
94
- `PasswordUserAuthentication` is a subclass of `Authentication`. It can be used to authenticate to an OpenShift
95
- cluster when the user has a username and password .
252
+ An abstract class that defines the necessary methods for passing a user's own Kubernetes config file.
253
+ Specifically this class defines the `load_kube_config()`, `config_check()` and `remove_config()` functions .
96
254
"""
97
255
98
- def __init__ (
99
- self ,
100
- username : str = None ,
101
- password : str = None ,
102
- ):
103
- """
104
- Initialize a PasswordUserAuthentication object that requires a value for `username`
105
- and `password` for authenticating to an OpenShift cluster.
106
- """
107
- self .username = username
108
- self .password = password
256
+ def __init__ (self , kube_config_path : str = None ):
257
+ self .kube_config_path = kube_config_path
109
258
110
- def login (self ) -> str :
111
- """
112
- This function is used to login to an OpenShift cluster using the user's `username` and `password`.
113
- """
114
- response = oc .login (self .username , self .password )
115
- return response .out ()
259
+ def load_kube_config (self ):
260
+ global path_set
261
+ try :
262
+ path_set = True
263
+ print ("Loaded user config file at path %s" % self .kube_config_path )
264
+ response = config .load_kube_config (self .kube_config_path )
265
+ except config .ConfigException :
266
+ path_set = False
267
+ raise Exception ("Please specify a config file path" )
268
+ return response
116
269
117
- def logout (self ) -> str :
118
- """
119
- This function is used to logout of an OpenShift cluster.
120
- """
121
- response = oc .invoke ("logout" )
122
- return response .out ()
270
+ def config_check ():
271
+ if path_set == False :
272
+ config .load_kube_config ()
273
+
274
+ def remove_config (self ) -> str :
275
+ global path_set
276
+ path_set = False
277
+ os .remove (self .kube_config_path )
278
+ print ("Removed config file" )
0 commit comments