diff --git a/.gitignore b/.gitignore index 2e0f9123..2572169f 100644 --- a/.gitignore +++ b/.gitignore @@ -75,3 +75,4 @@ fabric.properties target pom-development.xml +.vscode/settings.json diff --git a/src/main/java/cws/k8s/scheduler/model/SchedulerConfig.java b/src/main/java/cws/k8s/scheduler/model/SchedulerConfig.java index 99539e68..f07b0438 100644 --- a/src/main/java/cws/k8s/scheduler/model/SchedulerConfig.java +++ b/src/main/java/cws/k8s/scheduler/model/SchedulerConfig.java @@ -1,10 +1,12 @@ package cws.k8s.scheduler.model; +import com.fasterxml.jackson.databind.JsonNode; import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.ToString; import java.util.List; +import java.util.Map; @ToString @NoArgsConstructor(access = AccessLevel.PRIVATE,force = true) @@ -17,10 +19,7 @@ public class SchedulerConfig { public final String namespace; public final String costFunction; public final String strategy; - - public final Integer maxCopyTasksPerNode; - - public final Integer maxWaitingCopyTasksPerNode; + public final Map additional; @ToString @NoArgsConstructor(access = AccessLevel.PRIVATE,force = true) @@ -30,4 +29,4 @@ public static class VolumeClaim { public final String subPath; } -} +} \ No newline at end of file diff --git a/src/main/java/cws/k8s/scheduler/model/TaskInput.java b/src/main/java/cws/k8s/scheduler/model/TaskInput.java index d0b8b3ab..b483b0be 100644 --- a/src/main/java/cws/k8s/scheduler/model/TaskInput.java +++ b/src/main/java/cws/k8s/scheduler/model/TaskInput.java @@ -1,12 +1,14 @@ package cws.k8s.scheduler.model; import lombok.AccessLevel; +import lombok.Getter; import lombok.NoArgsConstructor; import lombok.RequiredArgsConstructor; import java.util.List; @NoArgsConstructor( access = AccessLevel.PRIVATE, force = true ) +@Getter /** * Only for testing */ diff --git a/src/main/java/cws/k8s/scheduler/rest/SchedulerRestController.java b/src/main/java/cws/k8s/scheduler/rest/SchedulerRestController.java index 05b1e8ea..319935f6 100644 --- a/src/main/java/cws/k8s/scheduler/rest/SchedulerRestController.java +++ b/src/main/java/cws/k8s/scheduler/rest/SchedulerRestController.java @@ -6,10 +6,12 @@ import cws.k8s.scheduler.dag.Vertex; import cws.k8s.scheduler.model.SchedulerConfig; import cws.k8s.scheduler.model.TaskConfig; +import cws.k8s.scheduler.scheduler.NodeLabelAssign; import cws.k8s.scheduler.scheduler.PrioritizeAssignScheduler; import cws.k8s.scheduler.scheduler.Scheduler; import cws.k8s.scheduler.scheduler.prioritize.*; import cws.k8s.scheduler.scheduler.nodeassign.FairAssign; +import cws.k8s.scheduler.scheduler.nodeassign.LabelAssign; import cws.k8s.scheduler.scheduler.nodeassign.NodeAssign; import cws.k8s.scheduler.scheduler.nodeassign.RandomNodeAssign; import cws.k8s.scheduler.scheduler.nodeassign.RoundRobinAssign; @@ -32,6 +34,8 @@ import java.util.List; import java.util.Map; +import com.fasterxml.jackson.databind.ObjectMapper; + @RestController @Slf4j @EnableScheduling @@ -99,16 +103,23 @@ ResponseEntity registerScheduler( Scheduler scheduler; + // ObjectMapper objetMapper = new ObjectMapper(); + // Map nodelabel = objectMapper.convertValue(config.additional.get("tasklabelconfig"),Map.class); if ( schedulerHolder.containsKey( execution ) ) { return noSchedulerFor( execution ); } + + Prioritize prioritize; + NodeAssign assign; + switch ( strategy.toLowerCase() ){ + case "nodelabelassign": + scheduler = new NodeLabelAssign(execution, client, namespace, config); + break; default: { final String[] split = strategy.split( "-" ); - Prioritize prioritize; - NodeAssign assign; if ( split.length <= 2 ) { switch ( split[0].toLowerCase() ) { case "fifo": prioritize = new FifoPrioritize(); break; diff --git a/src/main/java/cws/k8s/scheduler/scheduler/NodeLabelAssign.java b/src/main/java/cws/k8s/scheduler/scheduler/NodeLabelAssign.java new file mode 100644 index 00000000..d0732992 --- /dev/null +++ b/src/main/java/cws/k8s/scheduler/scheduler/NodeLabelAssign.java @@ -0,0 +1,97 @@ +package cws.k8s.scheduler.scheduler; + +import cws.k8s.scheduler.model.*; +import cws.k8s.scheduler.scheduler.prioritize.Prioritize; +import cws.k8s.scheduler.scheduler.prioritize.RankMaxPrioritize; +import cws.k8s.scheduler.client.Informable; +import cws.k8s.scheduler.client.KubernetesClient; +import cws.k8s.scheduler.scheduler.nodeassign.LabelAssign; +import cws.k8s.scheduler.scheduler.nodeassign.NodeAssign; +import cws.k8s.scheduler.scheduler.nodeassign.FairAssign; +import cws.k8s.scheduler.util.NodeTaskAlignment; +import lombok.extern.slf4j.Slf4j; + +import java.util.*; +import java.util.stream.Collector; +import java.util.stream.Collectors; + +@Slf4j +public class NodeLabelAssign extends Scheduler { + + private final Prioritize prioritize; + private final NodeAssign nodeAssigner; + private final NodeAssign nodeLabelAssigner; + + public NodeLabelAssign(String execution, KubernetesClient client, String namespace, SchedulerConfig config) { + this(execution, client, namespace, config, new RankMaxPrioritize(), new LabelAssign(config), new FairAssign()); + } + + public NodeLabelAssign( String execution, + KubernetesClient client, + String namespace, + SchedulerConfig config, + Prioritize prioritize, + NodeAssign nodeLabelAssigner, + NodeAssign nodeAssigner ) { + super(execution, client, namespace, config); + this.prioritize = (prioritize != null) ? prioritize : new RankMaxPrioritize(); + this.nodeLabelAssigner = (nodeLabelAssigner != null) ? nodeLabelAssigner : new LabelAssign(config); + this.nodeAssigner = (nodeAssigner != null) ? nodeAssigner : new FairAssign(); + nodeAssigner.registerScheduler( this ); + if ( nodeAssigner instanceof Informable ){ + client.addInformable( (Informable) nodeAssigner ); + } + } + + @Override + public void close() { + super.close(); + if ( nodeAssigner instanceof Informable ){ + client.removeInformable( (Informable) nodeAssigner ); + } + } + + @Override + public ScheduleObject getTaskNodeAlignment( + final List unscheduledTasks, + final Map availableByNode + ){ + long start = System.currentTimeMillis(); + if ( traceEnabled ) { + int index = 1; + for ( Task unscheduledTask : unscheduledTasks ) { + unscheduledTask.getTraceRecord().setSchedulerPlaceInQueue( index++ ); + } + } + prioritize.sortTasks( unscheduledTasks ); + + + unscheduledTasks.stream().map(obj -> obj.getConfig().getName()).forEach(System.out::println); + + // first alignemnt (LabelAssign) + List alignmentLabelAssign = nodeLabelAssigner.getTaskNodeAlignment(unscheduledTasks, availableByNode); + List namesList = alignmentLabelAssign.stream().map(obj -> obj.task.getConfig().getName()).collect(Collectors.toList()); + + List filteredTasks = new LinkedList<>(); + + for (final Task task : unscheduledTasks) { + if (!namesList.contains(task.getConfig().getName())) { + filteredTasks.add(task); + } + } + + // second alignemnt (FairAssign) + List alignment = nodeAssigner.getTaskNodeAlignment(filteredTasks, availableByNode); + + + alignmentLabelAssign.addAll(alignment); + long timeDelta = System.currentTimeMillis() - start; + for ( Task unscheduledTask : unscheduledTasks ) { + unscheduledTask.getTraceRecord().setSchedulerTimeToSchedule( (int) timeDelta ); + } + + final ScheduleObject scheduleObject = new ScheduleObject(alignmentLabelAssign); + scheduleObject.setCheckStillPossible( false ); + return scheduleObject; + } +} \ No newline at end of file diff --git a/src/main/java/cws/k8s/scheduler/scheduler/Scheduler.java b/src/main/java/cws/k8s/scheduler/scheduler/Scheduler.java index 726c4b72..660cf087 100644 --- a/src/main/java/cws/k8s/scheduler/scheduler/Scheduler.java +++ b/src/main/java/cws/k8s/scheduler/scheduler/Scheduler.java @@ -152,7 +152,7 @@ public boolean validSchedulePlan( List taskNodeAlignment ){ return true; } - abstract ScheduleObject getTaskNodeAlignment( + public abstract ScheduleObject getTaskNodeAlignment( final List unscheduledTasks, final Map availableByNode ); @@ -468,6 +468,7 @@ Map getAvailableByNode(){ } logInfo.add("------------------------------------"); log.info(String.join("\n", logInfo)); + return availableByNode; } diff --git a/src/main/java/cws/k8s/scheduler/scheduler/nodeassign/LabelAssign.java b/src/main/java/cws/k8s/scheduler/scheduler/nodeassign/LabelAssign.java new file mode 100644 index 00000000..30759838 --- /dev/null +++ b/src/main/java/cws/k8s/scheduler/scheduler/nodeassign/LabelAssign.java @@ -0,0 +1,82 @@ +package cws.k8s.scheduler.scheduler.nodeassign; + +import cws.k8s.scheduler.model.NodeWithAlloc; +import cws.k8s.scheduler.model.PodWithAge; +import cws.k8s.scheduler.model.Requirements; +import cws.k8s.scheduler.model.Task; +import cws.k8s.scheduler.model.SchedulerConfig; +import cws.k8s.scheduler.util.NodeTaskAlignment; +import lombok.extern.slf4j.Slf4j; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.util.*; + +@Slf4j +public class LabelAssign extends NodeAssign { + + final SchedulerConfig config; + + public LabelAssign( + final SchedulerConfig config + ){ + this.config = config; + } + + @Override + public List getTaskNodeAlignment( List unscheduledTasks, Map availableByNode ) { + + // get the node-label map + ObjectMapper objectMapper = new ObjectMapper(); + Map nodelabel = objectMapper.convertValue(config.additional.get("tasklabelconfig"),Map.class); + + LinkedList alignment = new LinkedList<>(); + // final ArrayList> entries = new ArrayList<>( availableByNode.entrySet() ); + for ( final Task task : unscheduledTasks ) { + + if ( nodelabel == null ){ + log.warn("No tasklabelconfig exist in the nextflow.config file. Define a tasklabelconfig or use another scheduling strategy."); + break; + } + + String taskName = null; + String taskLabel = null; + + try { + taskName = task.getConfig().getName(); + taskLabel = taskName.split("~")[1]; + // ~ is used for a special case in which subtasks from one process in nextflow are generated + // the labels in the nextflow config have to be named like this: ~label~ + + log.info("Label for task: " + task.getConfig().getName() + " : " + taskLabel); + } catch ( Exception e ){ + log.warn( "Cannot find a label for task: " + task.getConfig().getName(), e ); + continue; + } + + final PodWithAge pod = task.getPod(); + // log.info("Pod: " + pod.getName() + " Requested Resources: " + pod.getRequest() ); + + if(nodelabel.containsKey(taskLabel)){ + String nodeName = nodelabel.get(taskLabel); + + for ( Map.Entry e : availableByNode.entrySet() ) { + final NodeWithAlloc node = e.getKey(); + + if(nodeName.equals(node.getName())){ + log.info("Aligned Pod to node: " + node.getName()); + alignment.add( new NodeTaskAlignment( node, task ) ); + availableByNode.get( node ).subFromThis(pod.getRequest()); + log.info("--> " + node.getName()); + task.getTraceRecord().foundAlignment(); + break; + } + } + } else { + log.warn( "Task Label: " + taskLabel + " does not exist in config file."); + } + } + return alignment; + } +} +