import io.aeron.Aeron; import io.aeron.Subscription; import io.aeron.Image; import io.aeron.driver.MediaDriver; import io.aeron.logbuffer.FragmentHandler; import org.agrona.DirectBuffer; import org.agrona.concurrent.BackoffIdleStrategy; import org.agrona.concurrent.IdleStrategy; import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicBoolean; public class Subscriber { public static void main(String[] args) { final AtomicBoolean running = new AtomicBoolean(true); MediaDriver driver = MediaDriver.launchEmbedded(); Aeron.Context ctx = new Aeron.Context() .availableImageHandler((Image img) -> System.out.println("Publisher available: " + img.sourceIdentity())) .unavailableImageHandler((Image img) -> System.out.println("Publisher gone: " + img.sourceIdentity())) .aeronDirectoryName(driver.aeronDirectoryName()); Aeron aeron = Aeron.connect(ctx); String channel = "aeron:udp?endpoint=0.0.0.0:40123"; int streamId = 1001; Subscription sub = aeron.addSubscription(channel, streamId); IdleStrategy idleStrategy = new BackoffIdleStrategy(1, 1, 1, 1); System.out.println("Subscriber ready, waiting for messages..."); // Fragment handler FragmentHandler handler = (DirectBuffer buffer, int offset, int length, io.aeron.logbuffer.Header header) -> { String msg = buffer.getStringWithoutLengthUtf8(offset, length).trim(); System.out.println("RAW MESSAGE: '" + msg + "'"); String[] parts = msg.split(","); if (parts.length == 2) { try { long counter = Long.parseLong(parts[0].trim()); long sendTime = Long.parseLong(parts[1].trim()); long latency = System.currentTimeMillis() - sendTime; System.out.printf("Received: %d | Latency: %d ms%n", counter, latency); } catch (NumberFormatException e) { System.err.println("Failed to parse message: " + msg); } } else { System.err.println("Malformed message: " + msg); } }; while (running.get()) { int fragments = sub.poll(handler, 10); idleStrategy.idle(fragments); } sub.close(); aeron.close(); driver.close(); } }