#include "kvm/threadpool.h" #include "kvm/mutex.h" #include "kvm/kvm.h" #include #include #include #include static DEFINE_MUTEX(job_mutex); static DEFINE_MUTEX(thread_mutex); static pthread_cond_t job_cond = PTHREAD_COND_INITIALIZER; static LIST_HEAD(head); static pthread_t *threads; static long threadcount; static bool running; static struct thread_pool__job *thread_pool__job_pop_locked(void) { struct thread_pool__job *job; if (list_empty(&head)) return NULL; job = list_first_entry(&head, struct thread_pool__job, queue); list_del_init(&job->queue); return job; } static void thread_pool__job_push_locked(struct thread_pool__job *job) { list_add_tail(&job->queue, &head); } static struct thread_pool__job *thread_pool__job_pop(void) { struct thread_pool__job *job; mutex_lock(&job_mutex); job = thread_pool__job_pop_locked(); mutex_unlock(&job_mutex); return job; } static void thread_pool__job_push(struct thread_pool__job *job) { mutex_lock(&job_mutex); thread_pool__job_push_locked(job); mutex_unlock(&job_mutex); } static void thread_pool__handle_job(struct thread_pool__job *job) { while (job) { job->callback(job->kvm, job->data); mutex_lock(&job->mutex); if (--job->signalcount > 0) /* If the job was signaled again while we were working */ thread_pool__job_push(job); mutex_unlock(&job->mutex); job = thread_pool__job_pop(); } } static void thread_pool__threadfunc_cleanup(void *param) { mutex_unlock(&job_mutex); } static void *thread_pool__threadfunc(void *param) { pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL); kvm__set_thread_name("threadpool-worker"); while (running) { struct thread_pool__job *curjob = NULL; mutex_lock(&job_mutex); while (running && (curjob = thread_pool__job_pop_locked()) == NULL) pthread_cond_wait(&job_cond, &job_mutex.mutex); mutex_unlock(&job_mutex); if (running) thread_pool__handle_job(curjob); } pthread_cleanup_pop(0); return NULL; } static int thread_pool__addthread(void) { int res; void *newthreads; mutex_lock(&thread_mutex); newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t)); if (newthreads == NULL) { mutex_unlock(&thread_mutex); return -1; } threads = newthreads; res = pthread_create(threads + threadcount, NULL, thread_pool__threadfunc, NULL); if (res == 0) threadcount++; mutex_unlock(&thread_mutex); return res; } int thread_pool__init(struct kvm *kvm) { unsigned long i; unsigned int thread_count = sysconf(_SC_NPROCESSORS_ONLN); running = true; for (i = 0; i < thread_count; i++) if (thread_pool__addthread() < 0) return i; return i; } late_init(thread_pool__init); int thread_pool__exit(struct kvm *kvm) { int i; void *NUL = NULL; running = false; for (i = 0; i < threadcount; i++) { mutex_lock(&job_mutex); pthread_cond_signal(&job_cond); mutex_unlock(&job_mutex); } for (i = 0; i < threadcount; i++) { pthread_join(threads[i], NUL); } return 0; } late_exit(thread_pool__exit); void thread_pool__do_job(struct thread_pool__job *job) { struct thread_pool__job *jobinfo = job; if (jobinfo == NULL || jobinfo->callback == NULL) return; mutex_lock(&jobinfo->mutex); if (jobinfo->signalcount++ == 0) thread_pool__job_push(job); mutex_unlock(&jobinfo->mutex); mutex_lock(&job_mutex); pthread_cond_signal(&job_cond); mutex_unlock(&job_mutex); } void thread_pool__cancel_job(struct thread_pool__job *job) { bool running; /* * If the job is queued but not running, remove it. Otherwise, wait for * the signalcount to drop to 0, indicating that it has finished * running. We assume that nobody is queueing this job - * thread_pool__do_job() isn't called - while this function is running. */ do { mutex_lock(&job_mutex); if (list_empty(&job->queue)) { running = job->signalcount > 0; } else { list_del_init(&job->queue); job->signalcount = 0; running = false; } mutex_unlock(&job_mutex); } while (running); }