diff options
Diffstat (limited to 'contrib/adaptive-compression/adapt.c')
-rw-r--r-- | contrib/adaptive-compression/adapt.c | 1129 |
1 files changed, 0 insertions, 1129 deletions
diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c deleted file mode 100644 index 8fb4047e996b..000000000000 --- a/contrib/adaptive-compression/adapt.c +++ /dev/null @@ -1,1129 +0,0 @@ -/* - * Copyright (c) 2017-present, Facebook, Inc. - * All rights reserved. - * - * This source code is licensed under both the BSD-style license (found in the - * LICENSE file in the root directory of this source tree) and the GPLv2 (found - * in the COPYING file in the root directory of this source tree). - */ - -#include <stdio.h> /* fprintf */ -#include <stdlib.h> /* malloc, free */ -#include <pthread.h> /* pthread functions */ -#include <string.h> /* memset */ -#include "zstd_internal.h" -#include "util.h" -#include "timefn.h" /* UTIL_time_t, UTIL_getTime, UTIL_getSpanTimeMicro */ - -#define DISPLAY(...) fprintf(stderr, __VA_ARGS__) -#define PRINT(...) fprintf(stdout, __VA_ARGS__) -#define DEBUG(l, ...) { if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } } -#define FILE_CHUNK_SIZE 4 << 20 -#define MAX_NUM_JOBS 2 -#define stdinmark "/*stdin*\\" -#define stdoutmark "/*stdout*\\" -#define MAX_PATH 256 -#define DEFAULT_DISPLAY_LEVEL 1 -#define DEFAULT_COMPRESSION_LEVEL 6 -#define MAX_COMPRESSION_LEVEL_CHANGE 2 -#define CONVERGENCE_LOWER_BOUND 5 -#define CLEVEL_DECREASE_COOLDOWN 5 -#define CHANGE_BY_TWO_THRESHOLD 0.1 -#define CHANGE_BY_ONE_THRESHOLD 0.65 - -#ifndef DEBUG_MODE -static int g_displayLevel = DEFAULT_DISPLAY_LEVEL; -#else -static int g_displayLevel = DEBUG_MODE; -#endif - -static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL; -static UTIL_time_t g_startTime; -static size_t g_streamedSize = 0; -static unsigned g_useProgressBar = 1; -static unsigned g_forceCompressionLevel = 0; -static unsigned g_minCLevel = 1; -static unsigned g_maxCLevel; - -typedef struct { - void* start; - size_t size; - size_t capacity; -} buffer_t; - -typedef struct { - size_t filled; - buffer_t buffer; -} inBuff_t; - -typedef struct { - buffer_t src; - buffer_t dst; - unsigned jobID; - unsigned lastJobPlusOne; - size_t compressedSize; - size_t dictSize; -} jobDescription; - -typedef struct { - pthread_mutex_t pMutex; - int noError; -} mutex_t; - -typedef struct { - pthread_cond_t pCond; - int noError; -} cond_t; - -typedef struct { - unsigned compressionLevel; - unsigned numJobs; - unsigned nextJobID; - unsigned threadError; - - /* - * JobIDs for the next jobs to be created, compressed, and written - */ - unsigned jobReadyID; - unsigned jobCompressedID; - unsigned jobWriteID; - unsigned allJobsCompleted; - - /* - * counter for how many jobs in a row the compression level has not changed - * if the counter becomes >= CONVERGENCE_LOWER_BOUND, the next time the - * compression level tries to change (by non-zero amount) resets the counter - * to 1 and does not apply the change - */ - unsigned convergenceCounter; - - /* - * cooldown counter in order to prevent rapid successive decreases in compression level - * whenever compression level is decreased, cooldown is set to CLEVEL_DECREASE_COOLDOWN - * whenever adaptCompressionLevel() is called and cooldown != 0, it is decremented - * as long as cooldown != 0, the compression level cannot be decreased - */ - unsigned cooldown; - - /* - * XWaitYCompletion - * Range from 0.0 to 1.0 - * if the value is not 1.0, then this implies that thread X waited on thread Y to finish - * and thread Y was XWaitYCompletion finished at the time of the wait (i.e. compressWaitWriteCompletion=0.5 - * implies that the compression thread waited on the write thread and it was only 50% finished writing a job) - */ - double createWaitCompressionCompletion; - double compressWaitCreateCompletion; - double compressWaitWriteCompletion; - double writeWaitCompressionCompletion; - - /* - * Completion values - * Range from 0.0 to 1.0 - * Jobs are divided into mini-chunks in order to measure completion - * these values are updated each time a thread finishes its operation on the - * mini-chunk (i.e. finishes writing out, compressing, etc. this mini-chunk). - */ - double compressionCompletion; - double writeCompletion; - double createCompletion; - - mutex_t jobCompressed_mutex; - cond_t jobCompressed_cond; - mutex_t jobReady_mutex; - cond_t jobReady_cond; - mutex_t allJobsCompleted_mutex; - cond_t allJobsCompleted_cond; - mutex_t jobWrite_mutex; - cond_t jobWrite_cond; - mutex_t compressionCompletion_mutex; - mutex_t createCompletion_mutex; - mutex_t writeCompletion_mutex; - mutex_t compressionLevel_mutex; - size_t lastDictSize; - inBuff_t input; - jobDescription* jobs; - ZSTD_CCtx* cctx; -} adaptCCtx; - -typedef struct { - adaptCCtx* ctx; - FILE* dstFile; -} outputThreadArg; - -typedef struct { - FILE* srcFile; - adaptCCtx* ctx; - outputThreadArg* otArg; -} fcResources; - -static void freeCompressionJobs(adaptCCtx* ctx) -{ - unsigned u; - for (u=0; u<ctx->numJobs; u++) { - jobDescription job = ctx->jobs[u]; - free(job.dst.start); - free(job.src.start); - } -} - -static int destroyMutex(mutex_t* mutex) -{ - if (mutex->noError) { - int const ret = pthread_mutex_destroy(&mutex->pMutex); - return ret; - } - return 0; -} - -static int destroyCond(cond_t* cond) -{ - if (cond->noError) { - int const ret = pthread_cond_destroy(&cond->pCond); - return ret; - } - return 0; -} - -static int freeCCtx(adaptCCtx* ctx) -{ - if (!ctx) return 0; - { - int error = 0; - error |= destroyMutex(&ctx->jobCompressed_mutex); - error |= destroyCond(&ctx->jobCompressed_cond); - error |= destroyMutex(&ctx->jobReady_mutex); - error |= destroyCond(&ctx->jobReady_cond); - error |= destroyMutex(&ctx->allJobsCompleted_mutex); - error |= destroyCond(&ctx->allJobsCompleted_cond); - error |= destroyMutex(&ctx->jobWrite_mutex); - error |= destroyCond(&ctx->jobWrite_cond); - error |= destroyMutex(&ctx->compressionCompletion_mutex); - error |= destroyMutex(&ctx->createCompletion_mutex); - error |= destroyMutex(&ctx->writeCompletion_mutex); - error |= destroyMutex(&ctx->compressionLevel_mutex); - error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx)); - free(ctx->input.buffer.start); - if (ctx->jobs){ - freeCompressionJobs(ctx); - free(ctx->jobs); - } - free(ctx); - return error; - } -} - -static int initMutex(mutex_t* mutex) -{ - int const ret = pthread_mutex_init(&mutex->pMutex, NULL); - mutex->noError = !ret; - return ret; -} - -static int initCond(cond_t* cond) -{ - int const ret = pthread_cond_init(&cond->pCond, NULL); - cond->noError = !ret; - return ret; -} - -static int initCCtx(adaptCCtx* ctx, unsigned numJobs) -{ - ctx->compressionLevel = g_compressionLevel; - { - int pthreadError = 0; - pthreadError |= initMutex(&ctx->jobCompressed_mutex); - pthreadError |= initCond(&ctx->jobCompressed_cond); - pthreadError |= initMutex(&ctx->jobReady_mutex); - pthreadError |= initCond(&ctx->jobReady_cond); - pthreadError |= initMutex(&ctx->allJobsCompleted_mutex); - pthreadError |= initCond(&ctx->allJobsCompleted_cond); - pthreadError |= initMutex(&ctx->jobWrite_mutex); - pthreadError |= initCond(&ctx->jobWrite_cond); - pthreadError |= initMutex(&ctx->compressionCompletion_mutex); - pthreadError |= initMutex(&ctx->createCompletion_mutex); - pthreadError |= initMutex(&ctx->writeCompletion_mutex); - pthreadError |= initMutex(&ctx->compressionLevel_mutex); - if (pthreadError) return pthreadError; - } - ctx->numJobs = numJobs; - ctx->jobReadyID = 0; - ctx->jobCompressedID = 0; - ctx->jobWriteID = 0; - ctx->lastDictSize = 0; - - - ctx->createWaitCompressionCompletion = 1; - ctx->compressWaitCreateCompletion = 1; - ctx->compressWaitWriteCompletion = 1; - ctx->writeWaitCompressionCompletion = 1; - ctx->createCompletion = 1; - ctx->writeCompletion = 1; - ctx->compressionCompletion = 1; - ctx->convergenceCounter = 0; - ctx->cooldown = 0; - - ctx->jobs = calloc(1, numJobs*sizeof(jobDescription)); - - if (!ctx->jobs) { - DISPLAY("Error: could not allocate space for jobs during context creation\n"); - return 1; - } - - /* initializing jobs */ - { - unsigned jobNum; - for (jobNum=0; jobNum<numJobs; jobNum++) { - jobDescription* job = &ctx->jobs[jobNum]; - job->src.start = malloc(2 * FILE_CHUNK_SIZE); - job->dst.start = malloc(ZSTD_compressBound(FILE_CHUNK_SIZE)); - job->lastJobPlusOne = 0; - if (!job->src.start || !job->dst.start) { - DISPLAY("Could not allocate buffers for jobs\n"); - return 1; - } - job->src.capacity = FILE_CHUNK_SIZE; - job->dst.capacity = ZSTD_compressBound(FILE_CHUNK_SIZE); - } - } - - ctx->nextJobID = 0; - ctx->threadError = 0; - ctx->allJobsCompleted = 0; - - ctx->cctx = ZSTD_createCCtx(); - if (!ctx->cctx) { - DISPLAY("Error: could not allocate ZSTD_CCtx\n"); - return 1; - } - - ctx->input.filled = 0; - ctx->input.buffer.capacity = 2 * FILE_CHUNK_SIZE; - - ctx->input.buffer.start = malloc(ctx->input.buffer.capacity); - if (!ctx->input.buffer.start) { - DISPLAY("Error: could not allocate input buffer\n"); - return 1; - } - return 0; -} - -static adaptCCtx* createCCtx(unsigned numJobs) -{ - - adaptCCtx* const ctx = calloc(1, sizeof(adaptCCtx)); - if (ctx == NULL) { - DISPLAY("Error: could not allocate space for context\n"); - return NULL; - } - { - int const error = initCCtx(ctx, numJobs); - if (error) { - freeCCtx(ctx); - return NULL; - } - return ctx; - } -} - -static void signalErrorToThreads(adaptCCtx* ctx) -{ - ctx->threadError = 1; - pthread_mutex_lock(&ctx->jobReady_mutex.pMutex); - pthread_cond_signal(&ctx->jobReady_cond.pCond); - pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); - - pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); - pthread_cond_broadcast(&ctx->jobCompressed_cond.pCond); - pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); - - pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex); - pthread_cond_signal(&ctx->jobWrite_cond.pCond); - pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); - - pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex); - pthread_cond_signal(&ctx->allJobsCompleted_cond.pCond); - pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex); -} - -static void waitUntilAllJobsCompleted(adaptCCtx* ctx) -{ - if (!ctx) return; - pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex); - while (ctx->allJobsCompleted == 0 && !ctx->threadError) { - pthread_cond_wait(&ctx->allJobsCompleted_cond.pCond, &ctx->allJobsCompleted_mutex.pMutex); - } - pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex); -} - -/* map completion percentages to values for changing compression level */ -static unsigned convertCompletionToChange(double completion) -{ - if (completion < CHANGE_BY_TWO_THRESHOLD) { - return 2; - } - else if (completion < CHANGE_BY_ONE_THRESHOLD) { - return 1; - } - else { - return 0; - } -} - -/* - * Compression level is changed depending on which part of the compression process is lagging - * Currently, three theads exist for job creation, compression, and file writing respectively. - * adaptCompressionLevel() increments or decrements compression level based on which of the threads is lagging - * job creation or file writing lag => increased compression level - * compression thread lag => decreased compression level - * detecting which thread is lagging is done by keeping track of how many calls each thread makes to pthread_cond_wait - */ -static void adaptCompressionLevel(adaptCCtx* ctx) -{ - double createWaitCompressionCompletion; - double compressWaitCreateCompletion; - double compressWaitWriteCompletion; - double writeWaitCompressionCompletion; - double const threshold = 0.00001; - unsigned prevCompressionLevel; - - pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); - prevCompressionLevel = ctx->compressionLevel; - pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); - - - if (g_forceCompressionLevel) { - pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); - ctx->compressionLevel = g_compressionLevel; - pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); - return; - } - - - DEBUG(2, "adapting compression level %u\n", prevCompressionLevel); - - /* read and reset completion measurements */ - pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); - DEBUG(2, "createWaitCompressionCompletion %f\n", ctx->createWaitCompressionCompletion); - DEBUG(2, "writeWaitCompressionCompletion %f\n", ctx->writeWaitCompressionCompletion); - createWaitCompressionCompletion = ctx->createWaitCompressionCompletion; - writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion; - pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); - - pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex); - DEBUG(2, "compressWaitWriteCompletion %f\n", ctx->compressWaitWriteCompletion); - compressWaitWriteCompletion = ctx->compressWaitWriteCompletion; - pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex); - - pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex); - DEBUG(2, "compressWaitCreateCompletion %f\n", ctx->compressWaitCreateCompletion); - compressWaitCreateCompletion = ctx->compressWaitCreateCompletion; - pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); - DEBUG(2, "convergence counter: %u\n", ctx->convergenceCounter); - - assert(g_minCLevel <= prevCompressionLevel && g_maxCLevel >= prevCompressionLevel); - - /* adaptation logic */ - if (ctx->cooldown) ctx->cooldown--; - - if ((1-createWaitCompressionCompletion > threshold || 1-writeWaitCompressionCompletion > threshold) && ctx->cooldown == 0) { - /* create or write waiting on compression */ - /* use whichever one waited less because it was slower */ - double const completion = MAX(createWaitCompressionCompletion, writeWaitCompressionCompletion); - unsigned const change = convertCompletionToChange(completion); - unsigned const boundChange = MIN(change, prevCompressionLevel - g_minCLevel); - if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) { - /* reset convergence counter, might have been a spike */ - ctx->convergenceCounter = 0; - DEBUG(2, "convergence counter reset, no change applied\n"); - } - else if (boundChange != 0) { - pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); - ctx->compressionLevel -= boundChange; - pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); - ctx->cooldown = CLEVEL_DECREASE_COOLDOWN; - ctx->convergenceCounter = 1; - - DEBUG(2, "create or write threads waiting on compression, tried to decrease compression level by %u\n\n", boundChange); - } - } - else if (1-compressWaitWriteCompletion > threshold || 1-compressWaitCreateCompletion > threshold) { - /* compress waiting on write */ - double const completion = MIN(compressWaitWriteCompletion, compressWaitCreateCompletion); - unsigned const change = convertCompletionToChange(completion); - unsigned const boundChange = MIN(change, g_maxCLevel - prevCompressionLevel); - if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) { - /* reset convergence counter, might have been a spike */ - ctx->convergenceCounter = 0; - DEBUG(2, "convergence counter reset, no change applied\n"); - } - else if (boundChange != 0) { - pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); - ctx->compressionLevel += boundChange; - pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); - ctx->cooldown = 0; - ctx->convergenceCounter = 1; - - DEBUG(2, "compress waiting on write or create, tried to increase compression level by %u\n\n", boundChange); - } - - } - - pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); - if (ctx->compressionLevel == prevCompressionLevel) { - ctx->convergenceCounter++; - } - pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); -} - -static size_t getUseableDictSize(unsigned compressionLevel) -{ - ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0); - unsigned const overlapLog = compressionLevel >= (unsigned)ZSTD_maxCLevel() ? 0 : 3; - size_t const overlapSize = 1 << (params.cParams.windowLog - overlapLog); - return overlapSize; -} - -static void* compressionThread(void* arg) -{ - adaptCCtx* const ctx = (adaptCCtx*)arg; - unsigned currJob = 0; - for ( ; ; ) { - unsigned const currJobIndex = currJob % ctx->numJobs; - jobDescription* const job = &ctx->jobs[currJobIndex]; - DEBUG(2, "starting compression for job %u\n", currJob); - - { - /* check if compression thread will have to wait */ - unsigned willWaitForCreate = 0; - unsigned willWaitForWrite = 0; - - pthread_mutex_lock(&ctx->jobReady_mutex.pMutex); - if (currJob + 1 > ctx->jobReadyID) willWaitForCreate = 1; - pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); - - pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex); - if (currJob - ctx->jobWriteID >= ctx->numJobs) willWaitForWrite = 1; - pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); - - - pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex); - if (willWaitForCreate) { - DEBUG(2, "compression will wait for create on job %u\n", currJob); - ctx->compressWaitCreateCompletion = ctx->createCompletion; - DEBUG(2, "create completion %f\n", ctx->compressWaitCreateCompletion); - - } - else { - ctx->compressWaitCreateCompletion = 1; - } - pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); - - pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex); - if (willWaitForWrite) { - DEBUG(2, "compression will wait for write on job %u\n", currJob); - ctx->compressWaitWriteCompletion = ctx->writeCompletion; - DEBUG(2, "write completion %f\n", ctx->compressWaitWriteCompletion); - } - else { - ctx->compressWaitWriteCompletion = 1; - } - pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex); - - } - - /* wait until job is ready */ - pthread_mutex_lock(&ctx->jobReady_mutex.pMutex); - while (currJob + 1 > ctx->jobReadyID && !ctx->threadError) { - pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex); - } - pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); - - /* wait until job previously in this space is written */ - pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex); - while (currJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) { - pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex); - } - pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); - /* reset compression completion */ - pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); - ctx->compressionCompletion = 0; - pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); - - /* adapt compression level */ - if (currJob) adaptCompressionLevel(ctx); - - pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); - DEBUG(2, "job %u compressed with level %u\n", currJob, ctx->compressionLevel); - pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); - - /* compress the data */ - { - size_t const compressionBlockSize = ZSTD_BLOCKSIZE_MAX; /* 128 KB */ - unsigned cLevel; - unsigned blockNum = 0; - size_t remaining = job->src.size; - size_t srcPos = 0; - size_t dstPos = 0; - - pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); - cLevel = ctx->compressionLevel; - pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); - - /* reset compressed size */ - job->compressedSize = 0; - DEBUG(2, "calling ZSTD_compressBegin()\n"); - /* begin compression */ - { - size_t const useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize); - ZSTD_parameters params = ZSTD_getParams(cLevel, 0, useDictSize); - params.cParams.windowLog = 23; - { - size_t const initError = ZSTD_compressBegin_advanced(ctx->cctx, job->src.start + job->dictSize - useDictSize, useDictSize, params, 0); - size_t const windowSizeError = ZSTD_CCtx_setParameter(ctx->cctx, ZSTD_c_forceMaxWindow, 1); - if (ZSTD_isError(initError) || ZSTD_isError(windowSizeError)) { - DISPLAY("Error: something went wrong while starting compression\n"); - signalErrorToThreads(ctx); - return arg; - } - } - } - DEBUG(2, "finished with ZSTD_compressBegin()\n"); - - do { - size_t const actualBlockSize = MIN(remaining, compressionBlockSize); - - /* continue compression */ - if (currJob != 0 || blockNum != 0) { /* not first block of first job flush/overwrite the frame header */ - size_t const hSize = ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, 0); - if (ZSTD_isError(hSize)) { - DISPLAY("Error: something went wrong while continuing compression\n"); - job->compressedSize = hSize; - signalErrorToThreads(ctx); - return arg; - } - ZSTD_invalidateRepCodes(ctx->cctx); - } - { - size_t const ret = (job->lastJobPlusOne == currJob + 1 && remaining == actualBlockSize) ? - ZSTD_compressEnd (ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize) : - ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize); - if (ZSTD_isError(ret)) { - DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(ret)); - signalErrorToThreads(ctx); - return arg; - } - job->compressedSize += ret; - remaining -= actualBlockSize; - srcPos += actualBlockSize; - dstPos += ret; - blockNum++; - - /* update completion */ - pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); - ctx->compressionCompletion = 1 - (double)remaining/job->src.size; - pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); - } - } while (remaining != 0); - job->dst.size = job->compressedSize; - } - pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); - ctx->jobCompressedID++; - pthread_cond_broadcast(&ctx->jobCompressed_cond.pCond); - pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); - if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) { - /* finished compressing all jobs */ - break; - } - DEBUG(2, "finished compressing job %u\n", currJob); - currJob++; - } - return arg; -} - -static void displayProgress(unsigned cLevel, unsigned last) -{ - UTIL_time_t currTime = UTIL_getTime(); - if (!g_useProgressBar) return; - { double const timeElapsed = (double)(UTIL_getSpanTimeMicro(g_startTime, currTime) / 1000.0); - double const sizeMB = (double)g_streamedSize / (1 << 20); - double const avgCompRate = sizeMB * 1000 / timeElapsed; - fprintf(stderr, "\r| Comp. Level: %2u | Time Elapsed: %7.2f s | Data Size: %7.1f MB | Avg Comp. Rate: %6.2f MB/s |", cLevel, timeElapsed/1000.0, sizeMB, avgCompRate); - if (last) { - fprintf(stderr, "\n"); - } else { - fflush(stderr); - } } -} - -static void* outputThread(void* arg) -{ - outputThreadArg* const otArg = (outputThreadArg*)arg; - adaptCCtx* const ctx = otArg->ctx; - FILE* const dstFile = otArg->dstFile; - - unsigned currJob = 0; - for ( ; ; ) { - unsigned const currJobIndex = currJob % ctx->numJobs; - jobDescription* const job = &ctx->jobs[currJobIndex]; - unsigned willWaitForCompress = 0; - DEBUG(2, "starting write for job %u\n", currJob); - - pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); - if (currJob + 1 > ctx->jobCompressedID) willWaitForCompress = 1; - pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); - - - pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); - if (willWaitForCompress) { - /* write thread is waiting on compression thread */ - ctx->writeWaitCompressionCompletion = ctx->compressionCompletion; - DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion); - } - else { - ctx->writeWaitCompressionCompletion = 1; - } - pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); - - pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); - while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) { - pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); - } - pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); - - /* reset write completion */ - pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex); - ctx->writeCompletion = 0; - pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex); - - { - size_t const compressedSize = job->compressedSize; - size_t remaining = compressedSize; - if (ZSTD_isError(compressedSize)) { - DISPLAY("Error: an error occurred during compression\n"); - signalErrorToThreads(ctx); - return arg; - } - { - size_t const blockSize = MAX(compressedSize >> 7, 1 << 10); - size_t pos = 0; - for ( ; ; ) { - size_t const writeSize = MIN(remaining, blockSize); - size_t const ret = fwrite(job->dst.start + pos, 1, writeSize, dstFile); - if (ret != writeSize) break; - pos += ret; - remaining -= ret; - - /* update completion variable for writing */ - pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex); - ctx->writeCompletion = 1 - (double)remaining/compressedSize; - pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex); - - if (remaining == 0) break; - } - if (pos != compressedSize) { - DISPLAY("Error: an error occurred during file write operation\n"); - signalErrorToThreads(ctx); - return arg; - } - } - } - { - unsigned cLevel; - pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); - cLevel = ctx->compressionLevel; - pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); - displayProgress(cLevel, job->lastJobPlusOne == currJob + 1); - } - pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex); - ctx->jobWriteID++; - pthread_cond_signal(&ctx->jobWrite_cond.pCond); - pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); - - if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) { - /* finished with all jobs */ - pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex); - ctx->allJobsCompleted = 1; - pthread_cond_signal(&ctx->allJobsCompleted_cond.pCond); - pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex); - break; - } - DEBUG(2, "finished writing job %u\n", currJob); - currJob++; - - } - return arg; -} - -static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) -{ - unsigned const nextJob = ctx->nextJobID; - unsigned const nextJobIndex = nextJob % ctx->numJobs; - jobDescription* const job = &ctx->jobs[nextJobIndex]; - - - job->src.size = srcSize; - job->jobID = nextJob; - if (last) job->lastJobPlusOne = nextJob + 1; - { - /* swap buffer */ - void* const copy = job->src.start; - job->src.start = ctx->input.buffer.start; - ctx->input.buffer.start = copy; - } - job->dictSize = ctx->lastDictSize; - - ctx->nextJobID++; - /* if not on the last job, reuse data as dictionary in next job */ - if (!last) { - size_t const oldDictSize = ctx->lastDictSize; - memcpy(ctx->input.buffer.start, job->src.start + oldDictSize, srcSize); - ctx->lastDictSize = srcSize; - ctx->input.filled = srcSize; - } - - /* signal job ready */ - pthread_mutex_lock(&ctx->jobReady_mutex.pMutex); - ctx->jobReadyID++; - pthread_cond_signal(&ctx->jobReady_cond.pCond); - pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); - - return 0; -} - -static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadArg* otArg) -{ - /* early error check to exit */ - if (!ctx || !srcFile || !otArg) { - return 1; - } - - /* create output thread */ - { - pthread_t out; - if (pthread_create(&out, NULL, &outputThread, otArg)) { - DISPLAY("Error: could not create output thread\n"); - signalErrorToThreads(ctx); - return 1; - } - else if (pthread_detach(out)) { - DISPLAY("Error: could not detach output thread\n"); - signalErrorToThreads(ctx); - return 1; - } - } - - /* create compression thread */ - { - pthread_t compression; - if (pthread_create(&compression, NULL, &compressionThread, ctx)) { - DISPLAY("Error: could not create compression thread\n"); - signalErrorToThreads(ctx); - return 1; - } - else if (pthread_detach(compression)) { - DISPLAY("Error: could not detach compression thread\n"); - signalErrorToThreads(ctx); - return 1; - } - } - { - unsigned currJob = 0; - /* creating jobs */ - for ( ; ; ) { - size_t pos = 0; - size_t const readBlockSize = 1 << 15; - size_t remaining = FILE_CHUNK_SIZE; - unsigned const nextJob = ctx->nextJobID; - unsigned willWaitForCompress = 0; - DEBUG(2, "starting creation of job %u\n", currJob); - - pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); - if (nextJob - ctx->jobCompressedID >= ctx->numJobs) willWaitForCompress = 1; - pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); - - pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); - if (willWaitForCompress) { - /* creation thread is waiting, take measurement of completion */ - ctx->createWaitCompressionCompletion = ctx->compressionCompletion; - DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion); - } - else { - ctx->createWaitCompressionCompletion = 1; - } - pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); - - /* wait until the job has been compressed */ - pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); - while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) { - pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); - } - pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); - - /* reset create completion */ - pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex); - ctx->createCompletion = 0; - pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); - - while (remaining != 0 && !feof(srcFile)) { - size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile); - if (ret != readBlockSize && !feof(srcFile)) { - /* error could not read correct number of bytes */ - DISPLAY("Error: problem occurred during read from src file\n"); - signalErrorToThreads(ctx); - return 1; - } - pos += ret; - remaining -= ret; - pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex); - ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE); - pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); - } - if (remaining != 0 && !feof(srcFile)) { - DISPLAY("Error: problem occurred during read from src file\n"); - signalErrorToThreads(ctx); - return 1; - } - g_streamedSize += pos; - /* reading was fine, now create the compression job */ - { - int const last = feof(srcFile); - int const error = createCompressionJob(ctx, pos, last); - if (error != 0) { - signalErrorToThreads(ctx); - return error; - } - } - DEBUG(2, "finished creating job %u\n", currJob); - currJob++; - if (feof(srcFile)) { - break; - } - } - } - /* success -- created all jobs */ - return 0; -} - -static fcResources createFileCompressionResources(const char* const srcFilename, const char* const dstFilenameOrNull) -{ - fcResources fcr; - unsigned const stdinUsed = !strcmp(srcFilename, stdinmark); - FILE* const srcFile = stdinUsed ? stdin : fopen(srcFilename, "rb"); - const char* const outFilenameIntermediate = (stdinUsed && !dstFilenameOrNull) ? stdoutmark : dstFilenameOrNull; - const char* outFilename = outFilenameIntermediate; - char fileAndSuffix[MAX_PATH]; - size_t const numJobs = MAX_NUM_JOBS; - - memset(&fcr, 0, sizeof(fcr)); - - if (!outFilenameIntermediate) { - if (snprintf(fileAndSuffix, MAX_PATH, "%s.zst", srcFilename) + 1 > MAX_PATH) { - DISPLAY("Error: output filename is too long\n"); - return fcr; - } - outFilename = fileAndSuffix; - } - - { - unsigned const stdoutUsed = !strcmp(outFilename, stdoutmark); - FILE* const dstFile = stdoutUsed ? stdout : fopen(outFilename, "wb"); - fcr.otArg = malloc(sizeof(outputThreadArg)); - if (!fcr.otArg) { - DISPLAY("Error: could not allocate space for output thread argument\n"); - return fcr; - } - fcr.otArg->dstFile = dstFile; - } - /* checking for errors */ - if (!fcr.otArg->dstFile || !srcFile) { - DISPLAY("Error: some file(s) could not be opened\n"); - return fcr; - } - - /* creating context */ - fcr.ctx = createCCtx(numJobs); - fcr.otArg->ctx = fcr.ctx; - fcr.srcFile = srcFile; - return fcr; -} - -static int freeFileCompressionResources(fcResources* fcr) -{ - int ret = 0; - waitUntilAllJobsCompleted(fcr->ctx); - ret |= (fcr->srcFile != NULL) ? fclose(fcr->srcFile) : 0; - ret |= (fcr->ctx != NULL) ? freeCCtx(fcr->ctx) : 0; - if (fcr->otArg) { - ret |= (fcr->otArg->dstFile != stdout) ? fclose(fcr->otArg->dstFile) : 0; - free(fcr->otArg); - /* no need to freeCCtx() on otArg->ctx because it should be the same context */ - } - return ret; -} - -static int compressFilename(const char* const srcFilename, const char* const dstFilenameOrNull) -{ - int ret = 0; - fcResources fcr = createFileCompressionResources(srcFilename, dstFilenameOrNull); - g_streamedSize = 0; - ret |= performCompression(fcr.ctx, fcr.srcFile, fcr.otArg); - ret |= freeFileCompressionResources(&fcr); - return ret; -} - -static int compressFilenames(const char** filenameTable, unsigned numFiles, unsigned forceStdout) -{ - int ret = 0; - unsigned fileNum; - for (fileNum=0; fileNum<numFiles; fileNum++) { - const char* filename = filenameTable[fileNum]; - if (!forceStdout) { - ret |= compressFilename(filename, NULL); - } - else { - ret |= compressFilename(filename, stdoutmark); - } - - } - return ret; -} - -/*! readU32FromChar() : - @return : unsigned integer value read from input in `char` format - allows and interprets K, KB, KiB, M, MB and MiB suffix. - Will also modify `*stringPtr`, advancing it to position where it stopped reading. - Note : function result can overflow if digit string > MAX_UINT */ -static unsigned readU32FromChar(const char** stringPtr) -{ - unsigned result = 0; - while ((**stringPtr >='0') && (**stringPtr <='9')) - result *= 10, result += **stringPtr - '0', (*stringPtr)++ ; - if ((**stringPtr=='K') || (**stringPtr=='M')) { - result <<= 10; - if (**stringPtr=='M') result <<= 10; - (*stringPtr)++ ; - if (**stringPtr=='i') (*stringPtr)++; - if (**stringPtr=='B') (*stringPtr)++; - } - return result; -} - -static void help(const char* progPath) -{ - PRINT("Usage:\n"); - PRINT(" %s [options] [file(s)]\n", progPath); - PRINT("\n"); - PRINT("Options:\n"); - PRINT(" -oFILE : specify the output file name\n"); - PRINT(" -i# : provide initial compression level -- default %d, must be in the range [L, U] where L and U are bound values (see below for defaults)\n", DEFAULT_COMPRESSION_LEVEL); - PRINT(" -h : display help/information\n"); - PRINT(" -f : force the compression level to stay constant\n"); - PRINT(" -c : force write to stdout\n"); - PRINT(" -p : hide progress bar\n"); - PRINT(" -q : quiet mode -- do not show progress bar or other information\n"); - PRINT(" -l# : provide lower bound for compression level -- default 1\n"); - PRINT(" -u# : provide upper bound for compression level -- default %u\n", ZSTD_maxCLevel()); -} -/* return 0 if successful, else return error */ -int main(int argCount, const char* argv[]) -{ - const char* outFilename = NULL; - const char** filenameTable = (const char**)malloc(argCount*sizeof(const char*)); - unsigned filenameIdx = 0; - unsigned forceStdout = 0; - unsigned providedInitialCLevel = 0; - int ret = 0; - int argNum; - filenameTable[0] = stdinmark; - g_maxCLevel = ZSTD_maxCLevel(); - - if (filenameTable == NULL) { - DISPLAY("Error: could not allocate sapce for filename table.\n"); - return 1; - } - - for (argNum=1; argNum<argCount; argNum++) { - const char* argument = argv[argNum]; - - /* output filename designated with "-o" */ - if (argument[0]=='-' && strlen(argument) > 1) { - switch (argument[1]) { - case 'o': - argument += 2; - outFilename = argument; - break; - case 'i': - argument += 2; - g_compressionLevel = readU32FromChar(&argument); - providedInitialCLevel = 1; - break; - case 'h': - help(argv[0]); - goto _main_exit; - case 'p': - g_useProgressBar = 0; - break; - case 'c': - forceStdout = 1; - outFilename = stdoutmark; - break; - case 'f': - g_forceCompressionLevel = 1; - break; - case 'q': - g_useProgressBar = 0; - g_displayLevel = 0; - break; - case 'l': - argument += 2; - g_minCLevel = readU32FromChar(&argument); - break; - case 'u': - argument += 2; - g_maxCLevel = readU32FromChar(&argument); - break; - default: - DISPLAY("Error: invalid argument provided\n"); - ret = 1; - goto _main_exit; - } - continue; - } - - /* regular files to be compressed */ - filenameTable[filenameIdx++] = argument; - } - - /* check initial, max, and min compression levels */ - { - unsigned const minMaxInconsistent = g_minCLevel > g_maxCLevel; - unsigned const initialNotInRange = g_minCLevel > g_compressionLevel || g_maxCLevel < g_compressionLevel; - if (minMaxInconsistent || (initialNotInRange && providedInitialCLevel)) { - DISPLAY("Error: provided compression level parameters are invalid\n"); - ret = 1; - goto _main_exit; - } - else if (initialNotInRange) { - g_compressionLevel = g_minCLevel; - } - } - - /* error checking with number of files */ - if (filenameIdx > 1 && (outFilename != NULL && strcmp(outFilename, stdoutmark))) { - DISPLAY("Error: multiple input files provided, cannot use specified output file\n"); - ret = 1; - goto _main_exit; - } - - /* compress files */ - if (filenameIdx <= 1) { - ret |= compressFilename(filenameTable[0], outFilename); - } - else { - ret |= compressFilenames(filenameTable, filenameIdx, forceStdout); - } -_main_exit: - free(filenameTable); - return ret; -} |