initial commit

This commit is contained in:
gwg313 2026-01-15 12:21:18 -05:00
commit 976b4f88cc
Signed by: gwg313
GPG key ID: 60FF63B4826B7400
11 changed files with 249 additions and 0 deletions

12
aeron/docker-compose.yaml Normal file
View file

@ -0,0 +1,12 @@
version: "3.8"
services:
subscriber:
build: ./subscriber
ports:
- "40123:40123"
publisher:
build: ./publisher
depends_on:
- subscriber

View file

@ -0,0 +1,15 @@
FROM eclipse-temurin:21-jdk
WORKDIR /app
ENV JAVA_TOOL_OPTIONS="--add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
ADD https://repo1.maven.org/maven2/io/aeron/aeron-all/1.44.1/aeron-all-1.44.1.jar aeron.jar
ADD https://repo1.maven.org/maven2/org/agrona/agrona/1.18.1/agrona-1.18.1.jar agrona.jar
COPY Publisher.java .
RUN javac -cp "aeron.jar:agrona.jar" Publisher.java
CMD ["java", "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", "-cp", ".:aeron.jar:agrona.jar", "Publisher"]

View file

@ -0,0 +1,49 @@
import io.aeron.Aeron;
import io.aeron.Publication;
import io.aeron.driver.MediaDriver;
import org.agrona.concurrent.UnsafeBuffer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
public class Publisher {
public static void main(String[] args) throws Exception {
MediaDriver driver = MediaDriver.launchEmbedded();
Aeron.Context ctx = new Aeron.Context()
.aeronDirectoryName(driver.aeronDirectoryName());
Aeron aeron = Aeron.connect(ctx);
String channel = "aeron:udp?endpoint=subscriber:40123";
int streamId = 1001;
Publication publication = aeron.addPublication(channel, streamId);
UnsafeBuffer buffer = new UnsafeBuffer(ByteBuffer.allocateDirect(256));
long counter = 0;
System.out.println("Publisher ready");
while (true) {
long timestamp = System.currentTimeMillis();
String msg = counter + "," + timestamp;
buffer.setMemory(0, buffer.capacity(), (byte) 0);
byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
buffer.putBytes(0, bytes);
long result;
do {
result = publication.offer(buffer, 0, bytes.length);
if (result < 0)
Thread.yield();
} while (result < 0);
System.out.println("Sent: " + msg);
counter++;
Thread.sleep(1000);
}
}
}

View file

@ -0,0 +1,14 @@
FROM eclipse-temurin:21-jdk
WORKDIR /app
ENV JAVA_TOOL_OPTIONS="--add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
ADD https://repo1.maven.org/maven2/io/aeron/aeron-all/1.44.1/aeron-all-1.44.1.jar aeron.jar
ADD https://repo1.maven.org/maven2/org/agrona/agrona/1.18.1/agrona-1.18.1.jar agrona.jar
COPY Subscriber.java .
RUN javac -cp "aeron.jar:agrona.jar" Subscriber.java
CMD ["java", "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", "-cp", ".:aeron.jar:agrona.jar", "Subscriber"]

View file

@ -0,0 +1,65 @@
import io.aeron.Aeron;
import io.aeron.Subscription;
import io.aeron.Image;
import io.aeron.driver.MediaDriver;
import io.aeron.logbuffer.FragmentHandler;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.BackoffIdleStrategy;
import org.agrona.concurrent.IdleStrategy;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;
public class Subscriber {
public static void main(String[] args) {
final AtomicBoolean running = new AtomicBoolean(true);
MediaDriver driver = MediaDriver.launchEmbedded();
Aeron.Context ctx = new Aeron.Context()
.availableImageHandler((Image img) -> System.out.println("Publisher available: " + img.sourceIdentity()))
.unavailableImageHandler((Image img) -> System.out.println("Publisher gone: " + img.sourceIdentity()))
.aeronDirectoryName(driver.aeronDirectoryName());
Aeron aeron = Aeron.connect(ctx);
String channel = "aeron:udp?endpoint=0.0.0.0:40123";
int streamId = 1001;
Subscription sub = aeron.addSubscription(channel, streamId);
IdleStrategy idleStrategy = new BackoffIdleStrategy(1, 1, 1, 1);
System.out.println("Subscriber ready, waiting for messages...");
// Fragment handler
FragmentHandler handler = (DirectBuffer buffer, int offset, int length, io.aeron.logbuffer.Header header) -> {
String msg = buffer.getStringWithoutLengthUtf8(offset, length).trim();
System.out.println("RAW MESSAGE: '" + msg + "'");
String[] parts = msg.split(",");
if (parts.length == 2) {
try {
long counter = Long.parseLong(parts[0].trim());
long sendTime = Long.parseLong(parts[1].trim());
long latency = System.currentTimeMillis() - sendTime;
System.out.printf("Received: %d | Latency: %d ms%n", counter, latency);
} catch (NumberFormatException e) {
System.err.println("Failed to parse message: " + msg);
}
} else {
System.err.println("Malformed message: " + msg);
}
};
while (running.get()) {
int fragments = sub.poll(handler, 10);
idleStrategy.idle(fragments);
}
sub.close();
aeron.close();
driver.close();
}
}