aboutsummaryrefslogtreecommitdiff
path: root/subversion/libsvn_fs_x/batch_fsync.c
diff options
context:
space:
mode:
authorPeter Wemm <peter@FreeBSD.org>2018-05-08 03:44:38 +0000
committerPeter Wemm <peter@FreeBSD.org>2018-05-08 03:44:38 +0000
commit3faf8d6bffc5d0fb2525ba37bb504c53366caf9d (patch)
tree7e47911263e75034b767fe34b2f8d3d17e91f66d /subversion/libsvn_fs_x/batch_fsync.c
parenta55fb3c0d5eca7d887798125d5b95942b1f01d4b (diff)
Diffstat (limited to 'subversion/libsvn_fs_x/batch_fsync.c')
-rw-r--r--subversion/libsvn_fs_x/batch_fsync.c588
1 files changed, 588 insertions, 0 deletions
diff --git a/subversion/libsvn_fs_x/batch_fsync.c b/subversion/libsvn_fs_x/batch_fsync.c
new file mode 100644
index 000000000000..4116d7d7614c
--- /dev/null
+++ b/subversion/libsvn_fs_x/batch_fsync.c
@@ -0,0 +1,588 @@
+/* batch_fsync.c --- efficiently fsync multiple targets
+ *
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ */
+
+#include <apr_thread_pool.h>
+#include <apr_thread_cond.h>
+
+#include "batch_fsync.h"
+#include "svn_pools.h"
+#include "svn_hash.h"
+#include "svn_dirent_uri.h"
+#include "svn_private_config.h"
+
+#include "private/svn_atomic.h"
+#include "private/svn_dep_compat.h"
+#include "private/svn_mutex.h"
+#include "private/svn_subr_private.h"
+
+/* Handy macro to check APR function results and turning them into
+ * svn_error_t upon failure. */
+#define WRAP_APR_ERR(x,msg) \
+ { \
+ apr_status_t status_ = (x); \
+ if (status_) \
+ return svn_error_wrap_apr(status_, msg); \
+ }
+
+
+/* A simple SVN-wrapper around the apr_thread_cond_* API */
+#if APR_HAS_THREADS
+typedef apr_thread_cond_t svn_thread_cond__t;
+#else
+typedef int svn_thread_cond__t;
+#endif
+
+static svn_error_t *
+svn_thread_cond__create(svn_thread_cond__t **cond,
+ apr_pool_t *result_pool)
+{
+#if APR_HAS_THREADS
+
+ WRAP_APR_ERR(apr_thread_cond_create(cond, result_pool),
+ _("Can't create condition variable"));
+
+#else
+
+ *cond = apr_pcalloc(result_pool, sizeof(**cond));
+
+#endif
+
+ return SVN_NO_ERROR;
+}
+
+static svn_error_t *
+svn_thread_cond__broadcast(svn_thread_cond__t *cond)
+{
+#if APR_HAS_THREADS
+
+ WRAP_APR_ERR(apr_thread_cond_broadcast(cond),
+ _("Can't broadcast condition variable"));
+
+#endif
+
+ return SVN_NO_ERROR;
+}
+
+static svn_error_t *
+svn_thread_cond__wait(svn_thread_cond__t *cond,
+ svn_mutex__t *mutex)
+{
+#if APR_HAS_THREADS
+
+ WRAP_APR_ERR(apr_thread_cond_wait(cond, svn_mutex__get(mutex)),
+ _("Can't broadcast condition variable"));
+
+#endif
+
+ return SVN_NO_ERROR;
+}
+
+/* Utility construct: Clients can efficiently wait for the encapsulated
+ * counter to reach a certain value. Currently, only increments have been
+ * implemented. This whole structure can be opaque to the API users.
+ */
+typedef struct waitable_counter_t
+{
+ /* Current value, initialized to 0. */
+ int value;
+
+ /* Synchronization objects. */
+ svn_thread_cond__t *cond;
+ svn_mutex__t *mutex;
+} waitable_counter_t;
+
+/* Set *COUNTER_P to a new waitable_counter_t instance allocated in
+ * RESULT_POOL. The initial counter value is 0. */
+static svn_error_t *
+waitable_counter__create(waitable_counter_t **counter_p,
+ apr_pool_t *result_pool)
+{
+ waitable_counter_t *counter = apr_pcalloc(result_pool, sizeof(*counter));
+ counter->value = 0;
+
+ SVN_ERR(svn_thread_cond__create(&counter->cond, result_pool));
+ SVN_ERR(svn_mutex__init(&counter->mutex, TRUE, result_pool));
+
+ *counter_p = counter;
+
+ return SVN_NO_ERROR;
+}
+
+/* Increment the value in COUNTER by 1. */
+static svn_error_t *
+waitable_counter__increment(waitable_counter_t *counter)
+{
+ SVN_ERR(svn_mutex__lock(counter->mutex));
+ counter->value++;
+
+ SVN_ERR(svn_thread_cond__broadcast(counter->cond));
+ SVN_ERR(svn_mutex__unlock(counter->mutex, SVN_NO_ERROR));
+
+ return SVN_NO_ERROR;
+}
+
+/* Efficiently wait for COUNTER to assume VALUE. */
+static svn_error_t *
+waitable_counter__wait_for(waitable_counter_t *counter,
+ int value)
+{
+ svn_boolean_t done = FALSE;
+
+ /* This loop implicitly handles spurious wake-ups. */
+ do
+ {
+ SVN_ERR(svn_mutex__lock(counter->mutex));
+
+ if (counter->value == value)
+ done = TRUE;
+ else
+ SVN_ERR(svn_thread_cond__wait(counter->cond, counter->mutex));
+
+ SVN_ERR(svn_mutex__unlock(counter->mutex, SVN_NO_ERROR));
+ }
+ while (!done);
+
+ return SVN_NO_ERROR;
+}
+
+/* Set the value in COUNTER to 0. */
+static svn_error_t *
+waitable_counter__reset(waitable_counter_t *counter)
+{
+ SVN_ERR(svn_mutex__lock(counter->mutex));
+ counter->value = 0;
+ SVN_ERR(svn_mutex__unlock(counter->mutex, SVN_NO_ERROR));
+
+ SVN_ERR(svn_thread_cond__broadcast(counter->cond));
+
+ return SVN_NO_ERROR;
+}
+
+/* Entry type for the svn_fs_x__batch_fsync_t collection. There is one
+ * instance per file handle.
+ */
+typedef struct to_sync_t
+{
+ /* Open handle of the file / directory to fsync. */
+ apr_file_t *file;
+
+ /* Pool to use with FILE. It is private to FILE such that it can be
+ * used safely together with FILE in a separate thread. */
+ apr_pool_t *pool;
+
+ /* Result of the file operations. */
+ svn_error_t *result;
+
+ /* Counter to increment when we completed the task. */
+ waitable_counter_t *counter;
+} to_sync_t;
+
+/* The actual collection object. */
+struct svn_fs_x__batch_fsync_t
+{
+ /* Maps open file handles: C-string path to to_sync_t *. */
+ apr_hash_t *files;
+
+ /* Counts the number of completed fsync tasks. */
+ waitable_counter_t *counter;
+
+ /* Perform fsyncs only if this flag has been set. */
+ svn_boolean_t flush_to_disk;
+};
+
+/* Data structures for concurrent fsync execution are only available if
+ * we have threading support.
+ */
+#if APR_HAS_THREADS
+
+/* Number of microseconds that an unused thread remains in the pool before
+ * being terminated.
+ *
+ * Higher values are useful if clients frequently send small requests and
+ * you want to minimize the latency for those.
+ */
+#define THREADPOOL_THREAD_IDLE_LIMIT 1000000
+
+/* Maximum number of threads in THREAD_POOL, i.e. number of paths we can
+ * fsync concurrently throughout the process. */
+#define MAX_THREADS 16
+
+/* Thread pool to execute the fsync tasks. */
+static apr_thread_pool_t *thread_pool = NULL;
+
+#endif
+
+/* Keep track on whether we already created the THREAD_POOL . */
+static svn_atomic_t thread_pool_initialized = FALSE;
+
+/* We open non-directory files with these flags. */
+#define FILE_FLAGS (APR_READ | APR_WRITE | APR_BUFFERED | APR_CREATE)
+
+#if APR_HAS_THREADS
+
+/* Destructor function that implicitly cleans up any running threads
+ in the TRHEAD_POOL *once*.
+
+ Must be run as a pre-cleanup hook.
+ */
+static apr_status_t
+thread_pool_pre_cleanup(void *data)
+{
+ apr_thread_pool_t *tp = thread_pool;
+ if (!thread_pool)
+ return APR_SUCCESS;
+
+ thread_pool = NULL;
+ thread_pool_initialized = FALSE;
+
+ return apr_thread_pool_destroy(tp);
+}
+
+#endif
+
+/* Core implementation of svn_fs_x__batch_fsync_init. */
+static svn_error_t *
+create_thread_pool(void *baton,
+ apr_pool_t *owning_pool)
+{
+#if APR_HAS_THREADS
+ /* The thread-pool must be allocated from a thread-safe pool.
+ GLOBAL_POOL may be single-threaded, though. */
+ apr_pool_t *pool = svn_pool_create(NULL);
+
+ /* This thread pool will get cleaned up automatically when GLOBAL_POOL
+ gets cleared. No additional cleanup callback is needed. */
+ WRAP_APR_ERR(apr_thread_pool_create(&thread_pool, 0, MAX_THREADS, pool),
+ _("Can't create fsync thread pool in FSX"));
+
+ /* Work around an APR bug: The cleanup must happen in the pre-cleanup
+ hook instead of the normal cleanup hook. Otherwise, the sub-pools
+ containing the thread objects would already be invalid. */
+ apr_pool_pre_cleanup_register(pool, NULL, thread_pool_pre_cleanup);
+ apr_pool_pre_cleanup_register(owning_pool, NULL, thread_pool_pre_cleanup);
+
+ /* let idle threads linger for a while in case more requests are
+ coming in */
+ apr_thread_pool_idle_wait_set(thread_pool, THREADPOOL_THREAD_IDLE_LIMIT);
+
+ /* don't queue requests unless we reached the worker thread limit */
+ apr_thread_pool_threshold_set(thread_pool, 0);
+
+#endif
+
+ return SVN_NO_ERROR;
+}
+
+svn_error_t *
+svn_fs_x__batch_fsync_init(apr_pool_t *owning_pool)
+{
+ /* Protect against multiple calls. */
+ return svn_error_trace(svn_atomic__init_once(&thread_pool_initialized,
+ create_thread_pool,
+ NULL, owning_pool));
+}
+
+/* Destructor for svn_fs_x__batch_fsync_t. Releases all global pool memory
+ * and closes all open file handles. */
+static apr_status_t
+fsync_batch_cleanup(void *data)
+{
+ svn_fs_x__batch_fsync_t *batch = data;
+ apr_hash_index_t *hi;
+
+ /* Close all files (implicitly) and release memory. */
+ for (hi = apr_hash_first(apr_hash_pool_get(batch->files), batch->files);
+ hi;
+ hi = apr_hash_next(hi))
+ {
+ to_sync_t *to_sync = apr_hash_this_val(hi);
+ svn_pool_destroy(to_sync->pool);
+ }
+
+ return APR_SUCCESS;
+}
+
+svn_error_t *
+svn_fs_x__batch_fsync_create(svn_fs_x__batch_fsync_t **result_p,
+ svn_boolean_t flush_to_disk,
+ apr_pool_t *result_pool)
+{
+ svn_fs_x__batch_fsync_t *result = apr_pcalloc(result_pool, sizeof(*result));
+ result->files = svn_hash__make(result_pool);
+ result->flush_to_disk = flush_to_disk;
+
+ SVN_ERR(waitable_counter__create(&result->counter, result_pool));
+ apr_pool_cleanup_register(result_pool, result, fsync_batch_cleanup,
+ apr_pool_cleanup_null);
+
+ *result_p = result;
+
+ return SVN_NO_ERROR;
+}
+
+/* If BATCH does not contain a handle for PATH, yet, create one with FLAGS
+ * and add it to BATCH. Set *FILE to the open file handle.
+ * Use SCRATCH_POOL for temporaries.
+ */
+static svn_error_t *
+internal_open_file(apr_file_t **file,
+ svn_fs_x__batch_fsync_t *batch,
+ const char *path,
+ apr_int32_t flags,
+ apr_pool_t *scratch_pool)
+{
+ svn_error_t *err;
+ apr_pool_t *pool;
+ to_sync_t *to_sync;
+#ifdef SVN_ON_POSIX
+ svn_boolean_t is_new_file;
+#endif
+
+ /* If we already have a handle for PATH, return that. */
+ to_sync = svn_hash_gets(batch->files, path);
+ if (to_sync)
+ {
+ *file = to_sync->file;
+ return SVN_NO_ERROR;
+ }
+
+ /* Calling fsync in PATH is going to be expensive in any case, so we can
+ * allow for some extra overhead figuring out whether the file already
+ * exists. If it doesn't, be sure to schedule parent folder updates, if
+ * required on this platform.
+ *
+ * See svn_fs_x__batch_fsync_new_path() for when such extra fsyncs may be
+ * needed at all. */
+
+#ifdef SVN_ON_POSIX
+
+ is_new_file = FALSE;
+ if (flags & APR_CREATE)
+ {
+ svn_node_kind_t kind;
+ /* We might actually be about to create a new file.
+ * Check whether the file already exists. */
+ SVN_ERR(svn_io_check_path(path, &kind, scratch_pool));
+ is_new_file = kind == svn_node_none;
+ }
+
+#endif
+
+ /* To be able to process each file in a separate thread, they must use
+ * separate, thread-safe pools. Allocating a sub-pool from the standard
+ * memory pool achieves exactly that. */
+ pool = svn_pool_create(NULL);
+ err = svn_io_file_open(file, path, flags, APR_OS_DEFAULT, pool);
+ if (err)
+ {
+ svn_pool_destroy(pool);
+ return svn_error_trace(err);
+ }
+
+ to_sync = apr_pcalloc(pool, sizeof(*to_sync));
+ to_sync->file = *file;
+ to_sync->pool = pool;
+ to_sync->result = SVN_NO_ERROR;
+ to_sync->counter = batch->counter;
+
+ svn_hash_sets(batch->files,
+ apr_pstrdup(apr_hash_pool_get(batch->files), path),
+ to_sync);
+
+ /* If we just created a new file, schedule any additional necessary fsyncs.
+ * Note that this can only recurse once since the parent folder already
+ * exists on disk. */
+#ifdef SVN_ON_POSIX
+
+ if (is_new_file)
+ SVN_ERR(svn_fs_x__batch_fsync_new_path(batch, path, scratch_pool));
+
+#endif
+
+ return SVN_NO_ERROR;
+}
+
+svn_error_t *
+svn_fs_x__batch_fsync_open_file(apr_file_t **file,
+ svn_fs_x__batch_fsync_t *batch,
+ const char *filename,
+ apr_pool_t *scratch_pool)
+{
+ apr_off_t offset = 0;
+
+ SVN_ERR(internal_open_file(file, batch, filename, FILE_FLAGS,
+ scratch_pool));
+ SVN_ERR(svn_io_file_seek(*file, APR_SET, &offset, scratch_pool));
+
+ return SVN_NO_ERROR;
+}
+
+svn_error_t *
+svn_fs_x__batch_fsync_new_path(svn_fs_x__batch_fsync_t *batch,
+ const char *path,
+ apr_pool_t *scratch_pool)
+{
+ apr_file_t *file;
+
+#ifdef SVN_ON_POSIX
+
+ /* On POSIX, we need to sync the parent directory because it contains
+ * the name for the file / folder given by PATH. */
+ path = svn_dirent_dirname(path, scratch_pool);
+ SVN_ERR(internal_open_file(&file, batch, path, APR_READ, scratch_pool));
+
+#else
+
+ svn_node_kind_t kind;
+
+ /* On non-POSIX systems, we assume that sync'ing the given PATH is the
+ * right thing to do. Also, we assume that only files may be sync'ed. */
+ SVN_ERR(svn_io_check_path(path, &kind, scratch_pool));
+ if (kind == svn_node_file)
+ SVN_ERR(internal_open_file(&file, batch, path, FILE_FLAGS,
+ scratch_pool));
+
+#endif
+
+ return SVN_NO_ERROR;
+}
+
+/* Thread-pool task Flush the to_sync_t instance given by DATA. */
+static void * APR_THREAD_FUNC
+flush_task(apr_thread_t *tid,
+ void *data)
+{
+ to_sync_t *to_sync = data;
+
+ to_sync->result = svn_error_trace(svn_io_file_flush_to_disk
+ (to_sync->file, to_sync->pool));
+
+ /* As soon as the increment call returns, TO_SYNC may be invalid
+ (the main thread may have woken up and released the struct.
+
+ Therefore, we cannot chain this error into TO_SYNC->RESULT.
+ OTOH, the main thread will probably deadlock anyway if we got
+ an error here, thus there is no point in trying to tell the
+ main thread what the problem was. */
+ svn_error_clear(waitable_counter__increment(to_sync->counter));
+
+ return NULL;
+}
+
+svn_error_t *
+svn_fs_x__batch_fsync_run(svn_fs_x__batch_fsync_t *batch,
+ apr_pool_t *scratch_pool)
+{
+ apr_hash_index_t *hi;
+
+ /* Number of tasks sent to the thread pool. */
+ int tasks = 0;
+
+ /* Because we allocated the open files from our global pool, don't bail
+ * out on the first error. Instead, process all files and but accumulate
+ * the errors in this chain.
+ */
+ svn_error_t *chain = SVN_NO_ERROR;
+
+ /* First, flush APR-internal buffers. This should minimize / prevent the
+ * introduction of additional meta-data changes during the next phase.
+ * We might otherwise issue redundant fsyncs.
+ */
+ for (hi = apr_hash_first(scratch_pool, batch->files);
+ hi;
+ hi = apr_hash_next(hi))
+ {
+ to_sync_t *to_sync = apr_hash_this_val(hi);
+ to_sync->result = svn_error_trace(svn_io_file_flush
+ (to_sync->file, to_sync->pool));
+ }
+
+ /* Make sure the task completion counter is set to 0. */
+ chain = svn_error_compose_create(chain,
+ waitable_counter__reset(batch->counter));
+
+ /* Start the actual fsyncing process. */
+ if (batch->flush_to_disk)
+ {
+ for (hi = apr_hash_first(scratch_pool, batch->files);
+ hi;
+ hi = apr_hash_next(hi))
+ {
+ to_sync_t *to_sync = apr_hash_this_val(hi);
+
+#if APR_HAS_THREADS
+
+ /* Forgot to call _init() or cleaned up the owning pool too early?
+ */
+ SVN_ERR_ASSERT(thread_pool);
+
+ /* If there are multiple fsyncs to perform, run them in parallel.
+ * Otherwise, skip the thread-pool and synchronization overhead. */
+ if (apr_hash_count(batch->files) > 1)
+ {
+ apr_status_t status = APR_SUCCESS;
+ status = apr_thread_pool_push(thread_pool, flush_task, to_sync,
+ 0, NULL);
+ if (status)
+ to_sync->result = svn_error_wrap_apr(status,
+ _("Can't push task"));
+ else
+ tasks++;
+ }
+ else
+
+#endif
+
+ {
+ to_sync->result = svn_error_trace(svn_io_file_flush_to_disk
+ (to_sync->file,
+ to_sync->pool));
+ }
+ }
+ }
+
+ /* Wait for all outstanding flush operations to complete. */
+ chain = svn_error_compose_create(chain,
+ waitable_counter__wait_for(batch->counter,
+ tasks));
+
+ /* Collect the results, close all files and release memory. */
+ for (hi = apr_hash_first(scratch_pool, batch->files);
+ hi;
+ hi = apr_hash_next(hi))
+ {
+ to_sync_t *to_sync = apr_hash_this_val(hi);
+ if (batch->flush_to_disk)
+ chain = svn_error_compose_create(chain, to_sync->result);
+
+ chain = svn_error_compose_create(chain,
+ svn_io_file_close(to_sync->file,
+ scratch_pool));
+ svn_pool_destroy(to_sync->pool);
+ }
+
+ /* Don't process any file / folder twice. */
+ apr_hash_clear(batch->files);
+
+ /* Report the errors that we encountered. */
+ return svn_error_trace(chain);
+}