Subject: Parallelize dispatching --- daemons/cmirrord/Makefile.in | 2 daemons/cmirrord/cluster.c | 110 +++++++++++++++++++++++++++---------------- daemons/cmirrord/functions.c | 37 ++++++++++++-- daemons/cmirrord/local.c | 3 - lib/metadata/mirror.c | 24 --------- lib/mirror/mirrored.c | 5 + tools/lvconvert.c | 9 --- 7 files changed, 109 insertions(+), 81 deletions(-) Index: LVM2.2.02.164/daemons/cmirrord/Makefile.in =================================================================== --- LVM2.2.02.164.orig/daemons/cmirrord/Makefile.in +++ LVM2.2.02.164/daemons/cmirrord/Makefile.in @@ -26,7 +26,7 @@ TARGETS = cmirrord include $(top_builddir)/make.tmpl -LIBS += -ldevmapper +LIBS += -ldevmapper -lpthread LMLIBS += $(CPG_LIBS) $(SACKPT_LIBS) CFLAGS += $(CPG_CFLAGS) $(SACKPT_CFLAGS) $(EXTRA_EXEC_CFLAGS) LDFLAGS += $(EXTRA_EXEC_LDFLAGS) Index: LVM2.2.02.164/daemons/cmirrord/cluster.c =================================================================== --- LVM2.2.02.164.orig/daemons/cmirrord/cluster.c +++ LVM2.2.02.164/daemons/cmirrord/cluster.c @@ -22,6 +22,7 @@ #include #include #include +#include #if CMIRROR_HAS_CHECKPOINT #include #include @@ -152,9 +153,11 @@ struct clog_cpg { struct checkpoint_data *checkpoint_list; int idx; char debugging[DEBUGGING_HISTORY][DEBUGGING_BUFLEN]; + pthread_t thread_pid; }; static struct dm_list clog_cpg_list; +static pthread_rwlock_t clog_cpg_lock = PTHREAD_RWLOCK_INITIALIZER; /* * cluster_send @@ -169,12 +172,14 @@ int cluster_send(struct clog_request *rq struct iovec iov; struct clog_cpg *entry; + pthread_rwlock_rdlock(&clog_cpg_lock); dm_list_iterate_items(entry, &clog_cpg_list) if (!strncmp(entry->name.value, rq->u_rq.uuid, CPG_MAX_NAME_LENGTH)) { found = 1; break; } + pthread_rwlock_unlock(&clog_cpg_lock); if (!found) { rq->u_rq.error = -ENOENT; @@ -255,11 +260,11 @@ static struct clog_request *get_matching return NULL; } -static char rq_buffer[DM_ULOG_REQUEST_SIZE]; static int handle_cluster_request(struct clog_cpg *entry __attribute__((unused)), struct clog_request *rq, int server) { int r = 0; + char rq_buffer[DM_ULOG_REQUEST_SIZE]; struct clog_request *tmp = (struct clog_request *)rq_buffer; /* @@ -370,9 +375,13 @@ static struct clog_cpg *find_clog_cpg(cp { struct clog_cpg *match; + pthread_rwlock_rdlock(&clog_cpg_lock); dm_list_iterate_items(match, &clog_cpg_list) - if (match->handle == handle) + if (match->handle == handle) { + pthread_rwlock_unlock(&clog_cpg_lock); return match; + } + pthread_rwlock_unlock(&clog_cpg_lock); return NULL; } @@ -982,34 +991,21 @@ static int resend_requests(struct clog_c return r; } -static int do_cluster_work(void *data __attribute__((unused))) +static void cluster_thread_fn(void *data) { int r = CS_OK; - struct clog_cpg *entry, *tmp; - - dm_list_iterate_items_safe(entry, tmp, &clog_cpg_list) { - r = cpg_dispatch(entry->handle, CS_DISPATCH_ALL); - if (r != CS_OK) { - if ((r == CS_ERR_BAD_HANDLE) && - ((entry->state == INVALID) || - (entry->state == LEAVING))) - /* It's ok if we've left the cluster */ - r = CS_OK; - else - LOG_ERROR("cpg_dispatch failed: %s", - str_ais_error(r)); - } - - if (entry->free_me) { - free(entry); - continue; - } - do_checkpoints(entry, 0); - - resend_requests(entry); + struct clog_cpg *match = data; + r = cpg_dispatch(match->handle, CS_DISPATCH_BLOCKING); + if (r != CS_OK) { + if ((r == CS_ERR_BAD_HANDLE) && + ((match->state == INVALID) || + (match->state == LEAVING))) + /* It's ok if we've left the cluster */ + r = CS_OK; + else + LOG_ERROR("cpg_dispatch failed: %s", + str_ais_error(r)); } - - return (r == CS_OK) ? 0 : -1; /* FIXME: good error number? */ } static int flush_startup_list(struct clog_cpg *entry) @@ -1062,23 +1058,37 @@ static int flush_startup_list(struct clo return 0; } +static void do_cpg_message_callback(struct clog_cpg *match, uint32_t nodeid, + void *msg, size_t msg_len); + static void cpg_message_callback(cpg_handle_t handle, const struct cpg_name *gname __attribute__((unused)), uint32_t nodeid, uint32_t pid __attribute__((unused)), void *msg, size_t msg_len) { + struct clog_cpg *entry; + + entry = find_clog_cpg(handle); + if (!entry) { + LOG_ERROR("Unable to find clog_cpg for cluster message"); + return; + } + do_cpg_message_callback(entry, nodeid, msg, msg_len); + + do_checkpoints(entry, 0); + resend_requests(entry); + +} + +static void do_cpg_message_callback(struct clog_cpg *match, uint32_t nodeid, + void *msg, size_t msg_len) +{ int i; int r = 0; int i_am_server; int response = 0; struct clog_request *rq = msg; struct clog_request *tmp_rq, *tmp_rq2; - struct clog_cpg *match; - match = find_clog_cpg(handle); - if (!match) { - LOG_ERROR("Unable to find clog_cpg for cluster message"); - return; - } /* * Perform necessary endian and version compatibility conversions @@ -1384,7 +1394,7 @@ static void cpg_leave_callback(struct cl size_t member_list_entries) { unsigned i; - int j, fd; + int j; uint32_t lowest = match->lowest_id; struct clog_request *rq, *n; struct checkpoint_data *p_cp, *c_cp; @@ -1395,10 +1405,9 @@ static void cpg_leave_callback(struct cl /* Am I leaving? */ if (my_cluster_id == left->nodeid) { LOG_DBG("Finalizing leave..."); + pthread_rwlock_wrlock(&clog_cpg_lock); dm_list_del(&match->list); - - cpg_fd_get(match->handle, &fd); - links_unregister(fd); + pthread_rwlock_unlock(&clog_cpg_lock); cluster_postsuspend(match->name.value, match->luid); @@ -1526,11 +1535,13 @@ static void cpg_config_callback(cpg_hand struct clog_cpg *match; int found = 0; + pthread_rwlock_rdlock(&clog_cpg_lock); dm_list_iterate_items(match, &clog_cpg_list) if (match->handle == handle) { found = 1; break; } + pthread_rwlock_unlock(&clog_cpg_lock); if (!found) { LOG_ERROR("Unable to find match for CPG config callback"); @@ -1547,6 +1558,16 @@ static void cpg_config_callback(cpg_hand else cpg_leave_callback(match, left_list, member_list, member_list_entries); + + + if (match->free_me) { + LOG_DBG("closing thread %x", (unsigned int)match->thread_pid); + free(match); + return; + } + + do_checkpoints(match, 0); + resend_requests(match); } cpg_callbacks_t cpg_callbacks = { @@ -1614,12 +1635,16 @@ int create_cluster_cpg(char *uuid, uint6 size_t size; struct clog_cpg *new = NULL; struct clog_cpg *tmp; + pthread_t new_pid; + pthread_rwlock_rdlock(&clog_cpg_lock); dm_list_iterate_items(tmp, &clog_cpg_list) if (!strncmp(tmp->name.value, uuid, CPG_MAX_NAME_LENGTH)) { LOG_ERROR("Log entry already exists: %s", uuid); + pthread_rwlock_unlock(&clog_cpg_lock); return -EEXIST; } + pthread_rwlock_unlock(&clog_cpg_lock); new = malloc(sizeof(*new)); if (!new) { @@ -1661,13 +1686,16 @@ int create_cluster_cpg(char *uuid, uint6 } new->cpg_state = VALID; + pthread_rwlock_wrlock(&clog_cpg_lock); dm_list_add(&clog_cpg_list, &new->list); + pthread_rwlock_unlock(&clog_cpg_lock); + LOG_DBG("New handle: %llu", (unsigned long long)new->handle); LOG_DBG("New name: %s", new->name.value); - /* FIXME: better variable */ - cpg_fd_get(new->handle, &r); - links_register(r, "cluster", do_cluster_work, NULL); + pthread_create(&new_pid, NULL, (void *)cluster_thread_fn, (void*)new); + new->thread_pid = new_pid; + pthread_detach(new_pid); return 0; } @@ -1736,9 +1764,11 @@ int destroy_cluster_cpg(char *uuid) { struct clog_cpg *del, *tmp; + pthread_rwlock_rdlock(&clog_cpg_lock); dm_list_iterate_items_safe(del, tmp, &clog_cpg_list) if (!strncmp(del->name.value, uuid, CPG_MAX_NAME_LENGTH)) _destroy_cluster_cpg(del); + pthread_rwlock_unlock(&clog_cpg_lock); return 0; } Index: LVM2.2.02.164/daemons/cmirrord/functions.c =================================================================== --- LVM2.2.02.164.orig/daemons/cmirrord/functions.c +++ LVM2.2.02.164/daemons/cmirrord/functions.c @@ -19,6 +19,7 @@ #include #include #include +#include #define BYTE_SHIFT 3 @@ -106,6 +107,9 @@ struct recovery_request { static DM_LIST_INIT(log_list); static DM_LIST_INIT(log_pending_list); +static pthread_rwlock_t log_list_lock = PTHREAD_RWLOCK_INITIALIZER; +static pthread_rwlock_t log_pending_lock = PTHREAD_RWLOCK_INITIALIZER; + static int log_test_bit(dm_bitset_t bs, int bit) { return dm_bit(bs, bit) ? 1 : 0; @@ -152,11 +156,15 @@ static struct log_c *get_log(const char { struct log_c *lc; + pthread_rwlock_rdlock(&log_list_lock); dm_list_iterate_items(lc, &log_list) if (!strcmp(lc->uuid, uuid) && - (!luid || (luid == lc->luid))) + (!luid || (luid == lc->luid))) { + pthread_rwlock_unlock(&log_list_lock); return lc; + } + pthread_rwlock_unlock(&log_list_lock); return NULL; } @@ -172,10 +180,14 @@ static struct log_c *get_pending_log(con { struct log_c *lc; + pthread_rwlock_rdlock(&log_pending_lock); dm_list_iterate_items(lc, &log_pending_list) if (!strcmp(lc->uuid, uuid) && - (!luid || (luid == lc->luid))) + (!luid || (luid == lc->luid))) { + pthread_rwlock_unlock(&log_pending_lock); return lc; + } + pthread_rwlock_unlock(&log_pending_lock); return NULL; } @@ -520,7 +532,9 @@ static int _clog_ctr(char *uuid, uint64_ LOG_DBG("Disk log ready"); } + pthread_rwlock_wrlock(&log_pending_lock); dm_list_add(&log_pending_list, &lc->list); + pthread_rwlock_unlock(&log_pending_lock); return 0; fail: @@ -650,7 +664,10 @@ static int clog_dtr(struct dm_ulog_reque LOG_DBG("[%s] Cluster log removed", SHORT_UUID(lc->uuid)); + pthread_rwlock_wrlock(&log_list_lock); dm_list_del(&lc->list); + pthread_rwlock_unlock(&log_list_lock); + if (lc->disk_fd != -1 && close(lc->disk_fd)) LOG_ERROR("Failed to close disk log: %s", strerror(errno)); @@ -722,8 +739,13 @@ int cluster_postsuspend(char *uuid, uint lc->resume_override = 0; /* move log to pending list */ + pthread_rwlock_wrlock(&log_list_lock); dm_list_del(&lc->list); + pthread_rwlock_unlock(&log_list_lock); + + pthread_rwlock_wrlock(&log_pending_lock); dm_list_add(&log_pending_list, &lc->list); + pthread_rwlock_unlock(&log_pending_lock); return 0; } @@ -827,9 +849,9 @@ no_disk: if (commit_log && (lc->disk_fd >= 0)) { rq->error = write_log(lc); if (rq->error) - LOG_ERROR("Failed initial disk log write"); + LOG_ERROR("[%s] Failed initial disk log write", SHORT_UUID(lc->uuid)); else - LOG_DBG("Disk log initialized"); + LOG_DBG("[%s] Disk log initialized", SHORT_UUID(lc->uuid)); lc->touched = 0; } out: @@ -911,8 +933,13 @@ int local_resume(struct dm_ulog_request } /* move log to official list */ + pthread_rwlock_wrlock(&log_pending_lock); dm_list_del(&lc->list); + pthread_rwlock_unlock(&log_pending_lock); + + pthread_rwlock_wrlock(&log_list_lock); dm_list_add(&log_list, &lc->list); + pthread_rwlock_unlock(&log_list_lock); } return 0; @@ -1935,7 +1962,6 @@ void log_debug(void) LOG_ERROR(""); LOG_ERROR("LOG COMPONENT DEBUGGING::"); - LOG_ERROR("Official log list:"); LOG_ERROR("Pending log list:"); dm_list_iterate_items(lc, &log_pending_list) { LOG_ERROR("%s", lc->uuid); @@ -1945,6 +1971,7 @@ void log_debug(void) print_bits(lc->clean_bits, 1); } + LOG_ERROR("Official log list:"); dm_list_iterate_items(lc, &log_list) { LOG_ERROR("%s", lc->uuid); LOG_ERROR(" recoverer : %" PRIu32, lc->recoverer); Index: LVM2.2.02.164/daemons/cmirrord/local.c =================================================================== --- LVM2.2.02.164.orig/daemons/cmirrord/local.c +++ LVM2.2.02.164/daemons/cmirrord/local.c @@ -29,13 +29,13 @@ static int cn_fd = -1; /* Connector (netlink) socket fd */ static char recv_buf[2048]; -static char send_buf[2048]; /* FIXME: merge this function with kernel_send_helper */ static int kernel_ack(uint32_t seq, int error) { int r; + char send_buf[2048]; struct nlmsghdr *nlh = (struct nlmsghdr *)send_buf; struct cn_msg *msg = NLMSG_DATA(nlh); @@ -179,6 +179,7 @@ static int kernel_send_helper(void *data int r; struct nlmsghdr *nlh; struct cn_msg *msg; + char send_buf[2048]; memset(send_buf, 0, sizeof(send_buf)); Index: LVM2.2.02.164/lib/metadata/mirror.c =================================================================== --- LVM2.2.02.164.orig/lib/metadata/mirror.c +++ LVM2.2.02.164/lib/metadata/mirror.c @@ -1973,10 +1973,6 @@ int add_mirror_log(struct cmd_context *c unsigned old_log_count; int r = 0; - if (vg_is_clustered(lv->vg) && (log_count > 1)) { - log_error("Log type, \"mirrored\", is unavailable to cluster mirrors"); - return 0; - } if (dm_list_size(&lv->segments) != 1) { log_error("Multiple-segment mirror is not supported"); @@ -2140,26 +2136,6 @@ int lv_add_mirrors(struct cmd_context *c return 0; } - if (vg_is_clustered(lv->vg)) { - /* FIXME: move this test out of this function */ - /* Skip test for pvmove mirrors, it can use local mirror */ - if (!lv_is_pvmove(lv) && !lv_is_locked(lv) && - lv_is_active(lv) && - !lv_is_active_exclusive_locally(lv) && /* lv_is_active_remotely */ - !cluster_mirror_is_available(lv->vg->cmd)) { - log_error("Shared cluster mirrors are not available."); - return 0; - } - - /* - * No mirrored logs for cluster mirrors until - * log daemon is multi-threaded. - */ - if (log_count > 1) { - log_error("Log type, \"mirrored\", is unavailable to cluster mirrors"); - return 0; - } - } /* For corelog mirror, activation code depends on * the global mirror_in_sync status. As we are adding Index: LVM2.2.02.164/lib/mirror/mirrored.c =================================================================== --- LVM2.2.02.164.orig/lib/mirror/mirrored.c +++ LVM2.2.02.164/lib/mirror/mirrored.c @@ -293,12 +293,15 @@ static int _add_log(struct dm_pool *mem, return 0; } } else { - /* If core log, use mirror's UUID and set DM_CORELOG flag */ + /* If core log, use mirror's (UUID + CORE) and set DM_CORELOG flag */ if (!(log_dlid = build_dm_uuid(mem, seg->lv, NULL))) { log_error("Failed to build uuid for mirror LV %s.", seg->lv->name); return 0; } + if (clustered) + memcpy(&log_dlid[strlen(log_dlid) - 4], "CORE", 4); + log_flags |= DM_CORELOG; } Index: LVM2.2.02.164/tools/lvconvert.c =================================================================== --- LVM2.2.02.164.orig/tools/lvconvert.c +++ LVM2.2.02.164/tools/lvconvert.c @@ -1309,15 +1309,6 @@ static int _lvconvert_mirrors_parse_para *new_log_count = arg_int_value(cmd, mirrorlog_ARG, lp->corelog ? MIRROR_LOG_CORE : DEFAULT_MIRRORLOG); - /* - * No mirrored logs for cluster mirrors until - * log daemon is multi-threaded. - */ - if ((*new_log_count == MIRROR_LOG_MIRRORED) && vg_is_clustered(lv->vg)) { - log_error("Log type, \"mirrored\", is unavailable to cluster mirrors."); - return 0; - } - log_verbose("Setting logging type to %s.", get_mirror_log_name(*new_log_count)); /*