summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPancakeTAS <pancake@mgnet.work>2026-06-13 15:18:14 +0200
committerPancakeTAS <pancake@mgnet.work>2026-06-14 16:26:40 +0200
commit4ed4b809bc6198a7c2f5cdff74dac90fec8bd1bf (patch)
tree01af5b8eb3f71944e82f0ea25ed2d9bf22cd599a
parentfeat: Implement min-heap data structure (diff)
feat: Implement two-path UDP context
Nearly complete implementation reaching ~19 Gbps on my system
-rw-r--r--CMakeLists.txt1
-rw-r--r--src/mpu.c197
-rw-r--r--src/mpu.h71
3 files changed, 269 insertions, 0 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 68a9b98..0820f7e 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -9,6 +9,7 @@ include(cmake/Diagnostics.cmake)
add_executable(mpu
"src/heap.c"
+ "src/mpu.c"
"src/p2p.c"
"src/main.c")
diff --git a/src/mpu.c b/src/mpu.c
new file mode 100644
index 0000000..32c0ffe
--- /dev/null
+++ b/src/mpu.c
@@ -0,0 +1,197 @@
+/* SPDX-License-Identifier: GPL-3.0-or-later */
+
+#include "mpu.h"
+#include "heap.h"
+
+#include <errno.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/poll.h>
+#include <time.h>
+
+int mpu_init(mpu_ctx_t* ctx,
+ int sock0, float weight0,
+ int sock1, float weight1,
+ int reorder_window,
+ size_t reorder_slots
+) {
+ if (!ctx || sock0 < 0 || sock1 < 0
+ || weight0 <= 0.0F || weight1 <= 0.0F
+ || reorder_window <= 0
+ || reorder_slots == 0) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ ctx->paths[0] = (mpu_path_t) {
+ .sockfd = sock0,
+ .weight = weight0
+ };
+ ctx->paths[1] = (mpu_path_t) {
+ .sockfd = sock1,
+ .weight = weight1
+ };
+
+ ctx->credits[0] = 0.0F;
+ ctx->credits[1] = 0.0F;
+
+ ctx->send_seq_num = 1;
+ ctx->recv_seq_num = 1;
+
+ ctx->reorder_window = reorder_window;
+ int i = heap_init(&ctx->reorder_heap, reorder_slots, sizeof(mpu_slot_t));
+ if (i < 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
+// Helper to get current time in milliseconds
+static inline uint64_t get_time_now(void) {
+ struct timespec ts;
+ clock_gettime(CLOCK_MONOTONIC, &ts);
+
+ return (uint64_t) ts.tv_sec * 1000ULL
+ + ((uint64_t) ts.tv_nsec / 1000000ULL);
+}
+
+// FIXME: seq_num is never converted to network byte order
+
+// TODO: Zero-copy send() requiring the user to reserve space for the header.
+// Perhaps consider MSG_MORE for sending header & payload separately or sendmsg()
+
+int mpu_send(mpu_ctx_t* ctx, const uint8_t* data, size_t len) {
+ if (!ctx || !data || len == 0 || len > MPU_MTU - sizeof(mpu_hdr_t)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ // Prepare packet with header
+ size_t packet_len = sizeof(mpu_hdr_t) + len;
+ union {
+ uint8_t buf[MPU_MTU];
+ mpu_hdr_t hdr;
+ } packet;
+
+ memcpy(packet.buf + sizeof(mpu_hdr_t), data, len);
+ packet.hdr = (mpu_hdr_t) {
+ .seq_num = ctx->send_seq_num++
+ };
+
+ // Select path using weighted round-robin scheduler
+ int path_idx = (ctx->credits[0] <= ctx->credits[1]) ? 0 : 1;
+ ctx->credits[path_idx] += (float) len / ctx->paths[path_idx].weight;
+
+ ssize_t sent = send(ctx->paths[path_idx].sockfd, packet.buf, packet_len, 0);
+ if (sent < 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
+// FIXME: Skipped packages are never purged
+
+// TODO: Zero-copy recv() into reorder buffer slot, requiring the user to provide a callback
+// for processing received packets, instead of copying into a user buffer.
+
+// TODO: Attempt to optimize heap_extract() to purge multiple expired packets at once.
+
+ssize_t mpu_recv(mpu_ctx_t* ctx, uint8_t* data, size_t len) {
+ if (!ctx || !data || !len) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ while (true) {
+ // Peek heap for next in-order packet and deliver if available
+ heap_node_t* node = heap_next_extract(&ctx->reorder_heap);
+ if (node && node->value == ctx->recv_seq_num) {
+ mpu_slot_t* slot = (mpu_slot_t*) node->user;
+
+ size_t data_len = slot->packet_len - sizeof(mpu_hdr_t);
+ if (data_len > len) {
+ data_len = len;
+ }
+
+ memcpy(data, slot->packet.buf + sizeof(mpu_hdr_t), data_len);
+
+ heap_extract(&ctx->reorder_heap);
+ ctx->recv_seq_num++;
+
+ return (ssize_t) data_len;
+ }
+
+ // Purge packets outside of reorder window from heap
+ uint64_t now = get_time_now();
+ int expires_in = ctx->reorder_window;
+
+ while (true) {
+ node = heap_next_extract(&ctx->reorder_heap);
+ if (!node) {
+ break;
+ }
+
+ mpu_slot_t* slot = (mpu_slot_t*) node->user;
+
+ int age = (int) (now - slot->timestamp);
+ if (age < ctx->reorder_window) {
+ expires_in = ctx->reorder_window - age;
+ break;
+ }
+
+ ctx->recv_seq_num = slot->packet.hdr.seq_num + 1;
+ heap_extract(&ctx->reorder_heap);
+ }
+
+ // Receive from either path
+ struct pollfd pfds[2] = {
+ { .fd = ctx->paths[0].sockfd, .events = POLLIN },
+ { .fd = ctx->paths[1].sockfd, .events = POLLIN }
+ };
+ int ret = poll(pfds, 2, expires_in);
+ if (ret < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ return -1;
+ } else if (ret == 0) {
+ continue;
+ }
+
+ for (int i = 0; i < 2; i++) {
+ if (!(pfds[i].revents & POLLIN)) {
+ continue;
+ }
+
+ // Write to reorder buffer
+ node = heap_next_insert(&ctx->reorder_heap);
+ if (!node) {
+ // No free slot, skip packet for now
+ continue;
+ }
+
+ mpu_slot_t* slot = (mpu_slot_t*) node->user;
+
+ ssize_t packet_len = recv(pfds[i].fd, slot->packet.buf, MPU_MTU, 0);
+ if (packet_len < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ return -1;
+ } else if (packet_len < (ssize_t) sizeof(mpu_hdr_t)) {
+ // Malformed packet, ignore
+ continue;
+ }
+
+ slot->timestamp = get_time_now();
+ slot->packet_len = (uint16_t) packet_len;
+
+ heap_insert(&ctx->reorder_heap, slot->packet.hdr.seq_num);
+ }
+ }
+}
diff --git a/src/mpu.h b/src/mpu.h
new file mode 100644
index 0000000..da41a25
--- /dev/null
+++ b/src/mpu.h
@@ -0,0 +1,71 @@
+/* SPDX-License-Identifier: GPL-3.0-or-later */
+
+#pragma once
+
+#include "heap.h"
+
+#include <netinet/in.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <sys/types.h>
+
+//
+// A two-path UDP tunnel implementation.
+//
+// The tunnel uses two physical paths to transmit packets to the same destination.
+//
+// Sent packets are assigned a sequence number and transmitted through one of the
+// two paths according to a weighted round-robin scheduler.
+//
+// On the receiving end, packets are buffered in a reorder buffer and a best effort is made
+// to deliver packets in order.
+//
+
+#define MPU_MTU (1500U - 20 - 8) //!< Maximum payload size for traditional IP/UDP headers
+
+typedef struct {
+ int sockfd;
+ float weight;
+} mpu_path_t; //!< Physical path for packet transmission
+
+typedef struct {
+ uint64_t seq_num;
+} mpu_hdr_t; //!< Tunnel packet header
+
+typedef struct {
+ uint64_t timestamp; // in milliseconds
+ uint16_t packet_len;
+ union {
+ uint8_t buf[MPU_MTU];
+ mpu_hdr_t hdr;
+ } packet;
+} mpu_slot_t; //!< Packet slot for reorder buffer
+
+typedef struct {
+ mpu_path_t paths[2];
+
+ // Weighted round-robin scheduler state
+ float credits[2];
+ uint64_t send_seq_num;
+
+ // Reorder buffer
+ uint64_t recv_seq_num;
+ int reorder_window; // in milliseconds
+ heap_t reorder_heap;
+} mpu_ctx_t;
+
+/// Initialize MPU context with two physical paths
+/// Returns 0 on success, -1 on failure (errno is set).
+int mpu_init(mpu_ctx_t* ctx,
+ int sock0, float weight0,
+ int sock1, float weight1,
+ int reorder_window, // in milliseconds
+ size_t reorder_slots);
+
+/// Send a packet through the tunnel. Packet must not exceed MPU_MTU - sizeof(mpu_hdr_t).
+/// Returns 0 on success, -1 on failure (errno is set).
+int mpu_send(mpu_ctx_t* ctx, const uint8_t* data, size_t len);
+
+/// Receive a packet from the tunnel, blocking until one is available.
+/// Returns the length of the received data, or -1 on failure (errno is set).
+ssize_t mpu_recv(mpu_ctx_t* ctx, uint8_t* data, size_t len);