@@ -37,6 +37,7 @@ class Action
37
37
# @since 6.2.0
38
38
def initialize ( definition )
39
39
@definition = definition
40
+ @retries = 0
40
41
end
41
42
42
43
# Execute the action. The method returns the client, in case the action created a new client
@@ -66,71 +67,9 @@ def execute(client, test = nil)
66
67
shadow_client . send ( method )
67
68
end
68
69
end
69
-
70
70
_method = chain [ -1 ]
71
- case _method
72
- when 'bulk'
73
- arguments = prepare_arguments ( args , test )
74
- arguments [ :body ] . map! do |item |
75
- if item . is_a? ( Hash )
76
- item
77
- elsif item . is_a? ( String )
78
- symbolize_keys ( JSON . parse ( item ) )
79
- end
80
- end if arguments [ :body ] . is_a? Array
81
- @response = client . send ( _method , arguments )
82
- client
83
- when 'headers'
84
- headers = prepare_arguments ( args , test )
85
- host = client . transport . instance_variable_get ( '@hosts' )
86
- transport_options = client . transport . instance_variable_get ( '@options' ) &.dig ( :transport_options ) || { }
87
- if ENV [ 'QUIET' ] == 'true'
88
- # todo: create a method on Elasticsearch::Client that can clone the client with new options
89
- Elasticsearch ::Client . new (
90
- host : host ,
91
- transport_options : transport_options . merge ( headers : headers )
92
- )
93
- else
94
- Elasticsearch ::Client . new (
95
- host : host ,
96
- tracer : Logger . new ( $stdout) ,
97
- transport_options : transport_options . merge ( headers : headers )
98
- )
99
- end
100
- when 'catch' , 'warnings' , 'allowed_warnings' , 'allowed_warnings_regex'
101
- client
102
- when 'put_trained_model_alias'
103
- args . merge! ( 'reassign' => true ) unless args [ 'reassign' ] === false
104
- @response = client . send ( _method , prepare_arguments ( args , test ) )
105
- client
106
- when 'create'
107
- begin
108
- @response = client . send ( _method , prepare_arguments ( args , test ) )
109
- rescue Elastic ::Transport ::Transport ::Errors ::BadRequest => e
110
- case e . message
111
- when /resource_already_exists_exception/
112
- client . delete ( index : args [ 'index' ] )
113
- when /failed to parse date field/
114
- body = args [ 'body' ]
115
- time_series = body [ 'settings' ] [ 'index' ] [ 'time_series' ]
116
- time_series . each { |k , v | time_series [ k ] = v . strftime ( "%FT%TZ" ) }
117
- args [ 'body' ] = body
118
- else
119
- raise e
120
- end
121
- @response = client . send ( _method , prepare_arguments ( args , test ) )
122
- end
123
- client
124
- when 'update_user_profile_data' , 'get_user_profile' , 'enable_user_profile' , 'disable_user_profile'
125
- args . each do |key , value |
126
- args [ key ] = value . gsub ( value , test . cached_values [ value . gsub ( '$' , '' ) ] ) if value . match? ( /^\$ / )
127
- end
128
- @response = client . send ( _method , prepare_arguments ( args , test ) )
129
- client
130
- else
131
- @response = client . send ( _method , prepare_arguments ( args , test ) )
132
- client
133
- end
71
+
72
+ perform_action ( _method , args , client , test )
134
73
end
135
74
end
136
75
end
@@ -188,6 +127,79 @@ def perform_internal(method, args, client, test)
188
127
client
189
128
end
190
129
130
+ def perform_action ( _method , args , client , test )
131
+ case _method
132
+ when 'bulk'
133
+ arguments = prepare_arguments ( args , test )
134
+ arguments [ :body ] . map! do |item |
135
+ if item . is_a? ( Hash )
136
+ item
137
+ elsif item . is_a? ( String )
138
+ symbolize_keys ( JSON . parse ( item ) )
139
+ end
140
+ end if arguments [ :body ] . is_a? Array
141
+ @response = client . send ( _method , arguments )
142
+ client
143
+ when 'headers'
144
+ headers = prepare_arguments ( args , test )
145
+ host = client . transport . instance_variable_get ( '@hosts' )
146
+ transport_options = client . transport . instance_variable_get ( '@options' ) &.dig ( :transport_options ) || { }
147
+ if ENV [ 'QUIET' ] == 'true'
148
+ # todo: create a method on Elasticsearch::Client that can clone the client with new options
149
+ Elasticsearch ::Client . new (
150
+ host : host ,
151
+ transport_options : transport_options . merge ( headers : headers )
152
+ )
153
+ else
154
+ Elasticsearch ::Client . new (
155
+ host : host ,
156
+ tracer : Logger . new ( $stdout) ,
157
+ transport_options : transport_options . merge ( headers : headers )
158
+ )
159
+ end
160
+ when 'catch' , 'warnings' , 'allowed_warnings' , 'allowed_warnings_regex'
161
+ client
162
+ when 'put_trained_model_alias'
163
+ args . merge! ( 'reassign' => true ) unless args [ 'reassign' ] === false
164
+ @response = client . send ( _method , prepare_arguments ( args , test ) )
165
+ client
166
+ when 'create'
167
+ begin
168
+ @response = client . send ( _method , prepare_arguments ( args , test ) )
169
+ rescue Elastic ::Transport ::Transport ::Errors ::BadRequest => e
170
+ case e . message
171
+ when /resource_already_exists_exception/
172
+ client . delete ( index : args [ 'index' ] )
173
+ when /failed to parse date field/
174
+ body = args [ 'body' ]
175
+ time_series = body [ 'settings' ] [ 'index' ] [ 'time_series' ]
176
+ time_series . each { |k , v | time_series [ k ] = v . strftime ( "%FT%TZ" ) }
177
+ args [ 'body' ] = body
178
+ else
179
+ raise e
180
+ end
181
+ @response = client . send ( _method , prepare_arguments ( args , test ) )
182
+ end
183
+ client
184
+ when 'update_user_profile_data' , 'get_user_profile' , 'enable_user_profile' , 'disable_user_profile'
185
+ args . each do |key , value |
186
+ args [ key ] = value . gsub ( value , test . cached_values [ value . gsub ( '$' , '' ) ] ) if value . match? ( /^\$ / )
187
+ end
188
+ @response = client . send ( _method , prepare_arguments ( args , test ) )
189
+ client
190
+ else
191
+ @response = client . send ( _method , prepare_arguments ( args , test ) )
192
+ client
193
+ end
194
+ rescue Elastic ::Transport ::Transport ::Error => e
195
+ if e . message . match ( /Net::ReadTimeout/ ) && @retries <= 5
196
+ @retries += 1
197
+ perform_action ( method , args , client , test )
198
+ else
199
+ raise e
200
+ end
201
+ end
202
+
191
203
def prepare_arguments ( args , test )
192
204
symbolize_keys ( args ) . tap do |args |
193
205
if test
0 commit comments