From 11326c23e6878cfd29226a0848cdf8c14fa15338 Mon Sep 17 00:00:00 2001 From: Karl Osterseher Date: Fri, 3 Feb 2023 00:03:05 +0100 Subject: [PATCH] add experimental support for opus this only works for chunk sizes <= 10ms if set to 20ms server sometimes sends 20ms and sometimes 10ms frames which confuses esp client. RESYNCING HARD 2 still happen very often for some reason. remove bug in insert_pcm_chunk() resulting in dropped chunks if queue gets full, which isn't desired. --- components/lightsnapcast/player.c | 136 ++++++++++++++--- main/main.c | 243 +++++++++++++++++++++++++++++- 2 files changed, 347 insertions(+), 32 deletions(-) diff --git a/components/lightsnapcast/player.c b/components/lightsnapcast/player.c index 5802530..39b8101 100644 --- a/components/lightsnapcast/player.c +++ b/components/lightsnapcast/player.c @@ -72,6 +72,8 @@ static QueueHandle_t snapcastSettingQueueHandle = NULL; static uint32_t i2sDmaBufCnt; static uint32_t i2sDmaBufMaxLen; +static SemaphoreHandle_t playerPcmQueueMux = NULL; + static SemaphoreHandle_t snapcastSettingsMux = NULL; static snapcastSetting_t currentSnapcastSetting; @@ -193,6 +195,8 @@ static int destroy_pcm_queue(QueueHandle_t *queueHandle) { int ret = pdPASS; pcm_chunk_message_t *chnk = NULL; + xSemaphoreTake(playerPcmQueueMux, portMAX_DELAY); + if (*queueHandle == NULL) { ESP_LOGW(TAG, "no pcm chunk queue created?"); ret = pdFAIL; @@ -214,6 +218,8 @@ static int destroy_pcm_queue(QueueHandle_t *queueHandle) { ret = pdPASS; } + xSemaphoreGive(playerPcmQueueMux); + return ret; } @@ -237,6 +243,11 @@ int deinit_player(void) { ret = destroy_pcm_queue(&pcmChkQHdl); + if (playerPcmQueueMux != NULL) { + vSemaphoreDelete(playerPcmQueueMux); + playerPcmQueueMux = NULL; + } + if (latencyBufSemaphoreHandle == NULL) { ESP_LOGW(TAG, "no latency buffer semaphore created?"); } else { @@ -271,6 +282,11 @@ int init_player(void) { xSemaphoreGive(snapcastSettingsMux); } + if (playerPcmQueueMux == NULL) { + playerPcmQueueMux = xSemaphoreCreateMutex(); + xSemaphoreGive(playerPcmQueueMux); + } + ret = player_setup_i2s(I2S_NUM_0, ¤tSnapcastSetting); if (ret < 0) { ESP_LOGE(TAG, "player_setup_i2s failed: %d", ret); @@ -919,10 +935,20 @@ int32_t allocate_pcm_chunk_memory(pcm_chunk_message_t **pcmChunk, "couldn't get memory to insert chunk, inserting an chunk " "containing just 0"); + // xSemaphoreTake(playerPcmQueueMux, portMAX_DELAY); + // ESP_LOGW( + // TAG, "%d, %d, %d, %d, %d", + // heap_caps_get_free_size(MALLOC_CAP_8BIT), + // heap_caps_get_largest_free_block(MALLOC_CAP_8BIT), + // uxQueueMessagesWaiting(pcmChkQHdl), + // heap_caps_get_free_size(MALLOC_CAP_32BIT | MALLOC_CAP_EXEC), + // heap_caps_get_largest_free_block(MALLOC_CAP_32BIT | + // MALLOC_CAP_EXEC)); + // xSemaphoreGive(playerPcmQueueMux); + ESP_LOGW( - TAG, "%d, %d, %d, %d, %d", heap_caps_get_free_size(MALLOC_CAP_8BIT), + TAG, "%d, %d, %d, %d", heap_caps_get_free_size(MALLOC_CAP_8BIT), heap_caps_get_largest_free_block(MALLOC_CAP_8BIT), - uxQueueMessagesWaiting(pcmChkQHdl), heap_caps_get_free_size(MALLOC_CAP_32BIT | MALLOC_CAP_EXEC), heap_caps_get_largest_free_block(MALLOC_CAP_32BIT | MALLOC_CAP_EXEC)); @@ -952,38 +978,61 @@ int32_t insert_pcm_chunk(pcm_chunk_message_t *pcmChunk) { return -1; } + bool isFull = false; + latency_buffer_full(&isFull, portMAX_DELAY); + if (isFull == false) { + free_pcm_chunk(pcmChunk); + + // ESP_LOGW(TAG, "%s: wait for initial latency measurement to finish", + // __func__); + + return -3; + } + + xSemaphoreTake(playerPcmQueueMux, portMAX_DELAY); if (pcmChkQHdl == NULL) { ESP_LOGW(TAG, "pcm chunk queue not created"); free_pcm_chunk(pcmChunk); + xSemaphoreGive(playerPcmQueueMux); + return -2; } - if (uxQueueSpacesAvailable(pcmChkQHdl) == 0) { - pcm_chunk_message_t *element; + // if (uxQueueSpacesAvailable(pcmChkQHdl) == 0) { + // pcm_chunk_message_t *element; + // + // xQueueReceive(pcmChkQHdl, &element, portMAX_DELAY); + // + // free_pcm_chunk(element); + // } - xQueueReceive(pcmChkQHdl, &element, portMAX_DELAY); - - free_pcm_chunk(element); - } - - if (xQueueSend(pcmChkQHdl, &pcmChunk, pdMS_TO_TICKS(10)) != pdTRUE) { + // if (xQueueSend(pcmChkQHdl, &pcmChunk, pdMS_TO_TICKS(10)) != pdTRUE) { + if (xQueueSend(pcmChkQHdl, &pcmChunk, pdMS_TO_TICKS(100)) != pdTRUE) { ESP_LOGW(TAG, "send: pcmChunkQueue full, messages waiting %d", uxQueueMessagesWaiting(pcmChkQHdl)); free_pcm_chunk(pcmChunk); } + xSemaphoreGive(playerPcmQueueMux); + return 0; } int32_t pcm_chunk_queue_msg_waiting(void) { + int ret = 0; + + xSemaphoreTake(playerPcmQueueMux, portMAX_DELAY); + if (pcmChkQHdl) { - return uxQueueMessagesWaiting(pcmChkQHdl); - } else { - return 0; + ret = uxQueueMessagesWaiting(pcmChkQHdl); } + + xSemaphoreGive(playerPcmQueueMux); + + return ret; } /** @@ -1077,11 +1126,11 @@ static void player_task(void *pvParameters) { if ((__scSet.buf_ms != scSet.buf_ms) || (__scSet.chkInFrames != scSet.chkInFrames)) { - if (pcmChkQHdl != NULL) { - destroy_pcm_queue(&pcmChkQHdl); - } + destroy_pcm_queue(&pcmChkQHdl); } + // xSemaphoreTake(playerPcmQueueMux, portMAX_DELAY); + if (pcmChkQHdl == NULL) { int entries = ceil(((float)__scSet.sr / (float)__scSet.chkInFrames) * ((float)__scSet.buf_ms / 1000)); @@ -1095,6 +1144,8 @@ static void player_task(void *pvParameters) { ESP_LOGI(TAG, "created new queue with %d", entries); } + // xSemaphoreGive(playerPcmQueueMux); + ESP_LOGI(TAG, "snapserver config changed, buffer %dms, chunk %d frames, " "sample rate %d, ch %d, bits %d mute %d latency %d", @@ -1130,11 +1181,16 @@ static void player_task(void *pvParameters) { } if (chnk == NULL) { + // xSemaphoreTake(playerPcmQueueMux, portMAX_DELAY); if (pcmChkQHdl != NULL) { ret = xQueueReceive(pcmChkQHdl, &chnk, pdMS_TO_TICKS(2000)); + + // xSemaphoreGive(playerPcmQueueMux); } else { // ESP_LOGE (TAG, "Couldn't get PCM chunk, pcm queue not created"); + // xSemaphoreGive(playerPcmQueueMux); + vTaskDelay(pdMS_TO_TICKS(100)); continue; @@ -1187,12 +1243,15 @@ static void player_task(void *pvParameters) { uint32_t currentDescriptor = 0, currentDescriptorOffset = 0; uint32_t tmpCnt = CHNK_CTRL_CNT; + + // xSemaphoreTake(playerPcmQueueMux, portMAX_DELAY); while (tmpCnt) { if (chnk == NULL) { if (pcmChkQHdl != NULL) { ret = xQueueReceive(pcmChkQHdl, &chnk, portMAX_DELAY); } } + // xSemaphoreGive(playerPcmQueueMux); fragment = chnk->fragment; p_payload = fragment->payload; @@ -1290,7 +1349,9 @@ static void player_task(void *pvParameters) { uint32_t c = ceil((float)age / (float)chkDur_us); // round up // now clear all those chunks which are probably late too while (c--) { + // xSemaphoreTake(playerPcmQueueMux, portMAX_DELAY); ret = xQueueReceive(pcmChkQHdl, &chnk, pdMS_TO_TICKS(1)); + // xSemaphoreGive(playerPcmQueueMux); if (ret == pdPASS) { free_pcm_chunk(chnk); chnk = NULL; @@ -1305,12 +1366,21 @@ static void player_task(void *pvParameters) { timer_pause(TIMER_GROUP_1, TIMER_1); timer_set_auto_reload(TIMER_GROUP_1, TIMER_1, TIMER_AUTORELOAD_DIS); + // xSemaphoreTake(playerPcmQueueMux, portMAX_DELAY); + // ESP_LOGW(TAG, + // "RESYNCING HARD 1: age %lldus, latency %lldus, free + // %d, " "largest block %d, %d, rssi: %d", age, + // diff2Server, + // heap_caps_get_free_size(MALLOC_CAP_32BIT), + // heap_caps_get_largest_free_block(MALLOC_CAP_32BIT), + // uxQueueMessagesWaiting(pcmChkQHdl), ap.rssi); + // xSemaphoreGive(playerPcmQueueMux); + ESP_LOGW(TAG, "RESYNCING HARD 1: age %lldus, latency %lldus, free %d, " - "largest block %d, %d, rssi: %d", + "largest block %d, rssi: %d", age, diff2Server, heap_caps_get_free_size(MALLOC_CAP_32BIT), - heap_caps_get_largest_free_block(MALLOC_CAP_32BIT), - uxQueueMessagesWaiting(pcmChkQHdl), ap.rssi); + heap_caps_get_largest_free_block(MALLOC_CAP_32BIT), ap.rssi); dir = 0; @@ -1337,11 +1407,16 @@ static void player_task(void *pvParameters) { shortMedian = MEDIANFILTER_Insert(&shortMedianFilter, avg); miniMedian = MEDIANFILTER_Insert(&miniMedianFilter, avg); + // xSemaphoreTake(playerPcmQueueMux, portMAX_DELAY); + int msgWaiting = uxQueueMessagesWaiting(pcmChkQHdl); + // xSemaphoreGive(playerPcmQueueMux); + // resync hard if we are getting very late / early. // rest gets tuned in through apll speed control - if ((uxQueueMessagesWaiting(pcmChkQHdl) == 0) || - ((abs(avg) > hardResyncThreshold) && - MEDIANFILTER_isFull(&shortMedianFilter))) { + if ((msgWaiting == 0) || ((abs(avg) > hardResyncThreshold) && + MEDIANFILTER_isFull(&shortMedianFilter))) + // if (msgWaiting == 0) + { if (chnk != NULL) { free_pcm_chunk(chnk); chnk = NULL; @@ -1350,6 +1425,7 @@ static void player_task(void *pvParameters) { wifi_ap_record_t ap; esp_wifi_sta_get_ap_info(&ap); + // xSemaphoreTake(playerPcmQueueMux, portMAX_DELAY); ESP_LOGW(TAG, "RESYNCING HARD 2: age %lldus, latency %lldus, free " "%d, largest block %d, %d, rssi: %d", @@ -1371,6 +1447,8 @@ static void player_task(void *pvParameters) { // } // } + // xSemaphoreGive(playerPcmQueueMux); + timer_pause(TIMER_GROUP_1, TIMER_1); timer_set_auto_reload(TIMER_GROUP_1, TIMER_1, TIMER_AUTORELOAD_DIS); @@ -1421,11 +1499,15 @@ static void player_task(void *pvParameters) { msec = usec / 1000; usec = usec % 1000; + // xSemaphoreTake(playerPcmQueueMux, portMAX_DELAY); + // ESP_LOGI (TAG, "%d, %lldus, %lldus %llds, %lld.%lldms", // dir, age, avg, sec, msec, usec); - // ESP_LOGI(TAG, "%d, %lldus, %lldus, %lldus, q:%d", dir, - // avg, shortMedian, miniMedian, - // uxQueueMessagesWaiting(pcmChkQHdl)); + + // ESP_LOGI(TAG, "%d, %lldus, %lldus, %lldus, q:%d", dir, + // avg, shortMedian, miniMedian, + // uxQueueMessagesWaiting(pcmChkQHdl)); + // ESP_LOGI( TAG, "8b f // %d b %d", heap_caps_get_free_size(MALLOC_CAP_8BIT | // MALLOC_CAP_INTERNAL), @@ -1434,6 +1516,8 @@ static void player_task(void *pvParameters) { // heap_caps_get_free_size(MALLOC_CAP_32BIT | // MALLOC_CAP_EXEC), heap_caps_get_largest_free_block // (MALLOC_CAP_32BIT | MALLOC_CAP_EXEC)); + + // xSemaphoreGive(playerPcmQueueMux); } dir = 0; @@ -1518,12 +1602,14 @@ static void player_task(void *pvParameters) { msec = usec / 1000; usec = usec % 1000; + // xSemaphoreTake(playerPcmQueueMux, portMAX_DELAY); if (pcmChkQHdl != NULL) { ESP_LOGE(TAG, "Couldn't get PCM chunk, recv: messages waiting %d, " "diff2Server: %llds, %lld.%lldms", uxQueueMessagesWaiting(pcmChkQHdl), sec, msec, usec); } + // xSemaphoreGive(playerPcmQueueMux); dir = 0; diff --git a/main/main.c b/main/main.c index d9428fe..29f21af 100644 --- a/main/main.c +++ b/main/main.c @@ -668,7 +668,6 @@ static void http_get_task(void *pvParameters) { int64_t tmpDiffToServer; int64_t lastTimeSync = 0; esp_timer_handle_t timeSyncMessageTimer = NULL; - uint16_t channels; esp_err_t err = 0; server_settings_message_t server_settings_message; bool received_header = false; @@ -680,6 +679,7 @@ static void http_get_task(void *pvParameters) { // 0}; flacData_t *pFlacData = NULL; pcm_chunk_message_t *pcmData = NULL; + uint8_t *opusData = NULL; ip_addr_t remote_ip; uint16_t remotePort = 0; int rc1 = ERR_OK, rc2 = ERR_OK; @@ -933,6 +933,8 @@ static void http_get_task(void *pvParameters) { int32_t payloadDataShift = 0; int flow_drain_counter = 0; + int16_t pcm_size = 120; + #define BASE_MESSAGE_STATE 0 #define TYPED_MESSAGE_STATE 1 @@ -1348,6 +1350,138 @@ static void http_get_task(void *pvParameters) { if (received_header == true) { switch (codec) { + case OPUS: { + if (opusData == NULL) { + // TODO: insert some break condition if we wait + // too long + while ((opusData = (uint8_t *)malloc( + wire_chnk.size)) == NULL) { + ESP_LOGE(TAG, "couldn't memory for opusData"); + + vTaskDelay(pdMS_TO_TICKS(1)); + } + + payloadOffset = 0; + } + + memcpy(&opusData[payloadOffset], start, tmp); + payloadOffset += tmp; + + // ESP_LOGE(TAG,"payloadOffset + // %d, wire_chnk.size + // %d", payloadOffset, + // wire_chnk.size); + + if (payloadOffset >= wire_chnk.size) { + int frame_size = 0; + int sample_count = 0; + int samples_per_frame = 0; + int frame_count; + int16_t *audio; + + if ((samples_per_frame = + opus_packet_get_samples_per_frame( + opusData, scSet.sr)) < 0) { + ESP_LOGE(TAG, + "couldn't get samples per frame count " + "of packet"); + } + + scSet.chkInFrames = samples_per_frame; + + // if ((frame_count = + // opus_packet_get_nb_frames + // (opusData, + // wire_chnk.size)) < + // 0) { + // ESP_LOGE(TAG,"couldn't + // get frame count + // of packet"); + // } + // + // if ((sample_count + // = + // opus_decoder_get_nb_samples + // (opusDecoder, + // opusData, + // wire_chnk.size)) + // >= 0) { + // ESP_LOGI(TAG,"opus + // packet contains + // %d samples, %d + // frames, %d + // samples per + // frame", + // sample_count, + // frame_count, + // samples_per_frame); + // } + // else { + // ESP_LOGE(TAG,"couldn't + // get sample count + // of packet"); + // } + + // TODO: insert some break condition if we wait + // too long + while ((audio = (int16_t *)malloc( + samples_per_frame * scSet.ch * + scSet.bits / 8)) == NULL) { + ESP_LOGE(TAG, "couldn't memory for audio"); + + vTaskDelay(pdMS_TO_TICKS(1)); + } + + frame_size = opus_decode( + opusDecoder, opusData, wire_chnk.size, + (opus_int16 *)audio, samples_per_frame, 0); + + free(opusData); + opusData = NULL; + + // ESP_LOGI(TAG, "time stamp in: %d", + // wire_chunk_message.timestamp.sec); + if (frame_size < 0) { + ESP_LOGE(TAG, "Decode error : %d \n", + frame_size); + } else { + if (pcmData == NULL) { + if (allocate_pcm_chunk_memory( + &pcmData, frame_size * scSet.ch * + scSet.bits / 8) < 0) { + pcmData = NULL; + } else { + if (pcmData->fragment->payload) { + volatile uint32_t *sample; + + uint32_t cnt = 0; + for (int i = 0; + i < frame_size * scSet.ch * + scSet.bits / 8; + i += 4) { + sample = (volatile uint32_t *)(&( + pcmData->fragment->payload[i])); + tmpData = + (((uint32_t)audio[cnt] << 16) & + 0xFFFF0000) | + (((uint32_t)audio[i + 1] << 0) & + 0x0000FFFF); + *sample = (volatile uint32_t)tmpData; + + cnt += 2; + } + } + } + } + } + + free(audio); + audio = NULL; + } + + break; + } + case FLAC: { #if TEST_DECODER_TASK pFlacData = NULL; @@ -1507,6 +1641,69 @@ static void http_get_task(void *pvParameters) { if (typedMsgCurrentPos >= base_message_rx.size) { if (received_header == true) { switch (codec) { + case OPUS: { + // size_t decodedSize + // = pcm_size; + + // ESP_LOGW(TAG, "got + // PCM chunk, + // typedMsgCurrentPos + // %d", + // typedMsgCurrentPos); + + if (pcmData) { + pcmData->timestamp = wire_chnk.timestamp; + } + + // scSet.chkInFrames + // = decodedSize; + + if (player_send_snapcast_setting(&scSet) != + pdPASS) { + ESP_LOGE(TAG, + "Failed to notify " + "sync task about " + "codec. Did you " + "init player?"); + + return; + } +#if CONFIG_USE_DSP_PROCESSOR + if (flow_drain_counter > 0) { + flow_drain_counter--; + double dynamic_vol = + ((double)scSet.volume / 100 / + (20 - flow_drain_counter)); + if (flow_drain_counter == 0) { +#if SNAPCAST_USE_SOFT_VOL + dynamic_vol = 0.0; +#else + dynamic_vol = 1.0; +#endif + audio_hal_set_mute( + board_handle->audio_hal, + server_settings_message.muted); + } + + dsp_processor_set_volome(dynamic_vol); + } + + if ((pcmData) && (pcmData->fragment->payload)) { + dsp_processor_worker(pcmData->fragment->payload, + pcmData->fragment->size, + scSet.sr); + } +#endif + + if (pcmData) { + insert_pcm_chunk(pcmData); + } + + pcmData = NULL; + + break; + } + case FLAC: { #if TEST_DECODER_TASK pFlacData = NULL; // send NULL so we know to wait @@ -1935,10 +2132,40 @@ static void http_get_task(void *pvParameters) { flacTaskQHdl = NULL; } - if (codec == OPUS) { - ESP_LOGI(TAG, "OPUS not implemented yet"); + if (opusDecoder != NULL) { + opus_decoder_destroy(opusDecoder); + opusDecoder = NULL; + } + + if (codec == OPUS) { + // ESP_LOGI(TAG, "OPUS not + // implemented yet"); return; + uint16_t channels; + uint32_t rate; + uint16_t bits; + + memcpy(&rate, tmp + 4, sizeof(rate)); + memcpy(&bits, tmp + 8, sizeof(bits)); + memcpy(&channels, tmp + 10, sizeof(channels)); + + scSet.codec = codec; + scSet.bits = bits; + scSet.ch = channels; + scSet.sr = rate; + + ESP_LOGI(TAG, "Opus sample format: %d:%d:%d\n", rate, + bits, channels); + + int error = 0; + opusDecoder = + opus_decoder_create(rate, channels, &error); + if (error != 0) { + ESP_LOGI(TAG, "Failed to init opus coder"); + return; + } + + ESP_LOGI(TAG, "Initialized opus Decoder: %d", error); - return; } else if (codec == FLAC) { if (t_flac_decoder_task == NULL) { xTaskCreatePinnedToCore( @@ -2024,10 +2251,12 @@ static void http_get_task(void *pvParameters) { scSet.bits, scSet.ch); #endif } else if (codec == PCM) { - memcpy(&channels, tmp + 22, sizeof(channels)); + uint16_t channels; uint32_t rate; - memcpy(&rate, tmp + 24, sizeof(rate)); uint16_t bits; + + memcpy(&channels, tmp + 22, sizeof(channels)); + memcpy(&rate, tmp + 24, sizeof(rate)); memcpy(&bits, tmp + 34, sizeof(bits)); scSet.codec = codec; @@ -2730,7 +2959,7 @@ void app_main(void) { xTaskCreatePinnedToCore(&ota_server_task, "ota", 14 * 256, NULL, OTA_TASK_PRIORITY, t_ota_task, OTA_TASK_CORE_ID); - xTaskCreatePinnedToCore(&http_get_task, "http", 3 * 1024, NULL, + xTaskCreatePinnedToCore(&http_get_task, "http", 12 * 1024, NULL, HTTP_TASK_PRIORITY, &t_http_get_task, HTTP_TASK_CORE_ID);