Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added leader-follower/etc/LeaderFollower.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
87 changes: 87 additions & 0 deletions leader-follower/etc/LeaderFollower.ucls
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
<?xml version="1.0" encoding="UTF-8"?>
<class-diagram version="1.1.8" icons="true" automaticImage="PNG" always-add-relationships="false" generalizations="true"
realizations="true" associations="true" dependencies="false" nesting-relationships="true">
<class id="1" language="java" name="com.iluwatar.leaderfollower.App" project="leader-follower"
file="/leader-follower/src/main/java/com/iluwatar/leaderfollower/App.java" binary="false" corner="BOTTOM_RIGHT">
<position height="-1" width="-1" x="364" y="21"/>
<display autosize="true" stereotype="true" package="true" initial-value="false" signature="true"
sort-features="false" accessors="true" visibility="true">
<attributes public="true" package="true" protected="true" private="true" static="true"/>
<operations public="true" package="true" protected="true" private="true" static="true"/>
</display>
</class>
<class id="2" language="java" name="com.iluwatar.leaderfollower.WorkStation" project="leader-follower"
file="/leader-follower/src/main/java/com/iluwatar/leaderfollower/WorkStation.java" binary="false"
corner="BOTTOM_RIGHT">
<position height="223" width="185" x="40" y="-11"/>
<display autosize="true" stereotype="true" package="true" initial-value="false" signature="true"
sort-features="false" accessors="true" visibility="true">
<attributes public="true" package="true" protected="true" private="true" static="true"/>
<operations public="true" package="true" protected="true" private="true" static="true"/>
</display>
</class>
<class id="3" language="java" name="com.iluwatar.leaderfollower.Work" project="leader-follower"
file="/leader-follower/src/main/java/com/iluwatar/leaderfollower/Work.java" binary="false" corner="BOTTOM_RIGHT">
<position height="97" width="122" x="313" y="139"/>
<display autosize="true" stereotype="true" package="true" initial-value="false" signature="true"
sort-features="false" accessors="true" visibility="true">
<attributes public="true" package="true" protected="true" private="true" static="true"/>
<operations public="true" package="true" protected="true" private="true" static="true"/>
</display>
</class>
<class id="4" language="java" name="com.iluwatar.leaderfollower.Worker" project="leader-follower"
file="/leader-follower/src/main/java/com/iluwatar/leaderfollower/Worker.java" binary="false" corner="BOTTOM_RIGHT">
<position height="-1" width="-1" x="749" y="97"/>
<display autosize="true" stereotype="true" package="true" initial-value="false" signature="true"
sort-features="false" accessors="true" visibility="true">
<attributes public="true" package="true" protected="true" private="true" static="true"/>
<operations public="true" package="true" protected="true" private="true" static="true"/>
</display>
</class>
<association id="5">
<end type="SOURCE" refId="2" navigable="false">
<attribute id="6" name="workers"/>
<multiplicity id="7" minimum="0" maximum="2147483647"/>
</end>
<end type="TARGET" refId="4" navigable="true"/>
<display labels="true" multiplicity="true"/>
</association>
<association id="8">
<end type="SOURCE" refId="4" navigable="false">
<attribute id="9" name="queue"/>
<multiplicity id="10" minimum="0" maximum="2147483647"/>
</end>
<end type="TARGET" refId="3" navigable="true"/>
<display labels="true" multiplicity="true"/>
</association>
<association id="11">
<end type="SOURCE" refId="4" navigable="false">
<attribute id="12" name="workstation"/>
<multiplicity id="13" minimum="0" maximum="1"/>
</end>
<end type="TARGET" refId="2" navigable="true"/>
<display labels="true" multiplicity="true"/>
</association>
<association id="14">
<end type="SOURCE" refId="2" navigable="false">
<attribute id="15" name="leader"/>
<multiplicity id="16" minimum="0" maximum="1"/>
</end>
<end type="TARGET" refId="4" navigable="true"/>
<display labels="true" multiplicity="true"/>
</association>
<association id="17">
<end type="SOURCE" refId="4" navigable="false">
<attribute id="18" name="workers"/>
<multiplicity id="19" minimum="0" maximum="2147483647"/>
</end>
<end type="TARGET" refId="4" navigable="true"/>
<display labels="true" multiplicity="true"/>
</association>
<classifier-display autosize="true" stereotype="true" package="true" initial-value="false" signature="true"
sort-features="false" accessors="true" visibility="true">
<attributes public="true" package="true" protected="true" private="true" static="true"/>
<operations public="true" package="true" protected="true" private="true" static="true"/>
</classifier-display>
<association-display labels="true" multiplicity="true"/>
</class-diagram>
16 changes: 16 additions & 0 deletions leader-follower/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
layout: pattern
title: Leader Follower
folder: leader-follower
permalink: /patterns/leader-follower/
categories: Concurrency
tags: Java
---

**Intent:** Leader Follower is a concurrency pattern where multiple threads can efficiently demultiplex
events and dispatch to event handlers.
![alt text](./etc/LeaderFollower.png "Leader Follower")

**Applicability:** Use the Leader Follower pattern when

* multiple threads can receive events , process reponses and demutiplex connection using a shared HandleSet.
18 changes: 18 additions & 0 deletions leader-follower/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.iluwatar</groupId>
<artifactId>java-design-patterns</artifactId>
<version>1.7.0</version>
</parent>
<artifactId>leader-follower</artifactId>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
30 changes: 30 additions & 0 deletions leader-follower/src/main/java/com/iluwatar/leaderfollower/App.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.iluwatar.leaderfollower;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
*
* Leader Follower is a concurrency pattern where multiple threads can efficiently demultiplex
* events and dispatch to event handlers.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please explain a bit more!

