b7d299183e
* Fix hangs accessing folders ("Retrieving folder contents" going on infinitely) OBS-URL: https://build.opensuse.org/package/show/KDE:Applications/akonadi-server?expand=0&rev=93
297 lines
12 KiB
Diff
297 lines
12 KiB
Diff
From d42f94701f38c26cf3439727b50ee73cf0d09bee Mon Sep 17 00:00:00 2001
|
|
From: David Faure <faure@kde.org>
|
|
Date: Wed, 15 Feb 2017 08:43:09 +0100
|
|
Subject: Fix ItemRetriever in case of concurrent requests for the same item(s)
|
|
|
|
Summary:
|
|
- ItemRetrievalManager must emit requestFinished() for those other requests,
|
|
otherwise the list of pending requests in ItemRetriever is never emptied
|
|
|
|
- ItemRetriever must not assume that a signal being emitted by
|
|
ItemRetrievalManager is necessarily about the request it's waiting for, it
|
|
could be for another one. So it must first check in its list of pending
|
|
requests to determine whether it should react or not.
|
|
|
|
With multithreaded unittest, checked for races with clang+tsan.
|
|
(There is one race, the connect to ItemRetrievalRequest vs the emit
|
|
in other threads, we should lock mLock before connect...)
|
|
|
|
Test Plan: new unittest
|
|
|
|
Reviewers: dvratil
|
|
|
|
Reviewed By: dvratil
|
|
|
|
Subscribers: #kde_pim
|
|
|
|
Tags: #kde_pim
|
|
|
|
Differential Revision: https://phabricator.kde.org/D4618
|
|
---
|
|
autotests/server/itemretrievertest.cpp | 174 ++++++++++++++++++++--------
|
|
src/server/storage/itemretrievalmanager.cpp | 1 +
|
|
src/server/storage/itemretriever.cpp | 21 ++--
|
|
3 files changed, 135 insertions(+), 61 deletions(-)
|
|
|
|
diff --git a/autotests/server/itemretrievertest.cpp b/autotests/server/itemretrievertest.cpp
|
|
index fb4468d..1206b35 100644
|
|
--- a/autotests/server/itemretrievertest.cpp
|
|
+++ b/autotests/server/itemretrievertest.cpp
|
|
@@ -20,6 +20,7 @@
|
|
#include <QObject>
|
|
#include <QTest>
|
|
#include <QTimer>
|
|
+#include <QMutex>
|
|
|
|
#include "storage/itemretriever.h"
|
|
#include "storage/itemretrievaljob.h"
|
|
@@ -144,6 +145,52 @@ private:
|
|
QMultiHash<qint64, JobResult> mJobResults;
|
|
};
|
|
|
|
+using RequestedParts = QVector<QByteArray /* FQ name */>;
|
|
+
|
|
+class ClientThread : public QThread
|
|
+{
|
|
+public:
|
|
+ ClientThread(Entity::Id itemId, const RequestedParts &requestedParts)
|
|
+ : m_itemId(itemId), m_requestedParts(requestedParts)
|
|
+ {}
|
|
+
|
|
+ void run() Q_DECL_OVERRIDE
|
|
+ {
|
|
+ // ItemRetriever should...
|
|
+ ItemRetriever retriever;
|
|
+ retriever.setItem(m_itemId);
|
|
+ retriever.setRetrieveParts(m_requestedParts);
|
|
+ QSignalSpy spy(&retriever, &ItemRetriever::itemsRetrieved);
|
|
+
|
|
+ const bool success = retriever.exec();
|
|
+
|
|
+ QMutexLocker lock(&m_mutex);
|
|
+ m_results.success = success;
|
|
+ m_results.signalsCount = spy.count();
|
|
+ if (m_results.signalsCount > 0) {
|
|
+ m_results.emittedItems = spy.at(0).at(0).value<QList<qint64>>();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ struct Results
|
|
+ {
|
|
+ bool success;
|
|
+ int signalsCount;
|
|
+ QList<qint64> emittedItems;
|
|
+ };
|
|
+ Results results() const {
|
|
+ QMutexLocker lock(&m_mutex);
|
|
+ return m_results;
|
|
+ }
|
|
+
|
|
+private:
|
|
+ const Entity::Id m_itemId;
|
|
+ const RequestedParts m_requestedParts;
|
|
+
|
|
+ mutable QMutex m_mutex; // protects results below
|
|
+ Results m_results;
|
|
+};
|
|
+
|
|
class ItemRetrieverTest : public QObject
|
|
{
|
|
Q_OBJECT
|
|
@@ -151,7 +198,6 @@ class ItemRetrieverTest : public QObject
|
|
|
|
using ExistingParts = QVector<QPair<QByteArray /* name */, QByteArray /* data */>>;
|
|
using AvailableParts = QVector<QPair<QByteArray /* name */, QByteArray /* data */>>;
|
|
- using RequestedParts = QVector<QByteArray /* FQ name */>;
|
|
|
|
public:
|
|
ItemRetrieverTest()
|
|
@@ -252,63 +298,89 @@ private Q_SLOTS:
|
|
|
|
|
|
// Setup
|
|
- DbInitializer dbInitializer;
|
|
- FakeItemRetrievalJobFactory factory(dbInitializer);
|
|
- ItemRetrievalManager mgr(&factory);
|
|
- QTest::qWait(100);
|
|
-
|
|
- // Given a PimItem with existing parts
|
|
- Resource res = dbInitializer.createResource("testresource");
|
|
- Collection col = dbInitializer.createCollection("col1");
|
|
- PimItem item = dbInitializer.createItem("1", col);
|
|
- Q_FOREACH (const auto &existingPart, existingParts) {
|
|
- dbInitializer.createPart(item.id(), existingPart.first, existingPart.second);
|
|
- }
|
|
+ for (int step = 0; step < 2; ++step) {
|
|
+ DbInitializer dbInitializer;
|
|
+ FakeItemRetrievalJobFactory factory(dbInitializer);
|
|
+ ItemRetrievalManager mgr(&factory);
|
|
+ QTest::qWait(100);
|
|
+
|
|
+ // Given a PimItem with existing parts
|
|
+ Resource res = dbInitializer.createResource("testresource");
|
|
+ Collection col = dbInitializer.createCollection("col1");
|
|
+
|
|
+ // step 0: do it in the main thread, for easier debugging
|
|
+ PimItem item = dbInitializer.createItem("1", col);
|
|
+ Q_FOREACH (const auto &existingPart, existingParts) {
|
|
+ dbInitializer.createPart(item.id(), existingPart.first, existingPart.second);
|
|
+ }
|
|
|
|
- Q_FOREACH (const auto &availablePart, availableParts) {
|
|
- factory.addJobResult(item.id(), availablePart.first, availablePart.second);
|
|
- }
|
|
+ Q_FOREACH (const auto &availablePart, availableParts) {
|
|
+ factory.addJobResult(item.id(), availablePart.first, availablePart.second);
|
|
+ }
|
|
|
|
- // ItemRetriever should...
|
|
- ItemRetriever retriever;
|
|
- retriever.setItem(item.id());
|
|
- retriever.setRetrieveParts(requestedParts);
|
|
- QSignalSpy spy(&retriever, &ItemRetriever::itemsRetrieved);
|
|
+ if (step == 0) {
|
|
+ ClientThread thread(item.id(), requestedParts);
|
|
+ thread.run();
|
|
+
|
|
+ const ClientThread::Results results = thread.results();
|
|
+ // ItemRetriever should ... succeed
|
|
+ QVERIFY(results.success);
|
|
+ // Emit exactly one signal ...
|
|
+ QCOMPARE(results.signalsCount, expectedSignals);
|
|
+ // ... with that one item
|
|
+ if (expectedSignals > 0) {
|
|
+ QCOMPARE(results.emittedItems, QList<qint64>{ item.id() });
|
|
+ }
|
|
|
|
- // Succeed
|
|
- QVERIFY(retriever.exec());
|
|
- // Run exactly one retrieval job
|
|
- QCOMPARE(factory.jobsCount(), expectedRetrievalJobs);
|
|
- // Emit exactly one signal ...
|
|
- QCOMPARE(spy.count(), expectedSignals);
|
|
- // ... with that one item
|
|
- if (expectedSignals > 0) {
|
|
- QCOMPARE(spy.at(0).at(0).value<QList<qint64>>(), QList<qint64>{ item.id() });
|
|
- }
|
|
+ // Check that the factory had exactly one retrieval job
|
|
+ QCOMPARE(factory.jobsCount(), expectedRetrievalJobs);
|
|
|
|
- // and the part exists in the DB
|
|
- const auto parts = item.parts();
|
|
- QCOMPARE(parts.count(), expectedParts);
|
|
- Q_FOREACH (const Part &dbPart, item.parts()) {
|
|
- const QString fqname = dbPart.partType().ns() + QLatin1Char(':') + dbPart.partType().name();
|
|
- if (!requestedParts.contains(fqname.toLatin1())) {
|
|
- continue;
|
|
+ } else {
|
|
+ QVector<ClientThread *> threads;
|
|
+ for (int i = 0; i < 20; ++i) {
|
|
+ threads.append(new ClientThread(item.id(), requestedParts));
|
|
+ }
|
|
+ for (int i = 0; i < threads.size(); ++i) {
|
|
+ threads.at(i)->start();
|
|
+ }
|
|
+ for (int i = 0; i < threads.size(); ++i) {
|
|
+ threads.at(i)->wait();
|
|
+ }
|
|
+ for (int i = 0; i < threads.size(); ++i) {
|
|
+ const ClientThread::Results results = threads.at(i)->results();
|
|
+ QVERIFY(results.success);
|
|
+ QCOMPARE(results.signalsCount, expectedSignals);
|
|
+ if (expectedSignals > 0) {
|
|
+ QCOMPARE(results.emittedItems, QList<qint64>{ item.id() });
|
|
+ }
|
|
+ }
|
|
+ qDeleteAll(threads);
|
|
}
|
|
|
|
- auto it = std::find_if(availableParts.constBegin(), availableParts.constEnd(),
|
|
- [dbPart](const QPair<QByteArray, QByteArray> &p) {
|
|
- return dbPart.partType().name().toLatin1() == p.first;
|
|
- });
|
|
- if (it == availableParts.constEnd()) {
|
|
- it = std::find_if(existingParts.constBegin(), existingParts.constEnd(),
|
|
- [fqname](const QPair<QByteArray, QByteArray> &p) {
|
|
- return fqname.toLatin1() == p.first;
|
|
- });
|
|
- QVERIFY(it != existingParts.constEnd());
|
|
- }
|
|
+ // Check that the parts now exist in the DB
|
|
+ const auto parts = item.parts();
|
|
+ QCOMPARE(parts.count(), expectedParts);
|
|
+ Q_FOREACH (const Part &dbPart, item.parts()) {
|
|
+ const QString fqname = dbPart.partType().ns() + QLatin1Char(':') + dbPart.partType().name();
|
|
+ if (!requestedParts.contains(fqname.toLatin1())) {
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ auto it = std::find_if(availableParts.constBegin(), availableParts.constEnd(),
|
|
+ [dbPart](const QPair<QByteArray, QByteArray> &p) {
|
|
+ return dbPart.partType().name().toLatin1() == p.first;
|
|
+ });
|
|
+ if (it == availableParts.constEnd()) {
|
|
+ it = std::find_if(existingParts.constBegin(), existingParts.constEnd(),
|
|
+ [fqname](const QPair<QByteArray, QByteArray> &p) {
|
|
+ return fqname.toLatin1() == p.first;
|
|
+ });
|
|
+ QVERIFY(it != existingParts.constEnd());
|
|
+ }
|
|
|
|
- QCOMPARE(dbPart.data(), it->second);
|
|
- QCOMPARE(dbPart.datasize(), it->second.size());
|
|
+ QCOMPARE(dbPart.data(), it->second);
|
|
+ QCOMPARE(dbPart.datasize(), it->second.size());
|
|
+ }
|
|
}
|
|
}
|
|
};
|
|
diff --git a/src/server/storage/itemretrievalmanager.cpp b/src/server/storage/itemretrievalmanager.cpp
|
|
index 65f5e78..382bb9e 100644
|
|
--- a/src/server/storage/itemretrievalmanager.cpp
|
|
+++ b/src/server/storage/itemretrievalmanager.cpp
|
|
@@ -232,6 +232,7 @@ void ItemRetrievalManager::retrievalJobFinished(ItemRetrievalRequest *request, c
|
|
qCDebug(AKONADISERVER_LOG) << "someone else requested item" << request->ids << "as well, marking as processed";
|
|
(*it)->errorMsg = errorMsg;
|
|
(*it)->processed = true;
|
|
+ Q_EMIT requestFinished(*it);
|
|
it = mPendingRequests[request->resourceId].erase(it);
|
|
} else {
|
|
++it;
|
|
diff --git a/src/server/storage/itemretriever.cpp b/src/server/storage/itemretriever.cpp
|
|
index 0ae3e91..c57c860 100644
|
|
--- a/src/server/storage/itemretriever.cpp
|
|
+++ b/src/server/storage/itemretriever.cpp
|
|
@@ -317,17 +317,18 @@ bool ItemRetriever::exec()
|
|
QEventLoop eventLoop;
|
|
connect(ItemRetrievalManager::instance(), &ItemRetrievalManager::requestFinished,
|
|
this, [&](ItemRetrievalRequest *finishedRequest) {
|
|
- if (!finishedRequest->errorMsg.isEmpty()) {
|
|
- mLastError = finishedRequest->errorMsg.toUtf8();
|
|
- eventLoop.exit(1);
|
|
- } else {
|
|
- requests.removeOne(finishedRequest);
|
|
- Q_EMIT itemsRetrieved(finishedRequest->ids);
|
|
- if (requests.isEmpty()) {
|
|
- eventLoop.quit();
|
|
- }
|
|
+ if (requests.removeOne(finishedRequest)) {
|
|
+ if (!finishedRequest->errorMsg.isEmpty()) {
|
|
+ mLastError = finishedRequest->errorMsg.toUtf8();
|
|
+ eventLoop.exit(1);
|
|
+ } else {
|
|
+ Q_EMIT itemsRetrieved(finishedRequest->ids);
|
|
+ if (requests.isEmpty()) {
|
|
+ eventLoop.quit();
|
|
}
|
|
- }, Qt::UniqueConnection);
|
|
+ }
|
|
+ }
|
|
+ }, Qt::UniqueConnection);
|
|
|
|
auto it = requests.begin();
|
|
while (it != requests.end()) {
|
|
--
|
|
cgit v0.11.2
|
|
|