/* SPDX-License-Identifier: GPL-3.0-or-later */ #include "mpu.h" #include "heap.h" #include #include #include #include #include #include #include #include #include int mpu_init(mpu_ctx_t* ctx, int sock0, float weight0, int sock1, float weight1, int reorder_window, size_t reorder_slots ) { if (!ctx || sock0 < 0 || sock1 < 0 || weight0 <= 0.0F || weight1 <= 0.0F || reorder_window <= 0 || reorder_slots == 0) { errno = EINVAL; return -1; } ctx->paths[0] = (mpu_path_t) { .sockfd = sock0, .weight = weight0 }; ctx->paths[1] = (mpu_path_t) { .sockfd = sock1, .weight = weight1 }; ctx->credits[0] = 0.0F; ctx->credits[1] = 0.0F; ctx->send_seq_num = 1; ctx->recv_seq_num = 1; ctx->reorder_window = reorder_window; int i = heap_init(&ctx->reorder_heap, reorder_slots, sizeof(mpu_slot_t)); if (i < 0) { return -1; } return 0; } // Helper to get current time in milliseconds static inline uint64_t get_time_now(void) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return (uint64_t) ts.tv_sec * 1000ULL + ((uint64_t) ts.tv_nsec / 1000000ULL); } int mpu_send(mpu_ctx_t* ctx, const uint8_t* data, size_t len) { if (!ctx || !data || len == 0 || len > MPU_MTU - sizeof(mpu_hdr_t)) { errno = EINVAL; return -1; } // Prepare packet with header size_t packet_len = sizeof(mpu_hdr_t) + len; union { uint8_t buf[MPU_MTU]; mpu_hdr_t hdr; } packet; memcpy(packet.buf + sizeof(mpu_hdr_t), data, len); packet.hdr = (mpu_hdr_t) { .seq_num = htole64(ctx->send_seq_num++) }; // Select path using weighted round-robin scheduler int path_idx = (ctx->credits[0] <= ctx->credits[1]) ? 0 : 1; ctx->credits[path_idx] += (float) len / ctx->paths[path_idx].weight; ssize_t sent = send(ctx->paths[path_idx].sockfd, packet.buf, packet_len, 0); if (sent < 0) { return -1; } return 0; } ssize_t mpu_recv(mpu_ctx_t* ctx, uint8_t* data, size_t len) { if (!ctx || !data || !len) { errno = EINVAL; return -1; } while (true) { // Peek heap for next in-order packet and deliver if available heap_node_t* node = heap_next_extract(&ctx->reorder_heap); if (node && node->value == ctx->recv_seq_num) { mpu_slot_t* slot = (mpu_slot_t*) node->user; size_t data_len = slot->packet_len - sizeof(mpu_hdr_t); if (data_len > len) { data_len = len; } memcpy(data, slot->packet.buf + sizeof(mpu_hdr_t), data_len); heap_extract(&ctx->reorder_heap); ctx->recv_seq_num++; return (ssize_t) data_len; } // Purge packets outside of reorder window from heap uint64_t now = get_time_now(); int expires_in = ctx->reorder_window; while (true) { node = heap_next_extract(&ctx->reorder_heap); if (!node) { break; } mpu_slot_t* slot = (mpu_slot_t*) node->user; if (le64toh(slot->packet.hdr.seq_num) < ctx->recv_seq_num) { heap_extract(&ctx->reorder_heap); continue; } int age = (int) (now - slot->timestamp); if (age < ctx->reorder_window) { expires_in = ctx->reorder_window - age; break; } ctx->recv_seq_num = le64toh(slot->packet.hdr.seq_num) + 1; heap_extract(&ctx->reorder_heap); } // Receive from either path struct pollfd pfds[2] = { { .fd = ctx->paths[0].sockfd, .events = POLLIN }, { .fd = ctx->paths[1].sockfd, .events = POLLIN } }; int ret = poll(pfds, 2, expires_in); if (ret < 0) { if (errno == EINTR) { continue; } return -1; } else if (ret == 0) { continue; } for (int i = 0; i < 2; i++) { if (!(pfds[i].revents & POLLIN)) { continue; } // Write to reorder buffer node = heap_next_insert(&ctx->reorder_heap); if (!node) { // No free slot, skip packet for now continue; } mpu_slot_t* slot = (mpu_slot_t*) node->user; ssize_t packet_len = recv(pfds[i].fd, slot->packet.buf, MPU_MTU, 0); if (packet_len < 0) { if (errno == EINTR) { continue; } return -1; } else if (packet_len < (ssize_t) sizeof(mpu_hdr_t)) { // Malformed packet, ignore continue; } slot->timestamp = get_time_now(); slot->packet_len = (uint16_t) packet_len; heap_insert(&ctx->reorder_heap, le64toh(slot->packet.hdr.seq_num)); } } }