SHA256
1
0
forked from pool/thrift

Accepting request 200385 from home:dstoecker

Framework library not yet in official OBS repos. Taken from home:jblunck:messaging.

OBS-URL: https://build.opensuse.org/request/show/200385
OBS-URL: https://build.opensuse.org/package/show/devel:tools/thrift?expand=0&rev=1
This commit is contained in:
Marcus Meissner 2013-09-25 14:04:58 +00:00 committed by Git OBS Bridge
commit e2cbd2fdb3
8 changed files with 1355 additions and 0 deletions

23
.gitattributes vendored Normal file
View File

@ -0,0 +1,23 @@
## Default LFS
*.7z filter=lfs diff=lfs merge=lfs -text
*.bsp filter=lfs diff=lfs merge=lfs -text
*.bz2 filter=lfs diff=lfs merge=lfs -text
*.gem filter=lfs diff=lfs merge=lfs -text
*.gz filter=lfs diff=lfs merge=lfs -text
*.jar filter=lfs diff=lfs merge=lfs -text
*.lz filter=lfs diff=lfs merge=lfs -text
*.lzma filter=lfs diff=lfs merge=lfs -text
*.obscpio filter=lfs diff=lfs merge=lfs -text
*.oxt filter=lfs diff=lfs merge=lfs -text
*.pdf filter=lfs diff=lfs merge=lfs -text
*.png filter=lfs diff=lfs merge=lfs -text
*.rpm filter=lfs diff=lfs merge=lfs -text
*.tbz filter=lfs diff=lfs merge=lfs -text
*.tbz2 filter=lfs diff=lfs merge=lfs -text
*.tgz filter=lfs diff=lfs merge=lfs -text
*.ttf filter=lfs diff=lfs merge=lfs -text
*.txz filter=lfs diff=lfs merge=lfs -text
*.whl filter=lfs diff=lfs merge=lfs -text
*.xz filter=lfs diff=lfs merge=lfs -text
*.zip filter=lfs diff=lfs merge=lfs -text
*.zst filter=lfs diff=lfs merge=lfs -text

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
.osc

View File

@ -0,0 +1,48 @@
Index: thrift-0.9.0/lib/cpp/src/thrift/protocol/TDebugProtocol.cpp
===================================================================
--- thrift-0.9.0.orig/lib/cpp/src/thrift/protocol/TDebugProtocol.cpp 2012-12-02 07:53:16.470513572 -0600
+++ thrift-0.9.0/lib/cpp/src/thrift/protocol/TDebugProtocol.cpp 2012-12-02 07:54:47.362509517 -0600
@@ -23,6 +23,7 @@
#include <cctype>
#include <cstdio>
#include <stdexcept>
+#include <limits>
#include <boost/static_assert.hpp>
#include <boost/lexical_cast.hpp>
Index: thrift-0.9.0/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp
===================================================================
--- thrift-0.9.0.orig/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp 2012-12-02 07:52:51.706514677 -0600
+++ thrift-0.9.0/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp 2012-12-02 07:54:06.918511327 -0600
@@ -89,6 +89,7 @@
#define __STDC_LIMIT_MACROS
#include <stdint.h>
+#include <limits>
#include <thrift/protocol/TDenseProtocol.h>
#include <thrift/TReflectionLocal.h>
Index: thrift-0.9.0/lib/cpp/src/thrift/protocol/TJSONProtocol.cpp
===================================================================
--- thrift-0.9.0.orig/lib/cpp/src/thrift/protocol/TJSONProtocol.cpp 2012-12-02 07:53:04.218514119 -0600
+++ thrift-0.9.0/lib/cpp/src/thrift/protocol/TJSONProtocol.cpp 2012-12-02 07:54:24.122510554 -0600
@@ -20,6 +20,7 @@
#include "TJSONProtocol.h"
#include <math.h>
+#include <limits>
#include <boost/lexical_cast.hpp>
#include "TBase64Utils.h"
#include <thrift/transport/TTransportException.h>
Index: thrift-0.9.0/lib/cpp/src/thrift/transport/THttpClient.cpp
===================================================================
--- thrift-0.9.0.orig/lib/cpp/src/thrift/transport/THttpClient.cpp 2012-12-02 07:53:24.610513212 -0600
+++ thrift-0.9.0/lib/cpp/src/thrift/transport/THttpClient.cpp 2012-12-02 07:53:46.750512234 -0600
@@ -19,6 +19,7 @@
#include <cstdlib>
#include <sstream>
+#include <limits>
#include <boost/algorithm/string.hpp>
#include <thrift/transport/THttpClient.h>

View File

