curl-library
Re: Active FTP uploads
Date: Wed, 07 Jun 2006 12:02:37 +0200
Daniel Stenberg wrote:
> On Wed, 7 Jun 2006, Patrick Heeb wrote:
>
>> I am trying to do parallel active mode FTP uploads with the multi
>> interface and detected a strange behaviour. The first upload of each
>> connection timeout's waiting for the server to connect. Retrying with
>> the same easy handle then starts the upload immediately.
>
> Using what libcurl version on what operating system and would it be
> possible to show us a complete source code for a program that shows
> this problem?
>
Im using libcurl 7.13.2 on Debian Sarge (installed from the package
libcurl3 7.13.2-2)
I wrote this small class to handle the uploads using a max of parallel
connections.
connect() sets up an array of easy_handles and a multi_handle
upload() does the actual upload.
setupItemHandles() is called by upload, and completes the easy_handles
with the needed data (source file, destination path).
#include <sstream>
#include <log4cpp/Category.hh>
#include "FTPTransferConnection.h"
/**
* Create a FTPTransferConnection Instance
*/
FTPTransferConnection::FTPTransferConnection(int parallelConnections,
string hostname, string user, string password, string rootPath)
{
this->loginInfo = user + ':' + password;
this->hostname = hostname;
this->rootPath = rootPath;
this->item_handle_count = 0;
this->item_handles = new CURL*[parallelConnections];
this->multi_handle = NULL;
this->commands = NULL;
this->numConnections = parallelConnections;
}
/**
* FTPTransferConnection Destructor
*/
FTPTransferConnection::~FTPTransferConnection()
{
delete[] item_handles;
}
/**
* Upload a list of TransferItems, return 0 on error, 1 on success
*/
int FTPTransferConnection::upload(TransferItem** items, int offset, int
itemCount)
{
if (!connected())
{
return 0;
}
int itemPos = offset;
int retry = 0;
while (itemPos < (offset+itemCount) && retry < 3)
{
int loopItems = min(numConnections, itemCount + offset -
itemPos);
int numItems = setupItemHandles(items, loopItems, itemPos);
if (numItems <= 0)
{
return 0;
}
int still_running = 1;
int msgs_left = 0;
CURLMsg* msg = NULL;
bool failed = false;
while (still_running)
{
while (CURLM_CALL_MULTI_PERFORM ==
curl_multi_perform(multi_handle, &still_running))
{
// do loop
}
while ((msg = curl_multi_info_read(multi_handle,
&msgs_left)))
{
if (msg->msg == CURLMSG_DONE)
{
int idx = 0;
int found = 0;
// Find out which handle this
message is about
for (; (!found && (idx <
loopItems)); idx++)
{
found =
(msg->easy_handle == item_handles[idx]);
}
if (found)
{
idx--;
TransferItem *item =
items[itemPos + idx];
item->closeFile();
if (msg->data.result != 0)
{
stringstream log;
log.str("upload failed: ");
log <<
item->getSourcePath() << " " << curl_easy_strerror(msg->data.result);
logger.error(log.str());
failed = true;
}
else
{
item->setTransferred();
curl_multi_remove_handle(multi_handle, item_handles[idx]);
}
}
}
}
}
if (!failed)
{
itemPos += loopItems;
retry = 0;
}
else
{
retry++;
}
for(int i = 0; i < loopItems; i++)
{
curl_multi_remove_handle(multi_handle,
item_handles[i]);
}
}
if (retry > 0)
{
return 0;
}
return 1;
}
int FTPTransferConnection::connect()
{
if (multi_handle != NULL)
{
return 1;
}
if (CURLE_OK != curl_global_init(CURL_GLOBAL_ALL))
{
logger.error("curl_global_init failed");
return 0;
}
multi_handle = curl_multi_init();
if (multi_handle == NULL)
{
logger.error("curl_multi_init failed");
return 0;
}
for(int i = 0; i < numConnections; i++)
{
item_handles[i] = curl_easy_init();
if (item_handles[i] == NULL)
{
logger.error("curl_easy_init failed");
return 0;
}
curl_easy_setopt(item_handles[i], CURLOPT_UPLOAD, 1);
curl_easy_setopt(item_handles[i], CURLOPT_USERPWD,
loginInfo.c_str());
curl_easy_setopt(item_handles[i],
CURLOPT_FTP_CREATE_MISSING_DIRS, 1);
curl_easy_setopt(item_handles[i], CURLOPT_FTPPORT,
"192.168.2.224");
curl_easy_setopt(item_handles[i], CURLOPT_FTP_USE_EPRT, 0);
curl_easy_setopt(item_handles[i], CURLOPT_TIMEOUT, 30);
curl_easy_setopt(item_handles[i],
CURLOPT_FTP_RESPONSE_TIMEOUT, 30);
curl_easy_setopt(item_handles[i], CURLOPT_VERBOSE, 1);
}
int still_running = 0;
return 1;
}
int FTPTransferConnection::disconnect()
{
if (multi_handle == NULL)
{
return 1;
}
for(int i = 0; i < numConnections; i++)
{
curl_easy_cleanup(item_handles[i]);
}
curl_multi_cleanup(multi_handle);
curl_global_cleanup();
return 1;
}
int FTPTransferConnection::connected()
{
return multi_handle != NULL;
}
string FTPTransferConnection::buildDestinationPath(string
itemDestinationPath)
{
stringstream sDstPath;
sDstPath.str("");
sDstPath << "ftp://" << hostname << "/" << rootPath;
char rootPathEnd = rootPath[rootPath.length()-1];
char dstPathBegin = itemDestinationPath[0];
if (rootPathEnd != '/' && dstPathBegin != '/')
{
sDstPath << "/";
sDstPath << itemDestinationPath;
}
else if (rootPathEnd == '/' && dstPathBegin == '/')
{
string tmp = itemDestinationPath.substr(1,
itemDestinationPath.length()-1);
sDstPath << tmp;
}
else
{
sDstPath << itemDestinationPath;
}
return sDstPath.str();
}
int FTPTransferConnection::setupItemHandles(TransferItem** items, int
loopItems, int itemPos)
{
item_handle_count = 0;
for(int i = 0; i < loopItems; i++)
{
TransferItem* item = items[itemPos+i];
if (item->isTransferred())
{
continue;
}
FILE* file = item->getSourceFile();
if (file == NULL)
{
logger.error(string("source file not
found: ") + item->getSourcePath());
return (itemPos - 1)*-1;
}
item_handle_count++;
string itemDestinationPath =
buildDestinationPath(item->getDestinationPath());
item->setProtocolDestinationPath(itemDestinationPath);
curl_easy_setopt(item_handles[i], CURLOPT_URL,
itemDestinationPath.c_str());
curl_easy_setopt(item_handles[i],
CURLOPT_READDATA, file);
curl_easy_setopt(item_handles[i],
CURLOPT_VERBOSE, 1);
curl_multi_add_handle(multi_handle,
item_handles[i]);
}
return item_handle_count;
}
Received on 2006-06-07