forked from pool/corosync
160 lines
5.5 KiB
Diff
160 lines
5.5 KiB
Diff
|
diff --git a/lib/cpg.c b/lib/cpg.c
|
||
|
index c831390b..a0e662f0 100644
|
||
|
--- a/lib/cpg.c
|
||
|
+++ b/lib/cpg.c
|
||
|
@@ -80,6 +80,15 @@
|
||
|
*/
|
||
|
#define CPG_MEMORY_MAP_UMASK 077
|
||
|
|
||
|
+struct cpg_assembly_data
|
||
|
+{
|
||
|
+ struct list_head list;
|
||
|
+ uint32_t nodeid;
|
||
|
+ uint32_t pid;
|
||
|
+ char *assembly_buf;
|
||
|
+ uint32_t assembly_buf_ptr;
|
||
|
+};
|
||
|
+
|
||
|
struct cpg_inst {
|
||
|
qb_ipcc_connection_t *c;
|
||
|
int finalize;
|
||
|
@@ -89,14 +98,8 @@ struct cpg_inst {
|
||
|
cpg_model_v1_data_t model_v1_data;
|
||
|
};
|
||
|
struct list_head iteration_list_head;
|
||
|
- uint32_t max_msg_size;
|
||
|
- char *assembly_buf;
|
||
|
- uint32_t assembly_buf_ptr;
|
||
|
- int assembling; /* Flag that says we have started assembling a message.
|
||
|
- * It's here to catch the situation where a node joins
|
||
|
- * the cluster/group in the middle of a CPG message send
|
||
|
- * so we don't pass on a partial message to the client.
|
||
|
- */
|
||
|
+ uint32_t max_msg_size;
|
||
|
+ struct list_head assembly_list_head;
|
||
|
};
|
||
|
static void cpg_inst_free (void *inst);
|
||
|
|
||
|
@@ -231,6 +234,8 @@ cs_error_t cpg_model_initialize (
|
||
|
|
||
|
list_init(&cpg_inst->iteration_list_head);
|
||
|
|
||
|
+ list_init(&cpg_inst->assembly_list_head);
|
||
|
+
|
||
|
hdb_handle_put (&cpg_handle_t_db, *handle);
|
||
|
|
||
|
return (CS_OK);
|
||
|
@@ -382,6 +387,8 @@ cs_error_t cpg_dispatch (
|
||
|
struct cpg_address left_list[CPG_MEMBERS_MAX];
|
||
|
struct cpg_address joined_list[CPG_MEMBERS_MAX];
|
||
|
struct cpg_name group_name;
|
||
|
+ struct cpg_assembly_data *assembly_data;
|
||
|
+ struct list_head *iter, *tmp_iter;
|
||
|
mar_cpg_address_t *left_list_start;
|
||
|
mar_cpg_address_t *joined_list_start;
|
||
|
unsigned int i;
|
||
|
@@ -471,32 +478,63 @@ cs_error_t cpg_dispatch (
|
||
|
&group_name,
|
||
|
&res_cpg_partial_deliver_callback->group_name);
|
||
|
|
||
|
+ /*
|
||
|
+ * Search for assembly data for current messages (nodeid, pid) pair in list of assemblies
|
||
|
+ */
|
||
|
+ assembly_data = NULL;
|
||
|
+ for (iter = cpg_inst->assembly_list_head.next; iter != &cpg_inst->assembly_list_head; iter = iter->next) {
|
||
|
+ struct cpg_assembly_data *current_assembly_data = list_entry (iter, struct cpg_assembly_data, list);
|
||
|
+ if (current_assembly_data->nodeid == res_cpg_partial_deliver_callback->nodeid && current_assembly_data->pid == res_cpg_partial_deliver_callback->pid) {
|
||
|
+ assembly_data = current_assembly_data;
|
||
|
+ break;
|
||
|
+ }
|
||
|
+ }
|
||
|
+
|
||
|
if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_FIRST) {
|
||
|
+
|
||
|
/*
|
||
|
- * Allocate a buffer to contain a full message.
|
||
|
+ * As this is LIBCPG_PARTIAL_FIRST packet, check that there is no ongoing assembly
|
||
|
*/
|
||
|
- cpg_inst->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
|
||
|
- if (!cpg_inst->assembly_buf) {
|
||
|
+ if (assembly_data) {
|
||
|
+ error = CS_ERR_MESSAGE_ERROR;
|
||
|
+ goto error_put;
|
||
|
+ }
|
||
|
+
|
||
|
+ assembly_data = malloc(sizeof(struct cpg_assembly_data));
|
||
|
+ if (!assembly_data) {
|
||
|
error = CS_ERR_NO_MEMORY;
|
||
|
goto error_put;
|
||
|
}
|
||
|
- cpg_inst->assembling = 1;
|
||
|
- cpg_inst->assembly_buf_ptr = 0;
|
||
|
+
|
||
|
+ assembly_data->nodeid = res_cpg_partial_deliver_callback->nodeid;
|
||
|
+ assembly_data->pid = res_cpg_partial_deliver_callback->pid;
|
||
|
+ assembly_data->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
|
||
|
+ if (!assembly_data->assembly_buf) {
|
||
|
+ free(assembly_data);
|
||
|
+ error = CS_ERR_NO_MEMORY;
|
||
|
+ goto error_put;
|
||
|
+ }
|
||
|
+ assembly_data->assembly_buf_ptr = 0;
|
||
|
+ list_init (&assembly_data->list);
|
||
|
+
|
||
|
+ list_add (&assembly_data->list, &cpg_inst->assembly_list_head);
|
||
|
}
|
||
|
- if (cpg_inst->assembling) {
|
||
|
- memcpy(cpg_inst->assembly_buf + cpg_inst->assembly_buf_ptr,
|
||
|
- res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
|
||
|
- cpg_inst->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
|
||
|
+ if (assembly_data) {
|
||
|
+ memcpy(assembly_data->assembly_buf + assembly_data->assembly_buf_ptr,
|
||
|
+ res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
|
||
|
+ assembly_data->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
|
||
|
|
||
|
if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_LAST) {
|
||
|
cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
|
||
|
&group_name,
|
||
|
res_cpg_partial_deliver_callback->nodeid,
|
||
|
res_cpg_partial_deliver_callback->pid,
|
||
|
- cpg_inst->assembly_buf,
|
||
|
+ assembly_data->assembly_buf,
|
||
|
res_cpg_partial_deliver_callback->msglen);
|
||
|
- free(cpg_inst->assembly_buf);
|
||
|
- cpg_inst->assembling = 0;
|
||
|
+
|
||
|
+ list_del (&assembly_data->list);
|
||
|
+ free(assembly_data->assembly_buf);
|
||
|
+ free(assembly_data);
|
||
|
}
|
||
|
}
|
||
|
break;
|
||
|
@@ -538,6 +576,24 @@ cs_error_t cpg_dispatch (
|
||
|
joined_list,
|
||
|
res_cpg_confchg_callback->joined_list_entries);
|
||
|
|
||
|
+ /*
|
||
|
+ * If member left while his partial packet was being assembled, assembly data must be removed from list
|
||
|
+ */
|
||
|
+ for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
|
||
|
+ for (iter = cpg_inst->assembly_list_head.next; iter != &cpg_inst->assembly_list_head;iter = tmp_iter) {
|
||
|
+ struct cpg_assembly_data *current_assembly_data = list_entry (iter, struct cpg_assembly_data, list);
|
||
|
+
|
||
|
+ tmp_iter = iter->next;
|
||
|
+
|
||
|
+ if (current_assembly_data->nodeid != left_list[i].nodeid || current_assembly_data->pid != left_list[i].pid)
|
||
|
+ continue;
|
||
|
+
|
||
|
+ list_del (¤t_assembly_data->list);
|
||
|
+ free(current_assembly_data->assembly_buf);
|
||
|
+ free(current_assembly_data);
|
||
|
+ }
|
||
|
+ }
|
||
|
+
|
||
|
break;
|
||
|
case MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK:
|
||
|
if (cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn == NULL) {
|
||
|
--
|
||
|
2.13.6
|
||
|
|