-
Notifications
You must be signed in to change notification settings - Fork 218
Changes for OWLS-83431 #1855
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes for OWLS-83431 #1855
Changes from 5 commits
4994a38
6dc9cdb
619cc7d
5b2266f
beeb8e0
00edd0a
f4d2039
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,7 @@ | |
import oracle.kubernetes.weblogic.domain.model.ServerSpec; | ||
import oracle.kubernetes.weblogic.domain.model.Shutdown; | ||
|
||
import static oracle.kubernetes.operator.LabelConstants.CLUSTERNAME_LABEL; | ||
import static oracle.kubernetes.operator.ProcessingConstants.SERVERS_TO_ROLL; | ||
|
||
public class PodHelper { | ||
|
@@ -85,6 +86,59 @@ public static boolean isReady(V1Pod pod) { | |
return ready; | ||
} | ||
|
||
/** | ||
* Get list of scheduled pods. | ||
* @param info Domain presence info | ||
* @return list containing scheduled pods | ||
*/ | ||
public static List<String> getScheduledPods(DomainPresenceInfo info, String clusterName) { | ||
rjeberhard marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// These are presently scheduled servers | ||
List<String> scheduledServers = new ArrayList<>(); | ||
for (Map.Entry<String, ServerKubernetesObjects> entry : info.getServers().entrySet()) { | ||
V1Pod pod = entry.getValue().getPod().get(); | ||
if (pod != null && !PodHelper.isDeleting(pod) && PodHelper.getScheduledStatus(pod)) { | ||
String wlsClusterName = pod.getMetadata().getLabels().get(CLUSTERNAME_LABEL); | ||
if ((wlsClusterName == null) || (wlsClusterName.contains(clusterName))) { | ||
scheduledServers.add(entry.getKey()); | ||
} | ||
} | ||
} | ||
return scheduledServers; | ||
} | ||
|
||
/** | ||
* Get list of ready pods. | ||
* @param info Domain presence info | ||
* @return list containing ready pods | ||
*/ | ||
public static List<String> getReadyPods(DomainPresenceInfo info, String clusterName) { | ||
// These are presently Ready servers | ||
List<String> readyServers = new ArrayList<>(); | ||
for (Map.Entry<String, ServerKubernetesObjects> entry : info.getServers().entrySet()) { | ||
V1Pod pod = entry.getValue().getPod().get(); | ||
if (pod != null && !PodHelper.isDeleting(pod) && PodHelper.getReadyStatus(pod)) { | ||
String wlsClusterName = pod.getMetadata().getLabels().get(CLUSTERNAME_LABEL); | ||
if ((wlsClusterName == null) || (wlsClusterName.contains(clusterName))) { | ||
readyServers.add(entry.getKey()); | ||
} | ||
} | ||
} | ||
return readyServers; | ||
} | ||
|
||
/** | ||
* get if pod is in scheduled state. | ||
* @param pod pod | ||
* @return true, if pod is scheduled | ||
*/ | ||
public static boolean getScheduledStatus(V1Pod pod) { | ||
V1PodSpec status = pod.getSpec(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix variable name There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
if (status != null) { | ||
return status.getNodeName() != null; | ||
} | ||
return false; | ||
} | ||
|
||
/** | ||
* get if pod is in ready state. | ||
* @param pod pod | ||
|
@@ -444,7 +498,7 @@ protected String getPodReplacedMessageKey() { | |
protected V1ObjectMeta createMetadata() { | ||
V1ObjectMeta metadata = super.createMetadata(); | ||
if (getClusterName() != null) { | ||
metadata.putLabelsItem(LabelConstants.CLUSTERNAME_LABEL, getClusterName()); | ||
metadata.putLabelsItem(CLUSTERNAME_LABEL, getClusterName()); | ||
} | ||
return metadata; | ||
} | ||
|
@@ -502,7 +556,7 @@ public NextAction apply(Packet packet) { | |
if (oldPod != null) { | ||
Map<String, String> labels = oldPod.getMetadata().getLabels(); | ||
if (labels != null) { | ||
clusterName = labels.get(LabelConstants.CLUSTERNAME_LABEL); | ||
clusterName = labels.get(CLUSTERNAME_LABEL); | ||
} | ||
|
||
ServerSpec serverSpec = info.getDomain().getServer(serverName, clusterName); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,16 +4,17 @@ | |
package oracle.kubernetes.operator.steps; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.Queue; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentLinkedQueue; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.IntStream; | ||
|
||
import oracle.kubernetes.operator.DomainStatusUpdater; | ||
import oracle.kubernetes.operator.ProcessingConstants; | ||
|
@@ -43,9 +44,6 @@ public class ManagedServerUpIteratorStep extends Step { | |
|
||
private final Collection<ServerStartupInfo> startupInfos; | ||
|
||
private static NextStepFactory NEXT_STEP_FACTORY = | ||
(next) -> DomainStatusUpdater.createStatusUpdateStep(next); | ||
|
||
public ManagedServerUpIteratorStep(Collection<ServerStartupInfo> startupInfos, Step next) { | ||
super(next); | ||
this.startupInfos = startupInfos; | ||
|
@@ -89,12 +87,26 @@ public NextAction apply(Packet packet) { | |
.filter(ssi -> !isServerInCluster(ssi)) | ||
.map(ssi -> createManagedServerUpDetails(packet, ssi)).collect(Collectors.toList()); | ||
|
||
getStartClusteredServersStepFactories(startupInfos, packet).values() | ||
.forEach(factory -> startDetails.addAll(factory.getServerStartsStepAndPackets())); | ||
Collection<StepAndPacket> work = new ArrayList<>(); | ||
if (!startDetails.isEmpty()) { | ||
work.add( | ||
new StepAndPacket( | ||
new StartManagedServersStep(null, startDetails, null), packet)); | ||
} | ||
|
||
for (Map.Entry<String, StartClusteredServersStepFactory> entry | ||
: getStartClusteredServersStepFactories(startupInfos, packet).entrySet()) { | ||
work.add( | ||
new StepAndPacket( | ||
new StartManagedServersStep(entry.getKey(), entry.getValue().getServerStartsStepAndPackets(), | ||
null), packet.clone())); | ||
} | ||
|
||
if (!work.isEmpty()) { | ||
return doForkJoin(DomainStatusUpdater.createStatusUpdateStep(getNext()), packet, work); | ||
} | ||
|
||
return doNext( | ||
NEXT_STEP_FACTORY.createStatusUpdateStep(new StartManagedServersStep(startDetails, getNext())), | ||
packet); | ||
return doNext(DomainStatusUpdater.createStatusUpdateStep(getNext()), packet); | ||
} | ||
|
||
|
||
|
@@ -142,19 +154,53 @@ private boolean isServerInCluster(ServerStartupInfo ssi) { | |
|
||
static class StartManagedServersStep extends Step { | ||
final Collection<StepAndPacket> startDetails; | ||
final Queue<StepAndPacket> startDetailsQueue = new ConcurrentLinkedQueue<>(); | ||
final String clusterName; | ||
int numStarted = 0; | ||
int maxConcurrency = 0; | ||
|
||
StartManagedServersStep(Collection<StepAndPacket> startDetails, Step next) { | ||
StartManagedServersStep(String clusterName, Collection<StepAndPacket> startDetails, Step next) { | ||
super(next); | ||
this.clusterName = clusterName; | ||
this.startDetails = startDetails; | ||
startDetails.forEach(this::add); | ||
} | ||
|
||
Collection<StepAndPacket> getStartDetails() { | ||
return startDetails; | ||
void add(StepAndPacket serverToStart) { | ||
startDetailsQueue.add(new StepAndPacket(serverToStart.step, serverToStart.packet)); | ||
this.maxConcurrency = Optional.ofNullable( | ||
(Integer) serverToStart.packet.get(ProcessingConstants.MAX_CONCURRENCY)).orElse(0); | ||
} | ||
|
||
@Override | ||
public NextAction apply(Packet packet) { | ||
return doForkJoin(new ManagedServerUpAfterStep(getNext()), packet, startDetails); | ||
|
||
if (startDetailsQueue.isEmpty()) { | ||
return doNext(new ManagedServerUpAfterStep(getNext()), packet); | ||
} else if (isServiceOnlyOrShuttingDown()) { | ||
Collection<StepAndPacket> servers = Collections.singletonList(startDetailsQueue.poll()); | ||
return doForkJoin(this, packet, servers); | ||
} else if (serverAvailableToStart(packet.getSpi(DomainPresenceInfo.class))) { | ||
this.numStarted++; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the concurrency of this code? Will multiple threads ever be running here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's one thread per cluster and one thread for non-clustered servers. It'll be single threaded for one cluster and 2 threads for 2 cluster case. If domain also have non-clustered (standalone) servers, it'll create a separate fiber for those servers. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me be clearer in my question. Do I need to worry about the concurrency of updating the numStarted variable? I can't really tell from your answer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for not being clear in my previous answer. In 2 cluster scenario, 2 threads could reading/updating in parallel but overall concurrency will be low. I have changed |
||
return doForkJoin(this, packet, Collections.singletonList(startDetailsQueue.poll())); | ||
} else { | ||
return doDelay(this, packet, 100, TimeUnit.MILLISECONDS); | ||
} | ||
} | ||
|
||
private boolean isServiceOnlyOrShuttingDown() { | ||
return Optional.ofNullable(startDetailsQueue.peek().step) | ||
.map(step -> step.getNext() instanceof ServerDownStep).orElse(false); | ||
} | ||
|
||
private boolean serverAvailableToStart(DomainPresenceInfo info) { | ||
return ((this.numStarted < PodHelper.getScheduledPods(info, clusterName).size()) | ||
rjeberhard marked this conversation as resolved.
Show resolved
Hide resolved
|
||
&& (canStartConcurrently(PodHelper.getReadyPods(info, clusterName).size()))); | ||
} | ||
|
||
private boolean canStartConcurrently(int numReady) { | ||
return ((this.maxConcurrency > 0) && (this.numStarted < (this.maxConcurrency + numReady - 1))) | ||
rjeberhard marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|| (this.maxConcurrency == 0); | ||
} | ||
} | ||
|
||
|
@@ -172,57 +218,13 @@ private static class StartClusteredServersStepFactory { | |
} | ||
|
||
void add(StepAndPacket serverToStart) { | ||
serverToStart.packet.put(ProcessingConstants.MAX_CONCURRENCY, maxConcurrency); | ||
serversToStart.add(serverToStart); | ||
} | ||
|
||
Collection<StepAndPacket> getServerStartsStepAndPackets() { | ||
if (maxConcurrency == 0 || serversToStart.size() <= maxConcurrency) { | ||
return serversToStart; | ||
} | ||
ArrayList<StepAndPacket> steps = new ArrayList<>(maxConcurrency); | ||
IntStream.range(0, maxConcurrency) | ||
.forEach(i -> steps.add(StartClusteredServersStep.createStepAndPacket(serversToStart))); | ||
return steps; | ||
} | ||
|
||
} | ||
|
||
static class StartClusteredServersStep extends Step { | ||
|
||
private final Queue<StepAndPacket> serversToStart; | ||
|
||
static StepAndPacket createStepAndPacket(Queue<StepAndPacket> serversToStart) { | ||
return new StepAndPacket(new StartClusteredServersStep(serversToStart), null); | ||
} | ||
|
||
StartClusteredServersStep(Queue<StepAndPacket> serversToStart) { | ||
super(null); | ||
this.serversToStart = serversToStart; | ||
serversToStart.forEach(stepAndPacket -> setupSequentialStartPacket(stepAndPacket.packet)); | ||
} | ||
|
||
Collection<StepAndPacket> getServersToStart() { | ||
return serversToStart; | ||
} | ||
|
||
private void setupSequentialStartPacket(Packet packet) { | ||
packet.put(ProcessingConstants.WAIT_FOR_POD_READY, true); | ||
} | ||
|
||
@Override | ||
public NextAction apply(Packet packet) { | ||
Collection<StepAndPacket> servers = Arrays.asList(serversToStart.poll()); | ||
if (servers.isEmpty()) { | ||
return doNext(packet); | ||
} else { | ||
return doForkJoin(this, packet, servers); | ||
} | ||
} | ||
} | ||
|
||
// an interface to provide a hook for unit testing. | ||
interface NextStepFactory { | ||
Step createStatusUpdateStep(Step next); | ||
} | ||
|
||
} |
Uh oh!
There was an error while loading. Please reload this page.