commit 976b4f88ccdbff0c26af20b3193c823b3461f942 Author: gwg313 Date: Thu Jan 15 12:21:18 2026 -0500 initial commit diff --git a/README.md b/README.md new file mode 100644 index 0000000..cab84e4 --- /dev/null +++ b/README.md @@ -0,0 +1,8 @@ +To run use docker-compose or podman-compose + +```bash +podman-compose down +podman system prune -f +podman-compose build --no-cache +podman-compose up +``` diff --git a/aeron/docker-compose.yaml b/aeron/docker-compose.yaml new file mode 100644 index 0000000..491b34b --- /dev/null +++ b/aeron/docker-compose.yaml @@ -0,0 +1,12 @@ +version: "3.8" + +services: + subscriber: + build: ./subscriber + ports: + - "40123:40123" + + publisher: + build: ./publisher + depends_on: + - subscriber diff --git a/aeron/publisher/Dockerfile b/aeron/publisher/Dockerfile new file mode 100644 index 0000000..3afecab --- /dev/null +++ b/aeron/publisher/Dockerfile @@ -0,0 +1,15 @@ +FROM eclipse-temurin:21-jdk + +WORKDIR /app + + +ENV JAVA_TOOL_OPTIONS="--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" + +ADD https://repo1.maven.org/maven2/io/aeron/aeron-all/1.44.1/aeron-all-1.44.1.jar aeron.jar +ADD https://repo1.maven.org/maven2/org/agrona/agrona/1.18.1/agrona-1.18.1.jar agrona.jar + +COPY Publisher.java . + +RUN javac -cp "aeron.jar:agrona.jar" Publisher.java + +CMD ["java", "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", "-cp", ".:aeron.jar:agrona.jar", "Publisher"] diff --git a/aeron/publisher/Publisher.java b/aeron/publisher/Publisher.java new file mode 100644 index 0000000..03ffaca --- /dev/null +++ b/aeron/publisher/Publisher.java @@ -0,0 +1,49 @@ +import io.aeron.Aeron; +import io.aeron.Publication; +import io.aeron.driver.MediaDriver; +import org.agrona.concurrent.UnsafeBuffer; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +public class Publisher { + public static void main(String[] args) throws Exception { + MediaDriver driver = MediaDriver.launchEmbedded(); + + Aeron.Context ctx = new Aeron.Context() + .aeronDirectoryName(driver.aeronDirectoryName()); + + Aeron aeron = Aeron.connect(ctx); + + String channel = "aeron:udp?endpoint=subscriber:40123"; + int streamId = 1001; + + Publication publication = aeron.addPublication(channel, streamId); + UnsafeBuffer buffer = new UnsafeBuffer(ByteBuffer.allocateDirect(256)); + + long counter = 0; + + System.out.println("Publisher ready"); + + while (true) { + long timestamp = System.currentTimeMillis(); + String msg = counter + "," + timestamp; + + buffer.setMemory(0, buffer.capacity(), (byte) 0); + + byte[] bytes = msg.getBytes(StandardCharsets.UTF_8); + buffer.putBytes(0, bytes); + + long result; + do { + result = publication.offer(buffer, 0, bytes.length); + if (result < 0) + Thread.yield(); + } while (result < 0); + + System.out.println("Sent: " + msg); + counter++; + Thread.sleep(1000); + } + } +} diff --git a/aeron/subscriber/Dockerfile b/aeron/subscriber/Dockerfile new file mode 100644 index 0000000..4e5597d --- /dev/null +++ b/aeron/subscriber/Dockerfile @@ -0,0 +1,14 @@ +FROM eclipse-temurin:21-jdk + +WORKDIR /app + +ENV JAVA_TOOL_OPTIONS="--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" + +ADD https://repo1.maven.org/maven2/io/aeron/aeron-all/1.44.1/aeron-all-1.44.1.jar aeron.jar +ADD https://repo1.maven.org/maven2/org/agrona/agrona/1.18.1/agrona-1.18.1.jar agrona.jar + +COPY Subscriber.java . + +RUN javac -cp "aeron.jar:agrona.jar" Subscriber.java + +CMD ["java", "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", "-cp", ".:aeron.jar:agrona.jar", "Subscriber"] diff --git a/aeron/subscriber/Subscriber.java b/aeron/subscriber/Subscriber.java new file mode 100644 index 0000000..0c7b7a5 --- /dev/null +++ b/aeron/subscriber/Subscriber.java @@ -0,0 +1,65 @@ +import io.aeron.Aeron; +import io.aeron.Subscription; +import io.aeron.Image; +import io.aeron.driver.MediaDriver; +import io.aeron.logbuffer.FragmentHandler; +import org.agrona.DirectBuffer; +import org.agrona.concurrent.BackoffIdleStrategy; +import org.agrona.concurrent.IdleStrategy; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicBoolean; + +public class Subscriber { + + public static void main(String[] args) { + final AtomicBoolean running = new AtomicBoolean(true); + + MediaDriver driver = MediaDriver.launchEmbedded(); + + Aeron.Context ctx = new Aeron.Context() + .availableImageHandler((Image img) -> System.out.println("Publisher available: " + img.sourceIdentity())) + .unavailableImageHandler((Image img) -> System.out.println("Publisher gone: " + img.sourceIdentity())) + .aeronDirectoryName(driver.aeronDirectoryName()); + + Aeron aeron = Aeron.connect(ctx); + + String channel = "aeron:udp?endpoint=0.0.0.0:40123"; + int streamId = 1001; + + Subscription sub = aeron.addSubscription(channel, streamId); + IdleStrategy idleStrategy = new BackoffIdleStrategy(1, 1, 1, 1); + + System.out.println("Subscriber ready, waiting for messages..."); + + // Fragment handler + FragmentHandler handler = (DirectBuffer buffer, int offset, int length, io.aeron.logbuffer.Header header) -> { + String msg = buffer.getStringWithoutLengthUtf8(offset, length).trim(); + System.out.println("RAW MESSAGE: '" + msg + "'"); + + String[] parts = msg.split(","); + if (parts.length == 2) { + try { + long counter = Long.parseLong(parts[0].trim()); + long sendTime = Long.parseLong(parts[1].trim()); + long latency = System.currentTimeMillis() - sendTime; + + System.out.printf("Received: %d | Latency: %d ms%n", counter, latency); + } catch (NumberFormatException e) { + System.err.println("Failed to parse message: " + msg); + } + } else { + System.err.println("Malformed message: " + msg); + } + }; + + while (running.get()) { + int fragments = sub.poll(handler, 10); + idleStrategy.idle(fragments); + } + + sub.close(); + aeron.close(); + driver.close(); + } +} diff --git a/natsio/docker-compose.yaml b/natsio/docker-compose.yaml new file mode 100644 index 0000000..fbe98af --- /dev/null +++ b/natsio/docker-compose.yaml @@ -0,0 +1,17 @@ +version: "3.8" + +services: + nats: + image: nats:2.10 + ports: + - "4222:4222" + + subscriber: + build: ./subscriber + depends_on: + - nats + + publisher: + build: ./publisher + depends_on: + - nats diff --git a/natsio/publisher/Dockerfile b/natsio/publisher/Dockerfile new file mode 100644 index 0000000..f46403a --- /dev/null +++ b/natsio/publisher/Dockerfile @@ -0,0 +1,11 @@ +FROM eclipse-temurin:21-jdk + +WORKDIR /app + +ADD https://repo1.maven.org/maven2/io/nats/jnats/2.16.14/jnats-2.16.14.jar nats.jar + +COPY Publisher.java . + +RUN javac -cp nats.jar Publisher.java + +CMD ["java", "-cp", ".:nats.jar", "Publisher"] diff --git a/natsio/publisher/Publisher.java b/natsio/publisher/Publisher.java new file mode 100644 index 0000000..749f943 --- /dev/null +++ b/natsio/publisher/Publisher.java @@ -0,0 +1,22 @@ +import io.nats.client.Connection; +import io.nats.client.Nats; +import java.time.Duration; + +public class Publisher { + public static void main(String[] args) throws Exception { + Connection nc = Nats.connect("nats://nats:4222"); + + int counter = 0; + + while (true) { + long sendTime = System.currentTimeMillis(); + String msg = counter + "," + sendTime; + nc.publish("demo.subject", msg.getBytes()); + nc.flush(Duration.ofSeconds(1)); + + System.out.println("Sent: " + counter); + counter++; + Thread.sleep(1000); + } + } +} diff --git a/natsio/subscriber/Dockerfile b/natsio/subscriber/Dockerfile new file mode 100644 index 0000000..d53e8d4 --- /dev/null +++ b/natsio/subscriber/Dockerfile @@ -0,0 +1,11 @@ +FROM eclipse-temurin:21-jdk + +WORKDIR /app + +ADD https://repo1.maven.org/maven2/io/nats/jnats/2.16.14/jnats-2.16.14.jar nats.jar + +COPY Subscriber.java . + +RUN javac -cp nats.jar Subscriber.java + +CMD ["java", "-cp", ".:nats.jar", "Subscriber"] diff --git a/natsio/subscriber/Subscriber.java b/natsio/subscriber/Subscriber.java new file mode 100644 index 0000000..784d410 --- /dev/null +++ b/natsio/subscriber/Subscriber.java @@ -0,0 +1,25 @@ +import io.nats.client.Connection; +import io.nats.client.Dispatcher; +import io.nats.client.Nats; + +public class Subscriber { + public static void main(String[] args) throws Exception { + Connection nc = Nats.connect("nats://nats:4222"); + + Dispatcher dispatcher = nc.createDispatcher(msg -> { + String received = new String(msg.getData()).trim(); + String[] parts = received.split(","); + if (parts.length >= 2) { + int counter = Integer.parseInt(parts[0].trim()); + long sendTime = Long.parseLong(parts[1].trim()); + long latency = System.currentTimeMillis() - sendTime; + + System.out.printf("Received: %d | Latency: %d ms%n", counter, latency); + } + }); + + dispatcher.subscribe("demo.subject"); + + Thread.sleep(Long.MAX_VALUE); + } +}