@ -0,0 +1,981 @@
Index: thrift-0.9.0/lib/cpp/Makefile.am
===================================================================
--- thrift-0.9.0.orig/lib/cpp/Makefile.am 2012-10-12 02:58:06.000000000 +0200
+++ thrift-0.9.0/lib/cpp/Makefile.am 2013-08-15 16:53:40.000000000 +0200
@@ -181,7 +181,8 @@
src/thrift/transport/TTransportUtils.h \
src/thrift/transport/TBufferTransports.h \
src/thrift/transport/TShortReadTransport.h \
- src/thrift/transport/TZlibTransport.h
+ src/thrift/transport/TZlibTransport.h \
+ src/thrift/transport/TLibEventTransport.h
include_serverdir = $(include_thriftdir)/server
include_server_HEADERS = \
Index: thrift-0.9.0/lib/cpp/Makefile.in
===================================================================
--- thrift-0.9.0.orig/lib/cpp/Makefile.in 2012-10-12 02:59:49.000000000 +0200
+++ thrift-0.9.0/lib/cpp/Makefile.in 2013-08-15 16:53:40.000000000 +0200
@@ -618,7 +618,8 @@
src/thrift/transport/TTransportUtils.h \
src/thrift/transport/TBufferTransports.h \
src/thrift/transport/TShortReadTransport.h \
- src/thrift/transport/TZlibTransport.h
+ src/thrift/transport/TZlibTransport.h \
+ src/thrift/transport/TLibEventTransport.h
include_serverdir = $(include_thriftdir)/server
include_server_HEADERS = \
Index: thrift-0.9.0/lib/cpp/src/thrift/server/TNonblockingServer.cpp
===================================================================
--- thrift-0.9.0.orig/lib/cpp/src/thrift/server/TNonblockingServer.cpp 2012-10-12 02:58:06.000000000 +0200
+++ thrift-0.9.0/lib/cpp/src/thrift/server/TNonblockingServer.cpp 2013-08-15 17:11:07.000000000 +0200
@@ -25,10 +25,13 @@
#include "TNonblockingServer.h"
#include <thrift/concurrency/Exception.h>
-#include <thrift/transport/TSocket.h>
+#include <thrift/transport/TLibEventTransport.h>
#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <boost/bind.hpp>
+
#include <iostream>
+#include <queue>
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
@@ -76,13 +79,6 @@
using apache::thrift::transport::TTransportException;
using boost::shared_ptr;
-/// Three states for sockets: recv frame size, recv data, and send mode
-enum TSocketState {
- SOCKET_RECV_FRAMING,
- SOCKET_RECV,
- SOCKET_SEND
-};
-
/**
* Five states for the nonblocking server:
* 1) initialize
@@ -93,7 +89,6 @@
*/
enum TAppState {
APP_INIT,
- APP_READ_FRAME_SIZE,
APP_READ_REQUEST,
APP_WAIT_TASK,
APP_SEND_RESULT,
@@ -116,7 +111,7 @@
boost::shared_ptr<TProcessor> processor_;
/// Object wrapping network socket
- boost::shared_ptr<TSocket> tSocket_;
+ boost::shared_ptr<TLibEventTransport> tSocket_;
/// Libevent object
struct event event_;
@@ -124,8 +119,8 @@
/// Libevent flags
short eventFlags_;
- /// Socket mode
- TSocketState socketState_;
+ /// Tells whether the frame size was read completely.
+ bool readFrameSize_;
/// Application state
TAppState appState_;
@@ -142,15 +137,12 @@
/// Read buffer size
uint32_t readBufferSize_;
- /// Write buffer
- uint8_t* writeBuffer_;
-
- /// Write buffer size
- uint32_t writeBufferSize_;
-
/// How far through writing are we?
uint32_t writeBufferPos_;
+ /// Write position for external events
+ uint externalWriteBufferPos_;
+
/// Largest size of write buffer seen since buffer was constructed
size_t largestWriteBufferSize_;
@@ -169,6 +161,9 @@
/// Transport that processor writes to
boost::shared_ptr<TMemoryBuffer> outputTransport_;
+ /// Transport that processes writes from an external libevent source.
+ boost::shared_ptr<TMemoryBuffer> externalTransport_;
+
/// extra transport generated by transport factory (e.g. BufferedRouterTransport)
boost::shared_ptr<TTransport> factoryInputTransport_;
boost::shared_ptr<TTransport> factoryOutputTransport_;
@@ -185,6 +180,16 @@
/// Thrift call context, if any
void *connectionContext_;
+ /// Node for queuing write requests from internal and external sources.
+ typedef std::pair<bool, uint32_t> WriteRequest;
+
+ /// Queue with write requests from outputTransport_ and externalTransport_.
+ std::queue<WriteRequest> writeRequests_;
+
+ /// Queues a write request for internal or external buffer and notifies
+ /// libevent.
+ void scheduleWrite(bool externalBuffer, uint32_t len);
+
/// Go into read mode
void setRead() {
setFlags(EV_READ | EV_PERSIST);
@@ -195,6 +200,11 @@
setFlags(EV_WRITE | EV_PERSIST);
}
+ /// Resets write mode after all write requests were send.
+ void clearWrite() {
+ clearFlags(EV_WRITE);
+ }
+
/// Set socket idle
void setIdle() {
setFlags(0);
@@ -208,12 +218,37 @@
void setFlags(short eventFlags);
/**
+ * Clear event flags for this connection.
+ *
+ * @param eventFlags flags we pass to libevent for the connection.
+ */
+ void clearFlags(short eventFlags);
+
+ /**
* Libevent handler called (via our static wrapper) when the connection
* socket had something happen. Rather than use the flags libevent passed,
* we use the connection state to determine whether we need to read or
* write the socket.
*/
- void workSocket();
+ void workSocket(short ev_type);
+
+ /**
+ * Reads frame size and message from socket.
+ */
+ bool readSocket();
+
+ /**
+ * Writes as many queued requests as possible
+ */
+ bool writeSocket();
+
+ /**
+ * Sets the frame size for a message that was written into a TMemoryBuffer.
+ *
+ * @param buf buffer including a new message at the end
+ * @param msgSize Size of the message including frame header
+ */
+ void setFrameSize(TMemoryBuffer& buf, uint32_t msgSize);
/// Close this connection and free or reset its resources.
void close();
@@ -236,7 +271,16 @@
inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
outputTransport_.reset(new TMemoryBuffer(
server_->getWriteBufferDefaultSize()));
- tSocket_.reset(new TSocket());
+ externalTransport_.reset(new TMemoryBuffer(
+ server_->getWriteBufferDefaultSize()));
+
+ boost::shared_ptr<TProtocol> externalProtocol =
+ server_->getOutputProtocolFactory()->getProtocol(externalTransport_);
+
+ tSocket_.reset(new TLibEventTransport(
+ externalProtocol,
+ boost::bind(&TConnection::beginExternalWriteEvent, this),
+ boost::bind(&TConnection::endExternalWriteEvent, this)));
init(socket, ioThread, addr, addrLen);
}
@@ -244,6 +288,18 @@
std::free(readBuffer_);
}
+ /**
+ * Callback for TLibEventTransport to write 4 bytes for the frame header
+ * into externalTransport_.
+ */
+ void beginExternalWriteEvent();
+
+ /**
+ * Callback for TLibEventTransport to calculate the frame size and notifiy
+ * libevent for writting.
+ */
+ void endExternalWriteEvent();
+
/**
* Check buffers against any size limits and shrink it if exceeded.
*
@@ -267,13 +323,13 @@
* C-callable event handler for connection events. Provides a callback
* that libevent can understand which invokes connection_->workSocket().
*
- * @param fd the descriptor the event occurred on.
- * @param which the flags associated with the event.
- * @param v void* callback arg where we placed TConnection's "this".
+ * @param fd the descriptor the event occurred on.
+ * @param ev_type the flags associated with the event.
+ * @param v void* callback arg where we placed TConnection's "this".
*/
- static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
+ static void eventHandler(evutil_socket_t fd, short ev_type, void* v) {
assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
- ((TConnection*)v)->workSocket();
+ ((TConnection*)v)->workSocket(ev_type);
}
/**
@@ -394,6 +450,7 @@
socklen_t addrLen) {
tSocket_->setSocketFD(socket);
tSocket_->setCachedAddress(addr, addrLen);
+ tSocket_->setEventBase(*(ioThread->getEventBase()));
ioThread_ = ioThread;
server_ = ioThread->getServer();
@@ -403,12 +460,9 @@
readBufferPos_ = 0;
readWant_ = 0;
- writeBuffer_ = NULL;
- writeBufferSize_ = 0;
- writeBufferPos_ = 0;
largestWriteBufferSize_ = 0;
- socketState_ = SOCKET_RECV_FRAMING;
+ readFrameSize_ = true;
callsForResize_ = 0;
// get input/transports
@@ -436,12 +490,29 @@
processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
}
-void TNonblockingServer::TConnection::workSocket() {
- int got=0, left=0, sent=0;
+void TNonblockingServer::TConnection::workSocket(short ev_type) {
+
+ if (ev_type & EV_READ) {
+ if (!readSocket()) {
+ return;
+ }
+ }
+
+ if (ev_type & EV_WRITE) {
+ if (!writeSocket()) {
+ return;
+ }
+ }
+
+ if (ev_type & ~(EV_READ | EV_WRITE))
+ GlobalOutput.printf("TConnection::workSocket: unexpected event type %d",
+ ev_type & ~(EV_READ | EV_WRITE));
+}
+
+bool TNonblockingServer::TConnection::readSocket() {
uint32_t fetch = 0;
- switch (socketState_) {
- case SOCKET_RECV_FRAMING:
+ if (readFrameSize_) {
union {
uint8_t buf[sizeof(uint32_t)];
uint32_t size;
@@ -457,20 +528,22 @@
if (fetch == 0) {
// Whenever we get here it means a remote disconnect
close();
- return;
+
+ return false;
}
readBufferPos_ += fetch;
- } catch (TTransportException& te) {
- GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+ } catch(TTransportException& te) {
+ GlobalOutput.printf("TConnection::readSocket(): %s", te.what());
close();
- return;
+ return false;
}
if (readBufferPos_ < sizeof(framing.size)) {
// more needed before frame size is known -- save what we have so far
readWant_ = framing.size;
- return;
+
+ return true;
}
readWant_ = ntohl(framing.size);
@@ -483,84 +556,154 @@
readWant_, server_->getMaxFrameSize(),
tSocket_->getSocketInfo().c_str());
close();
- return;
- }
- // size known; now get the rest of the frame
- transition();
- return;
-
- case SOCKET_RECV:
- // It is an error to be in this state if we already have all the data
- assert(readBufferPos_ < readWant_);
- try {
- // Read from the socket
- fetch = readWant_ - readBufferPos_;
- got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
+ return false;
}
- catch (TTransportException& te) {
- GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
- close();
- return;
- }
+ readFrameSize_ = false;
- if (got > 0) {
- // Move along in the buffer
- readBufferPos_ += got;
-
- // Check that we did not overdo it
- assert(readBufferPos_ <= readWant_);
-
- // We are done reading, move onto the next state
- if (readBufferPos_ == readWant_) {
- transition();
+ // We just read the request length
+ // Double the buffer size until it is big enough
+ if (readWant_ > readBufferSize_) {
+ if (readBufferSize_ == 0) {
+ readBufferSize_ = 1;
}
- return;
+ uint32_t newSize = readBufferSize_;
+ while (readWant_ > newSize) {
+ newSize *= 2;
+ }
+
+ uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
+ if (newBuffer == NULL) {
+ // nothing else to be done...
+ throw std::bad_alloc();
+ }
+ readBuffer_ = newBuffer;
+ readBufferSize_ = newSize;
}
- // Whenever we get down here it means a remote disconnect
+ readBufferPos_= 0;
+ }
+
+ // size known; now get the rest of the frame
+
+ // It is an error to be in this state if we already have all the data
+ assert(readBufferPos_ < readWant_);
+
+ uint32_t got=0;
+
+ try {
+ // Read from the socket
+ fetch = readWant_ - readBufferPos_;
+ got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
+ }
+ catch (TTransportException& te) {
+ GlobalOutput.printf("TConnection::readSocket(): %s", te.what());
close();
- return;
+ return false;
+ }
- case SOCKET_SEND:
- // Should never have position past size
- assert(writeBufferPos_ <= writeBufferSize_);
+ if (got > 0) {
+ // Move along in the buffer
+ readBufferPos_ += got;
+
+ // Check that we did not overdo it
+ assert(readBufferPos_ <= readWant_);
+
+ // We are done reading, move onto the next state
+ if (readBufferPos_ == readWant_) {
+ transition();
+ }
+
+ return true;
+ }
+
+ // Whenever we get down here it means a remote disconnect
+ close();
+
+ return false;
+}
+
+bool TNonblockingServer::TConnection::writeSocket() {
+ while(writeRequests_.empty() == false) {
+ WriteRequest& writeReq = writeRequests_.front();
+
+ TMemoryBuffer* transport = writeReq.first ? externalTransport_.get() :
+ outputTransport_.get();
+
+ uint8_t* writeBuffer;
+ uint32_t writeBufferSize;
+
+ transport->getBuffer(&writeBuffer, &writeBufferSize);
+ transport->borrow(0, &writeBufferSize);
+
+ uint32_t writeSize = writeReq.second;
+ if(writeBufferSize < writeReq.second) {
+ GlobalOutput.printf("WARNING: Write request size %u > %u buffer size.\n",
+ writeReq.second, writeBufferSize);
+ writeSize = writeBufferSize;
+ }
// If there is no data to send, then let us move on
- if (writeBufferPos_ == writeBufferSize_) {
+ if (writeBufferSize == 0) {
GlobalOutput("WARNING: Send state with no data to send\n");
- transition();
- return;
+ writeRequests_.pop();
+ continue;
+ }
+
+ if (writeBufferSize > largestWriteBufferSize_) {
+ largestWriteBufferSize_ = writeBufferSize;
}
+ uint32_t sent = 0;
+
try {
- left = writeBufferSize_ - writeBufferPos_;
- sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
+ sent = tSocket_->write_partial(writeBuffer, writeSize);
}
- catch (TTransportException& te) {
- GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
+ catch(TTransportException& te) {
+ GlobalOutput.printf("TConnection::writeSocket(): %s ", te.what());
close();
- return;
+
+ return false;
}
- writeBufferPos_ += sent;
+ // We are done!
+ if (sent == writeBufferSize) {
+ transport->consume(sent);
+ writeRequests_.pop();
+ }
+ else if (sent > 0) {
+ // Message was not written completely.
+ transport->consume(sent);
- // Did we overdo it?
- assert(writeBufferPos_ <= writeBufferSize_);
+ // Adjust size in write request.
+ writeReq.second -= sent;
- // We are done!
- if (writeBufferPos_ == writeBufferSize_) {
- transition();
+ return true;
}
+ }
- return;
+ // All queued requests were written, clear write flag.
+ clearWrite();
- default:
- GlobalOutput.printf("Unexpected Socket State %d", socketState_);
- assert(0);
+ // Verify that there's no incomplete message in the output buffers.
+ if(outputTransport_->peek() || externalTransport_->peek())
+ return true;
+
+ // it's now safe to perform buffer size housekeeping.
+ if (server_->getResizeBufferEveryN() > 0
+ && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
+ checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
+ server_->getIdleWriteBufferLimit());
+ callsForResize_ = 0;
}
+
+ // Reset write buffer position to the beginning of the buffer.
+ outputTransport_->resetBuffer();
+ externalTransport_->resetBuffer();
+
+ return true;
}
/**
@@ -580,9 +723,9 @@
// We are done reading the request, package the read buffer into transport
// and get back some data from the dispatch function
inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
- outputTransport_->resetBuffer();
// Prepend four bytes of blank space to the buffer so we can
// write the frame size there later.
+ writeBufferPos_ = outputTransport_->writeEnd();
outputTransport_->getWritePtr(4);
outputTransport_->wroteBytes(4);
@@ -615,10 +758,10 @@
return;
} else {
try {
- if (serverEventHandler_ != NULL) {
- serverEventHandler_->processContext(connectionContext_,
- getTSocket());
- }
+ if (serverEventHandler_ != NULL) {
+ serverEventHandler_->processContext(connectionContext_,
+ getTSocket());
+ }
// Invoke the processor
processor_->process(inputProtocol_, outputProtocol_,
connectionContext_);
@@ -646,107 +789,36 @@
// the writeBuffer_
case APP_WAIT_TASK:
+ {
// We have now finished processing a task and the result has been written
// into the outputTransport_, so we grab its contents and place them into
// the writeBuffer_ for actual writing by the libevent thread
server_->decrementActiveProcessors();
- // Get the result of the operation
- outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
- // If the function call generated return data, then move into the send
- // state and get going
- // 4 bytes were reserved for frame size
- if (writeBufferSize_ > 4) {
-
- // Move into write state
- writeBufferPos_ = 0;
- socketState_ = SOCKET_SEND;
-
- // Put the frame size into the write buffer
- int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
- memcpy(writeBuffer_, &frameSize, 4);
-
- // Socket into write mode
- appState_ = APP_SEND_RESULT;
- setWrite();
-
- // Try to work the socket immediately
- // workSocket();
-
- return;
- }
-
- // In this case, the request was oneway and we should fall through
- // right back into the read frame header state
- goto LABEL_APP_INIT;
-
- case APP_SEND_RESULT:
- // it's now safe to perform buffer size housekeeping.
- if (writeBufferSize_ > largestWriteBufferSize_) {
- largestWriteBufferSize_ = writeBufferSize_;
- }
- if (server_->getResizeBufferEveryN() > 0
- && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
- checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
- server_->getIdleWriteBufferLimit());
- callsForResize_ = 0;
+ // If the processor did not write a response we have to remove again
+ // the 4 bytes from the frame size.
+ uint32_t writePos = outputTransport_->writeEnd();
+ if (writePos == writeBufferPos_ + 4)
+ outputTransport_->revertLastWrite(4);
+ else {
+ uint32_t size = writePos - writeBufferPos_;
+ setFrameSize(*outputTransport_, size);
+ scheduleWrite(false, size);
}
// N.B.: We also intentionally fall through here into the INIT state!
-
- LABEL_APP_INIT:
+ }
case APP_INIT:
- // Clear write buffer variables
- writeBuffer_ = NULL;
- writeBufferPos_ = 0;
- writeBufferSize_ = 0;
-
// Into read4 state we go
- socketState_ = SOCKET_RECV_FRAMING;
- appState_ = APP_READ_FRAME_SIZE;
+ readFrameSize_ = true;
+ appState_ = APP_READ_REQUEST;
readBufferPos_ = 0;
// Register read event
setRead();
-
- // Try to work the socket right away
- // workSocket();
-
- return;
-
- case APP_READ_FRAME_SIZE:
- // We just read the request length
- // Double the buffer size until it is big enough
- if (readWant_ > readBufferSize_) {
- if (readBufferSize_ == 0) {
- readBufferSize_ = 1;
- }
- uint32_t newSize = readBufferSize_;
- while (readWant_ > newSize) {
- newSize *= 2;
- }
-
- uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
- if (newBuffer == NULL) {
- // nothing else to be done...
- throw std::bad_alloc();
- }
- readBuffer_ = newBuffer;
- readBufferSize_ = newSize;
- }
-
- readBufferPos_= 0;
-
- // Move into read request state
- socketState_ = SOCKET_RECV;
- appState_ = APP_READ_REQUEST;
-
- // Work the socket right away
- // workSocket();
-
return;
case APP_CLOSE_CONNECTION:
@@ -760,9 +832,59 @@
}
}
+void TNonblockingServer::TConnection::beginExternalWriteEvent() {
+ // Prepend four bytes of blank space to the buffer so we can
+ // write the frame size there later.
+ externalWriteBufferPos_ = externalTransport_->writeEnd();
+ externalTransport_->getWritePtr(4);
+ externalTransport_->wroteBytes(4);
+}
+
+void TNonblockingServer::TConnection::endExternalWriteEvent() {
+ // If the external function did not write anything we have to remove again
+ // the 4 bytes from the frame size.
+ uint32_t writePos = externalTransport_->writeEnd();
+
+ if (writePos == externalWriteBufferPos_ + 4)
+ externalTransport_->revertLastWrite(4);
+ else {
+ uint32_t size = writePos - externalWriteBufferPos_;
+ setFrameSize(*externalTransport_, size);
+
+ scheduleWrite(true, size);
+ }
+}
+
+void TNonblockingServer::TConnection::setFrameSize(
+ TMemoryBuffer& buf, uint32_t size) {
+ uint8_t* begin = buf.getWritePtr(0) - size;
+
+ // Put the frame size into the write buffer (subtract 4 bytes from
+ // for frame size header).
+ int32_t frameSize = (int32_t)htonl(size - 4);
+ memcpy(begin, &frameSize, 4);
+}
+
+void TNonblockingServer::TConnection::scheduleWrite(
+ bool externalBuffer, uint32_t len) {
+ if( writeRequests_.empty() ) {
+ writeRequests_.push(WriteRequest(externalBuffer, len));
+ setWrite();
+ return;
+ }
+
+ WriteRequest& req = writeRequests_.back();
+
+ // We can extend the last write request if the buffer is the same.
+ if(externalBuffer == req.first)
+ req.second += len;
+ else
+ writeRequests_.push(WriteRequest(externalBuffer, len));
+}
+
void TNonblockingServer::TConnection::setFlags(short eventFlags) {
// Catch the do nothing case
- if (eventFlags_ == eventFlags) {
+ if ((eventFlags_ | eventFlags) == eventFlags_) {
return;
}
@@ -775,7 +897,7 @@
}
// Update in memory structure
- eventFlags_ = eventFlags;
+ eventFlags_ |= eventFlags;
// Do not call event_set if there are no flags
if (!eventFlags_) {
@@ -819,10 +941,43 @@
}
}
+void TNonblockingServer::TConnection::clearFlags(short eventFlags) {
+ if ((~eventFlags & eventFlags_) == eventFlags_)
+ return;
+
+ short newFlags = ~eventFlags & eventFlags_;
+
+ eventFlags_ = 0;
+ if (event_del(&event_) == -1) {
+ GlobalOutput("TConnection::setFlags event_del");
+ return;
+ }
+
+ setFlags(newFlags);
+}
+
/**
* Closes a connection
*/
void TNonblockingServer::TConnection::close() {
+ // Close the socket
+ tSocket_->close();
+
+ // close any factory produced transports
+ factoryInputTransport_->close();
+ factoryOutputTransport_->close();
+
+ // Give this object back to the server that owns it
+ processor_.reset();
+
+ // Remove all queued write requests
+ while(writeRequests_.empty() == false)
+ writeRequests_.pop();
+
+ // Reset write buffer position to the beginning of the buffer.
+ outputTransport_->resetBuffer();
+ externalTransport_->resetBuffer();
+
// Delete the registered libevent
if (event_del(&event_) == -1) {
GlobalOutput.perror("TConnection::close() event_del", errno);
@@ -831,16 +986,9 @@
if (serverEventHandler_ != NULL) {
serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
}
- ioThread_ = NULL;
-
- // Close the socket
- tSocket_->close();
- // close any factory produced transports
- factoryInputTransport_->close();
- factoryOutputTransport_->close();
+ ioThread_ = NULL;
- // Give this object back to the server that owns it
server_->returnConnection(this);
}
@@ -856,6 +1004,7 @@
if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
// just start over
outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
+ externalTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
largestWriteBufferSize_ = 0;
}
}
@@ -1122,7 +1271,7 @@
void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
threadManager_ = threadManager;
if (threadManager != NULL) {
- threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
+ threadManager->setExpireCallback(boost::bind(&TNonblockingServer::expireClose, this, _1));
threadPoolProcessing_ = true;
} else {
threadPoolProcessing_ = false;
Index: thrift-0.9.0/lib/cpp/src/thrift/transport/TBufferTransports.cpp
===================================================================
--- thrift-0.9.0.orig/lib/cpp/src/thrift/transport/TBufferTransports.cpp 2012-10-12 02:58:06.000000000 +0200
+++ thrift-0.9.0/lib/cpp/src/thrift/transport/TBufferTransports.cpp 2013-08-15 16:53:40.000000000 +0200
@@ -374,6 +374,14 @@
wBase_ += len;
}
+void TMemoryBuffer::revertLastWrite(uint32_t len)
+{
+ if (wBase_ - len < rBase_)
+ throw TTransportException("Can't reset write pointer before read pointer");
+
+ wBase_ -= len;
+}
+
const uint8_t* TMemoryBuffer::borrowSlow(uint8_t* buf, uint32_t* len) {
(void) buf;
rBound_ = wBase_;
Index: thrift-0.9.0/lib/cpp/src/thrift/transport/TBufferTransports.h
===================================================================
--- thrift-0.9.0.orig/lib/cpp/src/thrift/transport/TBufferTransports.h 2012-10-12 02:58:06.000000000 +0200
+++ thrift-0.9.0/lib/cpp/src/thrift/transport/TBufferTransports.h 2013-08-15 16:53:40.000000000 +0200
@@ -683,6 +683,9 @@
// that had been provided by getWritePtr().
void wroteBytes(uint32_t len);
+ // Resets the write pointer by len bytes if the were not yet read.
+ void revertLastWrite(uint32_t len);
+
/*
* TVirtualTransport provides a default implementation of readAll().
* We want to use the TBufferBase version instead.
Index: thrift-0.9.0/lib/cpp/src/thrift/transport/TLibEventTransport.h
===================================================================
--- /dev/null 1970-01-01 00:00:00.000000000 +0000
+++ thrift-0.9.0/lib/cpp/src/thrift/transport/TLibEventTransport.h 2013-08-15 16:53:40.000000000 +0200
@@ -0,0 +1,114 @@
+#ifndef _THRIFT_TRANSPORT_TLIB_EVENTTRANSPORT_H_
+#define _THRIFT_TRANSPORT_TLIB_EVENTTRANSPORT_H_ 1
+
+#include "TSocket.h"
+#include "TBufferTransports.h"
+
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+
+struct event_base;
+
+namespace apache { namespace thrift {
+namespace protocol {
+ class TProtocol;
+}
+namespace transport {
+
+/**
+ * TLibEventTransport is used by TNonblockingServer to integrate events from
+ * an external libevent source.
+ *
+ * To support asynchronous communication between a thrift client and server the
+ * following pattern is required:
+ * - all request in a Thrift service must be defined as oneway
+ * - an additional "callback" service is required to send asynchronous
+ * responses and updates from server to client. The "callback" service is
+ * implemented by the client. Here as well all request have to be oneway.
+ * Since all request are oneway they can share the same thrift connection.
+ * - In the server the callback client object must be initialized in the
+ * handler factory with the protocol from TConnectionInfo. For
+ * TLibEventTransport one has to use TLibEventTransport::getProtocol().
+ * - In the client one has to create an own thread to handle the callbacks.
+ * One only has to call process of the generated "callback" processor with
+ * the protocols created for the server connection.
+ *
+ * For an external event beginWriteEvent() must be called before using the
+ * the callback object to write 4 bytes for the frame size into the output
+ * transport. After using the callback object endWriteEvent() must be called
+ * to calculate and set the frame size and notify libevent for writting.
+ */
+class TLibEventTransport : public TSocket
+{
+public:
+ typedef boost::function<void ()> WriteEventCb;
+
+ /*!
+ * TLibEventTransport is created by TNonblockingServer when a new client
+ * connection is accepted.
+ *
+ * @param protocol protocol for the callback object
+ * @param beginEventCb callback to start sending an asynchronous event
+ * @param endEventCb callback to finish sending an asynchronous event
+ */
+ TLibEventTransport(boost::shared_ptr<protocol::TProtocol>& protocol,
+ WriteEventCb beginEventCb,
+ WriteEventCb endEventCb) :
+ eventBase_(0),
+ protocol_(protocol),
+ beginWriteEventCb_(beginEventCb),
+ endWriteEventCb_(endEventCb) {}
+
+ /**
+ * @returns event_base used by this transport.
+ */
+ event_base& getEventBase() const {
+ assert(eventBase_);
+ return *eventBase_;
+ }
+
+ /*!
+ * Setting correct event_base from TNonblockingServer.
+ *
+ * @param ev event_base used by TNonblockingServer
+ */
+ void setEventBase(event_base& ev) {
+ eventBase_ = &ev;
+ }
+
+ /**
+ * @returns the protocol for the callback object.
+ */
+ boost::shared_ptr<protocol::TProtocol>& getProtocol() {
+ return protocol_;
+ }
+
+ /**
+ * Writes 4 bytes for the frame header into the output transport.
+ *
+ * Must be called before using a request from the callback object.
+ */
+ void beginWriteEvent() {
+ beginWriteEventCb_();
+ }
+
+ /**
+ * Calculates the frame size and notifies libevent for writting.
+ *
+ * Must be called after using a request from the callback object.
+ */
+ void endWriteEvent() {
+ endWriteEventCb_();
+ }
+
+private:
+ event_base* eventBase_;
+ boost::shared_ptr<protocol::TProtocol> protocol_;
+ WriteEventCb beginWriteEventCb_;
+ WriteEventCb endWriteEventCb_;
+};
+
+}}} // apache::thrift::transport
+
+
+#endif // _THRIFT_TRANSPORT_TLIB_EVENTTRANSPORT_H_

