initial commit
This commit is contained in:
commit
3e7357aba2
11 changed files with 248 additions and 0 deletions
65
aeron/subscriber/Subscriber.java
Normal file
65
aeron/subscriber/Subscriber.java
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
import io.aeron.Aeron;
|
||||
import io.aeron.Subscription;
|
||||
import io.aeron.Image;
|
||||
import io.aeron.driver.MediaDriver;
|
||||
import io.aeron.logbuffer.FragmentHandler;
|
||||
import org.agrona.DirectBuffer;
|
||||
import org.agrona.concurrent.BackoffIdleStrategy;
|
||||
import org.agrona.concurrent.IdleStrategy;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class Subscriber {
|
||||
|
||||
public static void main(String[] args) {
|
||||
final AtomicBoolean running = new AtomicBoolean(true);
|
||||
|
||||
MediaDriver driver = MediaDriver.launchEmbedded();
|
||||
|
||||
Aeron.Context ctx = new Aeron.Context()
|
||||
.availableImageHandler((Image img) -> System.out.println("Publisher available: " + img.sourceIdentity()))
|
||||
.unavailableImageHandler((Image img) -> System.out.println("Publisher gone: " + img.sourceIdentity()))
|
||||
.aeronDirectoryName(driver.aeronDirectoryName());
|
||||
|
||||
Aeron aeron = Aeron.connect(ctx);
|
||||
|
||||
String channel = "aeron:udp?endpoint=0.0.0.0:40123";
|
||||
int streamId = 1001;
|
||||
|
||||
Subscription sub = aeron.addSubscription(channel, streamId);
|
||||
IdleStrategy idleStrategy = new BackoffIdleStrategy(1, 1, 1, 1);
|
||||
|
||||
System.out.println("Subscriber ready, waiting for messages...");
|
||||
|
||||
// Fragment handler
|
||||
FragmentHandler handler = (DirectBuffer buffer, int offset, int length, io.aeron.logbuffer.Header header) -> {
|
||||
String msg = buffer.getStringWithoutLengthUtf8(offset, length).trim();
|
||||
System.out.println("RAW MESSAGE: '" + msg + "'");
|
||||
|
||||
String[] parts = msg.split(",");
|
||||
if (parts.length == 2) {
|
||||
try {
|
||||
long counter = Long.parseLong(parts[0].trim());
|
||||
long sendTime = Long.parseLong(parts[1].trim());
|
||||
long latency = System.currentTimeMillis() - sendTime;
|
||||
|
||||
System.out.printf("Received: %d | Latency: %d ms%n", counter, latency);
|
||||
} catch (NumberFormatException e) {
|
||||
System.err.println("Failed to parse message: " + msg);
|
||||
}
|
||||
} else {
|
||||
System.err.println("Malformed message: " + msg);
|
||||
}
|
||||
};
|
||||
|
||||
while (running.get()) {
|
||||
int fragments = sub.poll(handler, 10);
|
||||
idleStrategy.idle(fragments);
|
||||
}
|
||||
|
||||
sub.close();
|
||||
aeron.close();
|
||||
driver.close();
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue