cURL / Mailing Lists / curl-users / Single Mail

curl-users

Problems with multi-threaded client and curl easy interface

From: Stephen Hawkins <shawkins_at_axcient.com>
Date: Wed, 6 May 2015 18:06:13 -0700

Hello,

I am developing a Windows application that utilizes multiple threads to upload chunks of data to a receiver. For this purpose we have utilized the easy interface since it was very easy to get started and works well with the synchronous nature of what we are trying to accomplish with this. Basically, we want each thread to read a chunk of data from disk, upload it to our “receiver” (which is a Debian linux box with apache pre-fork) which does some integrity check stuff to verify that what we uploaded did indeed arrive in-tact, writes it to disk, and then sends a 200 back to our application to notify it that the selected chunk we uploaded was successful and we can move onto the next one.

The problem I am having is that when I run this application with multiple threads, my transfer speed grinds to a near halt, with sporadic bursts being sent every 30 seconds. I can look at the logs on the receiver and consistently see periods of inactivity that last for 30 seconds. Then I see a burst of transfers, which quickly dies back down. The debugger shows that all of the threads are blocked on a call to select() inside of curl_poll(). It seems like select() continuously returns 0, which causes curl_poll() to be called until select() finally does return something other than 0. If I run this application with just 1 thread, everything is fine and dandy. The single thread performs as quickly as it can. Not surprisingly, if I run this application with threads enabled and I wrap the call to curl_easy_perform() with a mutex, then everything is okay as well. Any ideas? I am using version 7.42 of lib curl. I am using a separate easy handle per thread. We are NOT using SSL in this instance, though I have taken care to register the SSL locking callbacks needed as the application will eventually do these transfers over SSL. The following is the code that is performing the transfer:

CURLcode LocalCAxCurl::UpdateData(const unsigned char *bpBuf,

  size_t nBufSize,

  uint64_t nSendOffset,

  const char *pstrFileName,

  long *plResponseCode,

  CURL * ext_curl_handle,

  AxCurlMemoryStruct * UpdateDataServerResponse)

{

long lResponseCode = 0;

int numAttempts = 0;

bool perform = true;

bool setWriteCallback = true;

bool sleep = false;

CURLcode rc = CURLE_OK;

char sendOffsetStr[256];

snprintf(sendOffsetStr, sizeof(sendOffsetStr), "%I64u", nSendOffset);

if (plResponseCode == NULL)

plResponseCode = &lResponseCode;

*plResponseCode = 0;

if (ext_curl_handle == NULL)

return CURLE_FAILED_INIT;

if(bpBuf == NULL || nBufSize == 0)

return CURLE_READ_ERROR; /* can't continue */

curl_easy_reset(ext_curl_handle);

/* Set global Curl call timeout */

curl_easy_setopt(ext_curl_handle, CURLOPT_TIMEOUT, g_lCurlTimeoutVal);

/* Set maximum number of connections */

//curl_easy_setopt(ext_curl_handle, CURLOPT_MAXCONNECTS, 1);

/* tell it to "upload" to the URL */

curl_easy_setopt(ext_curl_handle, CURLOPT_UPLOAD, 1L);

        /* HTTP PUT please */

        curl_easy_setopt(ext_curl_handle, CURLOPT_PUT, 1L);

curl_easy_setopt(ext_curl_handle, CURLOPT_COOKIEFILE, m_strCookiesFile.c_str());

        curl_easy_setopt(ext_curl_handle, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);

        if(m_bUseHttps)

        {

    curl_easy_setopt(ext_curl_handle, CURLOPT_SSL_VERIFYPEER, FALSE);

        }

        curl_easy_setopt(ext_curl_handle, CURLOPT_FOLLOWLOCATION, 1L);

std::string strUrl;

ConstructUrl(op_update_file, strUrl);

        rc = curl_easy_setopt(ext_curl_handle, CURLOPT_URL, strUrl.c_str());

        if (rc != CURLE_OK)

            return rc;

/* Read memory */

AxCurlMemoryStruct read_mem;

read_mem.memory = (char *)bpBuf;

read_mem.size = nBufSize;

/* set where to read from (on Windows you need to use READFUNCTION too) */

curl_easy_setopt(ext_curl_handle, CURLOPT_READDATA, &read_mem);

/* we want to use our own read function */

        curl_easy_setopt(ext_curl_handle, CURLOPT_READFUNCTION, read_memory_callback);

SetDebug(m_bVerbose);

char szBuf[MAX_PATH << 1];

char *outptr = NULL;

size_t outlen = 0;

        struct curl_slist *slist = NULL;

// add "Axcient-Receiver-Path: %s" to the header

rc = Curl_base64_encode(NULL, pstrFileName, strlen(pstrFileName), &outptr, &outlen);

if (outptr)

{

sprintf(szBuf, g_szFmtHeaderAxReceiverPath, outptr);

slist = curl_slist_append(slist, szBuf);

free(outptr);

outptr = NULL;

}

// add "File-Write-Offset: %llu" to the header

rc = Curl_base64_encode(NULL, sendOffsetStr, strlen(sendOffsetStr), &outptr, &outlen);

if (outptr)

{

sprintf(szBuf, g_szFmtHeaderSendOffset, outptr);

slist = curl_slist_append(slist, szBuf);

free(outptr);

outptr = NULL;

}

// add "Sender-Log-Sizes" to the header

slist = curl_slist_append(slist, g_szFmtHeaderSenderLogSizes);

unsigned char digest[2 * MD5_DIGEST_LEN];

memset(digest, 0, sizeof(digest));

rc = calc_md5_hash(bpBuf, (int)nBufSize, digest, MD5_DIGEST_LEN);

std::string strDigest = ToHexNoSpace(digest, MD5_DIGEST_LEN);

        if (rc != CURLE_OK)

        {

curl_slist_free_all(slist);

        return rc;

        }

// add "Content-MD5: %s" to the header

rc = Curl_base64_encode(NULL, strDigest.c_str(), strDigest.length(), &outptr, &outlen);

if (outptr)

{

sprintf(szBuf, g_szFmtHeaderContentMd5, outptr);

slist = curl_slist_append(slist, szBuf);

free(outptr);

outptr = NULL;

}

rc = curl_easy_setopt(ext_curl_handle, CURLOPT_HTTPHEADER, slist);

        if (rc != CURLE_OK)

{

    curl_slist_free_all(slist);

        return rc;

}

while (perform && numAttempts < m_nSendRetriesCount) {

/* We don't always want to register a write callback. Only when

* we encounter error 403*/

if (setWriteCallback)

{

/* set where to write our server reponse to */

curl_easy_setopt(ext_curl_handle, CURLOPT_WRITEDATA, UpdateDataServerResponse);

/* we want to use our own write function */

curl_easy_setopt(ext_curl_handle, CURLOPT_WRITEFUNCTION, write_last_response_callback);

}

//WaitForSingleObject(h_mPerformMutex, INFINITE);

/* Perform the Curl call */

rc = curl_easy_perform(ext_curl_handle);

//ReleaseMutex(h_mPerformMutex);

/* Check the Curl return call. This does NOT mean that the

* HTTP request was successful...*/

if (rc == CURLE_OK)

{

/* Get the http return code */

GetHttpResponseCode(rc, plResponseCode, ext_curl_handle);

switch (*plResponseCode) {

case AX_RSP_CODE_SUCCESS:

perform = false;

break;

case AX_RSP_CODE_BAD_CREDENTIALS:

perform = false;

break;

case AX_RSP_CODE_FORBIDDEN:

/* If we get a "session not initialized" 403, do not retry. We must exit,

* have the agent call start_snapshot, and be reinvoked for the job to continue*/

if (!setWriteCallback)

{

setWriteCallback = true;

sleep = true;

// decrement once, since we are "wasting" a retry

// to assign the callback function

--numAttempts;

break;

}

else /* write callback has been set, this is the second time in a row we

  * have gotten 403. Let's check the response. */

{

if (UpdateDataServerResponse->memory &&

strcmp(UpdateDataServerResponse->memory, g_UpdateFileRSP_NotInit) == 0)

{

/* Session not initialized 403. do not retry */

perform = false;

}

else

{

/* This is an integrity check failure 403. Retry */

sleep = true;

}

break;

}

case AX_RSP_CODE_NOT_FOUND:

perform = false;

break;

case AX_RSP_CODE_INTERNAL_SERVER_ERROR:

// received internal server error - will retry

sleep = true;

break;

default:

/* Unknown http return code. Do not retry */

perform = false;

break;

}

} else {

/* curl_easy_perform() did not return CURLE_OK. Sleep and retry */

sleep = true;

}

if (sleep) {

m_tLastAttemptTime = time(NULL);

while (time(NULL) - m_tLastAttemptTime < m_lSendRetriesInterval) {

#ifdef _WIN32

Sleep(100);

#else

sleep(1);

#endif

}

sleep = false;

}

++numAttempts;

} // while

        curl_slist_free_all(slist);

return rc;

}

This thread is invoked by all sending threads, but each thread passes in it’s own easy_handle.

-------------------------------------------------------------------
List admin: http://cool.haxx.se/list/listinfo/curl-users
FAQ: http://curl.haxx.se/docs/faq.html
Etiquette: http://curl.haxx.se/mail/etiquette.html
Received on 2015-05-07