Skip to content

Commit ae7aecd

Browse files
committed
Replaced consumer with Handler<T>
1 parent 0e8ef1e commit ae7aecd

6 files changed

Lines changed: 36 additions & 24 deletions

File tree

README.md

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -68,17 +68,13 @@ Internal message queue:
6868
## 🛠️ Usage Example
6969

7070
```java
71-
AffinityDispatcher<String> dispatcher = new AffinityDispatcher<>(
72-
"testDispatcher",
73-
value -> {},
74-
DefaultHashCodeProvider.INSTANCE,
75-
Config.builder().build()
76-
);
77-
78-
dispatcher.start(); // ✅ Start workers
79-
80-
dispatcher.dispatch("key1", "Hello World"); // 📤 Dispatch messages
81-
dispatcher.dispatch(42, "Another message");
82-
83-
dispatcher.shutdown(); // 🛑 Stop workers
71+
Handler<Object> handler = (workerName, value) -> {
72+
System.out.println("Handled by " + workerName + ": " + value);
73+
};
74+
AffinityDispatcher<Object> dispatcher = new AffinityDispatcher<>("test", handler, DefaultHashCodeProvider.INSTANCE, Config.builder().build());
75+
dispatcher.start();
76+
77+
for (int i = 0; i < 10_000_000; i++) {
78+
dispatcher.dispatch(UUID.randomUUID().toString(), i);
79+
}
8480
```

src/main/java/io/github/ryntric/AffinityDispatcher.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
import java.util.HashMap;
44
import java.util.Map;
5+
import java.util.UUID;
56
import java.util.concurrent.atomic.AtomicInteger;
6-
import java.util.function.Consumer;
77

88
/**
99
* AffinityDispatcher is a high-performance dispatcher that routes messages between workers based on a key's hash code.
@@ -35,7 +35,7 @@ public final class AffinityDispatcher<T> {
3535
* @param hashCodeProvider provider to compute hash codes from keys
3636
* @param config dispatcher configuration (worker count, buffer size, etc.)
3737
*/
38-
public AffinityDispatcher(String name, Consumer<T> handler, HashCodeProvider hashCodeProvider, Config config) {
38+
public AffinityDispatcher(String name, Handler<T> handler, HashCodeProvider hashCodeProvider, Config config) {
3939
this.name = name;
4040
this.workerCount = config.getWorkerCount();
4141
this.nodesPerWorker = config.getRoutingNodePerWorker();
@@ -47,7 +47,7 @@ public AffinityDispatcher(String name, Consumer<T> handler, HashCodeProvider has
4747
this.init(name, handler, config);
4848
}
4949

50-
private void init(String name, Consumer<T> handler, Config config) {
50+
private void init(String name, Handler<T> handler, Config config) {
5151
WorkerFactory<T> workerFactory = new WorkerFactory<>(name, config.getWorkerThreadPriority(), config.getBatchSize(), handler);
5252

5353
for (int i = 0; i < workerCount; i++) {
@@ -212,4 +212,17 @@ public Map<String, Long> getChannelSizes() {
212212
return result;
213213
}
214214

215+
public static void main(String[] args) {
216+
Handler<Object> handler = (workerName, value) -> {
217+
System.out.println("Handled by " + workerName + ": " + value);
218+
};
219+
AffinityDispatcher<Object> dispatcher = new AffinityDispatcher<>("test", handler, DefaultHashCodeProvider.INSTANCE, Config.builder().build());
220+
221+
dispatcher.start();
222+
223+
for (int i = 0; i < 10_000_000; i++) {
224+
dispatcher.dispatch(UUID.randomUUID().toString(), i);
225+
}
226+
}
227+
215228
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package io.github.ryntric;
2+
3+
public interface Handler<T> {
4+
void handle(String workerName, T value);
5+
}

src/main/java/io/github/ryntric/Utils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.github.ryntric;
22

3-
final class Utils {
3+
public final class Utils {
44
private static final Runtime RUNTIME = Runtime.getRuntime();
55
private static final Integer AVAILABLE_PROCESSORS = RUNTIME.availableProcessors();
66

src/main/java/io/github/ryntric/WorkerFactory.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
11
package io.github.ryntric;
22

3-
import java.util.function.Consumer;
4-
53
final class WorkerFactory<T> {
64
private static final String NAME_TEMPLATE = "%s-worker-th-%d";
75

86
private final String name;
97
private final int priority;
108
private final ThreadGroup group;
119
private final int batchsize;
12-
private final Consumer<T> handler;
10+
private final Handler<T> handler;
1311

1412
private int nextId = 0;
1513

16-
public WorkerFactory(String name, int priority, int batchsize, Consumer<T> handler) {
14+
public WorkerFactory(String name, int priority, int batchsize, Handler<T> handler) {
1715
this.name = name;
1816
this.priority = priority;
1917
this.group = new ThreadGroup(name);

src/main/java/io/github/ryntric/WorkerThread.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@
44
import java.util.function.Consumer;
55

66
final class WorkerThread<T> extends Thread {
7-
private final Consumer<T> handler;
87
private final Channel<T> channel;
98
private final int batchsize;
109
private final AtomicBoolean isRunning;
10+
private final Consumer<T> handler;
1111

12-
public WorkerThread(String name, ThreadGroup group, int batchsize, Channel<T> channel, Consumer<T> handler) {
12+
public WorkerThread(String name, ThreadGroup group, int batchsize, Channel<T> channel, Handler<T> delegated) {
1313
super(group, name);
1414
this.batchsize = batchsize;
1515
this.channel = channel;
16-
this.handler = handler;
16+
this.handler = (value) -> delegated.handle(name, value);
1717
this.isRunning = new AtomicBoolean(false);
1818
}
1919

0 commit comments

Comments
 (0)