使用异或实现冗余包方式,冗余包大,并且很难抵抗连续丢包,这里尝试用里德所罗门编码,实现丢包测试。
如下代码效果:25个包分一组,能任意恢复其中的3个包,冗余包比率为25%,抗丢包能力为10%左右。

图仅为一个示例,该情况下可能恢复不出来包。
分块分组实现FEC纠错码,就是为了更大范围的保护数据包,在该包丢失情况下,能被恢复出来。这个思路的理解,对代码实现尤为关键。
组包规则:
每个rtp包每4个字节拆分为1组,每组200个字节,所以理论上可以最多50个包一组,这里设置为25个包一组
#define DATA_PACKETS_PER_BLOCK 25 //25个包,能任意恢复3个包
#define NUM_BLOCKS 350 //350*8 = 1400 350*32=11200
#define REAL_BLOCK_SIZE 4 //
#define MAX_RESTORE_PACKET 3
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <errno.h>
#include <time.h>
// 引入 libcorrect 库头文件
#include <correct/correct.h>
#include <correct/reed-solomon.h>
// 定义系统参数
#define RTP_HEADER_SIZE sizeof(RTPHeader)
#define FEC_HEADER_SIZE sizeof(FECHeader)
#define MAX_PACKET_SIZE 1400
#define FEC_PAYLOAD_TYPE 127 // FEC 专用载荷类型
#define DATA_PACKETS_PER_BLOCK 25 // 每块 25 个 RTP 包
#define FEC_PACKETS_PER_BLOCK 1 //
#define TOTAL_PACKETS_PER_BLOCK (DATA_PACKETS_PER_BLOCK + FEC_PACKETS_PER_BLOCK) // 每块总数据包数
#define MAX_BLOCKS_IN_BUFFER 100 // 最大块缓冲区数量
#define RS_PRIMITIVE_POLYNOMIAL 0x11D // 原始多项式 (GF(2^8))
#define RS_FIRST_CONSECUTIVE_ROOT 1 // 第一个连续根
#define RS_GENERATOR_ROOT_GAP 2 // 生成器根间隔
#define RS_NUM_ROOTS 32 // 校验符号数 = M=4)
#define MAX_RTP_SIZE MAX_PACKET_SIZE
#define BLOCK_SIZE 223 // 保持RS(255,223)标准块
#define SYMBLE_SIZE 32
#define REED_SOLOMON_LEN 255
#define SYMBOL_SIZE MAX_RTP_SIZE // 符号大小=包最大长度
#define K_RS 223 // RS编码消息长度(需K_RS ≥ BLOCK_SIZE)
#define M_RS 32 // RS编码校验长度
#define K DATA_PACKETS_PER_BLOCK // 原始包数
#define M FEC_PACKETS_PER_BLOCK // 冗余包数
//#define NUM_BLOCKS 175 //175*8 = 1400 每块 1 个 FEC 包 175*32=5600 = 1400 * 2
//#define REAL_BLOCK_SIZE 8 //
//#define NUM_BLOCKS 200 //175*8 = 1400 200*32=6400
//#define REAL_BLOCK_SIZE 7 //
#define NUM_BLOCKS 350 //350*8 = 1400 350*32=11200
#define REAL_BLOCK_SIZE 4 //
#define MAX_RESTORE_PACKET 3
//#define NUM_BLOCKS 35 // 分块数=35 ,200个字节一组,5个rtp一个fec包,每包分成35个块,每块40个字节,一共35个分组 每包35*32个字节 = 1120个字节
//40*35 = 1400
//#define REAL_BLOCK_SIZE 40
#define FEC_PACKETS_SIZE NUM_BLOCKS*SYMBLE_SIZE
// 定义RS编码块大小
#define RS_MAX_DATA_LENGTH BLOCK_SIZE // // RS(255,223) 分块大小
// RTP 头部结构定义
typedef struct {
uint8_t version:2; // 版本号
uint8_t padding:1; // 填充标志
uint8_t extension:1; // 扩展标志
uint8_t cc:4; // CSRC 计数器
uint8_t marker:1; // 标记位
uint8_t payload:7; // 载荷类型
uint16_t seq; // 序列号
uint32_t timestamp; // 时间戳
uint32_t ssrc; // 同步源标识符
// 可选的 CSRC 列表、扩展头等
} __attribute__((packed)) RTPHeader;
// FEC 包头部结构(简化版)
typedef struct {
uint8_t version:2; // RTP 版本
uint8_t padding:1; // 填充标志
uint8_t extension:1; // 扩展标志
uint8_t cc:4; // CSRC 计数器
uint8_t marker:1; // 标记位
uint8_t payload:7; // 载荷类型(FEC 专用)
uint16_t seq; // 序列号
uint32_t timestamp; // 时间戳
uint32_t ssrc; // 同步源标识符
uint16_t block_id; // 块 ID
uint8_t fec_index; // FEC 包索引
uint8_t original_packet_count; // 原始数据包数量
} __attribute__((packed)) FECHeader;
typedef struct {
int block_num; // 块号
/*
组合1 200个字节一组,5个rtp一个fec包,每包分成35个块,每块40个字节,一共35个分组 每包35*32个字节 = 1120个字节
组合2 200个字节一组,25个rtp4个fec包 5600/4 = 1400个字节,每包分成175个块,每块8个字节,一共175个分组 每包175*32个字节 = 5600 字节
*组合2能恢复2个RTP包
*/
uint8_t fec_data[NUM_BLOCKS * SYMBLE_SIZE];
} __attribute__((packed)) FECPacket;
// 发送缓冲区结构
typedef struct {
uint8_t *data; // 数据包数据
int length; // 数据包长度
int seq_num; // 序列号
int timestamp; // 时间戳
int is_fec; // 是否为 FEC 包
int block_id; // 所属块 ID
int packet_id; // 在块中的 ID
} PacketBuffer;
// 编码器上下文
typedef struct {
correct_reed_solomon *rs; // libcorrect Reed-Solomon编码器
PacketBuffer packets[DATA_PACKETS_PER_BLOCK]; // 原始数据包
FECPacket fecData[M]; // 内存池: M冗余包
int packet_count; // 当前块中的数据包数量
int next_seq_num; // 下一个序列号
int next_block_id; // 下一个块 ID
int ssrc; // 同步源标识符
// 网络相关
int sockfd; // 套接字描述符
struct sockaddr_in dest_addr; // 目标地址
} EncoderContext;
// 解码器上下文
typedef struct {
correct_reed_solomon *rs; // libcorrect Reed-Solomon解码器
PacketBuffer *received_packets[MAX_BLOCKS_IN_BUFFER]; // 接收的数据包
int block_map[MAX_BLOCKS_IN_BUFFER]; // 块ID映射
int block_packet_count[MAX_BLOCKS_IN_BUFFER]; // 每个块收到的包数量
int block_recovered[MAX_BLOCKS_IN_BUFFER]; // 块是否已恢复
int ssrc; // 同步源标识符
// 网络相关
int sockfd; // 套接字描述符
struct sockaddr_in src_addr; // 源地址
// 统计信息
int total_packets; // 接收的总数据包数
int recovered_packets; // 成功恢复的数据包数
int total_blocks; // 处理的总块数
int recovered_blocks; // 成功恢复的块数
} DecoderContext;
void printf_hex(uint8_t *data, int len);
// 初始化编码器
EncoderContext* init_encoder(const char* ip, int port) {
EncoderContext* ctx = malloc(sizeof(EncoderContext));
if (!ctx) return NULL;
// 初始化libcorrect Reed-Solomon编码器
ctx->rs = correct_reed_solomon_create(
RS_PRIMITIVE_POLYNOMIAL,
RS_FIRST_CONSECUTIVE_ROOT,
RS_GENERATOR_ROOT_GAP,
RS_NUM_ROOTS
);
if (!ctx->rs) {
free(ctx);
return NULL;
}
//int k = correct_reed_solomon_message_length(ctx->rs);
//int m = correct_reed_solomon_block_length(ctx->rs);
//printf("k:%d, M:%d\r\n", k , m);
// 验证参数兼容性
//assert(correct_reed_solomon_message_length(enc->rs) == K);
//assert(correct_reed_solomon_block_length(enc->rs) == K+M);
ctx->packet_count = 0;
ctx->next_seq_num = 0;
ctx->next_block_id = 0;
ctx->ssrc = rand();
// 创建 UDP 套接字
ctx->sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (ctx->sockfd < 0) {
perror("socket creation failed");
correct_reed_solomon_destroy(ctx->rs);
free(ctx);
return NULL;
}
memset(&ctx->dest_addr, 0, sizeof(ctx->dest_addr));
ctx->dest_addr.sin_family = AF_INET;
ctx->dest_addr.sin_port = htons(port);
if (inet_pton(AF_INET, ip, &ctx->dest_addr.sin_addr) <= 0) {
perror("invalid address");
close(ctx->sockfd);
correct_reed_solomon_destroy(ctx->rs);
free(ctx);
return NULL;
}
// 初始化数据包缓冲区
for (int i = 0; i < DATA_PACKETS_PER_BLOCK; i++) {
ctx->packets[i].data = NULL;
}
return ctx;
}
// 初始化解码器
DecoderContext* init_decoder(int port) {
DecoderContext* ctx = malloc(sizeof(DecoderContext));
if (!ctx) return NULL;
// 初始化libcorrect Reed-Solomon解码器(与编码器参数一致)
ctx->rs = correct_reed_solomon_create(
RS_PRIMITIVE_POLYNOMIAL,
RS_FIRST_CONSECUTIVE_ROOT,
RS_GENERATOR_ROOT_GAP,
RS_NUM_ROOTS
);
if (!ctx->rs) {
free(ctx);
return NULL;
}
// 初始化块映射和计数器
memset(ctx->block_map, -1, sizeof(ctx->block_map));
memset(ctx->block_packet_count, 0, sizeof(ctx->block_packet_count));
memset(ctx->block_recovered, 0, sizeof(ctx->block_recovered));
ctx->ssrc = 0;
// 初始化统计信息
ctx->total_packets = 0;
ctx->recovered_packets = 0;
ctx->total_blocks = 0;
ctx->recovered_blocks = 0;
// 创建 UDP 套接字
ctx->sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (ctx->sockfd < 0) {
perror("socket creation failed");
correct_reed_solomon_destroy(ctx->rs);
free(ctx);
return NULL;
}
memset(&ctx->src_addr, 0, sizeof(ctx->src_addr));
ctx->src_addr.sin_family = AF_INET;
ctx->src_addr.sin_addr.s_addr = INADDR_ANY;
ctx->src_addr.sin_port = htons(port);
if (bind(ctx->sockfd, (const struct sockaddr *)&ctx->src_addr,
sizeof(ctx->src_addr)) < 0) {
perror("bind failed");
close(ctx->sockfd);
correct_reed_solomon_destroy(ctx->rs);
free(ctx);
return NULL;
}
// 初始化数据包缓冲区
for (int i = 0; i < MAX_BLOCKS_IN_BUFFER; i++) {
ctx->received_packets[i] = (PacketBuffer *)malloc(sizeof(PacketBuffer));
memset(ctx->received_packets[i], 0x00, sizeof(PacketBuffer));
}
return ctx;
}
// 清理编码器资源
void cleanup_encoder(EncoderContext* ctx) {
if (ctx) {
// 释放数据包缓冲区
for (int i = 0; i < DATA_PACKETS_PER_BLOCK; i++) {
if (ctx->packets[i].data) {
free(ctx->packets[i].data);
}
}
if (ctx->rs) correct_reed_solomon_destroy(ctx->rs);
if (ctx->sockfd >= 0) close(ctx->sockfd);
free(ctx);
}
}
// 清理解码器资源
void cleanup_decoder(DecoderContext* ctx) {
if (ctx) {
// 释放数据包缓冲区
for (int i = 0; i < MAX_BLOCKS_IN_BUFFER; i++) {
if (ctx->received_packets[i]) {
free(ctx->received_packets[i]);
}
}
if (ctx->rs) correct_reed_solomon_destroy(ctx->rs);
if (ctx->sockfd >= 0) close(ctx->sockfd);
// 打印统计信息
printf("解码器统计信息:\n");
printf(" 处理的总块数: %d\n", ctx->total_blocks);
printf(" 成功恢复的块数: %d\n", ctx->recovered_blocks);
printf(" 接收的总数据包: %d\n", ctx->total_packets);
printf(" 成功恢复的数据包: %d\n", ctx->recovered_packets);
printf(" 块恢复成功率: %.2f%%\n",
(float)ctx->recovered_blocks / ctx->total_blocks * 100);
printf(" 数据包恢复成功率: %.2f%%\n",
(float)ctx->recovered_packets / (ctx->total_blocks * DATA_PACKETS_PER_BLOCK) * 100);
free(ctx);
}
}
// 使用Reed-Solomon码编码并发送RTP包
int encode_and_send(EncoderContext* ctx, const uint8_t* payload, int payload_len, int timestamp) {
if (!ctx || !payload || payload_len <= 0) {
errno = EINVAL;
return -1;
}
// 确保有效载荷长度不超过最大值
if (payload_len > MAX_PACKET_SIZE ) {
payload_len = MAX_PACKET_SIZE;
}
// 计算所需缓冲区大小
int packet_size = RTP_HEADER_SIZE + payload_len;
// 分配或调整数据包缓冲区大小
if (ctx->packets[ctx->packet_count].data == NULL) {
ctx->packets[ctx->packet_count].data = malloc(packet_size);
if (!ctx->packets[ctx->packet_count].data) {
perror("内存分配失败");
return -1;
}
} else if (ctx->packets[ctx->packet_count].length < packet_size) {
ctx->packets[ctx->packet_count].data = realloc(ctx->packets[ctx->packet_count].data, packet_size);
if (!ctx->packets[ctx->packet_count].data) {
perror("内存分配失败");
return -1;
}
}
// 创建 RTP 头部
RTPHeader header;
header.version = 2;
header.padding = 0;
header.extension = 0;
header.cc = 0;
header.marker = 0;
header.payload = 96; // 假设默认载荷类型 96
header.seq = htons(ctx->next_seq_num++);
header.timestamp = htonl(timestamp);
header.ssrc = htonl(ctx->ssrc);
// 构建完整的 RTP 包
memcpy(ctx->packets[ctx->packet_count].data, &header, RTP_HEADER_SIZE);
memcpy(ctx->packets[ctx->packet_count].data + RTP_HEADER_SIZE, payload, payload_len);
ctx->packets[ctx->packet_count].length = packet_size;
ctx->packets[ctx->packet_count].seq_num = ntohs(header.seq);
ctx->packets[ctx->packet_count].timestamp = ntohl(header.timestamp);
ctx->packets[ctx->packet_count].is_fec = 0;
ctx->packets[ctx->packet_count].block_id = ctx->next_block_id;
ctx->packets[ctx->packet_count].packet_id = ctx->packet_count;
// 发送原始 RTP 包
ssize_t sent = sendto(ctx->sockfd,
ctx->packets[ctx->packet_count].data,
ctx->packets[ctx->packet_count].length,
0,
(const struct sockaddr *)&ctx->dest_addr,
sizeof(ctx->dest_addr));
if (sent < 0) {
perror("发送失败");
return -1;
}
printf("发送原始包 #%d (块 %d/%d), 长度 %d 字节\n",
ctx->packets[ctx->packet_count].seq_num,
ctx->next_block_id, ctx->packet_count + 1, payload_len);
// 增加包计数
int current_packet = ctx->packet_count++;
// 当收集到足够的包时,生成并发送 FEC 包
if (ctx->packet_count == DATA_PACKETS_PER_BLOCK) {
generate_and_send_fec(ctx);
ctx->packet_count = 0;
ctx->next_block_id++;
}
return current_packet;
}
// 处理变长包:填充至SYMBOL_SIZE
void pad_packet(char *pkt, size_t actual_len) {
if (actual_len < SYMBOL_SIZE) {
memset(pkt + actual_len, 0, SYMBOL_SIZE - actual_len);
}
}
// 使用Reed-Solomon码生成并发送FEC包
void generate_and_send_fec(EncoderContext* ctx) {
int block_id = ctx->next_block_id;
uint8_t msg[BLOCK_SIZE]; // 当前块数据(223字节)
uint8_t encoded[BLOCK_SIZE + SYMBLE_SIZE]; // 编码结果(255字节)
printf("generate_and_send_fec NUM_BLOCKS:%d\n", NUM_BLOCKS);
int group_index = 0;
//35个块*40 ==1400
//175*8=1400
for (int block=0; block<NUM_BLOCKS;block++){
int offset = block * REAL_BLOCK_SIZE;
int size = REAL_BLOCK_SIZE;
memset(msg, 0x00, sizeof(msg));
// 2. 分块处理(每块223字节)-200字节+23字节补0
//每个包取40个字节,5个rtp包 一共200个字节
for (int i=0; i<DATA_PACKETS_PER_BLOCK; i++)
{
// 1.准备数据
uint8_t *full_data = ctx->packets[i].data + RTP_HEADER_SIZE;
int payload_len = ctx->packets[i].length - RTP_HEADER_SIZE;
// 从完整数据中提取当前块
if (block == NUM_BLOCKS - 1){
int reserved = payload_len - REAL_BLOCK_SIZE*block;
if (reserved < size){
size = reserved;
}
}
memcpy((void *)msg+i*REAL_BLOCK_SIZE, (void *)full_data + offset, size);
}
// RS编码
ssize_t ret = correct_reed_solomon_encode(ctx->rs, msg, BLOCK_SIZE, encoded);
if (ret != REED_SOLOMON_LEN){
printf("correct_reed_solomon_encode not ret:%d\r\n", ret);
}
if (block == 0){
printf_hex(encoded+BLOCK_SIZE, SYMBLE_SIZE);
}
FECPacket *hdr = (FECPacket*)&(ctx->fecData[group_index]);
hdr->block_num = block; // 标记保护的块
memcpy(hdr->fec_data + block*SYMBLE_SIZE, encoded + BLOCK_SIZE, SYMBLE_SIZE); //SYMBLE_SIZE为32个标记位
}
for (int fec_idx = 0; fec_idx < FEC_PACKETS_PER_BLOCK; fec_idx++) {
FECPacket *fecPacket = (FECPacket*)&(ctx->fecData[fec_idx]);
// 创建FEC头部
FECHeader fec_header;
fec_header.version = 2;
fec_header.padding = 0;
fec_header.extension = 0;
fec_header.cc = 0;
fec_header.marker = 0;
fec_header.payload = FEC_PAYLOAD_TYPE;
fec_header.seq = htons(ctx->next_seq_num++);
fec_header.timestamp = htonl(ctx->packets[0].timestamp);
fec_header.ssrc = htons(ctx->ssrc);
fec_header.block_id = htons(block_id);
fec_header.fec_index = fec_idx;
fec_header.original_packet_count = DATA_PACKETS_PER_BLOCK;
int fec_packet_len = FEC_HEADER_SIZE +NUM_BLOCKS * SYMBLE_SIZE; //sizeof(FECPacket)
// 计算FEC包总长度
printf("fec_idx:%d, fec_packet_len:%d,sizeof(FECPacket):%d\n", fec_idx, fec_packet_len,sizeof(FECPacket));
uint8_t* fec_packet = malloc(fec_packet_len);
if (!fec_packet) {
perror("FEC包内存分配失败");
continue;
}
// 复制头部数据
memcpy(fec_packet, (void *)&fec_header, FEC_HEADER_SIZE);
uint8_t *fec_data = fecPacket->fec_data;
memcpy((uint8_t *)(fec_packet+FEC_HEADER_SIZE), fec_data, NUM_BLOCKS * SYMBLE_SIZE);
// 发送FEC包
ssize_t sent = sendto(ctx->sockfd, fec_packet, fec_packet_len, 0,
(const struct sockaddr *)&ctx->dest_addr,
sizeof(ctx->dest_addr));
if (sent < 0) {
perror("FEC包发送失败");
} else {
printf("发送FEC包 #%d (块 %d/%d), 保护 %d 个RTP包\n",
ntohs(fec_header.seq), block_id, fec_idx + 1, DATA_PACKETS_PER_BLOCK);
}
free(fec_packet);
}
// 重置包计数器,准备下一个块
ctx->packet_count = 0;
ctx->next_block_id++;
}
//MAX_PACKET_SIZE + RTP_HEADER_SIZE + 1
#define MAX_UDP_PACKET NUM_BLOCKS*32+120
// 使用Reed-Solomon码接收并解码RTP包
int receive_and_decode(DecoderContext* ctx, uint8_t** output_buffer, int* output_len, int* seq_num) {
if (!ctx || !output_buffer || !output_len || !seq_num) {
errno = EINVAL;
return -1;
}
uint8_t buffer[MAX_UDP_PACKET];
socklen_t addr_len = sizeof(ctx->src_addr);
// 接收数据包
int recv_len = recvfrom(ctx->sockfd, buffer, MAX_UDP_PACKET, 0,
(struct sockaddr *)&ctx->src_addr, &addr_len);
if (recv_len < 0) {
perror("recvfrom failed");
return -1;
}
// 解析RTP头部
RTPHeader* rtp_header = (RTPHeader*)buffer;
uint16_t packet_seq = ntohs(rtp_header->seq);
// 检查是否是FEC包
int is_fec = (rtp_header->payload == FEC_PAYLOAD_TYPE);
// 计算块ID和包ID
int block_id = 0;
int packet_id = 0;
if (is_fec) {
// 解析FEC头部
FECHeader* fec_header = (FECHeader*)buffer;
packet_id = packet_seq % TOTAL_PACKETS_PER_BLOCK;
block_id = ntohs(fec_header->block_id);
// 计算FEC包在块中的索引
int fec_index = fec_header->fec_index;
printf("接收到FEC包 #%d (块 %d) %d len:%d\n",
packet_seq, block_id, fec_index, recv_len);
} else {
// 原始 RTP 包
block_id = packet_seq / TOTAL_PACKETS_PER_BLOCK;
packet_id = packet_seq % TOTAL_PACKETS_PER_BLOCK;
printf("接收到原始包 #%d (块 %d/%d) len:%d\n",
packet_seq, block_id, packet_id, recv_len );
}
// 查找或分配块缓冲区
int block_index = packet_id;
// 验证块ID是否有效
if (block_index < 0 || block_index >= MAX_BLOCKS_IN_BUFFER) {
printf("警告: 无效的包块ID %d\n", block_id);
return 0;
}
// 保存数据包,只缓存一组
int group_index = 0;
PacketBuffer* packet = ctx->received_packets[block_index];
// 分配或调整缓冲区大小
if (packet->data == NULL) {
packet->data = malloc( MAX_UDP_PACKET);//
if (!packet->data) {
perror("内存分配失败");
return -1;
}
} else if (packet->length < recv_len) {
packet->data = realloc(packet->data, recv_len);
if (!packet->data) {
perror("[2]内存分配失败");
return -1;
}
}
memcpy(packet->data, buffer, recv_len);
packet->length = recv_len;
packet->seq_num = packet_seq;
packet->timestamp = ntohl(rtp_header->timestamp);
packet->is_fec = is_fec;
packet->block_id = block_id;
packet->packet_id = packet_id;
// 更新接收计数
ctx->block_packet_count[block_index]++;
// 更新统计信息
ctx->total_packets++;
// 尝试恢复块数据
if (block_index >= DATA_PACKETS_PER_BLOCK) {
int recovered = recover_lost_packets(ctx, block_id);
if (recovered) {
ctx->block_recovered[block_index] = 1;
ctx->recovered_blocks++;
ctx->recovered_packets += DATA_PACKETS_PER_BLOCK;
printf("成功恢复块 %d 的所有数据包\n", block_id);
// 返回第一个可用的原始数据包
for (int i = 0; i < TOTAL_PACKETS_PER_BLOCK; i++) {
if (ctx->received_packets[i]->data) {
packet = ctx->received_packets[i];
*output_buffer = ctx->received_packets[i]->data + RTP_HEADER_SIZE;
*output_len = ctx->received_packets[i]->length - RTP_HEADER_SIZE;
*seq_num = ctx->received_packets[i]->seq_num;
// 标记为已处理
free(ctx->received_packets[i]->data);
ctx->received_packets[i]->data = NULL;
memset((void *)packet, 0x00 ,sizeof(packet));
}
}
}
}
return 0; // 没有可用的完整数据包
}
void printf_hex(uint8_t *data, int len){
printf("*****print len:%d***\n", len);
for (int i=0; i<len; i++){
printf("%02x", data[i]);
}
printf("******end**\n");
}
// 使用Reed-Solomon码恢复丢失的数据包
int recover_lost_packets(DecoderContext* ctx, int block_index) {
int group_index = 0;
int missing_packets = 0;
int received[TOTAL_PACKETS_PER_BLOCK] = {0};
int erasures[BLOCK_SIZE] = {0}, num_erasures = 0;
int random = rand() % (TOTAL_PACKETS_PER_BLOCK -1);
int random2 = rand() % (TOTAL_PACKETS_PER_BLOCK -1);// random+1;//
int random3 = rand() % (TOTAL_PACKETS_PER_BLOCK -1);
//random = random2;
printf("尝试恢复块 %d,%d,%d: %d 的数据包...\n", random, random2, random3, block_index);
// 检查是否有FEC包可用
int available_fec_packets = 0;
int available_fec_packet_index = 0;
for (int i = 0; i < TOTAL_PACKETS_PER_BLOCK; i++) {
PacketBuffer* packet = ctx->received_packets[i];
if (packet->data){
if (random == i || random2 == i || random3 == i){
received[i] = 0;
uint8_t *data = packet->data+RTP_HEADER_SIZE;
printf("test lost:%d ,orign data \n", i);
printf_hex(data, packet->length - RTP_HEADER_SIZE);
missing_packets++;
}else{
received[i] = 1;
if (packet->is_fec) {
available_fec_packets++;
available_fec_packet_index = i;
//printf("fec lost:%d ,orign data \n", i);
//printf_hex(packet->data+FEC_HEADER_SIZE, 32);
}
}
}else{
received[i] = 0;
}
}
// 如果FEC包全丢,不进行恢复
if (available_fec_packets == 0) {
printf("错误: 没有可用的FEC包,无法恢复\n");
return 0;
}
printf("块 %d 有 %d 个丢失的数据包,%d 个可用的FEC包,fec_i:%d\n",
block_index, missing_packets, available_fec_packets, available_fec_packet_index);
// 检查是否有足够的FEC包来恢复
if (missing_packets > MAX_RESTORE_PACKET) {
printf("错误: 可用FEC包不足,无法恢复 (需要至少 %d 个,实际 %d 个)\n",
missing_packets, available_fec_packets);
return 0;
}
uint8_t symbols[REED_SOLOMON_LEN], restored[REED_SOLOMON_LEN];
PacketBuffer* fec_packet = ctx->received_packets[available_fec_packet_index] ;
FECHeader* fec_header = (FECHeader*)fec_packet;
// 1. 分块恢复(共35块,每块40个字节)
// 2. 分块恢复(共175块,每块8个字节)
for (int block=0; block<NUM_BLOCKS; block++) {
int offset = block * REAL_BLOCK_SIZE;
memset(symbols, 0, REED_SOLOMON_LEN);
memset(restored, 0, REED_SOLOMON_LEN);
memset(erasures, 0, BLOCK_SIZE);
num_erasures = 0;
// 组包1:收集当前块的原始数据 5个包,每个包40个字节
// 组包2:收集当前块的原始数据 175个包,每个包8个字节
for (int i=0; i<DATA_PACKETS_PER_BLOCK; i++) {
if (received[i] ) {
uint8_t *data = ctx->received_packets[i]->data;
uint8_t *full_data = data + RTP_HEADER_SIZE;
memcpy(symbols + i*REAL_BLOCK_SIZE, full_data + offset, REAL_BLOCK_SIZE);
}else{
for (int j=0; j<REAL_BLOCK_SIZE; j++){
erasures[num_erasures++] = i*REAL_BLOCK_SIZE+j;
}
//num_erasures += REAL_BLOCK_SIZE;
}
}
// 收集当前块的校验数据(1个FEC包)
memcpy(symbols + BLOCK_SIZE , (void*)fec_packet->data + FEC_HEADER_SIZE + block*SYMBLE_SIZE, SYMBLE_SIZE);
if (block == 0){
printf("***************num_erasures:%d data:",num_erasures);
for (int j=0; j<num_erasures; j++){
printf("%d,",erasures[j]);
}
printf("****\n",num_erasures);
printf_hex(symbols, REED_SOLOMON_LEN);
}
// RS解码
int success = correct_reed_solomon_decode_with_erasures(
ctx->rs,
symbols,
REED_SOLOMON_LEN,
erasures,
num_erasures,
restored
);
if (block == 0){
printf_hex(restored, REED_SOLOMON_LEN);
printf("#########success:%d#############\n",success);
}
// 3. 回写恢复的原始数据
if (success) {
for (int i=0; i<DATA_PACKETS_PER_BLOCK; i++) {
if (!received[i]) {
PacketBuffer *packetbuffer = ctx->received_packets[i] ;
if (packetbuffer && packetbuffer->data == NULL){
packetbuffer->data = (uint8_t *)malloc(MAX_PACKET_SIZE + RTP_HEADER_SIZE);
}
uint8_t *full_data = packetbuffer->data + RTP_HEADER_SIZE;
memcpy(full_data + offset, restored + i*REAL_BLOCK_SIZE, REAL_BLOCK_SIZE);
}
}
}
}
for (int i=0; i<DATA_PACKETS_PER_BLOCK; i++) {
if (!received[i]) {
int missing_idx= i;
if (ctx->received_packets[missing_idx]->data == NULL){
ctx->received_packets[missing_idx]->data = (uint8_t *)malloc(RTP_HEADER_SIZE+MAX_PACKET_SIZE);
}
// 创建恢复包的头部
RTPHeader* recovered_header = (RTPHeader*)ctx->received_packets[missing_idx]->data;
// 复制RTP头部(从第一个可用的包复制)
RTPHeader* header = NULL;
for (int k = 0; k < DATA_PACKETS_PER_BLOCK; k++) {
if (ctx->received_packets[k]->data) {
header = (RTPHeader*)ctx->received_packets[k]->data;
break;
}
}
*recovered_header = *header;
ctx->received_packets[missing_idx]->length = RTP_HEADER_SIZE + MAX_PACKET_SIZE;
ctx->received_packets[missing_idx]->seq_num = recovered_header->seq;
ctx->received_packets[missing_idx]->timestamp = ntohl(recovered_header->timestamp);
ctx->received_packets[missing_idx]->is_fec = 0;
ctx->received_packets[missing_idx]->block_id = fec_header->block_id;
ctx->received_packets[missing_idx]->packet_id = missing_idx;
uint8_t *data = ctx->received_packets[missing_idx]->data + RTP_HEADER_SIZE;
printf_hex(data, MAX_PACKET_SIZE);
printf("成功恢复块 %d 的数据包 #%d\n", fec_header->block_id, i);
}
}
printf("块 %d 的恢复尝试完成: 成功\n", fec_header->block_id);
return 1;
}
// 编码器主函数示例
void encoder_main(const char* dest_ip, int dest_port) {
EncoderContext* ctx = init_encoder(dest_ip, dest_port);
if (!ctx) {
printf("编码器初始化失败\n");
return;
}
printf("编码器开始发送数据到 %s:%d...\n", dest_ip, dest_port);
// 模拟发送 100 个 RTP 包
for (int i = 0; i < 100; i++) {
// 创建随机数据作为载荷
uint8_t payload[MAX_PACKET_SIZE];
for (int j = 0; j < sizeof(payload); j++) {
payload[j] = rand() % 256;
}
// 编码并发送
encode_and_send(ctx, payload, sizeof(payload), i * 90);
// 控制发送速率
usleep(10000); // 10ms
}
cleanup_encoder(ctx);
}
// 解码器主函数示例
void decoder_main(int listen_port) {
DecoderContext* ctx = init_decoder(listen_port);
if (!ctx) {
printf("解码器初始化失败\n");
return;
}
printf("解码器开始在端口 %d 接收数据...\n", listen_port);
uint8_t* output_buffer = NULL;
int output_len = 0;
int seq_num = 0;
// 持续接收和解码数据包
while (1) {
int result = receive_and_decode(ctx, &output_buffer, &output_len, &seq_num);
if (result > 0) {
printf("处理接收到的数据包 #%d, 长度 %d 字节\n", seq_num, output_len);
// 这里可以处理接收到的数据,例如写入文件或播放
// 示例中只是简单打印信息
// 注意:output_buffer 指向的是 ctx 内部缓冲区,
// 在下一次调用 receive_and_decode 前保持有效
} else if (result < 0) {
printf("接收数据时发生错误\n");
break;
}
// 控制循环速率
usleep(1000); // 1ms
}
cleanup_decoder(ctx);
}
int main(int argc, char* argv[]) {
// 初始化随机数生成器
srand(time(NULL));
if (argc < 2) {
printf("用法: %s [encoder|decoder] [参数...]\n", argv[0]);
printf("编码器: %s encoder <目标IP> <目标端口>\n", argv[0]);
printf("解码器: %s decoder <监听端口>\n", argv[0]);
return 1;
}
if (strcmp(argv[1], "encoder") == 0) {
if (argc < 4) {
printf("编码器需要目标IP和端口参数\n");
return 1;
}
const char* dest_ip = argv[2];
int dest_port = atoi(argv[3]);
encoder_main(dest_ip, dest_port);
} else if (strcmp(argv[1], "decoder") == 0) {
if (argc < 3) {
printf("解码器需要监听端口参数\n");
return 1;
}
int listen_port = atoi(argv[2]);
decoder_main(listen_port);
} else {
printf("未知命令: %s\n", argv[1]);
return 1;
}
return 0;
}测试:
接收端:
./testc decoder 5000
发送端:
./testc encoder 127.0.0.1 5000
-------------------广告线---------------
项目、合作,欢迎勾搭,邮箱:promall@qq.com
本文为呱牛笔记原创文章,转载无需和我联系,但请注明来自呱牛笔记 ,it3q.com
