This patch solves situation which can happen very rearly: - Node B is running - Node A is started and tries to create singleton membership. It also initialize service S which tries to send message during initialization - Just before node A finished move to operational state, it gets Node B multicast message so moves to gather state - Node A and B creates membership and moves to operational state and sync is started - Node A and B receives message sent by node A during initialization of service S - Node A exits before sync of service is finished In this situation, node B may never execute sync_init for service S. So node B service S is not aware of existence of node A but it received message from it. Similar situation can theoretically also happen during merge. Solution is to change flow of sync, so now it looks like: - Build service_list - Call sync_init for all local services - Send service_list - Receive service_list from all members and send barier - For all services: - Receive barier - Call sync_activate if this is not first service - Call sync_process for next service or finish sync if previous this service is the last one - Send barier --- exec/sync.c | 174 +++++++++++++++--------------------------------------------- 1 file changed, 44 insertions(+), 130 deletions(-) diff --git a/exec/sync.c b/exec/sync.c index 283634a8..1efa3577 100644 --- a/exec/sync.c +++ b/exec/sync.c @@ -62,10 +62,8 @@ LOGSYS_DECLARE_SUBSYS ("SYNC"); #define MESSAGE_REQ_SYNC_BARRIER 0 #define MESSAGE_REQ_SYNC_SERVICE_BUILD 1 -#define MESSAGE_REQ_SYNC_MEMB_DETERMINE 2 enum sync_process_state { - INIT, PROCESS, ACTIVATE }; @@ -96,11 +94,6 @@ struct processor_entry { int received; }; -struct req_exec_memb_determine_message { - struct qb_ipc_request_header header __attribute__((aligned(8))); - struct memb_ring_id ring_id __attribute__((aligned(8))); -}; - struct req_exec_service_build_message { struct qb_ipc_request_header header __attribute__((aligned(8))); struct memb_ring_id ring_id __attribute__((aligned(8))); @@ -117,14 +110,6 @@ static enum sync_state my_state = SYNC_BARRIER; static struct memb_ring_id my_ring_id; -static struct memb_ring_id my_memb_determine_ring_id; - -static int my_memb_determine = 0; - -static unsigned int my_memb_determine_list[PROCESSOR_COUNT_MAX]; - -static unsigned int my_memb_determine_list_entries = 0; - static int my_processing_idx = 0; static hdb_handle_t my_schedwrk_handle; @@ -157,6 +142,8 @@ static int schedwrk_processor (const void *context); static void sync_process_enter (void); +static void sync_process_call_init (void); + static struct totempg_group sync_group = { .group = "sync", .group_len = 4 @@ -234,7 +221,6 @@ static void sync_barrier_handler (unsigned int nodeid, const void *msg) my_processing_idx += 1; if (my_service_list_entries == my_processing_idx) { - my_memb_determine_list_entries = 0; sync_synchronization_completed (); } else { sync_process_enter (); @@ -242,15 +228,6 @@ static void sync_barrier_handler (unsigned int nodeid, const void *msg) } } -static void dummy_sync_init ( - const unsigned int *trans_list, - size_t trans_list_entries, - const unsigned int *member_list, - size_t member_list_entries, - const struct memb_ring_id *ring_id) -{ -} - static void dummy_sync_abort (void) { } @@ -272,31 +249,6 @@ static int service_entry_compare (const void *a, const void *b) return (service_entry_a->service_id > service_entry_b->service_id); } -static void sync_memb_determine (unsigned int nodeid, const void *msg) -{ - const struct req_exec_memb_determine_message *req_exec_memb_determine_message = msg; - int found = 0; - int i; - - if (memcmp (&req_exec_memb_determine_message->ring_id, - &my_memb_determine_ring_id, sizeof (struct memb_ring_id)) != 0) { - - log_printf (LOGSYS_LEVEL_DEBUG, "memb determine for old ring - discarding"); - return; - } - - my_memb_determine = 1; - for (i = 0; i < my_memb_determine_list_entries; i++) { - if (my_memb_determine_list[i] == nodeid) { - found = 1; - } - } - if (found == 0) { - my_memb_determine_list[my_memb_determine_list_entries] = nodeid; - my_memb_determine_list_entries += 1; - } -} - static void sync_service_build_handler (unsigned int nodeid, const void *msg) { const struct req_exec_service_build_message *req_exec_service_build_message = msg; @@ -321,15 +273,14 @@ static void sync_service_build_handler (unsigned int nodeid, const void *msg) } } if (found == 0) { - my_service_list[my_service_list_entries].state = - INIT; + my_service_list[my_service_list_entries].state = PROCESS; my_service_list[my_service_list_entries].service_id = req_exec_service_build_message->service_list[i]; sprintf (my_service_list[my_service_list_entries].name, "Unknown External Service (id = %d)\n", req_exec_service_build_message->service_list[i]); my_service_list[my_service_list_entries].sync_init = - dummy_sync_init; + NULL; my_service_list[my_service_list_entries].sync_abort = dummy_sync_abort; my_service_list[my_service_list_entries].sync_process = @@ -356,6 +307,7 @@ static void sync_service_build_handler (unsigned int nodeid, const void *msg) } } if (barrier_reached) { + log_printf (LOGSYS_LEVEL_DEBUG, "enter sync process"); sync_process_enter (); } } @@ -375,31 +327,9 @@ static void sync_deliver_fn ( case MESSAGE_REQ_SYNC_SERVICE_BUILD: sync_service_build_handler (nodeid, msg); break; - case MESSAGE_REQ_SYNC_MEMB_DETERMINE: - sync_memb_determine (nodeid, msg); - break; } } -static void memb_determine_message_transmit (void) -{ - struct iovec iovec; - struct req_exec_memb_determine_message req_exec_memb_determine_message; - - req_exec_memb_determine_message.header.size = sizeof (struct req_exec_memb_determine_message); - req_exec_memb_determine_message.header.id = MESSAGE_REQ_SYNC_MEMB_DETERMINE; - - memcpy (&req_exec_memb_determine_message.ring_id, - &my_memb_determine_ring_id, - sizeof (struct memb_ring_id)); - - iovec.iov_base = (char *)&req_exec_memb_determine_message; - iovec.iov_len = sizeof (req_exec_memb_determine_message); - - (void)totempg_groups_mcast_joined (sync_group_handle, - &iovec, 1, TOTEMPG_AGREED); -} - static void barrier_message_transmit (void) { struct iovec iovec; @@ -441,6 +371,38 @@ static void sync_barrier_enter (void) barrier_message_transmit (); } +static void sync_process_call_init (void) +{ + unsigned int old_trans_list[PROCESSOR_COUNT_MAX]; + size_t old_trans_list_entries = 0; + int o, m; + int i; + + memcpy (old_trans_list, my_trans_list, my_trans_list_entries * + sizeof (unsigned int)); + old_trans_list_entries = my_trans_list_entries; + + my_trans_list_entries = 0; + for (o = 0; o < old_trans_list_entries; o++) { + for (m = 0; m < my_member_list_entries; m++) { + if (old_trans_list[o] == my_member_list[m]) { + my_trans_list[my_trans_list_entries] = my_member_list[m]; + my_trans_list_entries++; + break; + } + } + } + + for (i = 0; i < my_service_list_entries; i++) { + if (my_sync_callbacks_retrieve(my_service_list[i].service_id, NULL) != -1) { + my_service_list[i].sync_init (my_trans_list, + my_trans_list_entries, my_member_list, + my_member_list_entries, + &my_ring_id); + } + } +} + static void sync_process_enter (void) { int i; @@ -452,13 +414,13 @@ static void sync_process_enter (void) */ if (my_service_list_entries == 0) { my_state = SYNC_SERVICELIST_BUILD; - my_memb_determine_list_entries = 0; sync_synchronization_completed (); return; } for (i = 0; i < my_processor_list_entries; i++) { my_processor_list[i].received = 0; } + schedwrk_create (&my_schedwrk_handle, schedwrk_processor, NULL); @@ -498,7 +460,7 @@ static void sync_servicelist_build_enter ( if (sync_callbacks.sync_init == NULL) { continue; } - my_service_list[my_service_list_entries].state = INIT; + my_service_list[my_service_list_entries].state = PROCESS; my_service_list[my_service_list_entries].service_id = i; strcpy (my_service_list[my_service_list_entries].name, sync_callbacks.name); @@ -516,42 +478,16 @@ static void sync_servicelist_build_enter ( service_build.service_list_entries = my_service_list_entries; service_build_message_transmit (&service_build); + + log_printf (LOGSYS_LEVEL_DEBUG, "call init for locally known services"); + sync_process_call_init (); } static int schedwrk_processor (const void *context) { int res = 0; - if (my_service_list[my_processing_idx].state == INIT) { - unsigned int old_trans_list[PROCESSOR_COUNT_MAX]; - size_t old_trans_list_entries = 0; - int o, m; - my_service_list[my_processing_idx].state = PROCESS; - - memcpy (old_trans_list, my_trans_list, my_trans_list_entries * - sizeof (unsigned int)); - old_trans_list_entries = my_trans_list_entries; - - my_trans_list_entries = 0; - for (o = 0; o < old_trans_list_entries; o++) { - for (m = 0; m < my_member_list_entries; m++) { - if (old_trans_list[o] == my_member_list[m]) { - my_trans_list[my_trans_list_entries] = my_member_list[m]; - my_trans_list_entries++; - break; - } - } - } - - if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) { - my_service_list[my_processing_idx].sync_init (my_trans_list, - my_trans_list_entries, my_member_list, - my_member_list_entries, - &my_ring_id); - } - } if (my_service_list[my_processing_idx].state == PROCESS) { - my_service_list[my_processing_idx].state = PROCESS; if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) { res = my_service_list[my_processing_idx].sync_process (); } else { @@ -574,14 +510,8 @@ void sync_start ( ENTER(); memcpy (&my_ring_id, ring_id, sizeof (struct memb_ring_id)); - if (my_memb_determine) { - my_memb_determine = 0; - sync_servicelist_build_enter (my_memb_determine_list, - my_memb_determine_list_entries, ring_id); - } else { - sync_servicelist_build_enter (member_list, member_list_entries, - ring_id); - } + sync_servicelist_build_enter (member_list, member_list_entries, + ring_id); } void sync_save_transitional ( @@ -610,19 +540,3 @@ void sync_abort (void) */ memset (&my_ring_id, 0, sizeof (struct memb_ring_id)); } - -void sync_memb_list_determine (const struct memb_ring_id *ring_id) -{ - ENTER(); - memcpy (&my_memb_determine_ring_id, ring_id, - sizeof (struct memb_ring_id)); - - memb_determine_message_transmit (); -} - -void sync_memb_list_abort (void) -{ - ENTER(); - my_memb_determine_list_entries = 0; - memset (&my_memb_determine_ring_id, 0, sizeof (struct memb_ring_id)); -} -- 2.13.6