Skip to content

Commit

Permalink
Upload: assynchronious operations
Browse files Browse the repository at this point in the history
  • Loading branch information
ogoffart committed Jun 25, 2018
1 parent a18b6bb commit 82ff1b5
Show file tree
Hide file tree
Showing 9 changed files with 332 additions and 44 deletions.
13 changes: 7 additions & 6 deletions src/libsync/owncloudpropagator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1001,12 +1001,12 @@ void CleanupPollsJob::start()
auto info = _pollInfos.first();
_pollInfos.pop_front();
SyncJournalFileRecord record;
if (_journal->getFileRecord(info._file, &record) && record.isValid()) {
SyncFileItemPtr item = SyncFileItem::fromSyncJournalFileRecord(record);
PollJob *job = new PollJob(_account, info._url, item, _journal, _localPath, this);
connect(job, &PollJob::finishedSignal, this, &CleanupPollsJob::slotPollFinished);
job->start();
}
SyncFileItemPtr item(new SyncFileItem);
item->_file = info._file;
item->_modtime = info._modtime;
PollJob *job = new PollJob(_account, info._url, item, _journal, _localPath, this);
connect(job, &PollJob::finishedSignal, this, &CleanupPollsJob::slotPollFinished);
job->start();
}

void CleanupPollsJob::slotPollFinished()
Expand All @@ -1028,6 +1028,7 @@ void CleanupPollsJob::slotPollFinished()
deleteLater();
return;
}
_journal->setUploadInfo(job->_item->_file, SyncJournalDb::UploadInfo());
}
// Continue with the next entry, or finish
start();
Expand Down
29 changes: 18 additions & 11 deletions src/libsync/propagateupload.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void PollJob::start()
QUrl finalUrl = QUrl::fromUserInput(accountUrl.scheme() + QLatin1String("://") + accountUrl.authority()
+ (path().startsWith('/') ? QLatin1String("") : QLatin1String("/")) + path());
sendRequest("GET", finalUrl);
connect(reply(), &QNetworkReply::downloadProgress, this, &AbstractNetworkJob::resetTimeout);
connect(reply(), &QNetworkReply::downloadProgress, this, &AbstractNetworkJob::resetTimeout, Qt::UniqueConnection);
AbstractNetworkJob::start();
}

Expand All @@ -123,31 +123,38 @@ bool PollJob::finished()
emit finishedSignal();
return true;
}
start();
QTimer::singleShot(8 * 1000, this, &PollJob::start);
return false;
}

QByteArray jsonData = reply()->readAll().trimmed();
qCInfo(lcPollJob) << ">" << jsonData << "<" << reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
QJsonParseError jsonParseError;
QJsonObject status = QJsonDocument::fromJson(jsonData, &jsonParseError).object();
QJsonObject json = QJsonDocument::fromJson(jsonData, &jsonParseError).object();
qCInfo(lcPollJob) << ">" << jsonData << "<" << reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt() << json << jsonParseError.errorString();
if (jsonParseError.error != QJsonParseError::NoError) {
_item->_errorString = tr("Invalid JSON reply from the poll URL");
_item->_status = SyncFileItem::NormalError;
emit finishedSignal();
return true;
}

