diff --git a/src/ustreamer/workers.c b/src/ustreamer/workers.c index 4ba8e4fd..ad5b6262 100644 --- a/src/ustreamer/workers.c +++ b/src/ustreamer/workers.c @@ -30,6 +30,7 @@ #include "../libs/tools.h" #include "../libs/threading.h" #include "../libs/logging.h" +#include "../libs/list.h" static void *_worker_thread(void *v_worker); @@ -53,13 +54,13 @@ us_workers_pool_s *us_workers_pool_init( atomic_init(&pool->stop, false); pool->n_workers = n_workers; - US_CALLOC(pool->workers, pool->n_workers); US_MUTEX_INIT(pool->free_workers_mutex); US_COND_INIT(pool->free_workers_cond); for (uint index = 0; index < pool->n_workers; ++index) { - us_worker_s *const wr = &pool->workers[index]; + us_worker_s *wr; + US_CALLOC(wr, 1); wr->number = index; US_ASPRINTF(wr->name, "%s-%u", wr_prefix, index); @@ -73,6 +74,8 @@ us_workers_pool_s *us_workers_pool_init( US_THREAD_CREATE(wr->tid, _worker_thread, (void*)wr); pool->free_workers += 1; + + US_LIST_APPEND(pool->workers, wr); } return pool; } @@ -81,9 +84,7 @@ void us_workers_pool_destroy(us_workers_pool_s *pool) { US_LOG_INFO("Destroying workers pool %s ...", pool->name); atomic_store(&pool->stop, true); - for (uint index = 0; index < pool->n_workers; ++index) { - us_worker_s *const wr = &pool->workers[index]; - + US_LIST_ITERATE(pool->workers, wr, { // cppcheck-suppress constStatement US_MUTEX_LOCK(wr->has_job_mutex); atomic_store(&wr->has_job, true); // Final job: die US_MUTEX_UNLOCK(wr->has_job_mutex); @@ -93,83 +94,56 @@ void us_workers_pool_destroy(us_workers_pool_s *pool) { US_MUTEX_DESTROY(wr->has_job_mutex); US_COND_DESTROY(wr->has_job_cond); - free(wr->name); - pool->job_destroy(wr->job); - } + + free(wr->name); + free(wr); + }); US_MUTEX_DESTROY(pool->free_workers_mutex); US_COND_DESTROY(pool->free_workers_cond); - free(pool->workers); free(pool); } us_worker_s *us_workers_pool_wait(us_workers_pool_s *pool) { - us_worker_s *ready_wr = NULL; - US_MUTEX_LOCK(pool->free_workers_mutex); US_COND_WAIT_FOR(pool->free_workers, pool->free_workers_cond, pool->free_workers_mutex); US_MUTEX_UNLOCK(pool->free_workers_mutex); - if (pool->oldest_wr && !atomic_load(&pool->oldest_wr->has_job)) { - ready_wr = pool->oldest_wr; - ready_wr->job_timely = true; - } else { - for (uint index = 0; index < pool->n_workers; ++index) { - us_worker_s *const wr = &pool->workers[index]; - if ( - !atomic_load(&wr->has_job) && ( - ready_wr == NULL - || ready_wr->job_start_ts < wr->job_start_ts - ) - ) { - ready_wr = wr; - break; - } + us_worker_s *found = NULL; + US_LIST_ITERATE(pool->workers, wr, { + if (!atomic_load(&wr->has_job) && (found == NULL || found->job_start_ts <= wr->job_start_ts)) { + found = wr; } - assert(ready_wr != NULL); - ready_wr->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате) + }); + assert(found != NULL); + US_LIST_REMOVE(pool->workers, found); + US_LIST_APPEND(pool->workers, found); // Перемещаем в конец списка + + found->job_timely = (found->job_start_ts > pool->job_timely_ts); + if (found->job_timely) { + pool->job_timely_ts = found->job_start_ts; } - return ready_wr; + return found; } -void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr/*, void *job*/) { - if (pool->oldest_wr == ready_wr) { - pool->oldest_wr = pool->oldest_wr->next_wr; - } - if (pool->oldest_wr == NULL) { - pool->oldest_wr = ready_wr; - pool->latest_wr = pool->oldest_wr; - } else { - if (ready_wr->next_wr != NULL) { - ready_wr->next_wr->prev_wr = ready_wr->prev_wr; - } - if (ready_wr->prev_wr != NULL) { - ready_wr->prev_wr->next_wr = ready_wr->next_wr; - } - ready_wr->prev_wr = pool->latest_wr; - pool->latest_wr->next_wr = ready_wr; - pool->latest_wr = ready_wr; - } - pool->latest_wr->next_wr = NULL; - - US_MUTEX_LOCK(ready_wr->has_job_mutex); - //ready_wr->job = job; - atomic_store(&ready_wr->has_job, true); - US_MUTEX_UNLOCK(ready_wr->has_job_mutex); - US_COND_SIGNAL(ready_wr->has_job_cond); +void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *wr) { + US_MUTEX_LOCK(wr->has_job_mutex); + atomic_store(&wr->has_job, true); + US_MUTEX_UNLOCK(wr->has_job_mutex); + US_COND_SIGNAL(wr->has_job_cond); US_MUTEX_LOCK(pool->free_workers_mutex); pool->free_workers -= 1; US_MUTEX_UNLOCK(pool->free_workers_mutex); } -ldf us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr) { - const ldf approx_job_time = pool->approx_job_time * 0.9 + ready_wr->last_job_time * 0.1; +ldf us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *wr) { + const ldf approx_job_time = pool->approx_job_time * 0.9 + wr->last_job_time * 0.1; US_LOG_VERBOSE("Correcting pool's %s approx_job_time: %.3Lf -> %.3Lf (last_job_time=%.3Lf)", - pool->name, pool->approx_job_time, approx_job_time, ready_wr->last_job_time); + pool->name, pool->approx_job_time, approx_job_time, wr->last_job_time); pool->approx_job_time = approx_job_time; @@ -203,7 +177,6 @@ static void *_worker_thread(void *v_worker) { wr->job_start_ts = job_start_ts; wr->last_job_time = us_get_now_monotonic() - wr->job_start_ts; } - //wr->job = NULL; atomic_store(&wr->has_job, false); } diff --git a/src/ustreamer/workers.h b/src/ustreamer/workers.h index fe03cca5..749fb4f4 100644 --- a/src/ustreamer/workers.h +++ b/src/ustreamer/workers.h @@ -27,6 +27,7 @@ #include #include "../libs/types.h" +#include "../libs/list.h" typedef struct us_worker_sx { @@ -44,10 +45,9 @@ typedef struct us_worker_sx { ldf job_start_ts; pthread_cond_t has_job_cond; - struct us_worker_sx *prev_wr; - struct us_worker_sx *next_wr; - struct us_workers_pool_sx *pool; + + US_LIST_DECLARE; } us_worker_s; typedef void *(*us_workers_pool_job_init_f)(void *arg); @@ -63,8 +63,7 @@ typedef struct us_workers_pool_sx { uint n_workers; us_worker_s *workers; - us_worker_s *oldest_wr; - us_worker_s *latest_wr; + ldf job_timely_ts; ldf approx_job_time; @@ -85,6 +84,6 @@ us_workers_pool_s *us_workers_pool_init( void us_workers_pool_destroy(us_workers_pool_s *pool); us_worker_s *us_workers_pool_wait(us_workers_pool_s *pool); -void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr/*, void *job*/); +void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr); ldf us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr);