--- daemons/clvmd/clvmd-corosync.c | 6 +++- daemons/clvmd/clvmd.c | 53 ++++++++++++++++++++++++++++++++++++++++- daemons/clvmd/clvmd.h | 2 + 3 files changed, 59 insertions(+), 2 deletions(-) --- a/daemons/clvmd/clvmd-corosync.c +++ b/daemons/clvmd/clvmd-corosync.c @@ -251,8 +251,12 @@ static void corosync_cpg_confchg_callbac ninfo = dm_hash_lookup_binary(node_hash, (char *)&left_list[i].nodeid, COROSYNC_CSID_LEN); - if (ninfo) + if (ninfo) { ninfo->state = NODE_DOWN; + char name[MAX_CLUSTER_MEMBER_NAME_LEN]; + sprintf(name, "%x", ninfo->nodeid); + decrease_inflight_expected_reply(name); + } } num_nodes = member_list_entries; --- a/daemons/clvmd/clvmd.c +++ b/daemons/clvmd/clvmd.c @@ -1613,6 +1613,57 @@ static void process_remote_command(struc dm_free(replyargs); } +void decrease_inflight_expected_reply(const char *nodename) +{ + struct local_client *thisfd; + struct node_reply *reply; + + DEBUGLOG("remote node %s down", nodename); + + for (thisfd = &local_client_head; thisfd != NULL; + thisfd = thisfd->next) { + /* in-flight request */ + if (thisfd->type == LOCAL_SOCK && + thisfd->bits.localsock.sent_out && + thisfd->bits.localsock.in_progress && + !thisfd->bits.localsock.finished && + thisfd->bits.localsock.expected_replies > + thisfd->bits.localsock.num_replies) { + + pthread_mutex_lock(&thisfd->bits.localsock.mutex); + + reply = thisfd->bits.localsock.replies; + while (reply && strcmp(reply->node, nodename) != 0) { + reply = reply->next; + } + /* + * if the remote down server has replies, + * do not decrease the expected_replies + */ + if (reply) + continue; + + thisfd->bits.localsock.expected_replies--; + DEBUGLOG("remote node down, decrement the expected replies to (%d), num_replies(%d)", + thisfd->bits.localsock.expected_replies, + thisfd->bits.localsock.num_replies); + + if (thisfd->bits.localsock.expected_replies <= + thisfd->bits.localsock.num_replies) { + /* tell pre_and_post thread to finish */ + if (thisfd->bits.localsock.threadid) { + thisfd->bits.localsock.all_success = 0; + pthread_mutex_lock(&thisfd->bits.localsock.mutex); + thisfd->bits.localsock.state = POST_COMMAND; + pthread_cond_signal(&thisfd->bits.localsock.cond); + pthread_mutex_unlock(&thisfd->bits.localsock.mutex); + } + } + pthread_mutex_unlock(&thisfd->bits.localsock.mutex); + } + } +} + /* Add a reply to a command to the list of replies for this client. If we have got a full set then send them to the waiting client down the local socket */ @@ -1652,7 +1703,7 @@ static void add_reply_to_list(struct loc /* If we have the whole lot then do the post-process */ /* Post-process the command */ - if (++client->bits.localsock.num_replies == + if (++client->bits.localsock.num_replies >= client->bits.localsock.expected_replies) { client->bits.localsock.state = POST_COMMAND; pthread_cond_signal(&client->bits.localsock.cond); --- a/daemons/clvmd/clvmd.h +++ b/daemons/clvmd/clvmd.h @@ -110,6 +110,8 @@ extern int do_post_command(struct local_ extern void cmd_client_cleanup(struct local_client *client); extern int add_client(struct local_client *new_client); + +extern void decrease_inflight_expected_reply(const char *nodename); extern void clvmd_cluster_init_completed(void); extern void process_message(struct local_client *client, char *buf, int len, const char *csid);