From 00968fd6e647af286e67bbd616168c4de7a39208 Mon Sep 17 00:00:00 2001 From: Armin Novak Date: Wed, 3 Sep 2025 14:40:47 +0200 Subject: [PATCH 067/100] [codec] use default threadpool Backported together with these commits to be applicable: [codec,yuv] fix thread count calculation - 17315c593655d476cccfb9a1b56e41f37030f8e1 [codec,yuv] fix worker object handling - 1cc64eb58b2f5ce9e4ad8aa8610e085f696e578c Fix YUV conversion for systems with lots of CPUs - 774ee652a9c73c0bc3cdba26eba0c112f079cee4 --- libfreerdp/codec/progressive.c | 5 ++- libfreerdp/codec/rfx.c | 56 +++++++--------------------------- libfreerdp/codec/rfx_types.h | 6 ---- libfreerdp/codec/yuv.c | 32 ++----------------- 4 files changed, 16 insertions(+), 83 deletions(-) diff --git a/libfreerdp/codec/progressive.c b/libfreerdp/codec/progressive.c index 6396e0f2a..8b5200546 100644 --- a/libfreerdp/codec/progressive.c +++ b/libfreerdp/codec/progressive.c @@ -1698,9 +1698,8 @@ static INLINE SSIZE_T progressive_process_tiles( if (progressive->rfx_context->priv->UseThreads) { - progressive->work_objects[idx] = - CreateThreadpoolWork(progressive_process_tiles_tile_work_callback, (void*)param, - &progressive->rfx_context->priv->ThreadPoolEnv); + progressive->work_objects[idx] = CreateThreadpoolWork( + progressive_process_tiles_tile_work_callback, (void*)param, NULL); if (!progressive->work_objects[idx]) { WLog_Print(progressive->log, WLOG_ERROR, diff --git a/libfreerdp/codec/rfx.c b/libfreerdp/codec/rfx.c index 8832cd740..2ec4eb521 100644 --- a/libfreerdp/codec/rfx.c +++ b/libfreerdp/codec/rfx.c @@ -205,16 +205,8 @@ RFX_CONTEXT* rfx_context_new(BOOL encoder) RFX_CONTEXT* rfx_context_new_ex(BOOL encoder, UINT32 ThreadingFlags) { - HKEY hKey = NULL; - LONG status = 0; - DWORD dwType = 0; - DWORD dwSize = 0; - DWORD dwValue = 0; - SYSTEM_INFO sysinfo; - RFX_CONTEXT* context = NULL; - wObject* pool = NULL; RFX_CONTEXT_PRIV* priv = NULL; - context = (RFX_CONTEXT*)winpr_aligned_calloc(1, sizeof(RFX_CONTEXT), 32); + RFX_CONTEXT* context = (RFX_CONTEXT*)winpr_aligned_calloc(1, sizeof(RFX_CONTEXT), 32); if (!context) return NULL; @@ -233,7 +225,7 @@ RFX_CONTEXT* rfx_context_new_ex(BOOL encoder, UINT32 ThreadingFlags) if (!priv->TilePool) goto fail; - pool = ObjectPool_Object(priv->TilePool); + wObject* pool = ObjectPool_Object(priv->TilePool); pool->fnObjectInit = rfx_tile_init; if (context->encoder) @@ -267,29 +259,22 @@ RFX_CONTEXT* rfx_context_new_ex(BOOL encoder, UINT32 ThreadingFlags) if (!(ThreadingFlags & THREADING_FLAGS_DISABLE_THREADS)) { + HKEY hKey = NULL; priv->UseThreads = TRUE; - GetNativeSystemInfo(&sysinfo); - priv->MinThreadCount = sysinfo.dwNumberOfProcessors; - priv->MaxThreadCount = 0; - status = RegOpenKeyExA(HKEY_LOCAL_MACHINE, RFX_KEY, 0, KEY_READ | KEY_WOW64_64KEY, &hKey); + const LONG status = + RegOpenKeyExA(HKEY_LOCAL_MACHINE, RFX_KEY, 0, KEY_READ | KEY_WOW64_64KEY, &hKey); if (status == ERROR_SUCCESS) { - dwSize = sizeof(dwValue); + DWORD dwType = 0; + DWORD dwValue = 0; + DWORD dwSize = sizeof(dwValue); if (RegQueryValueEx(hKey, _T("UseThreads"), NULL, &dwType, (BYTE*)&dwValue, &dwSize) == ERROR_SUCCESS) priv->UseThreads = dwValue ? 1 : 0; - if (RegQueryValueEx(hKey, _T("MinThreadCount"), NULL, &dwType, (BYTE*)&dwValue, - &dwSize) == ERROR_SUCCESS) - priv->MinThreadCount = dwValue; - - if (RegQueryValueEx(hKey, _T("MaxThreadCount"), NULL, &dwType, (BYTE*)&dwValue, - &dwSize) == ERROR_SUCCESS) - priv->MaxThreadCount = dwValue; - RegCloseKey(hKey); } } @@ -304,20 +289,6 @@ RFX_CONTEXT* rfx_context_new_ex(BOOL encoder, UINT32 ThreadingFlags) /* from multiple threads. This call will initialize all function pointers correctly */ /* before any decoding threads are started */ primitives_get(); - priv->ThreadPool = CreateThreadpool(NULL); - - if (!priv->ThreadPool) - goto fail; - - InitializeThreadpoolEnvironment(&priv->ThreadPoolEnv); - SetThreadpoolCallbackPool(&priv->ThreadPoolEnv, priv->ThreadPool); - - if (priv->MinThreadCount) - if (!SetThreadpoolThreadMinimum(priv->ThreadPool, priv->MinThreadCount)) - goto fail; - - if (priv->MaxThreadCount) - SetThreadpoolThreadMaximum(priv->ThreadPool, priv->MaxThreadCount); } /* initialize the default pixel format */ @@ -370,9 +341,6 @@ void rfx_context_free(RFX_CONTEXT* context) ObjectPool_Free(priv->TilePool); if (priv->UseThreads) { - if (priv->ThreadPool) - CloseThreadpool(priv->ThreadPool); - DestroyThreadpoolEnvironment(&priv->ThreadPoolEnv); winpr_aligned_free((void*)priv->workObjects); winpr_aligned_free(priv->tileWorkParams); #ifdef WITH_PROFILER @@ -1086,9 +1054,8 @@ static INLINE BOOL rfx_process_message_tileset(RFX_CONTEXT* WINPR_RESTRICT conte params[i].context = context; params[i].tile = message->tiles[i]; - if (!(work_objects[i] = - CreateThreadpoolWork(rfx_process_message_tile_work_callback, - (void*)¶ms[i], &context->priv->ThreadPoolEnv))) + if (!(work_objects[i] = CreateThreadpoolWork(rfx_process_message_tile_work_callback, + (void*)¶ms[i], NULL))) { WLog_Print(context->priv->log, WLOG_ERROR, "CreateThreadpoolWork failed."); rc = FALSE; @@ -1829,8 +1796,7 @@ RFX_MESSAGE* rfx_encode_message(RFX_CONTEXT* WINPR_RESTRICT context, workParam->tile = tile; if (!(*workObject = CreateThreadpoolWork(rfx_compose_message_tile_work_callback, - (void*)workParam, - &context->priv->ThreadPoolEnv))) + (void*)workParam, NULL))) { goto skip_encoding_loop; } diff --git a/libfreerdp/codec/rfx_types.h b/libfreerdp/codec/rfx_types.h index a9cd314da..c5a62259b 100644 --- a/libfreerdp/codec/rfx_types.h +++ b/libfreerdp/codec/rfx_types.h @@ -69,11 +69,11 @@ struct S_RFX_CONTEXT_PRIV PTP_WORK* workObjects; RFX_TILE_COMPOSE_WORK_PARAM* tileWorkParams; - DWORD MinThreadCount; + DWORD MinThreadCount; /* Kept only to silence abidiff */ - DWORD MaxThreadCount; + DWORD MaxThreadCount; /* Kept only to silence abidiff */ - PTP_POOL ThreadPool; + PTP_POOL ThreadPool; /* Kept only to silence abidiff */ - TP_CALLBACK_ENVIRON ThreadPoolEnv; + TP_CALLBACK_ENVIRON ThreadPoolEnv; /* Kept only to silence abidiff */ wBufferPool* BufferPool; diff --git a/libfreerdp/codec/yuv.c b/libfreerdp/codec/yuv.c index 75d8a6a89..4025782dd 100644 --- a/libfreerdp/codec/yuv.c +++ b/libfreerdp/codec/yuv.c @@ -55,11 +55,11 @@ struct S_YUV_CONTEXT UINT32 width, height; BOOL useThreads; BOOL encoder; - UINT32 nthreads; + UINT32 nthreads; /* Kept only to silence abidiff */ UINT32 heightStep; - PTP_POOL threadPool; + PTP_POOL threadPool; /* Kept only to silence abidiff */ - TP_CALLBACK_ENVIRON ThreadPoolEnv; + TP_CALLBACK_ENVIRON ThreadPoolEnv; /* Kept only to silence abidiff */ UINT32 work_object_count; PTP_WORK* work_objects; @@ -166,16 +162,21 @@ BOOL yuv_context_reset(YUV_CONTEXT* WINPR_RESTRICT context, UINT32 width, UINT32 context->width = width; context->height = height; - context->heightStep = (height / context->nthreads); + + context->heightStep = height; if (context->useThreads) { - const UINT32 pw = (width + TILE_SIZE - width % TILE_SIZE) / TILE_SIZE; - const UINT32 ph = (height + TILE_SIZE - height % TILE_SIZE) / TILE_SIZE; + context->heightStep = 16; + /* Preallocate workers for 16x16 tiles. + * this is overallocation for most cases. + * + * ~2MB total for a 4k resolution, so negligible. + */ + const size_t pw = (width + TILE_SIZE - width % TILE_SIZE) / 16; + const size_t ph = (height + TILE_SIZE - height % TILE_SIZE) / 16; - /* We´ve calculated the amount of workers for 64x64 tiles, but the decoder - * might get 16x16 tiles mixed in. */ - const UINT32 count = pw * ph * 16; + const size_t count = pw * ph; context->work_object_count = 0; if (context->encoder) @@ -237,33 +234,13 @@ YUV_CONTEXT* yuv_context_new(BOOL encoder, UINT32 ThreadingFlags) primitives_get(); ret->encoder = encoder; - ret->nthreads = 1; if (!(ThreadingFlags & THREADING_FLAGS_DISABLE_THREADS)) { GetNativeSystemInfo(&sysInfos); ret->useThreads = (sysInfos.dwNumberOfProcessors > 1); - if (ret->useThreads) - { - ret->nthreads = sysInfos.dwNumberOfProcessors; - ret->threadPool = CreateThreadpool(NULL); - if (!ret->threadPool) - { - goto error_threadpool; - } - - InitializeThreadpoolEnvironment(&ret->ThreadPoolEnv); - SetThreadpoolCallbackPool(&ret->ThreadPoolEnv, ret->threadPool); - } } return ret; - -error_threadpool: - WINPR_PRAGMA_DIAG_PUSH - WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC - yuv_context_free(ret); - WINPR_PRAGMA_DIAG_POP - return NULL; } void yuv_context_free(YUV_CONTEXT* context) @@ -272,9 +249,6 @@ void yuv_context_free(YUV_CONTEXT* context) return; if (context->useThreads) { - if (context->threadPool) - CloseThreadpool(context->threadPool); - DestroyThreadpoolEnvironment(&context->ThreadPoolEnv); winpr_aligned_free((void*)context->work_objects); winpr_aligned_free(context->work_combined_params); winpr_aligned_free(context->work_enc_params); @@ -330,7 +304,7 @@ static BOOL submit_object(PTP_WORK* WINPR_RESTRICT work_object, PTP_WORK_CALLBAC if (!param || !context) return FALSE; - *work_object = CreateThreadpoolWork(cb, cnv.pv, &context->ThreadPoolEnv); + *work_object = CreateThreadpoolWork(cb, cnv.pv, NULL); if (!*work_object) return FALSE; @@ -439,11 +417,8 @@ static BOOL pool_decode(YUV_CONTEXT* WIN if (context->work_object_count <= waitCount) { - WLog_ERR(TAG, - "YUV decoder: invalid number of tiles, only support less than %" PRIu32 - ", got %" PRIu32, - context->work_object_count, waitCount); - goto fail; + free_objects(context->work_objects, context->work_object_count); + waitCount = 0; } YUV_PROCESS_WORK_PARAM* cur = &context->work_dec_params[waitCount]; @@ -586,11 +561,8 @@ static BOOL pool_decode_rect(YUV_CONTEXT if (context->work_object_count <= waitCount) { - WLog_ERR(TAG, - "YUV rect decoder: invalid number of tiles, only support less than %" PRIu32 - ", got %" PRIu32, - context->work_object_count, waitCount); - goto fail; + free_objects(context->work_objects, context->work_object_count); + waitCount = 0; } current = &context->work_combined_params[waitCount]; *current = pool_decode_rect_param(®ionRects[waitCount], context, type, pYUVData, iStride, @@ -848,11 +820,8 @@ static BOOL pool_encode(YUV_CONTEXT* WIN if (context->work_object_count <= waitCount) { - WLog_ERR(TAG, - "YUV encoder: invalid number of tiles, only support less than %" PRIu32 - ", got %" PRIu32, - context->work_object_count, waitCount); - goto fail; + free_objects(context->work_objects, context->work_object_count); + waitCount = 0; } current = &context->work_enc_params[waitCount]; -- 2.51.0