* <p>
* In this example we use ThreadPool which basically acts as the ThreadPool. One of the Workers
* becomes Leader and listens on the {@link HandleSet} for work. HandleSet basically acts as the
* source of input events for the Workers, who are spawned and controlled by the {@link WorkStation}
* . When Work arrives which implements the {@link Handle} interface then the leader takes the work
* and calls the {@link ConcreteEventHandler}. However it also selects one of the waiting Workers as
* leader, who can then process the next work and so on.
*
*/
public class App {

public static void main(String[] args)
throws IOException, ClassNotFoundException, InterruptedException {
ExecutorService exec = Executors.newFixedThreadPool(4);
WorkStation station = new WorkStation(exec);
station.startWork();
exec.awaitTermination(5, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.iluwatar.leaderfollower;

/**
* The ConcreteEventHandler. This is used by the {@link Worker} to process the newly arrived work.
* @author amit
*
*/
public class ConcreteEventHandler implements EventHandler{

@Override
public void handleEvent(Handle handle) {
System.out.println("Doing the work");
System.out.println("Travelled the distance " + handle.getPayLoad());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.iluwatar.leaderfollower;
/**
* The EventHandler interface. All concrete EventHanlder implementations can process a unit of work
* represented by Handle
* @author amit
*
*/
public interface EventHandler {
public void handleEvent(Handle work);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.iluwatar.leaderfollower;

/**
* Represents a Unit of Work which workers can process. In this example only the {@link Work}
* class implements this
* @author amit
*
*/
public interface Handle {
public int getPayLoad();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.iluwatar.leaderfollower;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
* The {@link HandleSet} class. All work arrive here, workers receive work from here.
* @author amit
*
*/
public class HandleSet {

private BlockingQueue<Work> queue = new ArrayBlockingQueue<>(100);

public void fireEvent(Work input) throws InterruptedException {
queue.put(input);
}

public Work getPayLoad() throws InterruptedException {
return queue.take();
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.iluwatar.leaderfollower;

/**
* A unit of work to be processed by the Workers. Implements Handle.
* @author amit
*
*/
public class Work implements Handle {
public final int distance;

public Work(int distance) {
this.distance = distance;
}

@Override
public int getPayLoad() {
return distance;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.iluwatar.leaderfollower;

import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add JavaDoc

public class WorkStation {
private Worker leader;
private List<Worker> workers = new CopyOnWriteArrayList<>();
private ExecutorService executorService = Executors.newFixedThreadPool(4);

public WorkStation(ExecutorService executorService) {
this.executorService = executorService;
}

public void startWork() throws InterruptedException {

HandleSet handleSet = new HandleSet();
ConcreteEventHandler concreteEventHandler = new ConcreteEventHandler();
Worker worker = new Worker(handleSet, workers, 1, this, concreteEventHandler);
Worker worker2 = new Worker(handleSet, workers, 2, this ,concreteEventHandler);
Worker worker3 = new Worker(handleSet, workers, 3, this,concreteEventHandler);
Worker worker4 = new Worker(handleSet, workers, 4, this,concreteEventHandler);
workers.add(worker);
workers.add(worker2);
workers.add(worker3);
workers.add(worker4);
this.leader = workers.get(0);
executorService.submit(worker);
executorService.submit(worker2);
executorService.submit(worker3);
executorService.submit(worker4);
Random rand = new Random(1000);
handleSet.fireEvent(new Work(Math.abs(rand.nextInt())));
handleSet.fireEvent(new Work(Math.abs(rand.nextInt())));
handleSet.fireEvent(new Work(Math.abs(rand.nextInt())));
handleSet.fireEvent(new Work(Math.abs(rand.nextInt())));
handleSet.fireEvent(new Work(Math.abs(rand.nextInt())));
// queue.
Thread.sleep(1000);
}

public Worker getLeader() {
return this.leader;
}

public void setLeader(Worker leader) {
this.leader = leader;
}

public void addWorker(Worker worker) {
if (this.workers.size() <= 0) {
this.leader = worker;
workers.add(worker);
}
}

public List<Worker> getWorkers() {
return workers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.iluwatar.leaderfollower;

import java.util.List;

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add JavaDoc

public class Worker implements Runnable {

private Object leader = new Object();
private HandleSet handleSet;
private List<Worker> workers;
private long id;
private WorkStation workstation;
private ConcreteEventHandler concreteEventHandler;

public Worker(HandleSet queue, List<Worker> workers, long id,
WorkStation workstation, ConcreteEventHandler concreteEventHandler) {
super();
this.handleSet = queue;
this.workers = workers;
this.id = id;
this.workstation = workstation;
this.concreteEventHandler = concreteEventHandler;
}

public void becomeLeader() {
synchronized (leader) {
leader.notifyAll();
}
}



@Override
public void run() {
while (!Thread.interrupted()) {
try {
if (workstation.getLeader() != null && !workstation.getLeader().equals(this)) {
// System.out.println("ID " +id + " is follower");
synchronized (leader) {
leader.wait();
}

}
//
workers.remove(this);
System.out.println("Leader: " +id);
Work work = handleSet.getPayLoad();
if (workers.size() > 0) {
workstation.getWorkers().get(0).becomeLeader();
workstation.setLeader(workstation.getWorkers().get(0));
}
else {
workstation.setLeader(null);
}
concreteEventHandler.handleEvent(work);
Thread.sleep(100);
System.out.println("The Worker with the ID " + id + " completed the task");
workstation.addWorker(this);
} catch (InterruptedException e) {
System.out.println("Thread intreuppted");
}
}
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (id ^ (id >>> 32));
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Worker other = (Worker) obj;
if (id != other.id)
return false;
return true;
}

}
Loading