diff options
Diffstat (limited to 'lib/compress/zstdmt_compress.c')
-rw-r--r-- | lib/compress/zstdmt_compress.c | 302 |
1 files changed, 211 insertions, 91 deletions
diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index c7a205d8c757..f4aba1d2c494 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -37,18 +37,19 @@ #define ZSTD_RESIZE_SEQPOOL 0 /* ====== Debug ====== */ -#if defined(ZSTD_DEBUG) && (ZSTD_DEBUG>=2) +#if defined(DEBUGLEVEL) && (DEBUGLEVEL>=2) \ + && !defined(_MSC_VER) \ + && !defined(__MINGW32__) # include <stdio.h> # include <unistd.h> # include <sys/times.h> -# define DEBUGLOGRAW(l, ...) if (l<=ZSTD_DEBUG) { fprintf(stderr, __VA_ARGS__); } # define DEBUG_PRINTHEX(l,p,n) { \ unsigned debug_u; \ for (debug_u=0; debug_u<(n); debug_u++) \ - DEBUGLOGRAW(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \ - DEBUGLOGRAW(l, " \n"); \ + RAWLOG(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \ + RAWLOG(l, " \n"); \ } static unsigned long long GetCurrentClockTimeMicroseconds(void) @@ -62,7 +63,7 @@ static unsigned long long GetCurrentClockTimeMicroseconds(void) #define MUTEX_WAIT_TIME_DLEVEL 6 #define ZSTD_PTHREAD_MUTEX_LOCK(mutex) { \ - if (ZSTD_DEBUG >= MUTEX_WAIT_TIME_DLEVEL) { \ + if (DEBUGLEVEL >= MUTEX_WAIT_TIME_DLEVEL) { \ unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \ ZSTD_pthread_mutex_lock(mutex); \ { unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \ @@ -160,6 +161,25 @@ static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* const bufPool, size_t const ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); } + +static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool, U32 nbWorkers) +{ + unsigned const maxNbBuffers = 2*nbWorkers + 3; + if (srcBufPool==NULL) return NULL; + if (srcBufPool->totalBuffers >= maxNbBuffers) /* good enough */ + return srcBufPool; + /* need a larger buffer pool */ + { ZSTD_customMem const cMem = srcBufPool->cMem; + size_t const bSize = srcBufPool->bufferSize; /* forward parameters */ + ZSTDMT_bufferPool* newBufPool; + ZSTDMT_freeBufferPool(srcBufPool); + newBufPool = ZSTDMT_createBufferPool(nbWorkers, cMem); + if (newBufPool==NULL) return newBufPool; + ZSTDMT_setBufferSize(newBufPool, bSize); + return newBufPool; + } +} + /** ZSTDMT_getBuffer() : * assumption : bufPool must be valid * @return : a buffer, with start pointer and size @@ -229,8 +249,8 @@ static buffer_t ZSTDMT_resizeBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buffer) /* store buffer for later re-use, up to pool capacity */ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) { - if (buf.start == NULL) return; /* compatible with release on NULL */ DEBUGLOG(5, "ZSTDMT_releaseBuffer"); + if (buf.start == NULL) return; /* compatible with release on NULL */ ZSTD_pthread_mutex_lock(&bufPool->poolMutex); if (bufPool->nbBuffers < bufPool->totalBuffers) { bufPool->bTable[bufPool->nbBuffers++] = buf; /* stored for later use */ @@ -300,7 +320,8 @@ static void ZSTDMT_setNbSeq(ZSTDMT_seqPool* const seqPool, size_t const nbSeq) static ZSTDMT_seqPool* ZSTDMT_createSeqPool(unsigned nbWorkers, ZSTD_customMem cMem) { - ZSTDMT_seqPool* seqPool = ZSTDMT_createBufferPool(nbWorkers, cMem); + ZSTDMT_seqPool* const seqPool = ZSTDMT_createBufferPool(nbWorkers, cMem); + if (seqPool == NULL) return NULL; ZSTDMT_setNbSeq(seqPool, 0); return seqPool; } @@ -310,6 +331,10 @@ static void ZSTDMT_freeSeqPool(ZSTDMT_seqPool* seqPool) ZSTDMT_freeBufferPool(seqPool); } +static ZSTDMT_seqPool* ZSTDMT_expandSeqPool(ZSTDMT_seqPool* pool, U32 nbWorkers) +{ + return ZSTDMT_expandBufferPool(pool, nbWorkers); +} /* ===== CCtx Pool ===== */ @@ -355,6 +380,18 @@ static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbWorkers, return cctxPool; } +static ZSTDMT_CCtxPool* ZSTDMT_expandCCtxPool(ZSTDMT_CCtxPool* srcPool, + unsigned nbWorkers) +{ + if (srcPool==NULL) return NULL; + if (nbWorkers <= srcPool->totalCCtx) return srcPool; /* good enough */ + /* need a larger cctx pool */ + { ZSTD_customMem const cMem = srcPool->cMem; + ZSTDMT_freeCCtxPool(srcPool); + return ZSTDMT_createCCtxPool(nbWorkers, cMem); + } +} + /* only works during initialization phase, not during compression */ static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool) { @@ -425,12 +462,11 @@ typedef struct { ZSTD_window_t ldmWindow; /* A thread-safe copy of ldmState.window */ } serialState_t; -static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params) +static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params, size_t jobSize) { /* Adjust parameters */ if (params.ldmParams.enableLdm) { DEBUGLOG(4, "LDM window size = %u KB", (1U << params.cParams.windowLog) >> 10); - params.ldmParams.windowLog = params.cParams.windowLog; ZSTD_ldm_adjustParameters(¶ms.ldmParams, ¶ms.cParams); assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog); assert(params.ldmParams.hashEveryLog < 32); @@ -453,7 +489,7 @@ static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* serialState->params.ldmParams.hashLog - serialState->params.ldmParams.bucketSizeLog; /* Size the seq pool tables */ - ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, params.jobSize)); + ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, jobSize)); /* Reset the window */ ZSTD_window_clear(&serialState->ldmState.window); serialState->ldmWindow = serialState->ldmState.window; @@ -473,6 +509,7 @@ static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* memset(serialState->ldmState.bucketOffsets, 0, bucketSize); } serialState->params = params; + serialState->params.jobSize = (U32)jobSize; return 0; } @@ -505,6 +542,7 @@ static void ZSTDMT_serialState_update(serialState_t* serialState, /* Wait for our turn */ ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex); while (serialState->nextJobID < jobID) { + DEBUGLOG(5, "wait for serialState->cond"); ZSTD_pthread_cond_wait(&serialState->cond, &serialState->mutex); } /* A future job may error and skip our job */ @@ -514,6 +552,7 @@ static void ZSTDMT_serialState_update(serialState_t* serialState, size_t error; assert(seqStore.seq != NULL && seqStore.pos == 0 && seqStore.size == 0 && seqStore.capacity > 0); + assert(src.size <= serialState->params.jobSize); ZSTD_window_update(&serialState->ldmState.window, src.start, src.size); error = ZSTD_ldm_generateSequences( &serialState->ldmState, &seqStore, @@ -593,14 +632,32 @@ typedef struct { unsigned frameChecksumNeeded; /* used only by mtctx */ } ZSTDMT_jobDescription; +#define JOB_ERROR(e) { \ + ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); \ + job->cSize = e; \ + ZSTD_pthread_mutex_unlock(&job->job_mutex); \ + goto _endJob; \ +} + /* ZSTDMT_compressionJob() is a POOL_function type */ -void ZSTDMT_compressionJob(void* jobDescription) +static void ZSTDMT_compressionJob(void* jobDescription) { ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */ ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool); rawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool); buffer_t dstBuff = job->dstBuff; + size_t lastCBlockSize = 0; + + /* ressources */ + if (cctx==NULL) JOB_ERROR(ERROR(memory_allocation)); + if (dstBuff.start == NULL) { /* streaming job : doesn't provide a dstBuffer */ + dstBuff = ZSTDMT_getBuffer(job->bufPool); + if (dstBuff.start==NULL) JOB_ERROR(ERROR(memory_allocation)); + job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */ + } + if (jobParams.ldmParams.enableLdm && rawSeqStore.seq == NULL) + JOB_ERROR(ERROR(memory_allocation)); /* Don't compute the checksum for chunks, since we compute it externally, * but write it in the header. @@ -609,47 +666,31 @@ void ZSTDMT_compressionJob(void* jobDescription) /* Don't run LDM for the chunks, since we handle it externally */ jobParams.ldmParams.enableLdm = 0; - /* ressources */ - if (cctx==NULL) { - job->cSize = ERROR(memory_allocation); - goto _endJob; - } - if (dstBuff.start == NULL) { /* streaming job : doesn't provide a dstBuffer */ - dstBuff = ZSTDMT_getBuffer(job->bufPool); - if (dstBuff.start==NULL) { - job->cSize = ERROR(memory_allocation); - goto _endJob; - } - job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */ - } /* init */ if (job->cdict) { - size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, job->cdict, jobParams, job->fullFrameSize); + size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, jobParams, job->fullFrameSize); assert(job->firstJob); /* only allowed for first job */ - if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } + if (ZSTD_isError(initError)) JOB_ERROR(initError); } else { /* srcStart points at reloaded section */ U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size; { size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstJob); - if (ZSTD_isError(forceWindowError)) { - job->cSize = forceWindowError; - goto _endJob; - } } + if (ZSTD_isError(forceWindowError)) JOB_ERROR(forceWindowError); + } { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */ + ZSTD_dtlm_fast, NULL, /*cdict*/ jobParams, pledgedSrcSize); - if (ZSTD_isError(initError)) { - job->cSize = initError; - goto _endJob; - } } } + if (ZSTD_isError(initError)) JOB_ERROR(initError); + } } /* Perform serial step as early as possible, but after CCtx initialization */ ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID); if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */ size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0); - if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; } + if (ZSTD_isError(hSize)) JOB_ERROR(hSize); DEBUGLOG(5, "ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (U32)hSize); ZSTD_invalidateRepCodes(cctx); } @@ -667,7 +708,7 @@ void ZSTDMT_compressionJob(void* jobDescription) assert(job->cSize == 0); for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) { size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, chunkSize); - if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } + if (ZSTD_isError(cSize)) JOB_ERROR(cSize); ip += chunkSize; op += cSize; assert(op < oend); /* stats */ @@ -680,18 +721,16 @@ void ZSTDMT_compressionJob(void* jobDescription) ZSTD_pthread_mutex_unlock(&job->job_mutex); } /* last block */ - assert(chunkSize > 0); assert((chunkSize & (chunkSize - 1)) == 0); /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */ + assert(chunkSize > 0); + assert((chunkSize & (chunkSize - 1)) == 0); /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */ if ((nbChunks > 0) | job->lastJob /*must output a "last block" flag*/ ) { size_t const lastBlockSize1 = job->src.size & (chunkSize-1); size_t const lastBlockSize = ((lastBlockSize1==0) & (job->src.size>=chunkSize)) ? chunkSize : lastBlockSize1; size_t const cSize = (job->lastJob) ? ZSTD_compressEnd (cctx, op, oend-op, ip, lastBlockSize) : ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); - if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } - /* stats */ - ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); - job->cSize += cSize; - ZSTD_pthread_mutex_unlock(&job->job_mutex); + if (ZSTD_isError(cSize)) JOB_ERROR(cSize); + lastCBlockSize = cSize; } } _endJob: @@ -704,7 +743,9 @@ _endJob: ZSTDMT_releaseCCtx(job->cctxPool, cctx); /* report */ ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); - job->consumed = job->src.size; + if (ZSTD_isError(job->cSize)) assert(lastCBlockSize == 0); + job->cSize += lastCBlockSize; + job->consumed = job->src.size; /* when job->consumed == job->src.size , compression job is presumed completed */ ZSTD_pthread_cond_signal(&job->job_cond); ZSTD_pthread_mutex_unlock(&job->job_mutex); } @@ -745,9 +786,9 @@ struct ZSTDMT_CCtx_s { ZSTD_CCtx_params params; size_t targetSectionSize; size_t targetPrefixSize; - roundBuff_t roundBuff; + int jobReady; /* 1 => one job is already prepared, but pool has shortage of workers. Don't create a new job. */ inBuff_t inBuff; - int jobReady; /* 1 => one job is already prepared, but pool has shortage of workers. Don't create another one. */ + roundBuff_t roundBuff; serialState_t serial; unsigned singleBlockingThread; unsigned jobIDMask; @@ -798,6 +839,20 @@ static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_custom return jobTable; } +static size_t ZSTDMT_expandJobsTable (ZSTDMT_CCtx* mtctx, U32 nbWorkers) { + U32 nbJobs = nbWorkers + 2; + if (nbJobs > mtctx->jobIDMask+1) { /* need more job capacity */ + ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem); + mtctx->jobIDMask = 0; + mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, mtctx->cMem); + if (mtctx->jobs==NULL) return ERROR(memory_allocation); + assert((nbJobs != 0) && ((nbJobs & (nbJobs - 1)) == 0)); /* ensure nbJobs is a power of 2 */ + mtctx->jobIDMask = nbJobs - 1; + } + return 0; +} + + /* ZSTDMT_CCtxParam_setNbWorkers(): * Internal use only */ size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers) @@ -875,7 +930,7 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx) unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask; ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex); while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) { - DEBUGLOG(5, "waiting for jobCompleted signal from job %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */ + DEBUGLOG(4, "waiting for jobCompleted signal from job %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */ ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex); } ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex); @@ -924,6 +979,8 @@ size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, if ( (value > 0) /* value==0 => automatic job size */ & (value < ZSTDMT_JOBSIZE_MIN) ) value = ZSTDMT_JOBSIZE_MIN; + if (value > ZSTDMT_JOBSIZE_MAX) + value = ZSTDMT_JOBSIZE_MAX; params->jobSize = value; return value; case ZSTDMT_p_overlapSectionLog : @@ -950,6 +1007,21 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, } } +size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned* value) +{ + switch (parameter) { + case ZSTDMT_p_jobSize: + *value = mtctx->params.jobSize; + break; + case ZSTDMT_p_overlapSectionLog: + *value = mtctx->params.overlapSizeLog; + break; + default: + return ERROR(parameter_unsupported); + } + return 0; +} + /* Sets parameters relevant to the compression job, * initializing others to default values. */ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) @@ -960,13 +1032,30 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) jobParams.cParams = params.cParams; jobParams.fParams = params.fParams; jobParams.compressionLevel = params.compressionLevel; - jobParams.disableLiteralCompression = params.disableLiteralCompression; return jobParams; } + +/* ZSTDMT_resize() : + * @return : error code if fails, 0 on success */ +static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers) +{ + if (POOL_resize(mtctx->factory, nbWorkers)) return ERROR(memory_allocation); + CHECK_F( ZSTDMT_expandJobsTable(mtctx, nbWorkers) ); + mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, nbWorkers); + if (mtctx->bufPool == NULL) return ERROR(memory_allocation); + mtctx->cctxPool = ZSTDMT_expandCCtxPool(mtctx->cctxPool, nbWorkers); + if (mtctx->cctxPool == NULL) return ERROR(memory_allocation); + mtctx->seqPool = ZSTDMT_expandSeqPool(mtctx->seqPool, nbWorkers); + if (mtctx->seqPool == NULL) return ERROR(memory_allocation); + ZSTDMT_CCtxParam_setNbWorkers(&mtctx->params, nbWorkers); + return 0; +} + + /*! ZSTDMT_updateCParams_whileCompressing() : - * Updates only a selected set of compression parameters, to remain compatible with current frame. + * Updates a selected set of compression parameters, remaining compatible with currently active frame. * New parameters will be applied to next compression job. */ void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams) { @@ -981,38 +1070,36 @@ void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_p } } -/* ZSTDMT_getNbWorkers(): - * @return nb threads currently active in mtctx. - * mtctx must be valid */ -unsigned ZSTDMT_getNbWorkers(const ZSTDMT_CCtx* mtctx) -{ - assert(mtctx != NULL); - return mtctx->params.nbWorkers; -} - /* ZSTDMT_getFrameProgression(): * tells how much data has been consumed (input) and produced (output) for current frame. * able to count progression inside worker threads. - * Note : mutex will be acquired during statistics collection. */ + * Note : mutex will be acquired during statistics collection inside workers. */ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) { ZSTD_frameProgression fps; - DEBUGLOG(6, "ZSTDMT_getFrameProgression"); - fps.consumed = mtctx->consumed; - fps.produced = mtctx->produced; + DEBUGLOG(5, "ZSTDMT_getFrameProgression"); fps.ingested = mtctx->consumed + mtctx->inBuff.filled; + fps.consumed = mtctx->consumed; + fps.produced = fps.flushed = mtctx->produced; + fps.currentJobID = mtctx->nextJobID; + fps.nbActiveWorkers = 0; { unsigned jobNb; unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1); DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)", mtctx->doneJobID, lastJobNb, mtctx->jobReady) for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) { unsigned const wJobID = jobNb & mtctx->jobIDMask; - ZSTD_pthread_mutex_lock(&mtctx->jobs[wJobID].job_mutex); - { size_t const cResult = mtctx->jobs[wJobID].cSize; + ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID]; + ZSTD_pthread_mutex_lock(&jobPtr->job_mutex); + { size_t const cResult = jobPtr->cSize; size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; - fps.consumed += mtctx->jobs[wJobID].consumed; - fps.ingested += mtctx->jobs[wJobID].src.size; + size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed; + assert(flushed <= produced); + fps.ingested += jobPtr->src.size; + fps.consumed += jobPtr->consumed; fps.produced += produced; + fps.flushed += flushed; + fps.nbActiveWorkers += (jobPtr->consumed < jobPtr->src.size); } ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); } @@ -1021,6 +1108,34 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) } +size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx) +{ + size_t toFlush; + unsigned const jobID = mtctx->doneJobID; + assert(jobID <= mtctx->nextJobID); + if (jobID == mtctx->nextJobID) return 0; /* no active job => nothing to flush */ + + /* look into oldest non-fully-flushed job */ + { unsigned const wJobID = jobID & mtctx->jobIDMask; + ZSTDMT_jobDescription* const jobPtr = &mtctx->jobs[wJobID]; + ZSTD_pthread_mutex_lock(&jobPtr->job_mutex); + { size_t const cResult = jobPtr->cSize; + size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; + size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed; + assert(flushed <= produced); + toFlush = produced - flushed; + if (toFlush==0 && (jobPtr->consumed >= jobPtr->src.size)) { + /* doneJobID is not-fully-flushed, but toFlush==0 : doneJobID should be compressing some more data */ + assert(jobPtr->consumed < jobPtr->src.size); + } + } + ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); + } + + return toFlush; +} + + /* ------------------------------------------ */ /* ===== Multi-threaded compression ===== */ /* ------------------------------------------ */ @@ -1087,18 +1202,10 @@ static size_t ZSTDMT_compress_advanced_internal( assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */ ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) ); - if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params)) + if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, avgJobSize)) return ERROR(memory_allocation); - if (nbJobs > mtctx->jobIDMask+1) { /* enlarge job table */ - U32 jobsTableSize = nbJobs; - ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem); - mtctx->jobIDMask = 0; - mtctx->jobs = ZSTDMT_createJobsTable(&jobsTableSize, mtctx->cMem); - if (mtctx->jobs==NULL) return ERROR(memory_allocation); - assert((jobsTableSize != 0) && ((jobsTableSize & (jobsTableSize - 1)) == 0)); /* ensure jobsTableSize is a power of 2 */ - mtctx->jobIDMask = jobsTableSize - 1; - } + CHECK_F( ZSTDMT_expandJobsTable(mtctx, nbJobs) ); /* only expands if necessary */ { unsigned u; for (u=0; u<nbJobs; u++) { @@ -1221,17 +1328,18 @@ size_t ZSTDMT_initCStream_internal( const ZSTD_CDict* cdict, ZSTD_CCtx_params params, unsigned long long pledgedSrcSize) { - DEBUGLOG(4, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u, nbWorkers=%u, cctxPool=%u, disableLiteralCompression=%i)", - (U32)pledgedSrcSize, params.nbWorkers, mtctx->cctxPool->totalCCtx, params.disableLiteralCompression); - /* params are supposed to be fully validated at this point */ + DEBUGLOG(4, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u, nbWorkers=%u, cctxPool=%u)", + (U32)pledgedSrcSize, params.nbWorkers, mtctx->cctxPool->totalCCtx); + + /* params supposed partially fully validated at this point */ assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); assert(!((dict) && (cdict))); /* either dict or cdict, not both */ - assert(mtctx->cctxPool->totalCCtx == params.nbWorkers); /* init */ - if (params.jobSize == 0) { - params.jobSize = 1U << ZSTDMT_computeTargetJobLog(params); - } + if (params.nbWorkers != mtctx->params.nbWorkers) + CHECK_F( ZSTDMT_resize(mtctx, params.nbWorkers) ); + + if (params.jobSize > 0 && params.jobSize < ZSTDMT_JOBSIZE_MIN) params.jobSize = ZSTDMT_JOBSIZE_MIN; if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX; mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */ @@ -1270,7 +1378,9 @@ size_t ZSTDMT_initCStream_internal( mtctx->targetPrefixSize = (size_t)1 << ZSTDMT_computeOverlapLog(params); DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(mtctx->targetPrefixSize>>10)); mtctx->targetSectionSize = params.jobSize; - if (mtctx->targetSectionSize < ZSTDMT_JOBSIZE_MIN) mtctx->targetSectionSize = ZSTDMT_JOBSIZE_MIN; + if (mtctx->targetSectionSize == 0) { + mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(params); + } if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */ DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize); DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10)); @@ -1312,7 +1422,7 @@ size_t ZSTDMT_initCStream_internal( mtctx->allJobsCompleted = 0; mtctx->consumed = 0; mtctx->produced = 0; - if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params)) + if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetSectionSize)) return ERROR(memory_allocation); return 0; } @@ -1420,7 +1530,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS mtctx->jobs[jobID].jobID = mtctx->nextJobID; mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0); mtctx->jobs[jobID].lastJob = endFrame; - mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag; + mtctx->jobs[jobID].frameChecksumNeeded = mtctx->params.fParams.checksumFlag && endFrame && (mtctx->nextJobID>0); mtctx->jobs[jobID].dstFlushed = 0; /* Update the round buffer pos and clear the input buffer to be reset */ @@ -1468,6 +1578,8 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS /*! ZSTDMT_flushProduced() : + * flush whatever data has been produced but not yet flushed in current job. + * move to next job if current one is fully flushed. * `output` : `pos` will be updated with amount of data flushed . * `blockToFlush` : if >0, the function will block and wait if there is no data available to flush . * @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */ @@ -1496,7 +1608,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u /* try to flush something */ { size_t cSize = mtctx->jobs[wJobID].cSize; /* shared */ size_t const srcConsumed = mtctx->jobs[wJobID].consumed; /* shared */ - size_t const srcSize = mtctx->jobs[wJobID].src.size; /* read-only, could be done after mutex lock, but no-declaration-after-statement */ + size_t const srcSize = mtctx->jobs[wJobID].src.size; /* read-only, could be done after mutex lock, but no-declaration-after-statement */ ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); if (ZSTD_isError(cSize)) { DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s", @@ -1516,6 +1628,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u mtctx->jobs[wJobID].cSize += 4; /* can write this shared value, as worker is no longer active */ mtctx->jobs[wJobID].frameChecksumNeeded = 0; } + if (cSize > 0) { /* compression is ongoing or completed */ size_t const toFlush = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos); DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)", @@ -1529,11 +1642,12 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u output->pos += toFlush; mtctx->jobs[wJobID].dstFlushed += toFlush; /* can write : this value is only used by mtctx */ - if ( (srcConsumed == srcSize) /* job completed */ + if ( (srcConsumed == srcSize) /* job is completed */ && (mtctx->jobs[wJobID].dstFlushed == cSize) ) { /* output buffer fully flushed => free this job position */ DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one", mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff); + DEBUGLOG(5, "dstBuffer released"); mtctx->jobs[wJobID].dstBuff = g_nullBuffer; mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */ mtctx->consumed += srcSize; @@ -1610,6 +1724,7 @@ static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window) range_t extDict; range_t prefix; + DEBUGLOG(5, "ZSTDMT_doesOverlapWindow"); extDict.start = window.dictBase + window.lowLimit; extDict.size = window.dictLimit - window.lowLimit; @@ -1630,12 +1745,13 @@ static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer) { if (mtctx->params.ldmParams.enableLdm) { ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex; + DEBUGLOG(5, "ZSTDMT_waitForLdmComplete"); DEBUGLOG(5, "source [0x%zx, 0x%zx)", (size_t)buffer.start, (size_t)buffer.start + buffer.capacity); ZSTD_PTHREAD_MUTEX_LOCK(mutex); while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) { - DEBUGLOG(6, "Waiting for LDM to finish..."); + DEBUGLOG(5, "Waiting for LDM to finish..."); ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex); } DEBUGLOG(6, "Done waiting for LDM to finish"); @@ -1655,6 +1771,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx) size_t const target = mtctx->targetSectionSize; buffer_t buffer; + DEBUGLOG(5, "ZSTDMT_tryGetInputRange"); assert(mtctx->inBuff.buffer.start == NULL); assert(mtctx->roundBuff.capacity >= target); @@ -1668,7 +1785,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx) buffer.start = start; buffer.capacity = prefixSize; if (ZSTDMT_isOverlapped(buffer, inUse)) { - DEBUGLOG(6, "Waiting for buffer..."); + DEBUGLOG(5, "Waiting for buffer..."); return 0; } ZSTDMT_waitForLdmComplete(mtctx, buffer); @@ -1680,7 +1797,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx) buffer.capacity = target; if (ZSTDMT_isOverlapped(buffer, inUse)) { - DEBUGLOG(6, "Waiting for buffer..."); + DEBUGLOG(5, "Waiting for buffer..."); return 0; } assert(!ZSTDMT_isOverlapped(buffer, mtctx->inBuff.prefix)); @@ -1753,8 +1870,10 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, /* It is only possible for this operation to fail if there are * still compression jobs ongoing. */ + DEBUGLOG(5, "ZSTDMT_tryGetInputRange failed"); assert(mtctx->doneJobID != mtctx->nextJobID); - } + } else + DEBUGLOG(5, "ZSTDMT_tryGetInputRange completed successfully : mtctx->inBuff.buffer.start = %p", mtctx->inBuff.buffer.start); } if (mtctx->inBuff.buffer.start != NULL) { size_t const toLoad = MIN(input->size - input->pos, mtctx->targetSectionSize - mtctx->inBuff.filled); @@ -1782,6 +1901,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, /* check for potential compressed data ready to be flushed */ { size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */ if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not end flush yet */ + DEBUGLOG(5, "end of ZSTDMT_compressStream_generic: remainingToFlush = %u", (U32)remainingToFlush); return remainingToFlush; } } |