if (status["unfinished"].toBool()) {
start();
auto status = json["status"].toString();
if (status == QLatin1String("init") || status == QLatin1String("started")) {
QTimer::singleShot(5 * 1000, this, &PollJob::start);
return false;
}

_item->_errorString = status["error"].toString();
_item->_status = _item->_errorString.isEmpty() ? SyncFileItem::Success : SyncFileItem::NormalError;
_item->_fileId = status["fileid"].toString().toUtf8();
_item->_etag = status["etag"].toString().toUtf8();
_item->_responseTimeStamp = responseTimestamp();
_item->_httpErrorCode = json["errorCode"].toInt();

if (status == QLatin1String("finished")) {
_item->_status = SyncFileItem::Success;
_item->_fileId = json["fileId"].toString().toUtf8();
_item->_etag = parseEtag(json["ETag"].toString().toUtf8());
} else { // error
_item->_status = classifyError(QNetworkReply::UnknownContentError, _item->_httpErrorCode);
_item->_errorString = json["errorMessage"].toString();
}

SyncJournalDb::PollInfo info;
info._file = _item->_file;
Expand Down Expand Up @@ -571,7 +578,7 @@ void PropagateUploadFileCommon::abortWithError(SyncFileItem::Status status, cons
QMap<QByteArray, QByteArray> PropagateUploadFileCommon::headers()
{
QMap<QByteArray, QByteArray> headers;
headers["OC-Async"] = "1";
headers["OC-LazyOps"] = "true";
headers["Content-Type"] = "application/octet-stream";
headers["X-OC-Mtime"] = QByteArray::number(qint64(_item->_modtime));

Expand Down
6 changes: 3 additions & 3 deletions src/libsync/propagateupload.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ class PUTFileJob : public AbstractNetworkJob
/**
* @brief This job implements the asynchronous PUT
*
* If the server replies to a PUT with a OC-Finish-Poll url, we will query this url until the server
* replies with an etag. https://github.com/owncloud/core/issues/12097
* If the server replies to a PUT with a OC-JobStatus-Location path, we will query this url until the server
* replies with an etag.
* @ingroup libsync
*/
class PollJob : public AbstractNetworkJob
Expand Down Expand Up @@ -285,7 +285,7 @@ private slots:
*/
static void adjustLastJobTimeout(AbstractNetworkJob *job, quint64 fileSize);

// Bases headers that need to be sent with every chunk
/** Bases headers that need to be sent on the PUT, or in the MOVE for chunking-ng */
QMap<QByteArray, QByteArray> headers();
};

Expand Down
12 changes: 12 additions & 0 deletions src/libsync/propagateuploadng.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,18 @@ void PropagateUploadFileNG::slotMoveJobFinished()
commonErrorHandling(job);
return;
}

if (_item->_httpErrorCode == 202) {
QString path = QString::fromUtf8(job->reply()->rawHeader("OC-JobStatus-Location"));
if (path.isEmpty()) {
done(SyncFileItem::NormalError, tr("Poll URL missing"));
return;
}
_finished = true;
startPollJob(path);
return;
}

if (_item->_httpErrorCode != 201 && _item->_httpErrorCode != 204) {
abortWithError(SyncFileItem::NormalError, tr("Unexpected return code from server (%1)").arg(_item->_httpErrorCode));
return;
Expand Down
2 changes: 1 addition & 1 deletion src/libsync/propagateuploadv1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ void PropagateUploadFileV1::slotPutFinished()

// The server needs some time to process the request and provide us with a poll URL
if (_item->_httpErrorCode == 202) {
QString path = QString::fromUtf8(job->reply()->rawHeader("OC-Finish-Poll"));
QString path = QString::fromUtf8(job->reply()->rawHeader("OC-JobStatus-Location"));
if (path.isEmpty()) {
done(SyncFileItem::NormalError, tr("Poll URL missing"));
return;
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ owncloud_add_test(SyncConflict "syncenginetestutils.h")
owncloud_add_test(SyncFileStatusTracker "syncenginetestutils.h")
owncloud_add_test(Download "syncenginetestutils.h")
owncloud_add_test(ChunkingNg "syncenginetestutils.h")
owncloud_add_test(AsyncOp "syncenginetestutils.h")
owncloud_add_test(UploadReset "syncenginetestutils.h")
owncloud_add_test(AllFilesDeleted "syncenginetestutils.h")
owncloud_add_test(Blacklist "syncenginetestutils.h")
Expand Down
89 changes: 67 additions & 22 deletions test/syncenginetestutils.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ inline QString getFilePathFromUrl(const QUrl &url) {


inline QString generateEtag() {
return QString::number(QDateTime::currentDateTimeUtc().toMSecsSinceEpoch(), 16);
return QString::number(QDateTime::currentDateTimeUtc().toMSecsSinceEpoch(), 16) + QByteArray::number(qrand(), 16);
}
inline QByteArray generateFileId() {
return QByteArray::number(qrand(), 16);
Expand Down Expand Up @@ -233,7 +233,7 @@ class FileInfo : public FileModifier
auto file = it->find(pathComponents.subComponents(), invalidateEtags);
if (file && invalidateEtags)
// Update parents on the way back
etag = file->etag;
etag = generateEtag();
return file;
}
return 0;
Expand Down Expand Up @@ -301,7 +301,6 @@ class FileInfo : public FileModifier
QMap<QString, FileInfo> children;
QString parentPath;

private:
FileInfo *findInvalidatingEtags(const PathComponents &pathComponents) {
return find(pathComponents, true);
}
Expand Down Expand Up @@ -421,24 +420,25 @@ class FakePutReply : public QNetworkReply
setUrl(request.url());
setOperation(op);
open(QIODevice::ReadOnly);
fileInfo = perform(remoteRootFileInfo, request, putPayload);
QMetaObject::invokeMethod(this, "respond", Qt::QueuedConnection);
}

static FileInfo *perform(FileInfo &remoteRootFileInfo, const QNetworkRequest &request, const QByteArray &putPayload)
{
QString fileName = getFilePathFromUrl(request.url());
Q_ASSERT(!fileName.isEmpty());
if ((fileInfo = remoteRootFileInfo.find(fileName))) {
FileInfo *fileInfo = remoteRootFileInfo.find(fileName);
if (fileInfo) {
fileInfo->size = putPayload.size();
fileInfo->contentChar = putPayload.at(0);
} else {
// Assume that the file is filled with the same character
fileInfo = remoteRootFileInfo.create(fileName, putPayload.size(), putPayload.at(0));
}

if (!fileInfo) {
abort();
return;
}
fileInfo->lastModified = OCC::Utility::qDateTimeFromTime_t(request.rawHeader("X-OC-Mtime").toLongLong());
remoteRootFileInfo.find(fileName, /*invalidate_etags=*/true);
QMetaObject::invokeMethod(this, "respond", Qt::QueuedConnection);
return fileInfo;
}

Q_INVOKABLE virtual void respond()
Expand Down Expand Up @@ -627,7 +627,16 @@ class FakeChunkMoveReply : public QNetworkReply
setUrl(request.url());
setOperation(op);
open(QIODevice::ReadOnly);
fileInfo = perform(uploadsFileInfo, remoteRootFileInfo, request);
if (!fileInfo) {
QTimer::singleShot(0, this, &FakeChunkMoveReply::respondPreconditionFailed);
} else {
QTimer::singleShot(0, this, &FakeChunkMoveReply::respond);
}
}

static FileInfo *perform(FileInfo &uploadsFileInfo, FileInfo &remoteRootFileInfo, const QNetworkRequest &request)
{
QString source = getFilePathFromUrl(request.url());
Q_ASSERT(!source.isEmpty());
Q_ASSERT(source.endsWith("/.file"));
Expand All @@ -653,17 +662,16 @@ class FakeChunkMoveReply : public QNetworkReply
} while(true);

Q_ASSERT(count > 1); // There should be at least two chunks, otherwise why would we use chunking?
QCOMPARE(sourceFolder->children.count(), count); // There should not be holes or extra files
Q_ASSERT(sourceFolder->children.count() == count); // There should not be holes or extra files

QString fileName = getFilePathFromUrl(QUrl::fromEncoded(request.rawHeader("Destination")));
Q_ASSERT(!fileName.isEmpty());

if ((fileInfo = remoteRootFileInfo.find(fileName))) {
QVERIFY(request.hasRawHeader("If")); // The client should put this header
FileInfo *fileInfo = remoteRootFileInfo.find(fileName);
if (fileInfo) {
Q_ASSERT(request.hasRawHeader("If")); // The client should put this header
if (request.rawHeader("If") != QByteArray("<" + request.rawHeader("Destination") +
"> ([\"" + fileInfo->etag.toLatin1() + "\"])")) {
QMetaObject::invokeMethod(this, "respondPreconditionFailed", Qt::QueuedConnection);
return;
return nullptr;
}
fileInfo->size = size;
fileInfo->contentChar = payload;
Expand All @@ -672,15 +680,10 @@ class FakeChunkMoveReply : public QNetworkReply
// Assume that the file is filled with the same character
fileInfo = remoteRootFileInfo.create(fileName, size, payload);
}

if (!fileInfo) {
abort();
return;
}
fileInfo->lastModified = OCC::Utility::qDateTimeFromTime_t(request.rawHeader("X-OC-Mtime").toLongLong());
remoteRootFileInfo.find(fileName, /*invalidate_etags=*/true);

QTimer::singleShot(0, this, &FakeChunkMoveReply::respond);
return fileInfo;
}

Q_INVOKABLE virtual void respond()
Expand Down Expand Up @@ -710,6 +713,48 @@ class FakeChunkMoveReply : public QNetworkReply
};


class FakePayloadReply : public QNetworkReply
{
Q_OBJECT
public:
FakePayloadReply(QNetworkAccessManager::Operation op, const QNetworkRequest &request,
const QByteArray &body, QObject *parent)
: QNetworkReply{ parent }
, _body(body)
{
setRequest(request);
setUrl(request.url());
setOperation(op);
open(QIODevice::ReadOnly);
QTimer::singleShot(10, this, &FakePayloadReply::respond);
}

void respond()
{
setAttribute(QNetworkRequest::HttpStatusCodeAttribute, 200);
setHeader(QNetworkRequest::ContentLengthHeader, _body.size());
emit metaDataChanged();
emit readyRead();
setFinished(true);
emit finished();
}

void abort() override {}
qint64 readData(char *buf, qint64 max) override
{
max = qMin<qint64>(max, _body.size());
memcpy(buf, _body.constData(), max);
_body = _body.mid(max);
return max;
}
qint64 bytesAvailable() const override
{
return _body.size();
}
QByteArray _body;
};


class FakeErrorReply : public QNetworkReply
{
Q_OBJECT
Expand Down
Loading

0 comments on commit 82ff1b5

Please sign in to comment.