22
0003-TDenseProtocol.patch Normal file
View File

@ -0,0 +1,22 @@
Index: thrift-0.9.0/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp
===================================================================
--- thrift-0.9.0.orig/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp
+++ thrift-0.9.0/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp
@@ -304,7 +304,7 @@ uint32_t TDenseProtocol::writeStructBegi
// We need a new field index for this structure.
idx_stack_.push_back(0);
- return 0;
+ return xfer;
}
uint32_t TDenseProtocol::writeStructEnd() {
@@ -538,7 +538,7 @@ uint32_t TDenseProtocol::readStructBegin
// We need a new field index for this structure.
idx_stack_.push_back(0);
- return 0;
+ return xfer;
}
uint32_t TDenseProtocol::readStructEnd() {

3
thrift-0.9.0.tar.bz2 Normal file
View File

@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:9ef99a899b56619a51b218481e8536bbdef38116fe84f1213ae1c33e44ea602e
size 2160871

106
thrift.changes Normal file
View File

@ -0,0 +1,106 @@
-------------------------------------------------------------------
Wed Sep 25 08:54:20 UTC 2013 - d.desai@rtsgroup.net
- number of bytes written to transport and number of bytes
returned from write() method was not same. Fixed this
problem in TDenseProtocol.
-------------------------------------------------------------------
Thu Aug 15 15:15:51 UTC 2013 - o.herrmann217@googlemail.com
- Extended 0002-TNonblockingServer-TLibEventTransport.patch to
properly close TConnection when a client disconnects while
still receiving updates from libevent.
-------------------------------------------------------------------
Wed Aug 7 12:07:33 UTC 2013 - d.desai@rtsgroup.net
- Fixed multiple crashes in TNonblockingServer to work with lib_event.
-------------------------------------------------------------------
Fri Jun 21 11:12:57 UTC 2013 - o.herrmann217@googlemail.com
- Changed 0002-TNonblockingServer-TLibEventTransport.patch to
install new header file with automake.
-------------------------------------------------------------------
Thu Jun 20 21:12:12 UTC 2013 - o.herrmann217@googlemail.com
- Replaced std::tr1::bind with boost::bind in TNonblockingServer
to fix compilation error on RHEL 5.
-------------------------------------------------------------------
Thu Jun 20 11:51:54 UTC 2013 - o.herrmann217@googlemail.com
- Removed BuildRequires for boost-static
-------------------------------------------------------------------
Thu Jun 20 11:13:43 UTC 2013 - o.herrmann217@googlemail.com
- Created new patch for integrating external libevent clients in
TNonblockingServer.
- Removed 0003-TNonblokingServer-release-handler-on-close.patch.
It is now part of 0002-TNonblockingServer-TLibEventTransport.patch
-------------------------------------------------------------------
Tue May 14 07:40:45 UTC 2013 - d.desai@rtsgroup.net
- Reverted changes for dependency of openssl for sles_11.
-------------------------------------------------------------------
Tue May 14 07:02:05 UTC 2013 - d.desai@rtsgroup.net
- Added dependency of openssl for sles_11.
-------------------------------------------------------------------
Mon May 13 15:30:08 UTC 2013 - d.desai@rtsgroup.net
- Created patch 0003 to release handler on close in
TNonblockingServer
-------------------------------------------------------------------
Thu May 9 12:23:02 UTC 2013 - o.herrmann217@googlemail.com
- Fixed reset of smart pointer in patch 0002
-------------------------------------------------------------------
Thu May 9 11:16:57 UTC 2013 - o.herrmann217@googlemail.com
- Created patch 0002 to access event_base from libevent in
TNonblockingServer
-------------------------------------------------------------------
Wed Jan 23 18:26:38 UTC 2013 - d.desai@rtsgroup.net
- Removed boost version
-------------------------------------------------------------------
Wed Jan 23 14:51:46 UTC 2013 - o.herrmann217@googlemail.com
- Build with --hash-style=sysv
-------------------------------------------------------------------
Sun Dec 2 13:58:46 UTC 2012 - o.herrmann217@googlemail.com
- Striped one directory in patch 0001
-------------------------------------------------------------------
Sun Dec 2 13:41:57 UTC 2012 - o.herrmann217@googlemail.com
- Patch for missing limit headers
-------------------------------------------------------------------
Sun Dec 2 12:40:06 UTC 2012 - o.herrmann217@googlemail.com
- Call make with -j1 to fix compile problem temporally
-------------------------------------------------------------------
Tue Nov 27 21:58:37 UTC 2012 - jblunck@opensuse.org
- Update to 0.9.0
-------------------------------------------------------------------
Fri Jul 30 17:26:10 UTC 2010 - dmacvicar@novell.com
- initial package for 0.2.0

171
thrift.spec Normal file
View File

@ -0,0 +1,171 @@
%bcond_with perl
%bcond_with python
Name: thrift
Version: 0.9.0
Release: 0
Group: Development/Libraries/C and C++
License: Apache License
Source0: %{name}-%{version}.tar.bz2
Patch1: 0001-Add-missing-limits-header.patch
Patch2: 0002-TNonblockingServer-TLibEventTransport.patch
Patch3: 0003-TDenseProtocol.patch
Summary: Framework for scalable cross-language services development in C++, Java, Python, PHP, and Ruby
BuildRoot: %{_tmppath}/%{name}-%{version}-build
BuildRequires: gcc-c++
BuildRequires: boost-devel
# we build the gem separately
#BuildRequires: ruby-devel
#BuildRequires: java-devel
#%if %{with python}
BuildRequires: python-devel
#%endif
%if %{with perl}
BuildRequires: perl
%endif
BuildRequires: bison
BuildRequires: flex
BuildRequires: pkg-config
BuildRequires: openssl-devel
BuildRequires: libevent-devel
%description
Thrift is a software framework for scalable cross-language services
development. It combines a powerful software stack with a code generation
engine to build services that work efficiently and seamlessly between C++,
Java, C#, Python, Ruby, Perl, PHP, Objective C/Cocoa, Smalltalk, Erlang,
Objective Caml, and Haskell.
%package -n libthrift0_9_0
Summary: Thrift shared library
Group: System/Libraries
%description -n libthrift0_9_0
Thrift shared library
Thrift is a software framework for scalable cross-language services
development. It combines a powerful software stack with a code generation
engine to build services that work efficiently and seamlessly between C++,
Java, C#, Python, Ruby, Perl, PHP, Objective C/Cocoa, Smalltalk, Erlang,
Objective Caml, and Haskell.
%post -n libthrift0_9_0 -p /sbin/ldconfig
%postun -n libthrift0_9_0 -p /sbin/ldconfig
%package -n libthrift-devel
Summary: Thrift C++ library development files
Group: Development/Libraries
%description -n libthrift-devel
Thrift C++ library development files
Thrift is a software framework for scalable cross-language services
development. It combines a powerful software stack with a code generation
engine to build services that work efficiently and seamlessly between C++,
Java, C#, Python, Ruby, Perl, PHP, Objective C/Cocoa, Smalltalk, Erlang,
Objective Caml, and Haskell.
%if %{with perl}
%package -n perl-thrift
Summary: Thrift perl library
Group: Development/Libraries/Perl
%description -n perl-thrift
Thrift perl library
Thrift is a software framework for scalable cross-language services
development. It combines a powerful software stack with a code generation
engine to build services that work efficiently and seamlessly between C++,
Java, C#, Python, Ruby, Perl, PHP, Objective C/Cocoa, Smalltalk, Erlang,
Objective Caml, and Haskell.
%endif
%if %{with python}
%package -n python-thrift
Summary: Thrift python library
Group: Development/Libraries/Python
%description -n python-thrift
Thrift python library
Thrift is a software framework for scalable cross-language services
development. It combines a powerful software stack with a code generation
engine to build services that work efficiently and seamlessly between C++,
Java, C#, Python, Ruby, Perl, PHP, Objective C/Cocoa, Smalltalk, Erlang,
Objective Caml, and Haskell.
%endif
%prep
%setup
%patch1 -p1
%patch2 -p1
%patch3 -p1
%build
export CXXFLAGS="%{optflags} -fPIC"
export LDFLAGS="-Wl,--hash-style=sysv"
%{configure} --without-ruby
make -j1
#%{?jobs:-j%jobs}
%install
pushd compiler/cpp
%makeinstall
popd
pushd lib/cpp
%makeinstall
popd
%if %{with python}
pushd lib/py
%makeinstall
popd
%endif
%if %{with perl}
pushd lib/perl
perl Makefile.PL
%perl_make_install
%perl_process_packlist
rm -rf %{buildroot}%{perl_vendorarch}/auto
popd
%endif
%clean
%files
%defattr(-,root,root)
%doc CHANGES CONTRIBUTORS DISCLAIMER LICENSE NOTICE NEWS
%{_bindir}/thrift
%files -n libthrift0_9_0
%defattr(-,root,root)
%{_libdir}/*-0.9.0.so
%files -n libthrift-devel
%defattr(-,root,root)
#%dir %{_includedir}/thrift
%{_includedir}/thrift
%{_libdir}/*.a
%{_libdir}/*.la
%{_libdir}/*.so
%exclude %{_libdir}/*-0.9.0.so
%{_libdir}/pkgconfig/*.pc
%if %{with perl}
%files -n perl-thrift
%defattr(-,root,root)
%{perl_vendorlib}/Thrift
%{perl_vendorlib}/Thrift.pm
/var/adm/perl-modules/thrift
%endif
%if %{with python}
%files -n python-thrift
%defattr(-,root,root)
%{py_sitedir}/*
%endif
%changelog