summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSlendi <slendi@socopon.com>2023-09-08 01:13:04 +0300
committerSlendi <slendi@socopon.com>2023-09-08 01:13:04 +0300
commit414decf167f936eca0a43267f034a3adbe573958 (patch)
treeab65bb031b6356a13f269eccf69074247623fc6c
Initial commit.HEADmaster
Signed-off-by: Slendi <slendi@socopon.com>
-rw-r--r--.gitignore1
-rw-r--r--LICENSE7
-rw-r--r--Makefile6
-rw-r--r--cpfast.c270
-rw-r--r--thpool.c553
-rw-r--r--thpool.h187
-rw-r--r--vec.c375
-rw-r--r--vec.h37
8 files changed, 1436 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..4dc13a0
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+cpfast
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..c9f0ea7
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,7 @@
+Copyright (C) 2023 Slendi
+This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3.
+
+This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License along with this program. If not, see <https://www.gnu.org/licenses/>
+
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..a555afe
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,6 @@
+cpfast: cpfast.c
+ cc -march=native -O3 -s *.c -o cpfast
+
+clean:
+ rm -f cpfast
+
diff --git a/cpfast.c b/cpfast.c
new file mode 100644
index 0000000..def0341
--- /dev/null
+++ b/cpfast.c
@@ -0,0 +1,270 @@
+#include <dirent.h>
+#include <pthread.h>
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/stat.h>
+#include <sys/sysinfo.h>
+#include <unistd.h>
+#include <utime.h>
+
+#include <fcntl.h>
+#if defined(__APPLE__) || defined(__FreeBSD__)
+# include <copyfile.h>
+#else
+# include <sys/sendfile.h>
+#endif
+
+#include "thpool.h"
+#include "vec.h"
+
+#define CHMOD 0666
+
+typedef enum _OperationStatus
+{
+ SENDFILE_FAIL = -4,
+ STAT_FAIL = -3,
+ SOURCE_OPEN_FAIL = -2,
+ DESTINATION_OPEN_FAIL = -1,
+ SUCCESS = 0,
+} OperationStatus;
+
+typedef enum _QueueType
+{
+ FILE_COPY,
+ MKDIR,
+} QueueType;
+
+typedef struct _QueueItem
+{
+ char *source, *destination;
+ QueueType type;
+ bool done;
+ size_t requirement;
+} QueueItem;
+
+typedef struct
+{
+ pthread_t thread_id;
+ bool is_valid;
+} ThreadInfo;
+
+struct
+{
+ t_vec file_queue;
+ bool error;
+} g_state;
+
+void generate_queue(char *source, char *destination)
+{
+ static size_t last_dir = -1;
+
+ DIR *dir;
+ struct dirent *entry;
+ if (!(dir = opendir(source))) return;
+
+ while ((entry = readdir(dir)) != NULL) {
+ if (entry->d_type == DT_DIR) {
+ if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) continue;
+
+ char source_path[1024];
+ char dest_path[1024];
+ snprintf(source_path, sizeof(source_path), "%s/%s", source, entry->d_name);
+ snprintf(dest_path, sizeof(dest_path), "%s/%s", destination, entry->d_name);
+
+ QueueItem item;
+ item.source = strdup(source_path);
+ item.destination = strdup(dest_path);
+ item.type = MKDIR;
+ item.done = false;
+ item.requirement = last_dir;
+ last_dir = g_state.file_queue.len;
+ vec_push(&g_state.file_queue, &item);
+
+ generate_queue(source_path, dest_path);
+ } else {
+ char source_path[1024];
+ char dest_path[1024];
+ snprintf(source_path, sizeof(source_path), "%s/%s", source, entry->d_name);
+ snprintf(dest_path, sizeof(dest_path), "%s/%s", destination, entry->d_name);
+
+ QueueItem item;
+ item.source = strdup(source_path);
+ item.destination = strdup(dest_path);
+ item.type = FILE_COPY;
+ item.done = false;
+ item.requirement = last_dir;
+ vec_push(&g_state.file_queue, &item);
+ }
+ }
+
+ closedir(dir);
+}
+
+void print_operation_status(OperationStatus status)
+{
+ switch (status) {
+ case SENDFILE_FAIL: perror("Failed sendfile()"); break;
+ case STAT_FAIL: perror("Failed stat()"); break;
+ case SOURCE_OPEN_FAIL: perror("Failed source open"); break;
+ case DESTINATION_OPEN_FAIL: perror("Failed destination open"); break;
+ case SUCCESS: puts("Done!"); break;
+ }
+}
+
+#define FAILED(x) ((x) < 0)
+#define OK(x) ((x) >= 0)
+#define BYTE_TO_BINARY_PATTERN "%c%c%c%c%c%c%c%c"
+#define BYTE_TO_BINARY(byte) \
+ ((byte)&0x80 ? '1' : '0'), ((byte)&0x40 ? '1' : '0'), ((byte)&0x20 ? '1' : '0'), \
+ ((byte)&0x10 ? '1' : '0'), ((byte)&0x08 ? '1' : '0'), ((byte)&0x04 ? '1' : '0'), \
+ ((byte)&0x02 ? '1' : '0'), ((byte)&0x01 ? '1' : '0')
+OperationStatus copy_file(const char *source, const char *dest, bool preserve_mode,
+ bool preserve_time, bool preserve_guid, bool overwrite)
+{
+ struct stat source_stats;
+ mode_t mode = CHMOD;
+
+ int input_fd, output_fd;
+ if (FAILED(input_fd = open(source, O_RDONLY))) return SOURCE_OPEN_FAIL;
+ if (FAILED(
+ output_fd = open(dest, O_WRONLY | O_CREAT | (overwrite ? 0 : O_EXCL), 0666))) {
+ close(input_fd);
+ return DESTINATION_OPEN_FAIL;
+ }
+
+ if (FAILED(fstat(input_fd, &source_stats))) {
+ close(input_fd);
+ close(output_fd);
+ return STAT_FAIL;
+ }
+ if (preserve_mode) mode = source_stats.st_mode;
+
+#if defined(__APPLE__) || defined(__FreeBSD__)
+ int result = fcopyfile(input, output, 0, COPYFILE_ALL);
+#else
+ off_t copied_bytes = 0;
+ int result = sendfile(output_fd, input_fd, &copied_bytes, source_stats.st_size);
+ if (FAILED(result)) {
+ printf("Failed sneedfile(%d, %d, %p (%ld), %ld) on file %s -> %s\n", output_fd,
+ input_fd, &copied_bytes, copied_bytes, source_stats.st_size, source, dest);
+ }
+#endif
+
+ close(input_fd);
+ close(output_fd);
+
+ if (result < 0) return SENDFILE_FAIL;
+ if (preserve_time) {
+ struct utimbuf new_time;
+ new_time.actime = source_stats.st_atime;
+ new_time.modtime = source_stats.st_mtime;
+ utime(dest, &new_time);
+ }
+
+ return 0;
+}
+
+int is_regular_file(const char *path)
+{
+ struct stat path_stat;
+ stat(path, &path_stat);
+ return S_ISREG(path_stat.st_mode);
+}
+
+int handle_queue_item(QueueItem *item)
+{
+ int ret = 0;
+ if (item->type == FILE_COPY) {
+ struct stat st;
+ stat(item->source, &st);
+ if (S_ISLNK(st.st_mode)) {
+ char link_target[1024];
+ ssize_t link_size = readlink(item->source, link_target, sizeof(link_target));
+ if (link_size == -1) {
+ perror("Failed to read symbolic link");
+ return -1;
+ }
+ link_target[link_size] = '\0';
+
+ if (symlink(link_target, item->destination) == -1) {
+ perror("Failed to create symbolic link");
+ return -1;
+ }
+ } else if (S_ISREG(st.st_mode)) {
+ ret = copy_file(item->source, item->destination, false, false, false, true);
+ if (FAILED(ret)) print_operation_status(ret);
+ } else if (S_ISFIFO(st.st_mode)) {
+ if (mkfifo(item->destination, st.st_mode) == -1) {
+ perror("Failed to create FIFO");
+ return -1;
+ }
+ }
+ } else
+ ret = mkdir(item->destination, 0777);
+
+ if (OK(ret)) item->done = true;
+ if (FAILED(ret)) { printf("Failed with %d\n", ret); }
+ return ret;
+}
+
+void thread_function(void *arg)
+{
+ QueueItem *j = arg;
+
+ if (j->requirement != -1)
+ while (!((QueueItem *)vec_get(&g_state.file_queue, j->requirement))->done
+ && !g_state.error)
+ usleep(0);
+
+ if (g_state.error) return;
+
+ // FIXME: Show this if -v
+ // printf("%s -> %s [%d]\n", j->source, j->destination, j->type);
+
+ int queue_item_ret = handle_queue_item(j);
+ if (FAILED(queue_item_ret)) {
+ printf("Queue item ret: %d\n", queue_item_ret);
+ perror("handle_queue_item");
+ g_state.error = true;
+ }
+ return;
+}
+
+int main(int argc, char **argv)
+{
+ if (argc < 3) {
+ fputs("First argument is source, second argument is destination.", stderr);
+ return EXIT_FAILURE;
+ }
+
+ if (FAILED(vec_new(&g_state.file_queue, 20, sizeof(QueueItem)))) {
+ fputs("Failed creating queue.", stderr);
+ return EXIT_FAILURE;
+ }
+
+ g_state.error = false;
+
+ if (is_regular_file(argv[1])) {
+ print_operation_status(copy_file(argv[1], argv[2], false, false, false, false));
+ return 0;
+ }
+
+ generate_queue(argv[1], argv[2]);
+ mkdir(argv[2], 0777);
+
+ unsigned num_cores = get_nprocs();
+ threadpool tp = thpool_init(num_cores);
+
+ for (unsigned i = 0; i < g_state.file_queue.len; i++)
+ thpool_add_work(tp, thread_function, vec_get(&g_state.file_queue, i));
+
+ thpool_wait(tp);
+
+ if (g_state.error) { return EXIT_FAILURE; }
+
+ thpool_destroy(tp);
+
+ return EXIT_SUCCESS;
+}
diff --git a/thpool.c b/thpool.c
new file mode 100644
index 0000000..59e2e55
--- /dev/null
+++ b/thpool.c
@@ -0,0 +1,553 @@
+/* ********************************
+ * Author: Johan Hanssen Seferidis
+ * License: MIT
+ * Description: Library providing a threading pool where you can add
+ * work. For usage, check the thpool.h file or README.md
+ *
+ *//** @file thpool.h *//*
+ *
+ ********************************/
+
+#if defined(__APPLE__)
+#include <AvailabilityMacros.h>
+#else
+#ifndef _POSIX_C_SOURCE
+#define _POSIX_C_SOURCE 200809L
+#endif
+#endif
+#include <unistd.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <pthread.h>
+#include <errno.h>
+#include <time.h>
+#if defined(__linux__)
+#include <sys/prctl.h>
+#endif
+
+#include "thpool.h"
+
+#ifdef THPOOL_DEBUG
+#define THPOOL_DEBUG 1
+#else
+#define THPOOL_DEBUG 0
+#endif
+
+#if !defined(DISABLE_PRINT) || defined(THPOOL_DEBUG)
+#define err(str) fprintf(stderr, str)
+#else
+#define err(str)
+#endif
+
+static volatile int threads_keepalive;
+static volatile int threads_on_hold;
+
+
+
+/* ========================== STRUCTURES ============================ */
+
+
+/* Binary semaphore */
+typedef struct bsem {
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ int v;
+} bsem;
+
+
+/* Job */
+typedef struct job{
+ struct job* prev; /* pointer to previous job */
+ void (*function)(void* arg); /* function pointer */
+ void* arg; /* function's argument */
+} job;
+
+
+/* Job queue */
+typedef struct jobqueue{
+ pthread_mutex_t rwmutex; /* used for queue r/w access */
+ job *front; /* pointer to front of queue */
+ job *rear; /* pointer to rear of queue */
+ bsem *has_jobs; /* flag as binary semaphore */
+ int len; /* number of jobs in queue */
+} jobqueue;
+
+
+/* Thread */
+typedef struct thread{
+ int id; /* friendly id */
+ pthread_t pthread; /* pointer to actual thread */
+ struct thpool_* thpool_p; /* access to thpool */
+} thread;
+
+
+/* Threadpool */
+typedef struct thpool_{
+ thread** threads; /* pointer to threads */
+ volatile int num_threads_alive; /* threads currently alive */
+ volatile int num_threads_working; /* threads currently working */
+ pthread_mutex_t thcount_lock; /* used for thread count etc */
+ pthread_cond_t threads_all_idle; /* signal to thpool_wait */
+ jobqueue jobqueue; /* job queue */
+} thpool_;
+
+
+
+
+
+/* ========================== PROTOTYPES ============================ */
+
+
+static int thread_init(thpool_* thpool_p, struct thread** thread_p, int id);
+static void* thread_do(struct thread* thread_p);
+static void thread_hold(int sig_id);
+static void thread_destroy(struct thread* thread_p);
+
+static int jobqueue_init(jobqueue* jobqueue_p);
+static void jobqueue_clear(jobqueue* jobqueue_p);
+static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob_p);
+static struct job* jobqueue_pull(jobqueue* jobqueue_p);
+static void jobqueue_destroy(jobqueue* jobqueue_p);
+
+static void bsem_init(struct bsem *bsem_p, int value);
+static void bsem_reset(struct bsem *bsem_p);
+static void bsem_post(struct bsem *bsem_p);
+static void bsem_post_all(struct bsem *bsem_p);
+static void bsem_wait(struct bsem *bsem_p);
+
+
+
+
+
+/* ========================== THREADPOOL ============================ */
+
+
+/* Initialise thread pool */
+struct thpool_* thpool_init(int num_threads){
+
+ threads_on_hold = 0;
+ threads_keepalive = 1;
+
+ if (num_threads < 0){
+ num_threads = 0;
+ }
+
+ /* Make new thread pool */
+ thpool_* thpool_p;
+ thpool_p = (struct thpool_*)malloc(sizeof(struct thpool_));
+ if (thpool_p == NULL){
+ err("thpool_init(): Could not allocate memory for thread pool\n");
+ return NULL;
+ }
+ thpool_p->num_threads_alive = 0;
+ thpool_p->num_threads_working = 0;
+
+ /* Initialise the job queue */
+ if (jobqueue_init(&thpool_p->jobqueue) == -1){
+ err("thpool_init(): Could not allocate memory for job queue\n");
+ free(thpool_p);
+ return NULL;
+ }
+
+ /* Make threads in pool */
+ thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread *));
+ if (thpool_p->threads == NULL){
+ err("thpool_init(): Could not allocate memory for threads\n");
+ jobqueue_destroy(&thpool_p->jobqueue);
+ free(thpool_p);
+ return NULL;
+ }
+
+ pthread_mutex_init(&(thpool_p->thcount_lock), NULL);
+ pthread_cond_init(&thpool_p->threads_all_idle, NULL);
+
+ /* Thread init */
+ int n;
+ for (n=0; n<num_threads; n++){
+ thread_init(thpool_p, &thpool_p->threads[n], n);
+#if THPOOL_DEBUG
+ printf("THPOOL_DEBUG: Created thread %d in pool \n", n);
+#endif
+ }
+
+ /* Wait for threads to initialize */
+ while (thpool_p->num_threads_alive != num_threads) {}
+
+ return thpool_p;
+}
+
+
+/* Add work to the thread pool */
+int thpool_add_work(thpool_* thpool_p, void (*function_p)(void*), void* arg_p){
+ job* newjob;
+
+ newjob=(struct job*)malloc(sizeof(struct job));
+ if (newjob==NULL){
+ err("thpool_add_work(): Could not allocate memory for new job\n");
+ return -1;
+ }
+
+ /* add function and argument */
+ newjob->function=function_p;
+ newjob->arg=arg_p;
+
+ /* add job to queue */
+ jobqueue_push(&thpool_p->jobqueue, newjob);
+
+ return 0;
+}
+
+
+/* Wait until all jobs have finished */
+void thpool_wait(thpool_* thpool_p){
+ pthread_mutex_lock(&thpool_p->thcount_lock);
+ while (thpool_p->jobqueue.len || thpool_p->num_threads_working) {
+ pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock);
+ }
+ pthread_mutex_unlock(&thpool_p->thcount_lock);
+}
+
+
+/* Destroy the threadpool */
+void thpool_destroy(thpool_* thpool_p){
+ /* No need to destroy if it's NULL */
+ if (thpool_p == NULL) return ;
+
+ volatile int threads_total = thpool_p->num_threads_alive;
+
+ /* End each thread 's infinite loop */
+ threads_keepalive = 0;
+
+ /* Give one second to kill idle threads */
+ double TIMEOUT = 1.0;
+ time_t start, end;
+ double tpassed = 0.0;
+ time (&start);
+ while (tpassed < TIMEOUT && thpool_p->num_threads_alive){
+ bsem_post_all(thpool_p->jobqueue.has_jobs);
+ time (&end);
+ tpassed = difftime(end,start);
+ }
+
+ /* Poll remaining threads */
+ while (thpool_p->num_threads_alive){
+ bsem_post_all(thpool_p->jobqueue.has_jobs);
+ sleep(1);
+ }
+
+ /* Job queue cleanup */
+ jobqueue_destroy(&thpool_p->jobqueue);
+ /* Deallocs */
+ int n;
+ for (n=0; n < threads_total; n++){
+ thread_destroy(thpool_p->threads[n]);
+ }
+ free(thpool_p->threads);
+ free(thpool_p);
+}
+
+
+/* Pause all threads in threadpool */
+void thpool_pause(thpool_* thpool_p) {
+ int n;
+ for (n=0; n < thpool_p->num_threads_alive; n++){
+ pthread_kill(thpool_p->threads[n]->pthread, SIGUSR1);
+ }
+}
+
+
+/* Resume all threads in threadpool */
+void thpool_resume(thpool_* thpool_p) {
+ // resuming a single threadpool hasn't been
+ // implemented yet, meanwhile this suppresses
+ // the warnings
+ (void)thpool_p;
+
+ threads_on_hold = 0;
+}
+
+
+int thpool_num_threads_working(thpool_* thpool_p){
+ return thpool_p->num_threads_working;
+}
+
+
+
+
+
+/* ============================ THREAD ============================== */
+
+
+/* Initialize a thread in the thread pool
+ *
+ * @param thread address to the pointer of the thread to be created
+ * @param id id to be given to the thread
+ * @return 0 on success, -1 otherwise.
+ */
+static int thread_init (thpool_* thpool_p, struct thread** thread_p, int id){
+
+ *thread_p = (struct thread*)malloc(sizeof(struct thread));
+ if (*thread_p == NULL){
+ err("thread_init(): Could not allocate memory for thread\n");
+ return -1;
+ }
+
+ (*thread_p)->thpool_p = thpool_p;
+ (*thread_p)->id = id;
+
+ pthread_create(&(*thread_p)->pthread, NULL, (void * (*)(void *)) thread_do, (*thread_p));
+ pthread_detach((*thread_p)->pthread);
+ return 0;
+}
+
+
+/* Sets the calling thread on hold */
+static void thread_hold(int sig_id) {
+ (void)sig_id;
+ threads_on_hold = 1;
+ while (threads_on_hold){
+ sleep(1);
+ }
+}
+
+
+/* What each thread is doing
+*
+* In principle this is an endless loop. The only time this loop gets interuppted is once
+* thpool_destroy() is invoked or the program exits.
+*
+* @param thread thread that will run this function
+* @return nothing
+*/
+static void* thread_do(struct thread* thread_p){
+
+ /* Set thread name for profiling and debugging */
+ char thread_name[16] = {0};
+ snprintf(thread_name, 16, "thpool-%d", thread_p->id);
+
+#if defined(__linux__)
+ /* Use prctl instead to prevent using _GNU_SOURCE flag and implicit declaration */
+ prctl(PR_SET_NAME, thread_name);
+#elif defined(__APPLE__) && defined(__MACH__)
+ pthread_setname_np(thread_name);
+#else
+ err("thread_do(): pthread_setname_np is not supported on this system");
+#endif
+
+ /* Assure all threads have been created before starting serving */
+ thpool_* thpool_p = thread_p->thpool_p;
+
+ /* Register signal handler */
+ struct sigaction act;
+ sigemptyset(&act.sa_mask);
+ act.sa_flags = 0;
+ act.sa_handler = thread_hold;
+ if (sigaction(SIGUSR1, &act, NULL) == -1) {
+ err("thread_do(): cannot handle SIGUSR1");
+ }
+
+ /* Mark thread as alive (initialized) */
+ pthread_mutex_lock(&thpool_p->thcount_lock);
+ thpool_p->num_threads_alive += 1;
+ pthread_mutex_unlock(&thpool_p->thcount_lock);
+
+ while(threads_keepalive){
+
+ bsem_wait(thpool_p->jobqueue.has_jobs);
+
+ if (threads_keepalive){
+
+ pthread_mutex_lock(&thpool_p->thcount_lock);
+ thpool_p->num_threads_working++;
+ pthread_mutex_unlock(&thpool_p->thcount_lock);
+
+ /* Read job from queue and execute it */
+ void (*func_buff)(void*);
+ void* arg_buff;
+ job* job_p = jobqueue_pull(&thpool_p->jobqueue);
+ if (job_p) {
+ func_buff = job_p->function;
+ arg_buff = job_p->arg;
+ func_buff(arg_buff);
+ free(job_p);
+ }
+
+ pthread_mutex_lock(&thpool_p->thcount_lock);
+ thpool_p->num_threads_working--;
+ if (!thpool_p->num_threads_working) {
+ pthread_cond_signal(&thpool_p->threads_all_idle);
+ }
+ pthread_mutex_unlock(&thpool_p->thcount_lock);
+
+ }
+ }
+ pthread_mutex_lock(&thpool_p->thcount_lock);
+ thpool_p->num_threads_alive --;
+ pthread_mutex_unlock(&thpool_p->thcount_lock);
+
+ return NULL;
+}
+
+
+/* Frees a thread */
+static void thread_destroy (thread* thread_p){
+ free(thread_p);
+}
+
+
+
+
+
+/* ============================ JOB QUEUE =========================== */
+
+
+/* Initialize queue */
+static int jobqueue_init(jobqueue* jobqueue_p){
+ jobqueue_p->len = 0;
+ jobqueue_p->front = NULL;
+ jobqueue_p->rear = NULL;
+
+ jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem));
+ if (jobqueue_p->has_jobs == NULL){
+ return -1;
+ }
+
+ pthread_mutex_init(&(jobqueue_p->rwmutex), NULL);
+ bsem_init(jobqueue_p->has_jobs, 0);
+
+ return 0;
+}
+
+
+/* Clear the queue */
+static void jobqueue_clear(jobqueue* jobqueue_p){
+
+ while(jobqueue_p->len){
+ free(jobqueue_pull(jobqueue_p));
+ }
+
+ jobqueue_p->front = NULL;
+ jobqueue_p->rear = NULL;
+ bsem_reset(jobqueue_p->has_jobs);
+ jobqueue_p->len = 0;
+
+}
+
+
+/* Add (allocated) job to queue
+ */
+static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){
+
+ pthread_mutex_lock(&jobqueue_p->rwmutex);
+ newjob->prev = NULL;
+
+ switch(jobqueue_p->len){
+
+ case 0: /* if no jobs in queue */
+ jobqueue_p->front = newjob;
+ jobqueue_p->rear = newjob;
+ break;
+
+ default: /* if jobs in queue */
+ jobqueue_p->rear->prev = newjob;
+ jobqueue_p->rear = newjob;
+
+ }
+ jobqueue_p->len++;
+
+ bsem_post(jobqueue_p->has_jobs);
+ pthread_mutex_unlock(&jobqueue_p->rwmutex);
+}
+
+
+/* Get first job from queue(removes it from queue)
+ * Notice: Caller MUST hold a mutex
+ */
+static struct job* jobqueue_pull(jobqueue* jobqueue_p){
+
+ pthread_mutex_lock(&jobqueue_p->rwmutex);
+ job* job_p = jobqueue_p->front;
+
+ switch(jobqueue_p->len){
+
+ case 0: /* if no jobs in queue */
+ break;
+
+ case 1: /* if one job in queue */
+ jobqueue_p->front = NULL;
+ jobqueue_p->rear = NULL;
+ jobqueue_p->len = 0;
+ break;
+
+ default: /* if >1 jobs in queue */
+ jobqueue_p->front = job_p->prev;
+ jobqueue_p->len--;
+ /* more than one job in queue -> post it */
+ bsem_post(jobqueue_p->has_jobs);
+
+ }
+
+ pthread_mutex_unlock(&jobqueue_p->rwmutex);
+ return job_p;
+}
+
+
+/* Free all queue resources back to the system */
+static void jobqueue_destroy(jobqueue* jobqueue_p){
+ jobqueue_clear(jobqueue_p);
+ free(jobqueue_p->has_jobs);
+}
+
+
+
+
+
+/* ======================== SYNCHRONISATION ========================= */
+
+
+/* Init semaphore to 1 or 0 */
+static void bsem_init(bsem *bsem_p, int value) {
+ if (value < 0 || value > 1) {
+ err("bsem_init(): Binary semaphore can take only values 1 or 0");
+ exit(1);
+ }
+ pthread_mutex_init(&(bsem_p->mutex), NULL);
+ pthread_cond_init(&(bsem_p->cond), NULL);
+ bsem_p->v = value;
+}
+
+
+/* Reset semaphore to 0 */
+static void bsem_reset(bsem *bsem_p) {
+ bsem_init(bsem_p, 0);
+}
+
+
+/* Post to at least one thread */
+static void bsem_post(bsem *bsem_p) {
+ pthread_mutex_lock(&bsem_p->mutex);
+ bsem_p->v = 1;
+ pthread_cond_signal(&bsem_p->cond);
+ pthread_mutex_unlock(&bsem_p->mutex);
+}
+
+
+/* Post to all threads */
+static void bsem_post_all(bsem *bsem_p) {
+ pthread_mutex_lock(&bsem_p->mutex);
+ bsem_p->v = 1;
+ pthread_cond_broadcast(&bsem_p->cond);
+ pthread_mutex_unlock(&bsem_p->mutex);
+}
+
+
+/* Wait on semaphore until semaphore has value 0 */
+static void bsem_wait(bsem* bsem_p) {
+ pthread_mutex_lock(&bsem_p->mutex);
+ while (bsem_p->v != 1) {
+ pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex);
+ }
+ bsem_p->v = 0;
+ pthread_mutex_unlock(&bsem_p->mutex);
+}
diff --git a/thpool.h b/thpool.h
new file mode 100644
index 0000000..af3e68d
--- /dev/null
+++ b/thpool.h
@@ -0,0 +1,187 @@
+/**********************************
+ * @author Johan Hanssen Seferidis
+ * License: MIT
+ *
+ **********************************/
+
+#ifndef _THPOOL_
+#define _THPOOL_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* =================================== API ======================================= */
+
+
+typedef struct thpool_* threadpool;
+
+
+/**
+ * @brief Initialize threadpool
+ *
+ * Initializes a threadpool. This function will not return until all
+ * threads have initialized successfully.
+ *
+ * @example
+ *
+ * ..
+ * threadpool thpool; //First we declare a threadpool
+ * thpool = thpool_init(4); //then we initialize it to 4 threads
+ * ..
+ *
+ * @param num_threads number of threads to be created in the threadpool
+ * @return threadpool created threadpool on success,
+ * NULL on error
+ */
+threadpool thpool_init(int num_threads);
+
+
+/**
+ * @brief Add work to the job queue
+ *
+ * Takes an action and its argument and adds it to the threadpool's job queue.
+ * If you want to add to work a function with more than one arguments then
+ * a way to implement this is by passing a pointer to a structure.
+ *
+ * NOTICE: You have to cast both the function and argument to not get warnings.
+ *
+ * @example
+ *
+ * void print_num(int num){
+ * printf("%d\n", num);
+ * }
+ *
+ * int main() {
+ * ..
+ * int a = 10;
+ * thpool_add_work(thpool, (void*)print_num, (void*)a);
+ * ..
+ * }
+ *
+ * @param threadpool threadpool to which the work will be added
+ * @param function_p pointer to function to add as work
+ * @param arg_p pointer to an argument
+ * @return 0 on success, -1 otherwise.
+ */
+int thpool_add_work(threadpool, void (*function_p)(void*), void* arg_p);
+
+
+/**
+ * @brief Wait for all queued jobs to finish
+ *
+ * Will wait for all jobs - both queued and currently running to finish.
+ * Once the queue is empty and all work has completed, the calling thread
+ * (probably the main program) will continue.
+ *
+ * Smart polling is used in wait. The polling is initially 0 - meaning that
+ * there is virtually no polling at all. If after 1 seconds the threads
+ * haven't finished, the polling interval starts growing exponentially
+ * until it reaches max_secs seconds. Then it jumps down to a maximum polling
+ * interval assuming that heavy processing is being used in the threadpool.
+ *
+ * @example
+ *
+ * ..
+ * threadpool thpool = thpool_init(4);
+ * ..
+ * // Add a bunch of work
+ * ..
+ * thpool_wait(thpool);
+ * puts("All added work has finished");
+ * ..
+ *
+ * @param threadpool the threadpool to wait for
+ * @return nothing
+ */
+void thpool_wait(threadpool);
+
+
+/**
+ * @brief Pauses all threads immediately
+ *
+ * The threads will be paused no matter if they are idle or working.
+ * The threads return to their previous states once thpool_resume
+ * is called.
+ *
+ * While the thread is being paused, new work can be added.
+ *
+ * @example
+ *
+ * threadpool thpool = thpool_init(4);
+ * thpool_pause(thpool);
+ * ..
+ * // Add a bunch of work
+ * ..
+ * thpool_resume(thpool); // Let the threads start their magic
+ *
+ * @param threadpool the threadpool where the threads should be paused
+ * @return nothing
+ */
+void thpool_pause(threadpool);
+
+
+/**
+ * @brief Unpauses all threads if they are paused
+ *
+ * @example
+ * ..
+ * thpool_pause(thpool);
+ * sleep(10); // Delay execution 10 seconds
+ * thpool_resume(thpool);
+ * ..
+ *
+ * @param threadpool the threadpool where the threads should be unpaused
+ * @return nothing
+ */
+void thpool_resume(threadpool);
+
+
+/**
+ * @brief Destroy the threadpool
+ *
+ * This will wait for the currently active threads to finish and then 'kill'
+ * the whole threadpool to free up memory.
+ *
+ * @example
+ * int main() {
+ * threadpool thpool1 = thpool_init(2);
+ * threadpool thpool2 = thpool_init(2);
+ * ..
+ * thpool_destroy(thpool1);
+ * ..
+ * return 0;
+ * }
+ *
+ * @param threadpool the threadpool to destroy
+ * @return nothing
+ */
+void thpool_destroy(threadpool);
+
+
+/**
+ * @brief Show currently working threads
+ *
+ * Working threads are the threads that are performing work (not idle).
+ *
+ * @example
+ * int main() {
+ * threadpool thpool1 = thpool_init(2);
+ * threadpool thpool2 = thpool_init(2);
+ * ..
+ * printf("Working threads: %d\n", thpool_num_threads_working(thpool1));
+ * ..
+ * return 0;
+ * }
+ *
+ * @param threadpool the threadpool of interest
+ * @return integer number of threads working
+ */
+int thpool_num_threads_working(threadpool);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/vec.c b/vec.c
new file mode 100644
index 0000000..20100a2
--- /dev/null
+++ b/vec.c
@@ -0,0 +1,375 @@
+#include "vec.h"
+
+int vec_new(t_vec *dst, size_t init_len, size_t elem_size)
+{
+ if (!dst || elem_size == 0)
+ return (-1);
+ dst->alloc_size = init_len * elem_size;
+ dst->len = 0;
+ dst->elem_size = elem_size;
+ if (init_len == 0)
+ dst->memory = NULL;
+ else
+ {
+ dst->memory = malloc(dst->alloc_size);
+ if (!dst->memory)
+ return (-1);
+ }
+ return (1);
+}
+
+void vec_free(t_vec *src)
+{
+ if (!src || src->alloc_size == 0)
+ return ;
+ free(src->memory);
+ src->memory = NULL;
+ src->alloc_size = 0;
+ src->elem_size = 0;
+ src->len = 0;
+}
+
+int vec_from(t_vec *dst, void *src, size_t len, size_t elem_size)
+{
+ if (!dst || !src || elem_size == 0)
+ return (-1);
+ else if (vec_new(dst, len, elem_size) < 0)
+ return (-1);
+ memcpy(
+ dst->memory,
+ src,
+ dst->alloc_size);
+ dst->len = len;
+ return (1);
+}
+
+int vec_copy(t_vec *dst, t_vec *src)
+{
+ size_t copy_size;
+
+ if (!dst || !src || !src->memory)
+ return (-1);
+ else if (!dst->memory)
+ vec_new(dst, src->len, dst->elem_size);
+ if (src->len * src->elem_size < dst->alloc_size)
+ copy_size = src->len * src->elem_size;
+ else
+ copy_size = src->alloc_size;
+ memcpy(
+ dst->memory,
+ src->memory,
+ copy_size);
+ dst->len = copy_size / dst->elem_size;
+ return (1);
+}
+
+int vec_resize(t_vec *src, size_t target_len)
+{
+ t_vec dst;
+
+ if (!src)
+ return (-1);
+ else if (!src->memory)
+ return vec_new(src, target_len, src->elem_size);
+ else if (vec_new(&dst, target_len, src->elem_size) < 0)
+ return (-1);
+ memcpy(
+ dst.memory,
+ src->memory,
+ src->len * src->elem_size);
+ dst.len = src->len;
+ vec_free(src);
+ *src = dst;
+ return (1);
+}
+
+void *vec_get(t_vec *src, size_t index)
+{
+ unsigned char *ptr;
+
+ if (index >= src->len || !src || !src->memory)
+ return (NULL);
+ ptr = &src->memory[src->elem_size * index];
+ return (ptr);
+}
+
+int vec_push(t_vec *dst, void *src)
+{
+ if (!dst || !src)
+ return (-1);
+ else if (!dst->memory)
+ vec_new(dst, 1, dst->elem_size);
+ if (dst->elem_size * dst->len >= dst->alloc_size)
+ if (vec_resize(dst, dst->len * 2) < 0)
+ return (-1);
+ memcpy(&dst->memory[dst->elem_size * dst->len], src, dst->elem_size);
+ dst->len++;
+ return (1);
+}
+
+int vec_pop(void *dst, t_vec *src)
+{
+ if (!dst || !src)
+ return (-1);
+ else if (!src->memory || src->len == 0)
+ return (0);
+ memcpy(dst, vec_get(src, src->len - 1), src->elem_size);
+ src->len--;
+ return (1);
+}
+
+int vec_clear(t_vec *src)
+{
+ if (!src)
+ return (-1);
+ src->len = 0;
+ return (1);
+}
+
+int vec_insert(t_vec *dst, void *src, size_t index)
+{
+ if (!dst || !src || index > dst->len)
+ return (-1);
+ else if (index == dst->len)
+ return (vec_push(dst, src));
+ if (dst->elem_size * dst->len >= dst->alloc_size)
+ if (vec_resize(dst, (dst->alloc_size * 2) / dst->elem_size) < 0)
+ return (-1);
+ memmove(
+ vec_get(dst, index + 1),
+ vec_get(dst, index),
+ (dst->len - index) * dst->elem_size);
+ memcpy(
+ vec_get(dst, index),
+ src, dst->elem_size);
+ dst->len++;
+ return (1);
+}
+
+int vec_remove(t_vec *src, size_t index)
+{
+ if (!src || index > src->len)
+ return (-1);
+ else if (index == src->len)
+ {
+ src->len--;
+ return (1);
+ }
+ memmove(
+ vec_get(src, index),
+ &src->memory[src->elem_size * (index + 1)],
+ (src->len - index) * src->elem_size);
+ src->len--;
+ return (1);
+}
+
+int vec_append(t_vec *dst, t_vec *src)
+{
+ int ret;
+ size_t alloc_size;
+
+ if (!dst || !src || !src->memory)
+ return (-1);
+ else if (!dst->memory)
+ vec_new(dst, 1, dst->elem_size);
+ alloc_size = dst->len * dst->elem_size + src->len * src->elem_size;
+ if (dst->alloc_size < alloc_size)
+ {
+ if (dst->alloc_size * 2 < dst->len * alloc_size)
+ ret = vec_resize(dst, alloc_size);
+ else
+ ret = vec_resize(dst, dst->alloc_size * 2);
+ if (ret < 0)
+ return (-1);
+ }
+ memcpy(
+ &dst->memory[dst->elem_size * dst->len],
+ src->memory,
+ src->len * src->elem_size);
+ dst->len += src->len;
+ return (1);
+}
+
+int vec_prepend(t_vec *dst, t_vec *src)
+{
+ t_vec new;
+ size_t alloc_size;
+
+ if (!dst || !src)
+ return (-1);
+ else if (!dst->memory)
+ vec_new(dst, 1, dst->elem_size);
+ alloc_size = dst->len * dst->elem_size + src->len * src->elem_size;
+ if (vec_new(&new, alloc_size / dst->elem_size, dst->elem_size) < 0)
+ return (-1);
+ vec_copy(&new, src);
+ new.len = src->len + dst->len;
+ memcpy(
+ &new.memory[dst->elem_size * dst->len],
+ dst->memory,
+ dst->len * dst->elem_size);
+ vec_free(dst);
+ *dst = new;
+ return (1);
+}
+
+void vec_iter(t_vec *src, void (*f) (void *))
+{
+ void *ptr;
+ size_t i;
+
+ if (!src || !src->memory)
+ return ;
+ i = 0;
+ while (i < src->len)
+ {
+ ptr = vec_get(src, i);
+ f(ptr);
+ i++;
+ }
+}
+
+void *vec_find(t_vec *src, bool (*f) (void *))
+{
+ void *ptr;
+ size_t i;
+
+ if (!src || !src->memory)
+ return (NULL);
+ i = 0;
+ while (i < src->len)
+ {
+ ptr = vec_get(src, i);
+ if (f(ptr) == true)
+ return (ptr);
+ i++;
+ }
+ return (NULL);
+}
+
+int vec_map(t_vec *dst, t_vec *src, void (*f) (void *))
+{
+ void *ptr;
+ void *res;
+ size_t i;
+
+ if (!dst || !src || !src->memory)
+ return (-1);
+ else if (!dst->memory)
+ vec_new(dst, 1, dst->elem_size);
+ res = malloc(dst->elem_size);
+ if (!res)
+ return (-1);
+ i = 0;
+ while (i < src->len)
+ {
+ ptr = vec_get(src, i);
+ memcpy(res, ptr, dst->elem_size);
+ f(res);
+ vec_push(dst, res);
+ i++;
+ }
+ free(res);
+ return (1);
+}
+
+int vec_filter(t_vec *dst, t_vec *src, bool (*f) (void *))
+{
+ void *ptr;
+ void *res;
+ size_t i;
+
+ if (!dst || !src || !src->memory)
+ return (-1);
+ else if (!dst->memory)
+ vec_new(dst, 1, dst->elem_size);
+ res = malloc(dst->elem_size);
+ if (!res)
+ return (-1);
+ i = 0;
+ while (i < src->len)
+ {
+ ptr = vec_get(src, i);
+ memcpy(res, ptr, dst->elem_size);
+ if (f(res) == true)
+ vec_push(dst, res);
+ i++;
+ }
+ free(res);
+ return (1);
+}
+
+int vec_reduce(void *dst, t_vec *src, void (*f) (void *, void *))
+{
+ void *ptr;
+ size_t i;
+
+ if (!dst || !src || !src->memory)
+ return (-1);
+ i = 0;
+ while (i < src->len)
+ {
+ ptr = vec_get(src, i);
+ f(dst, ptr);
+ i++;
+ }
+ return (1);
+}
+
+static void memswap8(unsigned char *a, unsigned char *b)
+{
+ if (a == b)
+ return ;
+ *a ^= *b;
+ *b ^= *a;
+ *a ^= *b;
+}
+
+static void memswap(unsigned char *a, unsigned char *b, size_t bytes)
+{
+ size_t i;
+
+ if (!a || !b)
+ return ;
+ i = 0;
+ while (i < bytes)
+ {
+ memswap8(&a[i], &b[i]);
+ i++;
+ }
+}
+
+static void vec_sort_recurse(t_vec *src,
+ long int low,
+ long int high,
+ int (*f)(void *, void *))
+{
+ long int pivot;
+ long int a;
+ long int b;
+
+ if (low >= high)
+ return ;
+ pivot = low;
+ a = low;
+ b = high;
+ while (a < b)
+ {
+ while (a <= high && f(vec_get(src, a), vec_get(src, pivot)) <= 0)
+ a++;
+ while (b >= low && f(vec_get(src, b), vec_get(src, pivot)) > 0)
+ b--;
+ if (a < b)
+ memswap(vec_get(src, a), vec_get(src, b), src->elem_size);
+ }
+ memswap(vec_get(src, pivot), vec_get(src, b), src->elem_size);
+ vec_sort_recurse(src, low, b - 1, f);
+ vec_sort_recurse(src, b + 1, high, f);
+}
+
+void vec_sort(t_vec *src, int (*f)(void *, void *))
+{
+ if (!src || !src->memory)
+ return ;
+ vec_sort_recurse(src, 0, src->len - 1, f);
+}
diff --git a/vec.h b/vec.h
new file mode 100644
index 0000000..926d229
--- /dev/null
+++ b/vec.h
@@ -0,0 +1,37 @@
+#ifndef VEC_H
+# define VEC_H
+
+#include "stdlib.h"
+#include "unistd.h"
+#include "string.h"
+#include "stdbool.h"
+
+typedef struct s_vec
+{
+ unsigned char *memory;
+ size_t elem_size;
+ size_t alloc_size;
+ size_t len;
+} t_vec;
+
+int vec_new(t_vec *src, size_t init_len, size_t elem_size);
+void vec_free(t_vec *src);
+int vec_from(t_vec *dst, void *src, size_t len, size_t elem_size);
+int vec_resize(t_vec *src, size_t target_size);
+int vec_clear(t_vec *src);
+int vec_push(t_vec *src, void *elem);
+int vec_pop(void *dst, t_vec *src);
+int vec_copy(t_vec *dst, t_vec *src);
+void *vec_get(t_vec *src, size_t index);
+int vec_insert(t_vec *dst, void *elem, size_t index);
+int vec_remove(t_vec *src, size_t index);
+int vec_append(t_vec *dst, t_vec *src);
+int vec_prepend(t_vec *dst, t_vec *src);
+void vec_iter(t_vec *src, void (*f) (void *));
+void *vec_find(t_vec *src, bool (*f) (void *));
+int vec_map(t_vec *dst, t_vec *src, void (*f) (void *));
+int vec_filter(t_vec *dst, t_vec *src, bool (*f) (void *));
+int vec_reduce(void *dst, t_vec *src, void (*f) (void *, void *));
+void vec_sort(t_vec *src, int (*f)(void *, void *));
+
+#endif