20
20
"""
21
21
22
22
import abc
23
- import pathlib
24
- from kubernetes import config
25
- from jinja2 import Environment , FileSystemLoader
26
23
import os
24
+ from kubernetes import client , config
27
25
28
26
global path_set
29
27
path_set = False
30
28
31
- """
32
- auth = KubeConfigFileAuthentication(
33
- kube_config_path="config"
34
- )
35
- auth.load_kube_config()
36
-
37
-
38
- """
39
-
40
29
41
30
class Authentication (metaclass = abc .ABCMeta ):
42
31
"""
@@ -83,7 +72,7 @@ def logout(self):
83
72
84
73
class TokenAuthentication (Authentication ):
85
74
"""
86
- `TokenAuthentication` is a subclass of `Authentication`. It can be used to authenticate to an OpenShift
75
+ `TokenAuthentication` is a subclass of `Authentication`. It can be used to authenticate to a Kubernetes
87
76
cluster when the user has an API token and the API server address.
88
77
"""
89
78
@@ -93,186 +82,88 @@ def __init__(
93
82
server : str = None ,
94
83
skip_tls : bool = False ,
95
84
ca_cert_path : str = "/etc/pki/tls/certs/ca-bundle.crt" ,
96
- username : str = "user" ,
97
85
):
98
86
"""
99
87
Initialize a TokenAuthentication object that requires a value for `token`, the API Token
100
- and `server`, the API server address for authenticating to an OpenShift cluster.
88
+ and `server`, the API server address for authenticating to a Kubernetes cluster.
101
89
"""
102
90
103
91
self .token = token
104
92
self .server = server
105
93
self .skip_tls = skip_tls
106
94
self .ca_cert_path = ca_cert_path
107
- self .username = username
108
95
109
96
def login (self ) -> str :
110
97
"""
111
- This function is used to login to a Kubernetes cluster using the user's API token and API server address.
98
+ This function is used to log in to a Kubernetes cluster using the user's API token and API server address.
112
99
Depending on the cluster, a user can choose to login in with `--insecure-skip-tls-verify` by setting `skip_tls`
113
100
to `True` or `--certificate-authority` by setting `skip_tls` to false and providing a path to a ca bundle with `ca_cert_path`.
114
-
115
- If a user does not have a Kubernetes config file one is created from a template with the appropriate user functionality
116
- and if they do it is updated with new credentials.
117
101
"""
118
- dir = pathlib . Path ( __file__ ). parent . parent . resolve ()
119
- home = os . path . expanduser ( "~" )
102
+ global path_set
103
+ global api_client
120
104
try :
121
- security = "insecure-skip-tls-verify: false"
105
+ configuration = client .Configuration ()
106
+ configuration .api_key_prefix ["authorization" ] = "Bearer"
107
+ configuration .host = self .server
108
+ configuration .api_key ["authorization" ] = self .token
122
109
if self .skip_tls == False :
123
- security = "certificate-authority: %s" % self .ca_cert_path
110
+ configuration . ssl_ca_cert = self .ca_cert_path
124
111
else :
125
- security = "insecure-skip-tls-verify: true"
126
-
127
- env = Environment (
128
- loader = FileSystemLoader (f"{ dir } /templates" ),
129
- trim_blocks = True ,
130
- lstrip_blocks = True ,
131
- )
132
- template = env .get_template ("config.yaml" )
133
- server = self .server
134
- cluster_name = server [8 :].replace ("." , "-" )
135
- # If there is no .kube folder it is created.
136
- if not os .path .isdir ("%s/.kube" % home ):
137
- os .mkdir ("%s/.kube" % home )
138
-
139
- # If a config file exists then it will be updated with new fields and values.
140
- if os .path .isfile ("%s/.kube/config" % home ):
141
- file = open (r"%s/.kube/config" % home , "r" ).readlines ()
142
- write_file = open (r"%s/.kube/config" % home , "w" )
143
- existing = False
144
- # Check for existing config
145
- for line in file :
146
- if self .server in line :
147
- existing = True
148
-
149
- if existing == False :
150
- for line in file :
151
- # All of these fields are given new lines underneath with credentials info.
152
- if "clusters:" in line :
153
- write_file .write (line )
154
- write_file .write (
155
- "- cluster:\n %(security)s\n server: %(server)s\n name: %(cluster)s\n "
156
- % {
157
- "security" : security ,
158
- "server" : self .server ,
159
- "cluster" : cluster_name ,
160
- }
161
- )
162
- continue
163
- if "contexts:" in line :
164
- write_file .write (line )
165
- write_file .write (
166
- "- context:\n cluster: %(cluster)s\n namespace: default\n user: %(user)s/%(cluster)s\n name: default/%(cluster)s/%(user)s\n "
167
- % {"cluster" : cluster_name , "user" : self .username }
168
- )
169
- continue
170
- if "current-context:" in line :
171
- write_file .write (
172
- "current-context: default/{}/{}\n " .format (
173
- cluster_name , self .username
174
- )
175
- )
176
- continue
177
- if "users:" in line :
178
- write_file .write (line )
179
- write_file .write (
180
- "- name: {}/{}\n user:\n token: {}\n " .format (
181
- self .username , cluster_name , self .token
182
- )
183
- )
184
- continue
185
-
186
- write_file .write (line )
187
- else :
188
- # If there is an existing config just update the token and username
189
- for line in file :
190
- if "users:" in line :
191
- write_file .write (line )
192
- write_file .write (
193
- "- name: {}/{}\n user:\n token: {}\n " .format (
194
- self .username , cluster_name , self .token
195
- )
196
- )
197
- continue
198
- write_file .write (line )
112
+ configuration .verify_ssl = False
113
+ api_client = client .ApiClient (configuration )
114
+ path_set = False
115
+ return "Logged into %s" % self .server
116
+ except client .ApiException as exception :
117
+ return exception
199
118
200
- response = "Updated config file at %s/.kube/config" % home
201
- else :
202
- # Create a new config file from the config template and store it in HOME/.kube
203
- file = open ("%s/.kube/config" % home , "w" )
204
- file .write (
205
- template .render (
206
- security = security ,
207
- server = server ,
208
- cluster = cluster_name ,
209
- context_name = "default/{}/{}" .format (
210
- cluster_name , self .username
211
- ),
212
- current_context = "default/{}/{}" .format (
213
- cluster_name , self .username
214
- ),
215
- username = "{}/{}" .format (self .username , cluster_name ),
216
- token = self .token ,
217
- )
218
- )
219
- response = (
220
- "Logged in and created new config file at %s/.kube/config" % home
221
- )
222
- except :
223
- response = "Error logging in. Have you inputted correct credentials?"
224
- return response
119
+ def api_config_handler ():
120
+ """
121
+ This function is used to load the api client if the user has logged in
122
+ """
123
+ if api_client != None and path_set == False :
124
+ return api_client
125
+ else :
126
+ return None
225
127
226
128
def logout (self ) -> str :
227
129
"""
228
130
This function is used to logout of a Kubernetes cluster.
229
131
"""
230
- home = os .path .expanduser ("~" )
231
- file = open (r"%s/.kube/config" % home , "r" )
232
- lines = file .readlines ()
233
- line_count = 0
234
- for line in lines :
235
- if (
236
- "- name: {}/{}" .format (self .username , self .server [8 :].replace ("." , "-" ))
237
- not in line .strip ()
238
- ):
239
- line_count = line_count + 1
240
- else :
241
- break
242
- # The name, user and token are removed from the config file
243
- with open (r"%s/.kube/config" % home , "w" ) as file :
244
- for number , line in enumerate (lines ):
245
- if number not in [line_count , line_count + 1 , line_count + 2 ]:
246
- file .write (line )
247
- print ("logged out of user %s" % self .username )
132
+ global path_set
133
+ path_set = False
134
+ global api_client
135
+ api_client = None
248
136
249
137
250
138
class KubeConfigFileAuthentication (KubeConfiguration ):
251
139
"""
252
140
A class that defines the necessary methods for passing a user's own Kubernetes config file.
253
- Specifically this class defines the `load_kube_config()`, `config_check()` and `remove_config ()` functions.
141
+ Specifically this class defines the `load_kube_config()` and `config_check ()` functions.
254
142
"""
255
143
256
144
def __init__ (self , kube_config_path : str = None ):
257
145
self .kube_config_path = kube_config_path
258
146
259
147
def load_kube_config (self ):
148
+ """
149
+ Function for loading a user's own predefined Kubernetes config file.
150
+ """
260
151
global path_set
152
+ global api_client
261
153
try :
262
154
path_set = True
263
- print ("Loaded user config file at path %s" % self .kube_config_path )
264
- response = config .load_kube_config (self .kube_config_path )
155
+ api_client = None
156
+ config .load_kube_config (self .kube_config_path )
157
+ response = "Loaded user config file at path %s" % self .kube_config_path
265
158
except config .ConfigException :
266
159
path_set = False
267
160
raise Exception ("Please specify a config file path" )
268
161
return response
269
162
270
163
def config_check ():
271
- if path_set == False :
164
+ """
165
+ Function for loading the config file at the default config location ~/.kube/config if the user has not
166
+ specified their own config file or has logged in with their token and server.
167
+ """
168
+ if path_set == False and api_client == None :
272
169
config .load_kube_config ()
273
-
274
- def remove_config (self ) -> str :
275
- global path_set
276
- path_set = False
277
- os .remove (self .kube_config_path )
278
- print ("Removed config file" )
0 commit comments