diff options
| -rw-r--r-- | CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/mpu.c | 197 | ||||
| -rw-r--r-- | src/mpu.h | 71 |
3 files changed, 269 insertions, 0 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 68a9b98..0820f7e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,6 +9,7 @@ include(cmake/Diagnostics.cmake) add_executable(mpu "src/heap.c" + "src/mpu.c" "src/p2p.c" "src/main.c") diff --git a/src/mpu.c b/src/mpu.c new file mode 100644 index 0000000..32c0ffe --- /dev/null +++ b/src/mpu.c @@ -0,0 +1,197 @@ +/* SPDX-License-Identifier: GPL-3.0-or-later */ + +#include "mpu.h" +#include "heap.h" + +#include <errno.h> +#include <stddef.h> +#include <stdint.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/poll.h> +#include <time.h> + +int mpu_init(mpu_ctx_t* ctx, + int sock0, float weight0, + int sock1, float weight1, + int reorder_window, + size_t reorder_slots +) { + if (!ctx || sock0 < 0 || sock1 < 0 + || weight0 <= 0.0F || weight1 <= 0.0F + || reorder_window <= 0 + || reorder_slots == 0) { + errno = EINVAL; + return -1; + } + + ctx->paths[0] = (mpu_path_t) { + .sockfd = sock0, + .weight = weight0 + }; + ctx->paths[1] = (mpu_path_t) { + .sockfd = sock1, + .weight = weight1 + }; + + ctx->credits[0] = 0.0F; + ctx->credits[1] = 0.0F; + + ctx->send_seq_num = 1; + ctx->recv_seq_num = 1; + + ctx->reorder_window = reorder_window; + int i = heap_init(&ctx->reorder_heap, reorder_slots, sizeof(mpu_slot_t)); + if (i < 0) { + return -1; + } + + return 0; +} + +// Helper to get current time in milliseconds +static inline uint64_t get_time_now(void) { + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + + return (uint64_t) ts.tv_sec * 1000ULL + + ((uint64_t) ts.tv_nsec / 1000000ULL); +} + +// FIXME: seq_num is never converted to network byte order + +// TODO: Zero-copy send() requiring the user to reserve space for the header. +// Perhaps consider MSG_MORE for sending header & payload separately or sendmsg() + +int mpu_send(mpu_ctx_t* ctx, const uint8_t* data, size_t len) { + if (!ctx || !data || len == 0 || len > MPU_MTU - sizeof(mpu_hdr_t)) { + errno = EINVAL; + return -1; + } + + // Prepare packet with header + size_t packet_len = sizeof(mpu_hdr_t) + len; + union { + uint8_t buf[MPU_MTU]; + mpu_hdr_t hdr; + } packet; + + memcpy(packet.buf + sizeof(mpu_hdr_t), data, len); + packet.hdr = (mpu_hdr_t) { + .seq_num = ctx->send_seq_num++ + }; + + // Select path using weighted round-robin scheduler + int path_idx = (ctx->credits[0] <= ctx->credits[1]) ? 0 : 1; + ctx->credits[path_idx] += (float) len / ctx->paths[path_idx].weight; + + ssize_t sent = send(ctx->paths[path_idx].sockfd, packet.buf, packet_len, 0); + if (sent < 0) { + return -1; + } + + return 0; +} + +// FIXME: Skipped packages are never purged + +// TODO: Zero-copy recv() into reorder buffer slot, requiring the user to provide a callback +// for processing received packets, instead of copying into a user buffer. + +// TODO: Attempt to optimize heap_extract() to purge multiple expired packets at once. + +ssize_t mpu_recv(mpu_ctx_t* ctx, uint8_t* data, size_t len) { + if (!ctx || !data || !len) { + errno = EINVAL; + return -1; + } + + while (true) { + // Peek heap for next in-order packet and deliver if available + heap_node_t* node = heap_next_extract(&ctx->reorder_heap); + if (node && node->value == ctx->recv_seq_num) { + mpu_slot_t* slot = (mpu_slot_t*) node->user; + + size_t data_len = slot->packet_len - sizeof(mpu_hdr_t); + if (data_len > len) { + data_len = len; + } + + memcpy(data, slot->packet.buf + sizeof(mpu_hdr_t), data_len); + + heap_extract(&ctx->reorder_heap); + ctx->recv_seq_num++; + + return (ssize_t) data_len; + } + + // Purge packets outside of reorder window from heap + uint64_t now = get_time_now(); + int expires_in = ctx->reorder_window; + + while (true) { + node = heap_next_extract(&ctx->reorder_heap); + if (!node) { + break; + } + + mpu_slot_t* slot = (mpu_slot_t*) node->user; + + int age = (int) (now - slot->timestamp); + if (age < ctx->reorder_window) { + expires_in = ctx->reorder_window - age; + break; + } + + ctx->recv_seq_num = slot->packet.hdr.seq_num + 1; + heap_extract(&ctx->reorder_heap); + } + + // Receive from either path + struct pollfd pfds[2] = { + { .fd = ctx->paths[0].sockfd, .events = POLLIN }, + { .fd = ctx->paths[1].sockfd, .events = POLLIN } + }; + int ret = poll(pfds, 2, expires_in); + if (ret < 0) { + if (errno == EINTR) { + continue; + } + return -1; + } else if (ret == 0) { + continue; + } + + for (int i = 0; i < 2; i++) { + if (!(pfds[i].revents & POLLIN)) { + continue; + } + + // Write to reorder buffer + node = heap_next_insert(&ctx->reorder_heap); + if (!node) { + // No free slot, skip packet for now + continue; + } + + mpu_slot_t* slot = (mpu_slot_t*) node->user; + + ssize_t packet_len = recv(pfds[i].fd, slot->packet.buf, MPU_MTU, 0); + if (packet_len < 0) { + if (errno == EINTR) { + continue; + } + return -1; + } else if (packet_len < (ssize_t) sizeof(mpu_hdr_t)) { + // Malformed packet, ignore + continue; + } + + slot->timestamp = get_time_now(); + slot->packet_len = (uint16_t) packet_len; + + heap_insert(&ctx->reorder_heap, slot->packet.hdr.seq_num); + } + } +} diff --git a/src/mpu.h b/src/mpu.h new file mode 100644 index 0000000..da41a25 --- /dev/null +++ b/src/mpu.h @@ -0,0 +1,71 @@ +/* SPDX-License-Identifier: GPL-3.0-or-later */ + +#pragma once + +#include "heap.h" + +#include <netinet/in.h> +#include <stddef.h> +#include <stdint.h> +#include <sys/types.h> + +// +// A two-path UDP tunnel implementation. +// +// The tunnel uses two physical paths to transmit packets to the same destination. +// +// Sent packets are assigned a sequence number and transmitted through one of the +// two paths according to a weighted round-robin scheduler. +// +// On the receiving end, packets are buffered in a reorder buffer and a best effort is made +// to deliver packets in order. +// + +#define MPU_MTU (1500U - 20 - 8) //!< Maximum payload size for traditional IP/UDP headers + +typedef struct { + int sockfd; + float weight; +} mpu_path_t; //!< Physical path for packet transmission + +typedef struct { + uint64_t seq_num; +} mpu_hdr_t; //!< Tunnel packet header + +typedef struct { + uint64_t timestamp; // in milliseconds + uint16_t packet_len; + union { + uint8_t buf[MPU_MTU]; + mpu_hdr_t hdr; + } packet; +} mpu_slot_t; //!< Packet slot for reorder buffer + +typedef struct { + mpu_path_t paths[2]; + + // Weighted round-robin scheduler state + float credits[2]; + uint64_t send_seq_num; + + // Reorder buffer + uint64_t recv_seq_num; + int reorder_window; // in milliseconds + heap_t reorder_heap; +} mpu_ctx_t; + +/// Initialize MPU context with two physical paths +/// Returns 0 on success, -1 on failure (errno is set). +int mpu_init(mpu_ctx_t* ctx, + int sock0, float weight0, + int sock1, float weight1, + int reorder_window, // in milliseconds + size_t reorder_slots); + +/// Send a packet through the tunnel. Packet must not exceed MPU_MTU - sizeof(mpu_hdr_t). +/// Returns 0 on success, -1 on failure (errno is set). +int mpu_send(mpu_ctx_t* ctx, const uint8_t* data, size_t len); + +/// Receive a packet from the tunnel, blocking until one is available. +/// Returns the length of the received data, or -1 on failure (errno is set). +ssize_t mpu_recv(mpu_ctx_t* ctx, uint8_t* data, size_t len); |
