329 lines
12 KiB
Diff
329 lines
12 KiB
Diff
|
From 6557497b7095399424fe24e83bd936a3d54d4175 Mon Sep 17 00:00:00 2001
|
||
|
From: Doron Somech <somdoron@gmail.com>
|
||
|
Date: Wed, 13 May 2020 17:32:06 +0300
|
||
|
Subject: [PATCH] problem: zeromq connects peer before handshake is completed
|
||
|
|
||
|
Solution: delay connecting the peer pipe until the handshake is completed
|
||
|
(cherry picked from commit e7f0090b161ce6344f6bd35009816a925c070b09)
|
||
|
|
||
|
Conflicts:
|
||
|
src/i_engine.hpp
|
||
|
src/norm_engine.hpp
|
||
|
src/pgm_receiver.hpp
|
||
|
src/pgm_sender.hpp
|
||
|
src/raw_engine.cpp
|
||
|
src/session_base.cpp
|
||
|
src/session_base.hpp
|
||
|
src/stream_engine_base.cpp
|
||
|
src/stream_engine_base.hpp
|
||
|
src/udp_engine.hpp
|
||
|
src/ws_engine.cpp
|
||
|
src/zmtp_engine.cpp
|
||
|
---
|
||
|
src/i_engine.hpp | 4 ++++
|
||
|
src/ipc_connecter.cpp | 2 +-
|
||
|
src/ipc_listener.cpp | 2 +-
|
||
|
src/norm_engine.hpp | 2 ++
|
||
|
src/pgm_receiver.hpp | 1 +
|
||
|
src/pgm_sender.hpp | 1 +
|
||
|
src/session_base.cpp | 19 +++++++++++++------
|
||
|
src/session_base.hpp | 1 +
|
||
|
src/socks_connecter.cpp | 2 +-
|
||
|
src/stream_engine.cpp | 12 ++++++++++--
|
||
|
src/stream_engine.hpp | 8 +++++++-
|
||
|
src/tcp_connecter.cpp | 2 +-
|
||
|
src/tcp_listener.cpp | 2 +-
|
||
|
src/tipc_connecter.cpp | 2 +-
|
||
|
src/tipc_listener.cpp | 2 +-
|
||
|
src/udp_engine.hpp | 2 ++
|
||
|
16 files changed, 48 insertions(+), 16 deletions(-)
|
||
|
|
||
|
Index: zeromq-4.2.3/src/i_engine.hpp
|
||
|
===================================================================
|
||
|
--- zeromq-4.2.3.orig/src/i_engine.hpp
|
||
|
+++ zeromq-4.2.3/src/i_engine.hpp
|
||
|
@@ -41,6 +41,10 @@ namespace zmq
|
||
|
{
|
||
|
virtual ~i_engine () {}
|
||
|
|
||
|
+ // Indicate if the engine has an handshake stage.
|
||
|
+ // If engine has handshake stage, engine must call session.engine_ready when the handshake is complete.
|
||
|
+ virtual bool has_handshake_stage () = 0;
|
||
|
+
|
||
|
// Plug the engine to the session.
|
||
|
virtual void plug (zmq::io_thread_t *io_thread_,
|
||
|
class session_base_t *session_) = 0;
|
||
|
Index: zeromq-4.2.3/src/ipc_connecter.cpp
|
||
|
===================================================================
|
||
|
--- zeromq-4.2.3.orig/src/ipc_connecter.cpp
|
||
|
+++ zeromq-4.2.3/src/ipc_connecter.cpp
|
||
|
@@ -123,7 +123,7 @@ void zmq::ipc_connecter_t::out_event ()
|
||
|
}
|
||
|
// Create the engine object for this connection.
|
||
|
stream_engine_t *engine = new (std::nothrow)
|
||
|
- stream_engine_t (fd, options, endpoint);
|
||
|
+ stream_engine_t (fd, options, endpoint, !options.raw_socket);
|
||
|
alloc_assert (engine);
|
||
|
|
||
|
// Attach the engine to the corresponding session object.
|
||
|
Index: zeromq-4.2.3/src/ipc_listener.cpp
|
||
|
===================================================================
|
||
|
--- zeromq-4.2.3.orig/src/ipc_listener.cpp
|
||
|
+++ zeromq-4.2.3/src/ipc_listener.cpp
|
||
|
@@ -172,7 +172,7 @@ void zmq::ipc_listener_t::in_event ()
|
||
|
|
||
|
// Create the engine object for this connection.
|
||
|
stream_engine_t *engine = new (std::nothrow)
|
||
|
- stream_engine_t (fd, options, endpoint);
|
||
|
+ stream_engine_t (fd, options, endpoint, !options.raw_socket);
|
||
|
alloc_assert (engine);
|
||
|
|
||
|
// Choose I/O thread to run connecter in. Given that we are already
|
||
|
Index: zeromq-4.2.3/src/norm_engine.hpp
|
||
|
===================================================================
|
||
|
--- zeromq-4.2.3.orig/src/norm_engine.hpp
|
||
|
+++ zeromq-4.2.3/src/norm_engine.hpp
|
||
|
@@ -27,6 +27,8 @@ namespace zmq
|
||
|
int init(const char* network_, bool send, bool recv);
|
||
|
void shutdown();
|
||
|
|
||
|
+ bool has_handshake_stage () { return false; };
|
||
|
+
|
||
|
// i_engine interface implementation.
|
||
|
// Plug the engine to the session.
|
||
|
virtual void plug (zmq::io_thread_t *io_thread_,
|
||
|
Index: zeromq-4.2.3/src/pgm_receiver.hpp
|
||
|
===================================================================
|
||
|
--- zeromq-4.2.3.orig/src/pgm_receiver.hpp
|
||
|
+++ zeromq-4.2.3/src/pgm_receiver.hpp
|
||
|
@@ -58,6 +58,7 @@ namespace zmq
|
||
|
int init (bool udp_encapsulation_, const char *network_);
|
||
|
|
||
|
// i_engine interface implementation.
|
||
|
+ bool has_handshake_stage () { return false; };
|
||
|
void plug (zmq::io_thread_t *io_thread_,
|
||
|
zmq::session_base_t *session_);
|
||
|
void terminate ();
|
||
|
Index: zeromq-4.2.3/src/pgm_sender.hpp
|
||
|
===================================================================
|
||
|
--- zeromq-4.2.3.orig/src/pgm_sender.hpp
|
||
|
+++ zeromq-4.2.3/src/pgm_sender.hpp
|
||
|
@@ -57,6 +57,7 @@ namespace zmq
|
||
|
int init (bool udp_encapsulation_, const char *network_);
|
||
|
|
||
|
// i_engine interface implementation.
|
||
|
+ bool has_handshake_stage () { return false; };
|
||
|
void plug (zmq::io_thread_t *io_thread_,
|
||
|
zmq::session_base_t *session_);
|
||
|
void terminate ();
|
||
|
Index: zeromq-4.2.3/src/session_base.cpp
|
||
|
===================================================================
|
||
|
--- zeromq-4.2.3.orig/src/session_base.cpp
|
||
|
+++ zeromq-4.2.3/src/session_base.cpp
|
||
|
@@ -280,7 +280,8 @@ void zmq::session_base_t::read_activated
|
||
|
}
|
||
|
|
||
|
if (unlikely (engine == NULL)) {
|
||
|
- pipe->check_read ();
|
||
|
+ if (pipe)
|
||
|
+ pipe->check_read ();
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
@@ -383,7 +384,18 @@ bool zmq::session_base_t::zap_enabled ()
|
||
|
void zmq::session_base_t::process_attach (i_engine *engine_)
|
||
|
{
|
||
|
zmq_assert (engine_ != NULL);
|
||
|
+ zmq_assert (!engine);
|
||
|
+ engine = engine_;
|
||
|
+
|
||
|
+ if (!engine_->has_handshake_stage ())
|
||
|
+ engine_ready ();
|
||
|
+
|
||
|
+ // Plug in the engine.
|
||
|
+ engine->plug (io_thread, this);
|
||
|
+}
|
||
|
|
||
|
+void zmq::session_base_t::engine_ready ()
|
||
|
+{
|
||
|
// Create the pipe if it does not exist yet.
|
||
|
if (!pipe && !is_terminating ()) {
|
||
|
object_t *parents [2] = {this, socket};
|
||
|
@@ -412,11 +424,6 @@ void zmq::session_base_t::process_attach
|
||
|
// Ask socket to plug into the remote end of the pipe.
|
||
|
send_bind (socket, pipes [1]);
|
||
|
}
|
||
|
-
|
||
|
- // Plug in the engine.
|
||
|
- zmq_assert (!engine);
|
||
|
- engine = engine_;
|
||
|
- engine->plug (io_thread, this);
|
||
|
}
|
||
|
|
||
|
void zmq::session_base_t::engine_error (
|
||
|
Index: zeromq-4.2.3/src/session_base.hpp
|
||
|
===================================================================
|
||
|
--- zeromq-4.2.3.orig/src/session_base.hpp
|
||
|
+++ zeromq-4.2.3/src/session_base.hpp
|
||
|
@@ -67,6 +67,7 @@ namespace zmq
|
||
|
virtual void reset ();
|
||
|
void flush ();
|
||
|
void engine_error (zmq::stream_engine_t::error_reason_t reason);
|
||
|
+ void engine_ready ();
|
||
|
|
||
|
// i_pipe_events interface implementation.
|
||
|
void read_activated (zmq::pipe_t *pipe_);
|
||
|
Index: zeromq-4.2.3/src/socks_connecter.cpp
|
||
|
===================================================================
|
||
|
--- zeromq-4.2.3.orig/src/socks_connecter.cpp
|
||
|
+++ zeromq-4.2.3/src/socks_connecter.cpp
|
||
|
@@ -151,7 +151,7 @@ void zmq::socks_connecter_t::in_event ()
|
||
|
else {
|
||
|
// Create the engine object for this connection.
|
||
|
stream_engine_t *engine = new (std::nothrow)
|
||
|
- stream_engine_t (s, options, endpoint);
|
||
|
+ stream_engine_t (s, options, endpoint, !options.raw_socket);
|
||
|
alloc_assert (engine);
|
||
|
|
||
|
// Attach the engine to the corresponding session object.
|
||
|
Index: zeromq-4.2.3/src/stream_engine.cpp
|
||
|
===================================================================
|
||
|
--- zeromq-4.2.3.orig/src/stream_engine.cpp
|
||
|
+++ zeromq-4.2.3/src/stream_engine.cpp
|
||
|
@@ -63,7 +63,8 @@
|
||
|
#include "wire.hpp"
|
||
|
|
||
|
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
|
||
|
- const std::string &endpoint_) :
|
||
|
+ const std::string &endpoint_,
|
||
|
+ bool has_handshake_stage_) :
|
||
|
s (fd_),
|
||
|
as_server(false),
|
||
|
handle((handle_t)NULL),
|
||
|
@@ -78,6 +79,7 @@ zmq::stream_engine_t::stream_engine_t (f
|
||
|
greeting_size (v2_greeting_size),
|
||
|
greeting_bytes_read (0),
|
||
|
session (NULL),
|
||
|
+ _has_handshake_stage (has_handshake_stage_),
|
||
|
options (options_),
|
||
|
endpoint (endpoint_),
|
||
|
plugged (false),
|
||
|
@@ -290,9 +292,12 @@ void zmq::stream_engine_t::in_event ()
|
||
|
zmq_assert (!io_error);
|
||
|
|
||
|
// If still handshaking, receive and process the greeting message.
|
||
|
- if (unlikely (handshaking))
|
||
|
+ if (unlikely (handshaking)) {
|
||
|
if (!handshake ())
|
||
|
return;
|
||
|
+ else if (mechanism == NULL && _has_handshake_stage)
|
||
|
+ session->engine_ready ();
|
||
|
+ }
|
||
|
|
||
|
zmq_assert (decoder);
|
||
|
|
||
|
@@ -839,6 +844,9 @@ void zmq::stream_engine_t::mechanism_rea
|
||
|
has_heartbeat_timer = true;
|
||
|
}
|
||
|
|
||
|
+ if (_has_handshake_stage)
|
||
|
+ session->engine_ready ();
|
||
|
+
|
||
|
if (options.recv_routing_id) {
|
||
|
msg_t routing_id;
|
||
|
mechanism->peer_routing_id (&routing_id);
|
||
|
Index: zeromq-4.2.3/src/stream_engine.hpp
|
||
|
===================================================================
|
||
|
--- zeromq-4.2.3.orig/src/stream_engine.hpp
|
||
|
+++ zeromq-4.2.3/src/stream_engine.hpp
|
||
|
@@ -69,10 +69,12 @@ namespace zmq
|
||
|
};
|
||
|
|
||
|
stream_engine_t (fd_t fd_, const options_t &options_,
|
||
|
- const std::string &endpoint);
|
||
|
+ const std::string &endpoint,
|
||
|
+ bool has_handshake_stage);
|
||
|
~stream_engine_t ();
|
||
|
|
||
|
// i_engine interface implementation.
|
||
|
+ bool has_handshake_stage () { return _has_handshake_stage; };
|
||
|
void plug (zmq::io_thread_t *io_thread_,
|
||
|
zmq::session_base_t *session_);
|
||
|
void terminate ();
|
||
|
@@ -176,6 +178,10 @@ namespace zmq
|
||
|
// The session this engine is attached to.
|
||
|
zmq::session_base_t *session;
|
||
|
|
||
|
+ // Indicate if engine has an handshake stage, if it does, engine must call session.engine_ready
|
||
|
+ // when handshake is completed.
|
||
|
+ bool _has_handshake_stage;
|
||
|
+
|
||
|
options_t options;
|
||
|
|
||
|
// String representation of endpoint
|
||
|
Index: zeromq-4.2.3/src/tcp_connecter.cpp
|
||
|
===================================================================
|
||
|
--- zeromq-4.2.3.orig/src/tcp_connecter.cpp
|
||
|
+++ zeromq-4.2.3/src/tcp_connecter.cpp
|
||
|
@@ -156,7 +156,7 @@ void zmq::tcp_connecter_t::out_event ()
|
||
|
|
||
|
// Create the engine object for this connection.
|
||
|
stream_engine_t *engine = new (std::nothrow)
|
||
|
- stream_engine_t (fd, options, endpoint);
|
||
|
+ stream_engine_t (fd, options, endpoint, !options.raw_socket);
|
||
|
alloc_assert (engine);
|
||
|
|
||
|
// Attach the engine to the corresponding session object.
|
||
|
Index: zeromq-4.2.3/src/tcp_listener.cpp
|
||
|
===================================================================
|
||
|
--- zeromq-4.2.3.orig/src/tcp_listener.cpp
|
||
|
+++ zeromq-4.2.3/src/tcp_listener.cpp
|
||
|
@@ -108,7 +108,7 @@ void zmq::tcp_listener_t::in_event ()
|
||
|
|
||
|
// Create the engine object for this connection.
|
||
|
stream_engine_t *engine = new (std::nothrow)
|
||
|
- stream_engine_t (fd, options, endpoint);
|
||
|
+ stream_engine_t (fd, options, endpoint, !options.raw_socket);
|
||
|
alloc_assert (engine);
|
||
|
|
||
|
// Choose I/O thread to run connecter in. Given that we are already
|
||
|
Index: zeromq-4.2.3/src/tipc_connecter.cpp
|
||
|
===================================================================
|
||
|
--- zeromq-4.2.3.orig/src/tipc_connecter.cpp
|
||
|
+++ zeromq-4.2.3/src/tipc_connecter.cpp
|
||
|
@@ -123,7 +123,7 @@ void zmq::tipc_connecter_t::out_event ()
|
||
|
return;
|
||
|
}
|
||
|
// Create the engine object for this connection.
|
||
|
- stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint);
|
||
|
+ stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_socket);
|
||
|
alloc_assert (engine);
|
||
|
|
||
|
// Attach the engine to the corresponding session object.
|
||
|
Index: zeromq-4.2.3/src/tipc_listener.cpp
|
||
|
===================================================================
|
||
|
--- zeromq-4.2.3.orig/src/tipc_listener.cpp
|
||
|
+++ zeromq-4.2.3/src/tipc_listener.cpp
|
||
|
@@ -91,7 +91,7 @@ void zmq::tipc_listener_t::in_event ()
|
||
|
}
|
||
|
|
||
|
// Create the engine object for this connection.
|
||
|
- stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint);
|
||
|
+ stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_socket);
|
||
|
alloc_assert (engine);
|
||
|
|
||
|
// Choose I/O thread to run connecter in. Given that we are already
|
||
|
Index: zeromq-4.2.3/src/udp_engine.hpp
|
||
|
===================================================================
|
||
|
--- zeromq-4.2.3.orig/src/udp_engine.hpp
|
||
|
+++ zeromq-4.2.3/src/udp_engine.hpp
|
||
|
@@ -23,6 +23,8 @@ namespace zmq
|
||
|
|
||
|
int init (address_t *address_, bool send_, bool recv_);
|
||
|
|
||
|
+ bool has_handshake_stage () { return false; };
|
||
|
+
|
||
|
// i_engine interface implementation.
|
||
|
// Plug the engine to the session.
|
||
|
void plug (zmq::io_thread_t *io_thread_, class session_base_t *session_);
|