diff options
author | Slendi <slendi@socopon.com> | 2023-09-08 01:13:04 +0300 |
---|---|---|
committer | Slendi <slendi@socopon.com> | 2023-09-08 01:13:04 +0300 |
commit | 414decf167f936eca0a43267f034a3adbe573958 (patch) | |
tree | ab65bb031b6356a13f269eccf69074247623fc6c |
Signed-off-by: Slendi <slendi@socopon.com>
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | LICENSE | 7 | ||||
-rw-r--r-- | Makefile | 6 | ||||
-rw-r--r-- | cpfast.c | 270 | ||||
-rw-r--r-- | thpool.c | 553 | ||||
-rw-r--r-- | thpool.h | 187 | ||||
-rw-r--r-- | vec.c | 375 | ||||
-rw-r--r-- | vec.h | 37 |
8 files changed, 1436 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4dc13a0 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +cpfast @@ -0,0 +1,7 @@ +Copyright (C) 2023 Slendi +This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3. + +This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License along with this program. If not, see <https://www.gnu.org/licenses/> + diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..a555afe --- /dev/null +++ b/Makefile @@ -0,0 +1,6 @@ +cpfast: cpfast.c + cc -march=native -O3 -s *.c -o cpfast + +clean: + rm -f cpfast + diff --git a/cpfast.c b/cpfast.c new file mode 100644 index 0000000..def0341 --- /dev/null +++ b/cpfast.c @@ -0,0 +1,270 @@ +#include <dirent.h> +#include <pthread.h> +#include <stdbool.h> +#include <stddef.h> +#include <stdio.h> +#include <stdlib.h> +#include <sys/stat.h> +#include <sys/sysinfo.h> +#include <unistd.h> +#include <utime.h> + +#include <fcntl.h> +#if defined(__APPLE__) || defined(__FreeBSD__) +# include <copyfile.h> +#else +# include <sys/sendfile.h> +#endif + +#include "thpool.h" +#include "vec.h" + +#define CHMOD 0666 + +typedef enum _OperationStatus +{ + SENDFILE_FAIL = -4, + STAT_FAIL = -3, + SOURCE_OPEN_FAIL = -2, + DESTINATION_OPEN_FAIL = -1, + SUCCESS = 0, +} OperationStatus; + +typedef enum _QueueType +{ + FILE_COPY, + MKDIR, +} QueueType; + +typedef struct _QueueItem +{ + char *source, *destination; + QueueType type; + bool done; + size_t requirement; +} QueueItem; + +typedef struct +{ + pthread_t thread_id; + bool is_valid; +} ThreadInfo; + +struct +{ + t_vec file_queue; + bool error; +} g_state; + +void generate_queue(char *source, char *destination) +{ + static size_t last_dir = -1; + + DIR *dir; + struct dirent *entry; + if (!(dir = opendir(source))) return; + + while ((entry = readdir(dir)) != NULL) { + if (entry->d_type == DT_DIR) { + if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) continue; + + char source_path[1024]; + char dest_path[1024]; + snprintf(source_path, sizeof(source_path), "%s/%s", source, entry->d_name); + snprintf(dest_path, sizeof(dest_path), "%s/%s", destination, entry->d_name); + + QueueItem item; + item.source = strdup(source_path); + item.destination = strdup(dest_path); + item.type = MKDIR; + item.done = false; + item.requirement = last_dir; + last_dir = g_state.file_queue.len; + vec_push(&g_state.file_queue, &item); + + generate_queue(source_path, dest_path); + } else { + char source_path[1024]; + char dest_path[1024]; + snprintf(source_path, sizeof(source_path), "%s/%s", source, entry->d_name); + snprintf(dest_path, sizeof(dest_path), "%s/%s", destination, entry->d_name); + + QueueItem item; + item.source = strdup(source_path); + item.destination = strdup(dest_path); + item.type = FILE_COPY; + item.done = false; + item.requirement = last_dir; + vec_push(&g_state.file_queue, &item); + } + } + + closedir(dir); +} + +void print_operation_status(OperationStatus status) +{ + switch (status) { + case SENDFILE_FAIL: perror("Failed sendfile()"); break; + case STAT_FAIL: perror("Failed stat()"); break; + case SOURCE_OPEN_FAIL: perror("Failed source open"); break; + case DESTINATION_OPEN_FAIL: perror("Failed destination open"); break; + case SUCCESS: puts("Done!"); break; + } +} + +#define FAILED(x) ((x) < 0) +#define OK(x) ((x) >= 0) +#define BYTE_TO_BINARY_PATTERN "%c%c%c%c%c%c%c%c" +#define BYTE_TO_BINARY(byte) \ + ((byte)&0x80 ? '1' : '0'), ((byte)&0x40 ? '1' : '0'), ((byte)&0x20 ? '1' : '0'), \ + ((byte)&0x10 ? '1' : '0'), ((byte)&0x08 ? '1' : '0'), ((byte)&0x04 ? '1' : '0'), \ + ((byte)&0x02 ? '1' : '0'), ((byte)&0x01 ? '1' : '0') +OperationStatus copy_file(const char *source, const char *dest, bool preserve_mode, + bool preserve_time, bool preserve_guid, bool overwrite) +{ + struct stat source_stats; + mode_t mode = CHMOD; + + int input_fd, output_fd; + if (FAILED(input_fd = open(source, O_RDONLY))) return SOURCE_OPEN_FAIL; + if (FAILED( + output_fd = open(dest, O_WRONLY | O_CREAT | (overwrite ? 0 : O_EXCL), 0666))) { + close(input_fd); + return DESTINATION_OPEN_FAIL; + } + + if (FAILED(fstat(input_fd, &source_stats))) { + close(input_fd); + close(output_fd); + return STAT_FAIL; + } + if (preserve_mode) mode = source_stats.st_mode; + +#if defined(__APPLE__) || defined(__FreeBSD__) + int result = fcopyfile(input, output, 0, COPYFILE_ALL); +#else + off_t copied_bytes = 0; + int result = sendfile(output_fd, input_fd, &copied_bytes, source_stats.st_size); + if (FAILED(result)) { + printf("Failed sneedfile(%d, %d, %p (%ld), %ld) on file %s -> %s\n", output_fd, + input_fd, &copied_bytes, copied_bytes, source_stats.st_size, source, dest); + } +#endif + + close(input_fd); + close(output_fd); + + if (result < 0) return SENDFILE_FAIL; + if (preserve_time) { + struct utimbuf new_time; + new_time.actime = source_stats.st_atime; + new_time.modtime = source_stats.st_mtime; + utime(dest, &new_time); + } + + return 0; +} + +int is_regular_file(const char *path) +{ + struct stat path_stat; + stat(path, &path_stat); + return S_ISREG(path_stat.st_mode); +} + +int handle_queue_item(QueueItem *item) +{ + int ret = 0; + if (item->type == FILE_COPY) { + struct stat st; + stat(item->source, &st); + if (S_ISLNK(st.st_mode)) { + char link_target[1024]; + ssize_t link_size = readlink(item->source, link_target, sizeof(link_target)); + if (link_size == -1) { + perror("Failed to read symbolic link"); + return -1; + } + link_target[link_size] = '\0'; + + if (symlink(link_target, item->destination) == -1) { + perror("Failed to create symbolic link"); + return -1; + } + } else if (S_ISREG(st.st_mode)) { + ret = copy_file(item->source, item->destination, false, false, false, true); + if (FAILED(ret)) print_operation_status(ret); + } else if (S_ISFIFO(st.st_mode)) { + if (mkfifo(item->destination, st.st_mode) == -1) { + perror("Failed to create FIFO"); + return -1; + } + } + } else + ret = mkdir(item->destination, 0777); + + if (OK(ret)) item->done = true; + if (FAILED(ret)) { printf("Failed with %d\n", ret); } + return ret; +} + +void thread_function(void *arg) +{ + QueueItem *j = arg; + + if (j->requirement != -1) + while (!((QueueItem *)vec_get(&g_state.file_queue, j->requirement))->done + && !g_state.error) + usleep(0); + + if (g_state.error) return; + + // FIXME: Show this if -v + // printf("%s -> %s [%d]\n", j->source, j->destination, j->type); + + int queue_item_ret = handle_queue_item(j); + if (FAILED(queue_item_ret)) { + printf("Queue item ret: %d\n", queue_item_ret); + perror("handle_queue_item"); + g_state.error = true; + } + return; +} + +int main(int argc, char **argv) +{ + if (argc < 3) { + fputs("First argument is source, second argument is destination.", stderr); + return EXIT_FAILURE; + } + + if (FAILED(vec_new(&g_state.file_queue, 20, sizeof(QueueItem)))) { + fputs("Failed creating queue.", stderr); + return EXIT_FAILURE; + } + + g_state.error = false; + + if (is_regular_file(argv[1])) { + print_operation_status(copy_file(argv[1], argv[2], false, false, false, false)); + return 0; + } + + generate_queue(argv[1], argv[2]); + mkdir(argv[2], 0777); + + unsigned num_cores = get_nprocs(); + threadpool tp = thpool_init(num_cores); + + for (unsigned i = 0; i < g_state.file_queue.len; i++) + thpool_add_work(tp, thread_function, vec_get(&g_state.file_queue, i)); + + thpool_wait(tp); + + if (g_state.error) { return EXIT_FAILURE; } + + thpool_destroy(tp); + + return EXIT_SUCCESS; +} diff --git a/thpool.c b/thpool.c new file mode 100644 index 0000000..59e2e55 --- /dev/null +++ b/thpool.c @@ -0,0 +1,553 @@ +/* ******************************** + * Author: Johan Hanssen Seferidis + * License: MIT + * Description: Library providing a threading pool where you can add + * work. For usage, check the thpool.h file or README.md + * + *//** @file thpool.h *//* + * + ********************************/ + +#if defined(__APPLE__) +#include <AvailabilityMacros.h> +#else +#ifndef _POSIX_C_SOURCE +#define _POSIX_C_SOURCE 200809L +#endif +#endif +#include <unistd.h> +#include <signal.h> +#include <stdio.h> +#include <stdlib.h> +#include <pthread.h> +#include <errno.h> +#include <time.h> +#if defined(__linux__) +#include <sys/prctl.h> +#endif + +#include "thpool.h" + +#ifdef THPOOL_DEBUG +#define THPOOL_DEBUG 1 +#else +#define THPOOL_DEBUG 0 +#endif + +#if !defined(DISABLE_PRINT) || defined(THPOOL_DEBUG) +#define err(str) fprintf(stderr, str) +#else +#define err(str) +#endif + +static volatile int threads_keepalive; +static volatile int threads_on_hold; + + + +/* ========================== STRUCTURES ============================ */ + + +/* Binary semaphore */ +typedef struct bsem { + pthread_mutex_t mutex; + pthread_cond_t cond; + int v; +} bsem; + + +/* Job */ +typedef struct job{ + struct job* prev; /* pointer to previous job */ + void (*function)(void* arg); /* function pointer */ + void* arg; /* function's argument */ +} job; + + +/* Job queue */ +typedef struct jobqueue{ + pthread_mutex_t rwmutex; /* used for queue r/w access */ + job *front; /* pointer to front of queue */ + job *rear; /* pointer to rear of queue */ + bsem *has_jobs; /* flag as binary semaphore */ + int len; /* number of jobs in queue */ +} jobqueue; + + +/* Thread */ +typedef struct thread{ + int id; /* friendly id */ + pthread_t pthread; /* pointer to actual thread */ + struct thpool_* thpool_p; /* access to thpool */ +} thread; + + +/* Threadpool */ +typedef struct thpool_{ + thread** threads; /* pointer to threads */ + volatile int num_threads_alive; /* threads currently alive */ + volatile int num_threads_working; /* threads currently working */ + pthread_mutex_t thcount_lock; /* used for thread count etc */ + pthread_cond_t threads_all_idle; /* signal to thpool_wait */ + jobqueue jobqueue; /* job queue */ +} thpool_; + + + + + +/* ========================== PROTOTYPES ============================ */ + + +static int thread_init(thpool_* thpool_p, struct thread** thread_p, int id); +static void* thread_do(struct thread* thread_p); +static void thread_hold(int sig_id); +static void thread_destroy(struct thread* thread_p); + +static int jobqueue_init(jobqueue* jobqueue_p); +static void jobqueue_clear(jobqueue* jobqueue_p); +static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob_p); +static struct job* jobqueue_pull(jobqueue* jobqueue_p); +static void jobqueue_destroy(jobqueue* jobqueue_p); + +static void bsem_init(struct bsem *bsem_p, int value); +static void bsem_reset(struct bsem *bsem_p); +static void bsem_post(struct bsem *bsem_p); +static void bsem_post_all(struct bsem *bsem_p); +static void bsem_wait(struct bsem *bsem_p); + + + + + +/* ========================== THREADPOOL ============================ */ + + +/* Initialise thread pool */ +struct thpool_* thpool_init(int num_threads){ + + threads_on_hold = 0; + threads_keepalive = 1; + + if (num_threads < 0){ + num_threads = 0; + } + + /* Make new thread pool */ + thpool_* thpool_p; + thpool_p = (struct thpool_*)malloc(sizeof(struct thpool_)); + if (thpool_p == NULL){ + err("thpool_init(): Could not allocate memory for thread pool\n"); + return NULL; + } + thpool_p->num_threads_alive = 0; + thpool_p->num_threads_working = 0; + + /* Initialise the job queue */ + if (jobqueue_init(&thpool_p->jobqueue) == -1){ + err("thpool_init(): Could not allocate memory for job queue\n"); + free(thpool_p); + return NULL; + } + + /* Make threads in pool */ + thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread *)); + if (thpool_p->threads == NULL){ + err("thpool_init(): Could not allocate memory for threads\n"); + jobqueue_destroy(&thpool_p->jobqueue); + free(thpool_p); + return NULL; + } + + pthread_mutex_init(&(thpool_p->thcount_lock), NULL); + pthread_cond_init(&thpool_p->threads_all_idle, NULL); + + /* Thread init */ + int n; + for (n=0; n<num_threads; n++){ + thread_init(thpool_p, &thpool_p->threads[n], n); +#if THPOOL_DEBUG + printf("THPOOL_DEBUG: Created thread %d in pool \n", n); +#endif + } + + /* Wait for threads to initialize */ + while (thpool_p->num_threads_alive != num_threads) {} + + return thpool_p; +} + + +/* Add work to the thread pool */ +int thpool_add_work(thpool_* thpool_p, void (*function_p)(void*), void* arg_p){ + job* newjob; + + newjob=(struct job*)malloc(sizeof(struct job)); + if (newjob==NULL){ + err("thpool_add_work(): Could not allocate memory for new job\n"); + return -1; + } + + /* add function and argument */ + newjob->function=function_p; + newjob->arg=arg_p; + + /* add job to queue */ + jobqueue_push(&thpool_p->jobqueue, newjob); + + return 0; +} + + +/* Wait until all jobs have finished */ +void thpool_wait(thpool_* thpool_p){ + pthread_mutex_lock(&thpool_p->thcount_lock); + while (thpool_p->jobqueue.len || thpool_p->num_threads_working) { + pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock); + } + pthread_mutex_unlock(&thpool_p->thcount_lock); +} + + +/* Destroy the threadpool */ +void thpool_destroy(thpool_* thpool_p){ + /* No need to destroy if it's NULL */ + if (thpool_p == NULL) return ; + + volatile int threads_total = thpool_p->num_threads_alive; + + /* End each thread 's infinite loop */ + threads_keepalive = 0; + + /* Give one second to kill idle threads */ + double TIMEOUT = 1.0; + time_t start, end; + double tpassed = 0.0; + time (&start); + while (tpassed < TIMEOUT && thpool_p->num_threads_alive){ + bsem_post_all(thpool_p->jobqueue.has_jobs); + time (&end); + tpassed = difftime(end,start); + } + + /* Poll remaining threads */ + while (thpool_p->num_threads_alive){ + bsem_post_all(thpool_p->jobqueue.has_jobs); + sleep(1); + } + + /* Job queue cleanup */ + jobqueue_destroy(&thpool_p->jobqueue); + /* Deallocs */ + int n; + for (n=0; n < threads_total; n++){ + thread_destroy(thpool_p->threads[n]); + } + free(thpool_p->threads); + free(thpool_p); +} + + +/* Pause all threads in threadpool */ +void thpool_pause(thpool_* thpool_p) { + int n; + for (n=0; n < thpool_p->num_threads_alive; n++){ + pthread_kill(thpool_p->threads[n]->pthread, SIGUSR1); + } +} + + +/* Resume all threads in threadpool */ +void thpool_resume(thpool_* thpool_p) { + // resuming a single threadpool hasn't been + // implemented yet, meanwhile this suppresses + // the warnings + (void)thpool_p; + + threads_on_hold = 0; +} + + +int thpool_num_threads_working(thpool_* thpool_p){ + return thpool_p->num_threads_working; +} + + + + + +/* ============================ THREAD ============================== */ + + +/* Initialize a thread in the thread pool + * + * @param thread address to the pointer of the thread to be created + * @param id id to be given to the thread + * @return 0 on success, -1 otherwise. + */ +static int thread_init (thpool_* thpool_p, struct thread** thread_p, int id){ + + *thread_p = (struct thread*)malloc(sizeof(struct thread)); + if (*thread_p == NULL){ + err("thread_init(): Could not allocate memory for thread\n"); + return -1; + } + + (*thread_p)->thpool_p = thpool_p; + (*thread_p)->id = id; + + pthread_create(&(*thread_p)->pthread, NULL, (void * (*)(void *)) thread_do, (*thread_p)); + pthread_detach((*thread_p)->pthread); + return 0; +} + + +/* Sets the calling thread on hold */ +static void thread_hold(int sig_id) { + (void)sig_id; + threads_on_hold = 1; + while (threads_on_hold){ + sleep(1); + } +} + + +/* What each thread is doing +* +* In principle this is an endless loop. The only time this loop gets interuppted is once +* thpool_destroy() is invoked or the program exits. +* +* @param thread thread that will run this function +* @return nothing +*/ +static void* thread_do(struct thread* thread_p){ + + /* Set thread name for profiling and debugging */ + char thread_name[16] = {0}; + snprintf(thread_name, 16, "thpool-%d", thread_p->id); + +#if defined(__linux__) + /* Use prctl instead to prevent using _GNU_SOURCE flag and implicit declaration */ + prctl(PR_SET_NAME, thread_name); +#elif defined(__APPLE__) && defined(__MACH__) + pthread_setname_np(thread_name); +#else + err("thread_do(): pthread_setname_np is not supported on this system"); +#endif + + /* Assure all threads have been created before starting serving */ + thpool_* thpool_p = thread_p->thpool_p; + + /* Register signal handler */ + struct sigaction act; + sigemptyset(&act.sa_mask); + act.sa_flags = 0; + act.sa_handler = thread_hold; + if (sigaction(SIGUSR1, &act, NULL) == -1) { + err("thread_do(): cannot handle SIGUSR1"); + } + + /* Mark thread as alive (initialized) */ + pthread_mutex_lock(&thpool_p->thcount_lock); + thpool_p->num_threads_alive += 1; + pthread_mutex_unlock(&thpool_p->thcount_lock); + + while(threads_keepalive){ + + bsem_wait(thpool_p->jobqueue.has_jobs); + + if (threads_keepalive){ + + pthread_mutex_lock(&thpool_p->thcount_lock); + thpool_p->num_threads_working++; + pthread_mutex_unlock(&thpool_p->thcount_lock); + + /* Read job from queue and execute it */ + void (*func_buff)(void*); + void* arg_buff; + job* job_p = jobqueue_pull(&thpool_p->jobqueue); + if (job_p) { + func_buff = job_p->function; + arg_buff = job_p->arg; + func_buff(arg_buff); + free(job_p); + } + + pthread_mutex_lock(&thpool_p->thcount_lock); + thpool_p->num_threads_working--; + if (!thpool_p->num_threads_working) { + pthread_cond_signal(&thpool_p->threads_all_idle); + } + pthread_mutex_unlock(&thpool_p->thcount_lock); + + } + } + pthread_mutex_lock(&thpool_p->thcount_lock); + thpool_p->num_threads_alive --; + pthread_mutex_unlock(&thpool_p->thcount_lock); + + return NULL; +} + + +/* Frees a thread */ +static void thread_destroy (thread* thread_p){ + free(thread_p); +} + + + + + +/* ============================ JOB QUEUE =========================== */ + + +/* Initialize queue */ +static int jobqueue_init(jobqueue* jobqueue_p){ + jobqueue_p->len = 0; + jobqueue_p->front = NULL; + jobqueue_p->rear = NULL; + + jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem)); + if (jobqueue_p->has_jobs == NULL){ + return -1; + } + + pthread_mutex_init(&(jobqueue_p->rwmutex), NULL); + bsem_init(jobqueue_p->has_jobs, 0); + + return 0; +} + + +/* Clear the queue */ +static void jobqueue_clear(jobqueue* jobqueue_p){ + + while(jobqueue_p->len){ + free(jobqueue_pull(jobqueue_p)); + } + + jobqueue_p->front = NULL; + jobqueue_p->rear = NULL; + bsem_reset(jobqueue_p->has_jobs); + jobqueue_p->len = 0; + +} + + +/* Add (allocated) job to queue + */ +static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){ + + pthread_mutex_lock(&jobqueue_p->rwmutex); + newjob->prev = NULL; + + switch(jobqueue_p->len){ + + case 0: /* if no jobs in queue */ + jobqueue_p->front = newjob; + jobqueue_p->rear = newjob; + break; + + default: /* if jobs in queue */ + jobqueue_p->rear->prev = newjob; + jobqueue_p->rear = newjob; + + } + jobqueue_p->len++; + + bsem_post(jobqueue_p->has_jobs); + pthread_mutex_unlock(&jobqueue_p->rwmutex); +} + + +/* Get first job from queue(removes it from queue) + * Notice: Caller MUST hold a mutex + */ +static struct job* jobqueue_pull(jobqueue* jobqueue_p){ + + pthread_mutex_lock(&jobqueue_p->rwmutex); + job* job_p = jobqueue_p->front; + + switch(jobqueue_p->len){ + + case 0: /* if no jobs in queue */ + break; + + case 1: /* if one job in queue */ + jobqueue_p->front = NULL; + jobqueue_p->rear = NULL; + jobqueue_p->len = 0; + break; + + default: /* if >1 jobs in queue */ + jobqueue_p->front = job_p->prev; + jobqueue_p->len--; + /* more than one job in queue -> post it */ + bsem_post(jobqueue_p->has_jobs); + + } + + pthread_mutex_unlock(&jobqueue_p->rwmutex); + return job_p; +} + + +/* Free all queue resources back to the system */ +static void jobqueue_destroy(jobqueue* jobqueue_p){ + jobqueue_clear(jobqueue_p); + free(jobqueue_p->has_jobs); +} + + + + + +/* ======================== SYNCHRONISATION ========================= */ + + +/* Init semaphore to 1 or 0 */ +static void bsem_init(bsem *bsem_p, int value) { + if (value < 0 || value > 1) { + err("bsem_init(): Binary semaphore can take only values 1 or 0"); + exit(1); + } + pthread_mutex_init(&(bsem_p->mutex), NULL); + pthread_cond_init(&(bsem_p->cond), NULL); + bsem_p->v = value; +} + + +/* Reset semaphore to 0 */ +static void bsem_reset(bsem *bsem_p) { + bsem_init(bsem_p, 0); +} + + +/* Post to at least one thread */ +static void bsem_post(bsem *bsem_p) { + pthread_mutex_lock(&bsem_p->mutex); + bsem_p->v = 1; + pthread_cond_signal(&bsem_p->cond); + pthread_mutex_unlock(&bsem_p->mutex); +} + + +/* Post to all threads */ +static void bsem_post_all(bsem *bsem_p) { + pthread_mutex_lock(&bsem_p->mutex); + bsem_p->v = 1; + pthread_cond_broadcast(&bsem_p->cond); + pthread_mutex_unlock(&bsem_p->mutex); +} + + +/* Wait on semaphore until semaphore has value 0 */ +static void bsem_wait(bsem* bsem_p) { + pthread_mutex_lock(&bsem_p->mutex); + while (bsem_p->v != 1) { + pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex); + } + bsem_p->v = 0; + pthread_mutex_unlock(&bsem_p->mutex); +} diff --git a/thpool.h b/thpool.h new file mode 100644 index 0000000..af3e68d --- /dev/null +++ b/thpool.h @@ -0,0 +1,187 @@ +/********************************** + * @author Johan Hanssen Seferidis + * License: MIT + * + **********************************/ + +#ifndef _THPOOL_ +#define _THPOOL_ + +#ifdef __cplusplus +extern "C" { +#endif + +/* =================================== API ======================================= */ + + +typedef struct thpool_* threadpool; + + +/** + * @brief Initialize threadpool + * + * Initializes a threadpool. This function will not return until all + * threads have initialized successfully. + * + * @example + * + * .. + * threadpool thpool; //First we declare a threadpool + * thpool = thpool_init(4); //then we initialize it to 4 threads + * .. + * + * @param num_threads number of threads to be created in the threadpool + * @return threadpool created threadpool on success, + * NULL on error + */ +threadpool thpool_init(int num_threads); + + +/** + * @brief Add work to the job queue + * + * Takes an action and its argument and adds it to the threadpool's job queue. + * If you want to add to work a function with more than one arguments then + * a way to implement this is by passing a pointer to a structure. + * + * NOTICE: You have to cast both the function and argument to not get warnings. + * + * @example + * + * void print_num(int num){ + * printf("%d\n", num); + * } + * + * int main() { + * .. + * int a = 10; + * thpool_add_work(thpool, (void*)print_num, (void*)a); + * .. + * } + * + * @param threadpool threadpool to which the work will be added + * @param function_p pointer to function to add as work + * @param arg_p pointer to an argument + * @return 0 on success, -1 otherwise. + */ +int thpool_add_work(threadpool, void (*function_p)(void*), void* arg_p); + + +/** + * @brief Wait for all queued jobs to finish + * + * Will wait for all jobs - both queued and currently running to finish. + * Once the queue is empty and all work has completed, the calling thread + * (probably the main program) will continue. + * + * Smart polling is used in wait. The polling is initially 0 - meaning that + * there is virtually no polling at all. If after 1 seconds the threads + * haven't finished, the polling interval starts growing exponentially + * until it reaches max_secs seconds. Then it jumps down to a maximum polling + * interval assuming that heavy processing is being used in the threadpool. + * + * @example + * + * .. + * threadpool thpool = thpool_init(4); + * .. + * // Add a bunch of work + * .. + * thpool_wait(thpool); + * puts("All added work has finished"); + * .. + * + * @param threadpool the threadpool to wait for + * @return nothing + */ +void thpool_wait(threadpool); + + +/** + * @brief Pauses all threads immediately + * + * The threads will be paused no matter if they are idle or working. + * The threads return to their previous states once thpool_resume + * is called. + * + * While the thread is being paused, new work can be added. + * + * @example + * + * threadpool thpool = thpool_init(4); + * thpool_pause(thpool); + * .. + * // Add a bunch of work + * .. + * thpool_resume(thpool); // Let the threads start their magic + * + * @param threadpool the threadpool where the threads should be paused + * @return nothing + */ +void thpool_pause(threadpool); + + +/** + * @brief Unpauses all threads if they are paused + * + * @example + * .. + * thpool_pause(thpool); + * sleep(10); // Delay execution 10 seconds + * thpool_resume(thpool); + * .. + * + * @param threadpool the threadpool where the threads should be unpaused + * @return nothing + */ +void thpool_resume(threadpool); + + +/** + * @brief Destroy the threadpool + * + * This will wait for the currently active threads to finish and then 'kill' + * the whole threadpool to free up memory. + * + * @example + * int main() { + * threadpool thpool1 = thpool_init(2); + * threadpool thpool2 = thpool_init(2); + * .. + * thpool_destroy(thpool1); + * .. + * return 0; + * } + * + * @param threadpool the threadpool to destroy + * @return nothing + */ +void thpool_destroy(threadpool); + + +/** + * @brief Show currently working threads + * + * Working threads are the threads that are performing work (not idle). + * + * @example + * int main() { + * threadpool thpool1 = thpool_init(2); + * threadpool thpool2 = thpool_init(2); + * .. + * printf("Working threads: %d\n", thpool_num_threads_working(thpool1)); + * .. + * return 0; + * } + * + * @param threadpool the threadpool of interest + * @return integer number of threads working + */ +int thpool_num_threads_working(threadpool); + + +#ifdef __cplusplus +} +#endif + +#endif @@ -0,0 +1,375 @@ +#include "vec.h" + +int vec_new(t_vec *dst, size_t init_len, size_t elem_size) +{ + if (!dst || elem_size == 0) + return (-1); + dst->alloc_size = init_len * elem_size; + dst->len = 0; + dst->elem_size = elem_size; + if (init_len == 0) + dst->memory = NULL; + else + { + dst->memory = malloc(dst->alloc_size); + if (!dst->memory) + return (-1); + } + return (1); +} + +void vec_free(t_vec *src) +{ + if (!src || src->alloc_size == 0) + return ; + free(src->memory); + src->memory = NULL; + src->alloc_size = 0; + src->elem_size = 0; + src->len = 0; +} + +int vec_from(t_vec *dst, void *src, size_t len, size_t elem_size) +{ + if (!dst || !src || elem_size == 0) + return (-1); + else if (vec_new(dst, len, elem_size) < 0) + return (-1); + memcpy( + dst->memory, + src, + dst->alloc_size); + dst->len = len; + return (1); +} + +int vec_copy(t_vec *dst, t_vec *src) +{ + size_t copy_size; + + if (!dst || !src || !src->memory) + return (-1); + else if (!dst->memory) + vec_new(dst, src->len, dst->elem_size); + if (src->len * src->elem_size < dst->alloc_size) + copy_size = src->len * src->elem_size; + else + copy_size = src->alloc_size; + memcpy( + dst->memory, + src->memory, + copy_size); + dst->len = copy_size / dst->elem_size; + return (1); +} + +int vec_resize(t_vec *src, size_t target_len) +{ + t_vec dst; + + if (!src) + return (-1); + else if (!src->memory) + return vec_new(src, target_len, src->elem_size); + else if (vec_new(&dst, target_len, src->elem_size) < 0) + return (-1); + memcpy( + dst.memory, + src->memory, + src->len * src->elem_size); + dst.len = src->len; + vec_free(src); + *src = dst; + return (1); +} + +void *vec_get(t_vec *src, size_t index) +{ + unsigned char *ptr; + + if (index >= src->len || !src || !src->memory) + return (NULL); + ptr = &src->memory[src->elem_size * index]; + return (ptr); +} + +int vec_push(t_vec *dst, void *src) +{ + if (!dst || !src) + return (-1); + else if (!dst->memory) + vec_new(dst, 1, dst->elem_size); + if (dst->elem_size * dst->len >= dst->alloc_size) + if (vec_resize(dst, dst->len * 2) < 0) + return (-1); + memcpy(&dst->memory[dst->elem_size * dst->len], src, dst->elem_size); + dst->len++; + return (1); +} + +int vec_pop(void *dst, t_vec *src) +{ + if (!dst || !src) + return (-1); + else if (!src->memory || src->len == 0) + return (0); + memcpy(dst, vec_get(src, src->len - 1), src->elem_size); + src->len--; + return (1); +} + +int vec_clear(t_vec *src) +{ + if (!src) + return (-1); + src->len = 0; + return (1); +} + +int vec_insert(t_vec *dst, void *src, size_t index) +{ + if (!dst || !src || index > dst->len) + return (-1); + else if (index == dst->len) + return (vec_push(dst, src)); + if (dst->elem_size * dst->len >= dst->alloc_size) + if (vec_resize(dst, (dst->alloc_size * 2) / dst->elem_size) < 0) + return (-1); + memmove( + vec_get(dst, index + 1), + vec_get(dst, index), + (dst->len - index) * dst->elem_size); + memcpy( + vec_get(dst, index), + src, dst->elem_size); + dst->len++; + return (1); +} + +int vec_remove(t_vec *src, size_t index) +{ + if (!src || index > src->len) + return (-1); + else if (index == src->len) + { + src->len--; + return (1); + } + memmove( + vec_get(src, index), + &src->memory[src->elem_size * (index + 1)], + (src->len - index) * src->elem_size); + src->len--; + return (1); +} + +int vec_append(t_vec *dst, t_vec *src) +{ + int ret; + size_t alloc_size; + + if (!dst || !src || !src->memory) + return (-1); + else if (!dst->memory) + vec_new(dst, 1, dst->elem_size); + alloc_size = dst->len * dst->elem_size + src->len * src->elem_size; + if (dst->alloc_size < alloc_size) + { + if (dst->alloc_size * 2 < dst->len * alloc_size) + ret = vec_resize(dst, alloc_size); + else + ret = vec_resize(dst, dst->alloc_size * 2); + if (ret < 0) + return (-1); + } + memcpy( + &dst->memory[dst->elem_size * dst->len], + src->memory, + src->len * src->elem_size); + dst->len += src->len; + return (1); +} + +int vec_prepend(t_vec *dst, t_vec *src) +{ + t_vec new; + size_t alloc_size; + + if (!dst || !src) + return (-1); + else if (!dst->memory) + vec_new(dst, 1, dst->elem_size); + alloc_size = dst->len * dst->elem_size + src->len * src->elem_size; + if (vec_new(&new, alloc_size / dst->elem_size, dst->elem_size) < 0) + return (-1); + vec_copy(&new, src); + new.len = src->len + dst->len; + memcpy( + &new.memory[dst->elem_size * dst->len], + dst->memory, + dst->len * dst->elem_size); + vec_free(dst); + *dst = new; + return (1); +} + +void vec_iter(t_vec *src, void (*f) (void *)) +{ + void *ptr; + size_t i; + + if (!src || !src->memory) + return ; + i = 0; + while (i < src->len) + { + ptr = vec_get(src, i); + f(ptr); + i++; + } +} + +void *vec_find(t_vec *src, bool (*f) (void *)) +{ + void *ptr; + size_t i; + + if (!src || !src->memory) + return (NULL); + i = 0; + while (i < src->len) + { + ptr = vec_get(src, i); + if (f(ptr) == true) + return (ptr); + i++; + } + return (NULL); +} + +int vec_map(t_vec *dst, t_vec *src, void (*f) (void *)) +{ + void *ptr; + void *res; + size_t i; + + if (!dst || !src || !src->memory) + return (-1); + else if (!dst->memory) + vec_new(dst, 1, dst->elem_size); + res = malloc(dst->elem_size); + if (!res) + return (-1); + i = 0; + while (i < src->len) + { + ptr = vec_get(src, i); + memcpy(res, ptr, dst->elem_size); + f(res); + vec_push(dst, res); + i++; + } + free(res); + return (1); +} + +int vec_filter(t_vec *dst, t_vec *src, bool (*f) (void *)) +{ + void *ptr; + void *res; + size_t i; + + if (!dst || !src || !src->memory) + return (-1); + else if (!dst->memory) + vec_new(dst, 1, dst->elem_size); + res = malloc(dst->elem_size); + if (!res) + return (-1); + i = 0; + while (i < src->len) + { + ptr = vec_get(src, i); + memcpy(res, ptr, dst->elem_size); + if (f(res) == true) + vec_push(dst, res); + i++; + } + free(res); + return (1); +} + +int vec_reduce(void *dst, t_vec *src, void (*f) (void *, void *)) +{ + void *ptr; + size_t i; + + if (!dst || !src || !src->memory) + return (-1); + i = 0; + while (i < src->len) + { + ptr = vec_get(src, i); + f(dst, ptr); + i++; + } + return (1); +} + +static void memswap8(unsigned char *a, unsigned char *b) +{ + if (a == b) + return ; + *a ^= *b; + *b ^= *a; + *a ^= *b; +} + +static void memswap(unsigned char *a, unsigned char *b, size_t bytes) +{ + size_t i; + + if (!a || !b) + return ; + i = 0; + while (i < bytes) + { + memswap8(&a[i], &b[i]); + i++; + } +} + +static void vec_sort_recurse(t_vec *src, + long int low, + long int high, + int (*f)(void *, void *)) +{ + long int pivot; + long int a; + long int b; + + if (low >= high) + return ; + pivot = low; + a = low; + b = high; + while (a < b) + { + while (a <= high && f(vec_get(src, a), vec_get(src, pivot)) <= 0) + a++; + while (b >= low && f(vec_get(src, b), vec_get(src, pivot)) > 0) + b--; + if (a < b) + memswap(vec_get(src, a), vec_get(src, b), src->elem_size); + } + memswap(vec_get(src, pivot), vec_get(src, b), src->elem_size); + vec_sort_recurse(src, low, b - 1, f); + vec_sort_recurse(src, b + 1, high, f); +} + +void vec_sort(t_vec *src, int (*f)(void *, void *)) +{ + if (!src || !src->memory) + return ; + vec_sort_recurse(src, 0, src->len - 1, f); +} @@ -0,0 +1,37 @@ +#ifndef VEC_H +# define VEC_H + +#include "stdlib.h" +#include "unistd.h" +#include "string.h" +#include "stdbool.h" + +typedef struct s_vec +{ + unsigned char *memory; + size_t elem_size; + size_t alloc_size; + size_t len; +} t_vec; + +int vec_new(t_vec *src, size_t init_len, size_t elem_size); +void vec_free(t_vec *src); +int vec_from(t_vec *dst, void *src, size_t len, size_t elem_size); +int vec_resize(t_vec *src, size_t target_size); +int vec_clear(t_vec *src); +int vec_push(t_vec *src, void *elem); +int vec_pop(void *dst, t_vec *src); +int vec_copy(t_vec *dst, t_vec *src); +void *vec_get(t_vec *src, size_t index); +int vec_insert(t_vec *dst, void *elem, size_t index); +int vec_remove(t_vec *src, size_t index); +int vec_append(t_vec *dst, t_vec *src); +int vec_prepend(t_vec *dst, t_vec *src); +void vec_iter(t_vec *src, void (*f) (void *)); +void *vec_find(t_vec *src, bool (*f) (void *)); +int vec_map(t_vec *dst, t_vec *src, void (*f) (void *)); +int vec_filter(t_vec *dst, t_vec *src, bool (*f) (void *)); +int vec_reduce(void *dst, t_vec *src, void (*f) (void *, void *)); +void vec_sort(t_vec *src, int (*f)(void *, void *)); + +#endif |