Design a Multi-Threaded Task Scheduler (LLD + Multi-Threaded Construct)

Akhil Gupta
5 min readMar 7, 2023

Appropriate audience for this article

  1. Anyone who is interested in gaining insights on how to design a multi-threaded task scheduler.
  2. Anyone who is interested in gaining insights into some aspects of low-level design coupled with the intention of learning how to handle multi-threaded constructs can dive right into the article.

Problem Statement: Design a multi-threaded task scheduler with n threads which has the capability to schedule recurring tasks or one-time tasks.

Use Cases:

  1. Job scheduled at a particular time
  2. A recurring job with a particular interval of recurrence.
  3. The user should be able to configure the number of worker threads
  4. When there are an insufficient number of idle threads to perform the execution of all the tasks, we are going to execute the tasks with the latest execution time.

As a basic template for designing a low-level code, I find it useful to divide a given problem statement into multiple components (objects) which interact with each other. The objects which are added to a given system should have well-defined responsibilities. At the same time, how these objects interact with each other is also a matter of consideration. Typically, in a low-level code, there are three types of objects.

  1. Entities and Enums (A form of representation of data)
  2. Data Stores (Where all the data will be stored)
  3. Abstract and Functional Classes (Contains all the business logic)

Let’s first discuss all the entities that will be involved in the design implementation.

public interface ExecutionContext {

void execute();
}

public abstract class ScheduledTask {

public final ExecutionContext context;

public ScheduledTask (ExecutionContext context) {
this.context = context;
}

abstract boolean isRecurring();

void execute() {
context.execute();
}

abstract Optional<ScheduledTask> nextScheduledTask();

abstract long getNextExecutionTime();
}

In the above code snippet, ScheduledTask is the base class on top of which we can define different types of tasks such as one-time tasks, recurring tasks, etc. Note, here I also introduced ExecutionContext.java class which will be taken as an input argument in the constructor. Whenever we want to process any task we can simply invoke the execute method of the ExecutionContext which was provided.

To cater to the use case 1 and 2 mentioned above we can simply define two concrete implementations of the abstract class ScheduledTask.

public class OneTimeTask extends ScheduledTask {

private final long executionTime;

public ScheduledTaskImpl(ExecutionContext context, long executionTime) {
super(context);
this.executionTime = executionTime;
}

@Override
public long getNextExecutionTime() {
return executionTime;
}

@Override
public boolean isRecurring() {
return false;
}

@Override
public Optional<ScheduledTask> nextScheduledTask() {
return Optional.empty();
}
}

public class RecurringTask extends ScheduledTask {

private final long executionTime;

private final long interval;

public RecurringTask(ExecutionContext context, long executionTime, long interval) {
super(context);
this.executionTime = executionTime;
this.interval = interval;
}

@Override
public long getNextExecutionTime() {
return executionTime;
}

@Override
public boolean isRecurring() {
return true;
}

@Override
public Optional<ScheduledTask> nextScheduledTask() {
return Optional.of(new RecurringTask(context, executionTime + interval, interval));
}
}

Here, the isRecurring() method is used to find out whether the task is to be performed again after completion. The rest of the methods are self-explanatory. This pretty much sums up all the entity classes required for our use cases!

Now comes the second part of the design implementation i.e., storing these tasks in memory. Taking use case 4 into consideration, we realize that we need to have a data source that returns the next task to be executed while also maintaining all the other pending tasks in memory. Here is one possible design of the data store which can help us accomplish the same.

public interface TaskStore<T extends ScheduledTask> {

T peek();

T poll();

void add(T task);

boolean remove(T task);

boolean isEmpty();
}

public class PriorityBlockingQueueTaskStore implements TaskStore<ScheduledTask> {

private final PriorityBlockingQueue<ScheduledTask> taskQueue;

private final Set<ScheduledTask> tasks;

public PriorityBlockingQueueTaskStore(Comparator<ScheduledTask> comparator, Integer queueSize) {
this.taskQueue = new PriorityBlockingQueue<>(queueSize, comparator);
this.tasks = new HashSet<>();
}

@Override
public void add(ScheduledTask task) {
taskQueue.offer(task);
}

@Override
public ScheduledTask poll() {
return taskQueue.poll();
}

@Override
public ScheduledTask peek() {
return taskQueue.peek();
}

@Override
public boolean remove(ScheduledTask task) {
if (tasks.contains(task)) {
taskQueue.remove(task);
return true;
} else {
return false;
}
}

@Override
public boolean isEmpty() {
return taskQueue.isEmpty();
}
}

Note that I also added a set to store all the tasks which were added to the task store. In a typical production environment, we are going to store them in some form of a persistent layer. Also, let me know in the comments if you want the non-blocking priority queue-based implementation too! It is slightly complicated but still fun to learn for sure.

Finally, all that remains is designing a Task Scheduler class that takes TaskStore as an argument in the constructor.

public class TaskRunner implements Runnable {

private final TaskStore<ScheduledTask> taskStore;

private volatile boolean running;

public TaskRunner(TaskStore<ScheduledTask> taskStore) {
this.taskStore = taskStore;
this.running = true;
}

public void run() {
while (running && !Thread.currentThread().isInterrupted()) {
try {
ScheduledTask scheduledTask = taskStore.take(); // Block until a task is available
long delay = scheduledTask.getNextExecutionTime() - Instant.now().toEpochMilli();
if (delay > 0) {
// Re-queue the task and wait until it is ready
taskStore.put(scheduledTask);
synchronized (this) {
wait(delay);
}
} else {
scheduledTask.execute();
if (scheduledTask.isRecurring()) {
taskStore.put(scheduledTask.nextScheduledTask());
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Preserve the interrupted status
}
}
}

public void stop() {
this.running = false;
synchronized (this) {
notify(); // Notify the thread to wake up and stop
}
}
}

public class ExecutorConfig {

private int numThreads;

public ExecutorConfig(int numThreads) {
this.numThreads = numThreads;
}

public int getNumThreads() {
return numThreads;
}

public void setNumThreads(int numThreads) {
this.numThreads = numThreads;
}
}

public class TaskScheduler {

private final List<Thread> threads;

private final List<TaskRunner> taskRunners;

private final TaskStore<ScheduledTask> taskStore;

public TaskScheduler(ExecutorConfig executorConfig, TaskStore<ScheduledTask> taskStore) {
this.threads = new ArrayList<>();
this.taskRunners = new ArrayList<>();
this.taskStore = taskStore;
for (int i = 0; i < executorConfig.getNumThreads(); i++) {
TaskRunner taskRunner = new TaskRunner(taskStore);
Thread thread = new Thread(taskRunner);
thread.start();
threads.add(thread);
taskRunners.add(taskRunner);
}
}

public void stop() {
taskRunners.forEach(TaskRunner::stop);
threads.forEach(t -> {
t.interrupt();
try {
t.join(); // Wait for the thread to finish
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}

Firstly, I have created a TaskRunner class implementing the Runnable interface provided for thread execution. To start a thread with a runnable interface we simply call the run() method to activate the thread. Each TaskRunner will run indefinitely as long as we don’t manually stop the threads. It will consume the latest task from the task store and execute its ExecutionContext.

How our simple Task Scheduler construct looks like

I have also created an ExecutorConfig class which will have all the configurations such as the number of threads, etc. required by the Task Scheduler. So here, we are basically creating numThreads number of runnable instances which will each run on a separate independent thread. We have already ensured thread safety by using a thread-safe task store.

Let me know if I missed anything in the comments. Until then best of luck!

--

--