Skip to content

Changes for OWLS-83431 #1855

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 12, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public interface ProcessingConstants {
String INTROSPECTOR_JOB_FAILURE_LOGGED = "introspectorJobfailureLogged";

String WAIT_FOR_POD_READY = "waitForPodReady";
String MAX_CONCURRENCY = "MaxConcurrency";

/** Key to an object of type MakeRightDomainOperation. */
String MAKE_RIGHT_DOMAIN_OPERATION = "makeRightOp";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import oracle.kubernetes.weblogic.domain.model.ServerSpec;
import oracle.kubernetes.weblogic.domain.model.Shutdown;

import static oracle.kubernetes.operator.LabelConstants.CLUSTERNAME_LABEL;
import static oracle.kubernetes.operator.ProcessingConstants.SERVERS_TO_ROLL;

public class PodHelper {
Expand Down Expand Up @@ -85,6 +86,59 @@ public static boolean isReady(V1Pod pod) {
return ready;
}

/**
* Get list of scheduled pods.
* @param info Domain presence info
* @return list containing scheduled pods
*/
public static List<String> getScheduledPods(DomainPresenceInfo info, String clusterName) {
// These are presently scheduled servers
List<String> scheduledServers = new ArrayList<>();
for (Map.Entry<String, ServerKubernetesObjects> entry : info.getServers().entrySet()) {
V1Pod pod = entry.getValue().getPod().get();
if (pod != null && !PodHelper.isDeleting(pod) && PodHelper.getScheduledStatus(pod)) {
String wlsClusterName = pod.getMetadata().getLabels().get(CLUSTERNAME_LABEL);
if ((wlsClusterName == null) || (wlsClusterName.contains(clusterName))) {
scheduledServers.add(entry.getKey());
}
}
}
return scheduledServers;
}

/**
* Get list of ready pods.
* @param info Domain presence info
* @return list containing ready pods
*/
public static List<String> getReadyPods(DomainPresenceInfo info, String clusterName) {
// These are presently Ready servers
List<String> readyServers = new ArrayList<>();
for (Map.Entry<String, ServerKubernetesObjects> entry : info.getServers().entrySet()) {
V1Pod pod = entry.getValue().getPod().get();
if (pod != null && !PodHelper.isDeleting(pod) && PodHelper.getReadyStatus(pod)) {
String wlsClusterName = pod.getMetadata().getLabels().get(CLUSTERNAME_LABEL);
if ((wlsClusterName == null) || (wlsClusterName.contains(clusterName))) {
readyServers.add(entry.getKey());
}
}
}
return readyServers;
}

/**
* get if pod is in scheduled state.
* @param pod pod
* @return true, if pod is scheduled
*/
public static boolean getScheduledStatus(V1Pod pod) {
V1PodSpec status = pod.getSpec();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix variable name

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

if (status != null) {
return status.getNodeName() != null;
}
return false;
}

/**
* get if pod is in ready state.
* @param pod pod
Expand Down Expand Up @@ -444,7 +498,7 @@ protected String getPodReplacedMessageKey() {
protected V1ObjectMeta createMetadata() {
V1ObjectMeta metadata = super.createMetadata();
if (getClusterName() != null) {
metadata.putLabelsItem(LabelConstants.CLUSTERNAME_LABEL, getClusterName());
metadata.putLabelsItem(CLUSTERNAME_LABEL, getClusterName());
}
return metadata;
}
Expand Down Expand Up @@ -502,7 +556,7 @@ public NextAction apply(Packet packet) {
if (oldPod != null) {
Map<String, String> labels = oldPod.getMetadata().getLabels();
if (labels != null) {
clusterName = labels.get(LabelConstants.CLUSTERNAME_LABEL);
clusterName = labels.get(CLUSTERNAME_LABEL);
}

ServerSpec serverSpec = info.getDomain().getServer(serverName, clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
package oracle.kubernetes.operator.steps;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import oracle.kubernetes.operator.DomainStatusUpdater;
import oracle.kubernetes.operator.ProcessingConstants;
Expand Down Expand Up @@ -43,9 +44,6 @@ public class ManagedServerUpIteratorStep extends Step {

private final Collection<ServerStartupInfo> startupInfos;

private static NextStepFactory NEXT_STEP_FACTORY =
(next) -> DomainStatusUpdater.createStatusUpdateStep(next);

public ManagedServerUpIteratorStep(Collection<ServerStartupInfo> startupInfos, Step next) {
super(next);
this.startupInfos = startupInfos;
Expand Down Expand Up @@ -89,12 +87,26 @@ public NextAction apply(Packet packet) {
.filter(ssi -> !isServerInCluster(ssi))
.map(ssi -> createManagedServerUpDetails(packet, ssi)).collect(Collectors.toList());

getStartClusteredServersStepFactories(startupInfos, packet).values()
.forEach(factory -> startDetails.addAll(factory.getServerStartsStepAndPackets()));
Collection<StepAndPacket> work = new ArrayList<>();
if (!startDetails.isEmpty()) {
work.add(
new StepAndPacket(
new StartManagedServersStep(null, startDetails, null), packet));
}

for (Map.Entry<String, StartClusteredServersStepFactory> entry
: getStartClusteredServersStepFactories(startupInfos, packet).entrySet()) {
work.add(
new StepAndPacket(
new StartManagedServersStep(entry.getKey(), entry.getValue().getServerStartsStepAndPackets(),
null), packet.clone()));
}

if (!work.isEmpty()) {
return doForkJoin(DomainStatusUpdater.createStatusUpdateStep(getNext()), packet, work);
}

return doNext(
NEXT_STEP_FACTORY.createStatusUpdateStep(new StartManagedServersStep(startDetails, getNext())),
packet);
return doNext(DomainStatusUpdater.createStatusUpdateStep(getNext()), packet);
}


Expand Down Expand Up @@ -142,19 +154,53 @@ private boolean isServerInCluster(ServerStartupInfo ssi) {

static class StartManagedServersStep extends Step {
final Collection<StepAndPacket> startDetails;
final Queue<StepAndPacket> startDetailsQueue = new ConcurrentLinkedQueue<>();
final String clusterName;
int numStarted = 0;
int maxConcurrency = 0;

StartManagedServersStep(Collection<StepAndPacket> startDetails, Step next) {
StartManagedServersStep(String clusterName, Collection<StepAndPacket> startDetails, Step next) {
super(next);
this.clusterName = clusterName;
this.startDetails = startDetails;
startDetails.forEach(this::add);
}

Collection<StepAndPacket> getStartDetails() {
return startDetails;
void add(StepAndPacket serverToStart) {
startDetailsQueue.add(new StepAndPacket(serverToStart.step, serverToStart.packet));
this.maxConcurrency = Optional.ofNullable(
(Integer) serverToStart.packet.get(ProcessingConstants.MAX_CONCURRENCY)).orElse(0);
}

@Override
public NextAction apply(Packet packet) {
return doForkJoin(new ManagedServerUpAfterStep(getNext()), packet, startDetails);

if (startDetailsQueue.isEmpty()) {
return doNext(new ManagedServerUpAfterStep(getNext()), packet);
} else if (isServiceOnlyOrShuttingDown()) {
Collection<StepAndPacket> servers = Collections.singletonList(startDetailsQueue.poll());
return doForkJoin(this, packet, servers);
} else if (serverAvailableToStart(packet.getSpi(DomainPresenceInfo.class))) {
this.numStarted++;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the concurrency of this code? Will multiple threads ever be running here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's one thread per cluster and one thread for non-clustered servers. It'll be single threaded for one cluster and 2 threads for 2 cluster case. If domain also have non-clustered (standalone) servers, it'll create a separate fiber for those servers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me be clearer in my question. Do I need to worry about the concurrency of updating the numStarted variable? I can't really tell from your answer.

Copy link
Member Author

@ankedia ankedia Aug 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for not being clear in my previous answer. In 2 cluster scenario, 2 threads could reading/updating in parallel but overall concurrency will be low. I have changed numStarted to AtomicInteger as we discussed and made numConcurrency final by setting it in constructor. Please let me know if I missed anything. Thanks.

return doForkJoin(this, packet, Collections.singletonList(startDetailsQueue.poll()));
} else {
return doDelay(this, packet, 100, TimeUnit.MILLISECONDS);
}
}

private boolean isServiceOnlyOrShuttingDown() {
return Optional.ofNullable(startDetailsQueue.peek().step)
.map(step -> step.getNext() instanceof ServerDownStep).orElse(false);
}

private boolean serverAvailableToStart(DomainPresenceInfo info) {
return ((this.numStarted < PodHelper.getScheduledPods(info, clusterName).size())
&& (canStartConcurrently(PodHelper.getReadyPods(info, clusterName).size())));
}

private boolean canStartConcurrently(int numReady) {
return ((this.maxConcurrency > 0) && (this.numStarted < (this.maxConcurrency + numReady - 1)))
|| (this.maxConcurrency == 0);
}
}

Expand All @@ -172,57 +218,13 @@ private static class StartClusteredServersStepFactory {
}

void add(StepAndPacket serverToStart) {
serverToStart.packet.put(ProcessingConstants.MAX_CONCURRENCY, maxConcurrency);
serversToStart.add(serverToStart);
}

Collection<StepAndPacket> getServerStartsStepAndPackets() {
if (maxConcurrency == 0 || serversToStart.size() <= maxConcurrency) {
return serversToStart;
}
ArrayList<StepAndPacket> steps = new ArrayList<>(maxConcurrency);
IntStream.range(0, maxConcurrency)
.forEach(i -> steps.add(StartClusteredServersStep.createStepAndPacket(serversToStart)));
return steps;
}

}

static class StartClusteredServersStep extends Step {

private final Queue<StepAndPacket> serversToStart;

static StepAndPacket createStepAndPacket(Queue<StepAndPacket> serversToStart) {
return new StepAndPacket(new StartClusteredServersStep(serversToStart), null);
}

StartClusteredServersStep(Queue<StepAndPacket> serversToStart) {
super(null);
this.serversToStart = serversToStart;
serversToStart.forEach(stepAndPacket -> setupSequentialStartPacket(stepAndPacket.packet));
}

Collection<StepAndPacket> getServersToStart() {
return serversToStart;
}

private void setupSequentialStartPacket(Packet packet) {
packet.put(ProcessingConstants.WAIT_FOR_POD_READY, true);
}

@Override
public NextAction apply(Packet packet) {
Collection<StepAndPacket> servers = Arrays.asList(serversToStart.poll());
if (servers.isEmpty()) {
return doNext(packet);
} else {
return doForkJoin(this, packet, servers);
}
}
}

// an interface to provide a hook for unit testing.
interface NextStepFactory {
Step createStatusUpdateStep(Step next);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public String getServiceAccountName() {
return serverPod.getServiceAccountName();
}

void setNodeName(String nodeName) {
public void setNodeName(String nodeName) {
serverPod.setNodeName(nodeName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class DomainProcessorTestSetup {
public static final String NS = "namespace";
public static final String SECRET_NAME = "secret-name";
public static final String KUBERNETES_UID = "12345";
public static final String NODE_NAME = "Node1";

private static final String INTROSPECTION_JOB = LegalNames.toJobIntrospectorName(UID);
private static final String INTROSPECT_RESULT =
Expand Down Expand Up @@ -88,13 +89,14 @@ private static V1ObjectMeta withTimestamps(V1ObjectMeta meta) {
* @return a domain
*/
public static Domain createTestDomain() {
DomainSpec ds = new DomainSpec()
.withWebLogicCredentialsSecret(new V1SecretReference().name(SECRET_NAME).namespace(NS));
ds.setNodeName(NODE_NAME);
return new Domain()
.withApiVersion(KubernetesConstants.DOMAIN_GROUP + "/" + KubernetesConstants.DOMAIN_VERSION)
.withKind(KubernetesConstants.DOMAIN)
.withMetadata(withTimestamps(new V1ObjectMeta().name(UID).namespace(NS).uid(KUBERNETES_UID)))
.withSpec(
new DomainSpec()
.withWebLogicCredentialsSecret(new V1SecretReference().name(SECRET_NAME).namespace(NS)));
.withSpec(ds);
}

/**
Expand Down
Loading