MP3文件流式分发并播放,主要涉及广播分发音频文件,然后在广播端解码播放。

服务器端,读文件,发送流,为了避免粘包,先发长度,然后发送二进制流。
流格式为RTP+1300个字节的音频
struct ClientConfig {
public:
char ip[64];
unsigned short port;
bool useTcp;
};
std::vector<ClientConfig*> clients;
static volatile uint8_t running_flag = 0;
static volatile uint8_t running_end_flag = 1;
// RTP头结构
#pragma pack(push, 1)
struct RTPHeader {
uint8_t version_cc;
uint8_t marker_pt;
uint16_t seq;
uint32_t timestamp;
uint32_t ssrc;
};
DWORD WINAPI SendWorker(LPVOID param) {
char* file_name = (char*)param;
int file_size = GetFileSize(file_name);
// 一次性读取整个文件
FILE* fp = fopen(file_name, "rb");
if (fp == NULL) {
free(file_name);
return 0;
}
//fseek(fp, 0, SEEK_END);
//long file_size = ftell(fp);
fseek(fp, 0, SEEK_SET);
std::vector<SOCKET> clientSocks;
int sendBufSize = 1 * 1024 * 1024; // 1MB
for (auto* client : clients) {
if (client == NULL) {
continue;
}
SOCKET sock = client->useTcp ?
socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) :
socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
setsockopt(sock, SOL_SOCKET, SO_SNDBUF,
(char*)&sendBufSize, sizeof(sendBufSize));
sockaddr_in addr = {};
addr.sin_family = AF_INET;
addr.sin_port = htons(client->port);
addr.sin_addr.s_addr = inet_addr(client->ip);
if (client->useTcp) connect(sock, (sockaddr*)&addr, sizeof(addr));
clientSocks.push_back(sock);
}
#if 1
//第一个包,发送文件名,文件大小,文件信息
char send_buffer[256] = "";
std::string name = GetFileName(file_name);
sprintf(send_buffer, "start#name:%s#file_size:%d",name.c_str(), file_size);
for (size_t i = 0; i < clients.size(); ++i) {
if (clients[i]->useTcp) {
int packetSize = file_size;//strlen(send_buffer)+1;
uint32_t netSize = htonl(packetSize);
// 发送长度头
send(clientSocks[i], (char*)&netSize, sizeof(netSize), 0);
//send(clientSocks[i], send_buffer, packetSize+1, 0);
}
else {
int packetSize = strlen(send_buffer)+1;
sockaddr_in addr = {};
addr.sin_family = AF_INET;
addr.sin_port = htons(clients[i]->port);
addr.sin_addr.s_addr = inet_addr(clients[i]->ip);
sendto(clientSocks[i], send_buffer, packetSize + 1, 0,
(sockaddr*)&addr, sizeof(addr));
}
}
#endif
// 分段发送
const int chunkSize = 1300; // 有效载荷大小
RTPHeader header;
header.version_cc = 0x80; // V=2, CC=0
header.marker_pt = 0x60; // PT=96
header.ssrc = htonl(0x12345678);
char* fileData = new char[chunkSize*2];
running_end_flag = 0;
// 构造RTP包
char *packet = (char *)malloc(sizeof(RTPHeader) + chunkSize);
int seq = 1;
int file_offset = 0;
int need_pause = 0;
while (!feof(fp)) {
if (!running_flag) {
need_pause = 1;
break;
}
int readSize = fread(fileData, 1, chunkSize, fp);
if (readSize <= 0) {
break;
}
header.seq = htons(seq++);
header.timestamp = htonl(file_offset); // 模拟时间戳
memcpy((void *)packet, (void*)&header, HEADER_SIZE);
memcpy((char*)packet + HEADER_SIZE, (char*)fileData, readSize);
int packetSize = HEADER_SIZE + readSize;
// 为每个客户端发送当前块
for (size_t i = 0; i < clients.size(); ++i) {
if (clients[i]->useTcp) {
uint32_t netSize = htonl(packetSize);
// 发送长度头
send(clientSocks[i], (char*)&netSize, sizeof(netSize), 0);
send(clientSocks[i], packet, packetSize, 0);
}
else {
sockaddr_in addr = {};
addr.sin_family = AF_INET;
addr.sin_port = htons(clients[i]->port);
addr.sin_addr.s_addr = inet_addr(clients[i]->ip);
sendto(clientSocks[i], packet, packetSize, 0,
(sockaddr*)&addr, sizeof(addr));
}
}
file_offset += readSize;
if (!running_flag) {
need_pause = 1;
break;
}
Sleep(10); // 控制速率
}
char temp[64] = "";
sprintf(temp, "end!");
if (need_pause) {
sprintf(temp, "stop");
}
for (size_t i = 0; i < clients.size(); ++i) {
if (clients[i]->useTcp) {
int len = strlen(temp)+1;
uint32_t netSize = htonl(len);
// 发送长度头
send(clientSocks[i], (char*)&netSize, sizeof(netSize), 0);
send(clientSocks[i], temp, len, 0);
}
else {
sockaddr_in addr = {};
addr.sin_family = AF_INET;
addr.sin_port = htons(clients[i]->port);
addr.sin_addr.s_addr = inet_addr(clients[i]->ip);
sendto(clientSocks[i], temp, strlen(temp), 0,
(sockaddr*)&addr, sizeof(addr));
}
}
Sleep(10); // 控制速率
for (auto sock : clientSocks) closesocket(sock);
delete[] fileData;
clients.clear();
running_end_flag = 1;
running_flag = 0;
fclose(fp);
free(packet);
free(file_name);
return 0;
}Windows电脑模拟客户端:
几个关键点:1、音频流保存为文件;
2、使用ffmpeg的api探测探测音频流的格式,解码并转换为电脑播放的音频采样率、channel和音频PCM格式;
3、使用SDL的API播放音频,关键是AUDIO_F32LSB的转换,要不播放出来的声音有异常;
#define DATA_PORT 5600
#define SAMPLE_RATE 44100
#define OUT_PLAY_CHANNEL 2
enum AVSampleFormat sdl_format_to_ffmpeg(SDL_AudioFormat format);
// RTP头结构
#pragma pack(push, 1)
struct RTPHeader {
uint8_t version_cc;
uint8_t marker_pt;
uint16_t seq;
uint32_t timestamp;
uint32_t ssrc;
};
#define AUDIO_BUFFER_COUNT 4
#define MAX_FRAME_SIZE 4410*8
#define DETECT_LEN 300*1024
#define OUT_DATA_SIZE 1152 * 8
// 音频解码上下文
struct AudioContext {
AVCodecContext* codecCtx;
AVFormatContext* fmt_ctx;
AVCodecParameters* codecpar;
int audio_stream_idx;
const AVCodec* codec;
AVCodecParserContext* parser;
SwrContext* swrCtx;
HWAVEOUT hWaveOut;
bool isTcpMode;
AVFrame* frame;
AVFrame* pframePCM;
AVPacket* pkt;
uint8_t* outData[2];
uint8_t* cache_pcm_buffer;
size_t cache_capacity;
size_t cache_buffer_size;
uint8_t* recv_buffer;
size_t capacity;
size_t recv_buffer_size;
size_t read_buffer;
int file_size;
unsigned long play_start_time;
int duration;
double accumulated_play_time;
size_t dectected;
struct {
uint8_t* data;
int size;
bool inUse;
} audioBuffers[AUDIO_BUFFER_COUNT];
int currentBuffer;
pthread_mutex_t bufferMutex;
bool running;//
bool play_pause;//
SDL_AudioDeviceID audio_device;
};
static AudioContext ctx;
int configure_decoder(AudioContext* ctx) {
if (ctx->codec != NULL) {
avcodec_free_context(&ctx->codecCtx);
ctx->codecCtx = NULL;
swr_free(&ctx->swrCtx);
}
if (ctx->codecpar == NULL) {
printf("codecpar == NULL \n");
return -1;
}
// 查找解码器
ctx->codec = avcodec_find_decoder(ctx->codecpar->codec_id);
if (!ctx->codec) {
printf("avcodec_find_decoder failed \n");
return AVERROR_DECODER_NOT_FOUND;
}
// 创建解码器上下文
ctx->codecCtx = avcodec_alloc_context3(ctx->codec);
if (!ctx->codecCtx) {
printf("avcodec_alloc_context3 failed \n");
return AVERROR(ENOMEM);
}
// 复制参数到解码器上下文
int ret = avcodec_parameters_to_context(ctx->codecCtx, ctx->codecpar);
if (ret < 0) {
printf("avcodec_parameters_to_context failed: %d\n", ret);
return ret;
}
// 打开解码器
ret = avcodec_open2(ctx->codecCtx, ctx->codec, NULL);
if (ret < 0) {
printf("Could not open codec: %d\n", ret);
avcodec_free_context(&ctx->codecCtx);
return -1;
}
ctx->swrCtx = swr_alloc();
// 初始化重采样器
swr_alloc_set_opts(ctx->swrCtx,
AV_CH_LAYOUT_STEREO,
#ifdef USE_SDL_DEVICE
sdl_format_to_ffmpeg(AUDIO_F32LSB),
#else
AV_SAMPLE_FMT_S16,
#endif
SAMPLE_RATE,
av_get_default_channel_layout(ctx->codecCtx->channels),
ctx->codecCtx->sample_fmt,
ctx->codecCtx->sample_rate,
0,
NULL);
ret = swr_init(ctx->swrCtx);
if (ret < 0)
{
printf("swr_init failed: %d\n", ret);
avcodec_free_context(&ctx->codecCtx);
swr_free(&ctx->swrCtx);
return -1;
}
return ret;
}
enum AVSampleFormat sdl_format_to_ffmpeg(SDL_AudioFormat format) {
switch (format) {
case AUDIO_S16SYS:
case AUDIO_S16MSB:
return AV_SAMPLE_FMT_S16;
case AUDIO_F32LSB:
case AUDIO_F32MSB:
return AV_SAMPLE_FMT_FLT;
case AUDIO_S32LSB:
case AUDIO_S32MSB:
return AV_SAMPLE_FMT_S32;
default:
return AV_SAMPLE_FMT_NONE;
}
}
#define SDL_AUDIO_BUFFER_SIZE 4096
int start_play_with_sdl() {
// 设置SDL音频参数
SDL_AudioSpec desired, obtained;
desired.freq = SAMPLE_RATE; // 采样率
desired.format = AUDIO_F32;// AUDIO_F32LSB;// AUDIO_S16SYS; // 16位有符号整数(系统字节序)
desired.channels = 2; // 立体声
desired.silence = 0; // 静音值为0
desired.samples = SDL_AUDIO_BUFFER_SIZE; // 音频缓冲区大小
desired.callback = NULL; // 不使用回调
desired.userdata = NULL;
if (SDL_Init(SDL_INIT_AUDIO) < 0) {
fprintf(stderr, "SDL初始化失败: %s\n", SDL_GetError());
return -1;
}
// 打开音频设备
ctx.audio_device = SDL_OpenAudioDevice(NULL, 0, &desired, &obtained, SDL_AUDIO_ALLOW_FORMAT_CHANGE);
if (ctx.audio_device == 0) {
fprintf(stderr, "Failed to open audio device: %s\n", SDL_GetError());
return -1;
}
// 检查实际获得的音频格式是否符合要求
if (obtained.format != AUDIO_F32LSB) {
fprintf(stderr, "SDL did not provide AUDIO_S16SYS format\n");
return -1;
}
// 开始播放
SDL_PauseAudioDevice(ctx.audio_device, 0);
}
// 探测音频格式
int probe_audio_format(AudioContext* ctx, std::string &temp_file) {
if (ctx->fmt_ctx != NULL) {
avformat_free_context(ctx->fmt_ctx);
}
ctx->fmt_ctx = avformat_alloc_context();
if (!ctx->fmt_ctx) return AVERROR(ENOMEM);
// 探测前20KB数据(可调整)
ctx->fmt_ctx->probesize = DETECT_LEN;
ctx->fmt_ctx->max_analyze_duration = 1 * AV_TIME_BASE; // 5秒
// 探测格式但不解析完整文件
int ret = avformat_open_input(&ctx->fmt_ctx, temp_file.c_str(), NULL, NULL);
if (ret < 0) {
printf("open failed");
return ret;
}
// 查找流信息(仅头部)
ret = avformat_find_stream_info(ctx->fmt_ctx, NULL);
if (ret < 0) {
printf("avformat_find_stream_info failed");
return ret;
}
// 查找音频流
for (unsigned i = 0; i < ctx->fmt_ctx->nb_streams; i++) {
AVStream* stream = ctx->fmt_ctx->streams[i];
if (stream->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
ctx->audio_stream_idx = i;
ctx->codecpar = stream->codecpar;
break;
}
}
if (ctx->audio_stream_idx == -1) {
return AVERROR_STREAM_NOT_FOUND;
}
if (0 != configure_decoder(ctx)) {
return 0;
}
{
int64_t total_duration = ctx->fmt_ctx->duration + ((ctx->fmt_ctx->duration <= (INT64_MAX - 5000)) ? 5000 : 0);
ctx->duration = (total_duration / AV_TIME_BASE);
int us = (total_duration % AV_TIME_BASE);
printf("total_duration:%02d:%02d\r\n", ctx->duration, us);
}
struct timeval tv;
gettimeofday(&tv, NULL);
unsigned long start_time = tv.tv_sec * 1000 + tv.tv_usec / 1000;//ms
ctx->recv_buffer_size = 0;
ctx->dectected = 1;
ctx->play_start_time = start_time;// GetTickCount();// time(nullptr);
register_pjsip_thread("NetworkThread");
//start_play(ctx->codecCtx->sample_rate, ctx->codecCtx->channels, NULL, RemotePlayCallbackFunc);
#ifndef USE_SDL_DEVICE
start_play(SAMPLE_RATE, OUT_PLAY_CHANNEL, NULL, RemotePlayCallbackFunc);
#else
start_play_with_sdl();
#endif
return 0;
}
bool appendBinaryFile(const std::string& filePath, const uint8_t* binaryData, size_t dataSize) {
// 创建并打开文件流,以二进制追加模式
std::ofstream outFile(filePath, std::ios::out | std::ios::binary | std::ios::app);
// 检查文件是否成功打开
if (!outFile) {
std::cerr << "无法打开文件: " << filePath << std::endl;
return false;
}
// 将二进制数据追加写入文件
outFile.write(reinterpret_cast<const char*>(binaryData), dataSize);
// 关闭文件流
outFile.close();
return true;
}
bool initDecoder(AudioContext* ctx) {
// FFmpeg初始化
avcodec_register_all();
pthread_mutex_init(&ctx->bufferMutex, NULL);
ctx->fmt_ctx = NULL;// avformat_alloc_context();
for (int i = 0; i < AUDIO_BUFFER_COUNT; i++) {
ctx->audioBuffers[i].data = (uint8_t*)av_malloc(MAX_FRAME_SIZE);
if (!ctx->audioBuffers[i].data) return false;
ctx->audioBuffers[i].size = 0;
ctx->audioBuffers[i].inUse = false;
}
ctx->currentBuffer = 0;
ctx->cache_capacity = 400 * 1024;
ctx->cache_pcm_buffer = (uint8_t*)malloc(ctx->cache_capacity);
ctx->cache_buffer_size = 0;
ctx->capacity = 400 * 1024;
ctx->recv_buffer = (uint8_t*)malloc(ctx->capacity);
ctx->recv_buffer_size = 0;
ctx->dectected = 0;
#if 1
ctx->codecCtx = NULL;
ctx->codec = NULL;
ctx->pkt = av_packet_alloc();
av_init_packet(ctx->pkt);
ctx->frame = av_frame_alloc();
ctx->outData[0] = (uint8_t*)av_malloc(OUT_DATA_SIZE);
ctx->outData[1] = (uint8_t*)av_malloc(OUT_DATA_SIZE);
memset(ctx->outData[0], 0x00, OUT_DATA_SIZE);
memset(ctx->outData[1], 0x00, OUT_DATA_SIZE);
ctx->pframePCM = av_frame_alloc();
if (ctx->frame == NULL || ctx->pframePCM == NULL) {
return false;
}
AVFrame* pframePCM = ctx->pframePCM;
#ifdef USE_SDL_DEVICE
pframePCM->format = sdl_format_to_ffmpeg(AUDIO_F32LSB);
#else
pframePCM->format = AV_SAMPLE_FMT_S16;
#endif
pframePCM->channel_layout = AV_CH_LAYOUT_STEREO;
pframePCM->sample_rate = SAMPLE_RATE;
pframePCM->nb_samples = SAMPLE_RATE / 100;// 2 * SAMPLE_RATE / 100;//(rate*channels*AV_CH_LAYOUT_STEREO*20)/8000;//
pframePCM->channels = OUT_PLAY_CHANNEL;
av_frame_get_buffer(pframePCM, 0);
ctx->read_buffer = pframePCM->nb_samples * OUT_PLAY_CHANNEL;
ringmalloc(ctx->read_buffer * 2);
return true;
}
// 网络接收线程
int NetworkThread(void * param) {
AudioContext* ctx = (AudioContext*)param;
SOCKET sock = ctx->isTcpMode ?
socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) :
socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
sockaddr_in addr = {};
addr.sin_family = AF_INET;
addr.sin_port = htons(DATA_PORT);
addr.sin_addr.s_addr = INADDR_ANY;
bind(sock, (sockaddr*)&addr, sizeof(addr));
register_pjsip_thread("NetworkThread");
std::string curr_path = "";
GetExePath(curr_path);
std::string save_temp_path = curr_path + "\\temp.data";//
wchar_t* file_temp = str_to_wstr(save_temp_path.c_str());
// 接收数据循环
int start_recv_len = 100 * 1024;
#define BUFFER_SIZE 21480
uint8_t *buffer = (uint8_t *)malloc(BUFFER_SIZE);
int recvLen = 0;
// TCP模式监听连接
if (ctx->isTcpMode) {
do {
listen(sock, 1);
SOCKET client = accept(sock, NULL, NULL);
SOCKET new_sock = client;
ctx->dectected = 0;
ctx->recv_buffer_size = 0;
ctx->play_pause = false;
int first_packet = 1;
int server_play_end = 0;
int server_play_stop = 0;
while (1) {
uint32_t netSize = 0;
int ret = recv(new_sock, (char*)&netSize, sizeof(netSize), MSG_WAITALL);
if (ret < 0 || recvLen < 0) {
printf("1 failed: client closed.\r\n");
break;
}
if (ret != 4) {
//error
printf("1 failed: client ret failed.\r\n");
break;
}
recvLen = ntohl(netSize);
if (first_packet == 1) {
first_packet = 0;
//第一个包是文件大小
//sprintf(send_buffer, "start#name:%s#file_size:%d", name.c_str(), file_size);
ctx->file_size = recvLen;
printf("2 : server start,file_size:%d.\r\n", recvLen);
continue;
}
ret = recv(new_sock, (char*)(buffer), recvLen, 0);
if (ret <= 0 || recvLen <= 0) {
printf("2 failed: client closed.\r\n");
break;
}
if (recvLen == 5) {
buffer[recvLen] = '\0';
if (strstr((char *)buffer, "stop") != NULL) {
printf("2 failed: server closed.\r\n");
server_play_stop = 1;
Sleep(1000);
break;
}
else if (strstr((char*)buffer, "end!") != NULL) {
printf("2 failed: server play end.\r\n");
server_play_end = 1;
break;
}
}
uint8_t* payload = buffer + HEADER_SIZE;
int payload_len = recvLen - HEADER_SIZE;
if (payload_len < 0) {
continue;
}
appendBinaryFile(save_temp_path.c_str(), payload, payload_len);
ctx->recv_buffer_size += payload_len;
if (ctx->recv_buffer_size >= DETECT_LEN) {
if (ctx->dectected == 0) {
probe_audio_format(ctx, save_temp_path);
}
}
if (ctx->dectected) {
ret = decodeFrame(ctx, buffer, recvLen);
if (ret == AVERROR_EOF) {
printf("2 failed: server closed.\r\n");
break;
}
}
Sleep(5);
}
if (ctx->dectected && server_play_end) {
do {
int ret = decodeFrame(ctx, buffer, recvLen);
if (ret == AVERROR_EOF) {
printf("2 failed: file end.\r\n");
break;
}
Sleep(10);
} while (!ctx->play_pause);
do {
int ret = do_get_frame(ctx);
if (ret == -1) {
break;
}
Sleep(10);
} while (!ctx->play_pause);
}
#ifdef USE_SDL_DEVICE
while (SDL_GetQueuedAudioSize(ctx->audio_device) > 0) {
Uint32 queue_size = SDL_GetQueuedAudioSize(ctx->audio_device);
double remaining_sec = (double)queue_size / (SAMPLE_RATE * OUT_PLAY_CHANNEL * 2); // 2 bytes per sample
printf("剩余: %.2f KB (约 %.1f 秒)\r", queue_size / 1024.0, remaining_sec);
SDL_Delay(100);
}
SDL_Delay(100);
if (ctx->audio_device != 0) {
SDL_ClearQueuedAudio(ctx->audio_device); // 清空音频队列
SDL_CloseAudioDevice(ctx->audio_device);
printf("SDL音频设备已关闭\n");
}
SDL_Quit();
#endif
ctx->dectected = 0;
ctx->recv_buffer_size = 0;
closesocket(new_sock);
Sleep(100);
stop_play();
avformat_close_input(&ctx->fmt_ctx);
avformat_free_context(ctx->fmt_ctx);
avcodec_free_context(&ctx->codecCtx);
ctx->codecCtx = NULL;
ctx->fmt_ctx = NULL;
Sleep(10);
DeleteFile(file_temp);
} while (ctx->running);
}
else {
while (ctx->running) {
recvLen =
recvfrom(sock, (char*)buffer, BUFFER_SIZE, 0, NULL, NULL);
if (recvLen == 4) {
buffer[recvLen] = '\0';
if (strstr((char*)buffer, "stop") != NULL) {
break;
}
}else if (recvLen > 0) {
decodeFrame(ctx, buffer, recvLen);
}
}
}
delete[]file_temp;
free(buffer);
closesocket(sock);
return 0;
}遗留问题,保存到音频流数据,能探测到音频格式之后,播放总是播一半就结束了。
-------------------广告线---------------
项目、合作,欢迎勾搭,邮箱:promall@qq.com
本文为呱牛笔记原创文章,转载无需和我联系,但请注明来自呱牛笔记 ,it3q.com
