24
24
import org .elasticsearch .action .TaskOperationFailure ;
25
25
import org .elasticsearch .action .admin .cluster .node .tasks .list .TaskInfo ;
26
26
import org .elasticsearch .action .support .ActionFilters ;
27
- import org .elasticsearch .action .support .tasks .BaseTasksRequest ;
28
27
import org .elasticsearch .action .support .tasks .TransportTasksAction ;
29
28
import org .elasticsearch .cluster .ClusterName ;
30
29
import org .elasticsearch .cluster .ClusterService ;
36
35
import org .elasticsearch .common .io .stream .StreamOutput ;
37
36
import org .elasticsearch .common .settings .Settings ;
38
37
import org .elasticsearch .tasks .CancellableTask ;
38
+ import org .elasticsearch .tasks .TaskId ;
39
39
import org .elasticsearch .threadpool .ThreadPool ;
40
40
import org .elasticsearch .transport .EmptyTransportResponseHandler ;
41
41
import org .elasticsearch .transport .TransportChannel ;
@@ -84,17 +84,17 @@ protected TaskInfo readTaskResponse(StreamInput in) throws IOException {
84
84
}
85
85
86
86
protected void processTasks (CancelTasksRequest request , Consumer <CancellableTask > operation ) {
87
- if (request .taskId () != BaseTasksRequest . ALL_TASKS ) {
87
+ if (request .taskId (). isSet () == false ) {
88
88
// we are only checking one task, we can optimize it
89
- CancellableTask task = taskManager .getCancellableTask (request .taskId ());
89
+ CancellableTask task = taskManager .getCancellableTask (request .taskId (). getId () );
90
90
if (task != null ) {
91
91
if (request .match (task )) {
92
92
operation .accept (task );
93
93
} else {
94
94
throw new IllegalArgumentException ("task [" + request .taskId () + "] doesn't support this operation" );
95
95
}
96
96
} else {
97
- if (taskManager .getTask (request .taskId ()) != null ) {
97
+ if (taskManager .getTask (request .taskId (). getId () ) != null ) {
98
98
// The task exists, but doesn't support cancellation
99
99
throw new IllegalArgumentException ("task [" + request .taskId () + "] doesn't support cancellation" );
100
100
} else {
@@ -135,11 +135,14 @@ protected boolean accumulateExceptions() {
135
135
}
136
136
137
137
private void setBanOnNodes (String reason , CancellableTask task , Set <String > nodes , BanLock banLock ) {
138
- sendSetBanRequest (nodes , new BanParentTaskRequest (clusterService .localNode ().getId (), task .getId (), reason ), banLock );
138
+ sendSetBanRequest (nodes ,
139
+ BanParentTaskRequest .createSetBanParentTaskRequest (new TaskId (clusterService .localNode ().getId (), task .getId ()), reason ),
140
+ banLock );
139
141
}
140
142
141
143
private void removeBanOnNodes (CancellableTask task , Set <String > nodes ) {
142
- sendRemoveBanRequest (nodes , new BanParentTaskRequest (clusterService .localNode ().getId (), task .getId ()));
144
+ sendRemoveBanRequest (nodes ,
145
+ BanParentTaskRequest .createRemoveBanParentTaskRequest (new TaskId (clusterService .localNode ().getId (), task .getId ())));
143
146
}
144
147
145
148
private void sendSetBanRequest (Set <String > nodes , BanParentTaskRequest request , BanLock banLock ) {
@@ -148,8 +151,8 @@ private void sendSetBanRequest(Set<String> nodes, BanParentTaskRequest request,
148
151
DiscoveryNode discoveryNode = clusterState .getNodes ().get (node );
149
152
if (discoveryNode != null ) {
150
153
// Check if node still in the cluster
151
- logger .debug ("Sending ban for tasks with the parent [{}:{} ] to the node [{}], ban [{}]" , request .parentNodeId , request
152
- . parentTaskId , node , request .ban );
154
+ logger .debug ("Sending ban for tasks with the parent [{}] to the node [{}], ban [{}]" , request .parentTaskId , node ,
155
+ request .ban );
153
156
transportService .sendRequest (discoveryNode , BAN_PARENT_ACTION_NAME , request ,
154
157
new EmptyTransportResponseHandler (ThreadPool .Names .SAME ) {
155
158
@ Override
@@ -164,8 +167,8 @@ public void handleException(TransportException exp) {
164
167
});
165
168
} else {
166
169
banLock .onBanSet ();
167
- logger .debug ("Cannot send ban for tasks with the parent [{}:{} ] to the node [{}] - the node no longer in the cluster" ,
168
- request .parentNodeId , request . parentTaskId , node );
170
+ logger .debug ("Cannot send ban for tasks with the parent [{}] to the node [{}] - the node no longer in the cluster" ,
171
+ request .parentTaskId , node );
169
172
}
170
173
}
171
174
}
@@ -176,13 +179,12 @@ private void sendRemoveBanRequest(Set<String> nodes, BanParentTaskRequest reques
176
179
DiscoveryNode discoveryNode = clusterState .getNodes ().get (node );
177
180
if (discoveryNode != null ) {
178
181
// Check if node still in the cluster
179
- logger .debug ("Sending remove ban for tasks with the parent [{}:{}] to the node [{}]" , request .parentNodeId ,
180
- request .parentTaskId , node );
182
+ logger .debug ("Sending remove ban for tasks with the parent [{}] to the node [{}]" , request .parentTaskId , node );
181
183
transportService .sendRequest (discoveryNode , BAN_PARENT_ACTION_NAME , request , EmptyTransportResponseHandler
182
184
.INSTANCE_SAME );
183
185
} else {
184
- logger .debug ("Cannot send remove ban request for tasks with the parent [{}:{} ] to the node [{}] - the node no longer in " +
185
- "the cluster" , request .parentNodeId , request . parentTaskId , node );
186
+ logger .debug ("Cannot send remove ban request for tasks with the parent [{}] to the node [{}] - the node no longer in " +
187
+ "the cluster" , request .parentTaskId , node );
186
188
}
187
189
}
188
190
}
@@ -218,23 +220,27 @@ public void finish() {
218
220
219
221
private static class BanParentTaskRequest extends TransportRequest {
220
222
221
- private String parentNodeId ;
222
-
223
- private long parentTaskId ;
223
+ private TaskId parentTaskId ;
224
224
225
225
private boolean ban ;
226
226
227
227
private String reason ;
228
228
229
- BanParentTaskRequest (String parentNodeId , long parentTaskId , String reason ) {
230
- this .parentNodeId = parentNodeId ;
229
+ static BanParentTaskRequest createSetBanParentTaskRequest (TaskId parentTaskId , String reason ) {
230
+ return new BanParentTaskRequest (parentTaskId , reason );
231
+ }
232
+
233
+ static BanParentTaskRequest createRemoveBanParentTaskRequest (TaskId parentTaskId ) {
234
+ return new BanParentTaskRequest (parentTaskId );
235
+ }
236
+
237
+ private BanParentTaskRequest (TaskId parentTaskId , String reason ) {
231
238
this .parentTaskId = parentTaskId ;
232
239
this .ban = true ;
233
240
this .reason = reason ;
234
241
}
235
242
236
- BanParentTaskRequest (String parentNodeId , long parentTaskId ) {
237
- this .parentNodeId = parentNodeId ;
243
+ private BanParentTaskRequest (TaskId parentTaskId ) {
238
244
this .parentTaskId = parentTaskId ;
239
245
this .ban = false ;
240
246
}
@@ -245,8 +251,7 @@ public BanParentTaskRequest() {
245
251
@ Override
246
252
public void readFrom (StreamInput in ) throws IOException {
247
253
super .readFrom (in );
248
- parentNodeId = in .readString ();
249
- parentTaskId = in .readLong ();
254
+ parentTaskId = new TaskId (in );
250
255
ban = in .readBoolean ();
251
256
if (ban ) {
252
257
reason = in .readString ();
@@ -256,8 +261,7 @@ public void readFrom(StreamInput in) throws IOException {
256
261
@ Override
257
262
public void writeTo (StreamOutput out ) throws IOException {
258
263
super .writeTo (out );
259
- out .writeString (parentNodeId );
260
- out .writeLong (parentTaskId );
264
+ parentTaskId .writeTo (out );
261
265
out .writeBoolean (ban );
262
266
if (ban ) {
263
267
out .writeString (reason );
@@ -269,13 +273,13 @@ class BanParentRequestHandler implements TransportRequestHandler<BanParentTaskRe
269
273
@ Override
270
274
public void messageReceived (final BanParentTaskRequest request , final TransportChannel channel ) throws Exception {
271
275
if (request .ban ) {
272
- logger .debug ("Received ban for the parent [{}:{} ] on the node [{}], reason: [{}]" , request .parentNodeId , request
273
- . parentTaskId , clusterService .localNode ().getId (), request .reason );
274
- taskManager .setBan (request .parentNodeId , request . parentTaskId , request .reason );
276
+ logger .debug ("Received ban for the parent [{}] on the node [{}], reason: [{}]" , request .parentTaskId ,
277
+ clusterService .localNode ().getId (), request .reason );
278
+ taskManager .setBan (request .parentTaskId , request .reason );
275
279
} else {
276
- logger .debug ("Removing ban for the parent [{}:{} ] on the node [{}]" , request . parentNodeId , request .parentTaskId ,
280
+ logger .debug ("Removing ban for the parent [{}] on the node [{}]" , request .parentTaskId ,
277
281
clusterService .localNode ().getId ());
278
- taskManager .removeBan (request .parentNodeId , request . parentTaskId );
282
+ taskManager .removeBan (request .parentTaskId );
279
283
}
280
284
channel .sendResponse (TransportResponse .Empty .INSTANCE );
281
285
}
0 commit comments