curl-library
[PATCH] pipelining: Add CURLMOPT_PIPELINE_POLICY_FUNCTION
Date: Fri, 31 Oct 2014 21:01:57 +0100
Also adds CURLMOPT_PIPELINE_POLICY_DATA, CURL_SUPPORTS_PIPELINING,
CURL_BLACKLISTED, struct curl_pipeline_policy
and curl_pipeline_policy_callback
This allows the user to tweak the maximum number of connections
per host, the maximum number of requests in a pipeline and
to override blacklisting or implement whitelisting on a per
host basis (since bundles are only per host, not host:port - which
imho is a limitation). Finally they can cause libcurl to start
doing pipelining immediately instead after the first reply
is received.
The callback is called when a bundle is constructed and when libcurl
detects (for the first time) that a bundle can do http pipelining.
Without this the user can't specify anything of the policy unless the
bundle is destructed (which normally doesn't happen).
--- docs/libcurl/symbols-in-versions | 5 +++ include/curl/multi.h | 63 +++++++++++++++++++++++++++++ lib/bundles.c | 3 +- lib/bundles.h | 6 ++- lib/conncache.c | 19 ++++++++- lib/http.c | 16 +++++--- lib/multi.c | 8 ++++ lib/multihandle.h | 10 +++-- lib/pipeline.c | 85 ++++++++++++++++++++++++++++++++++++++++ lib/pipeline.h | 10 +++++ lib/url.c | 19 ++++----- packages/OS400/curl.inc.in | 7 ++++ 12 files changed, 226 insertions(+), 25 deletions(-) diff --git a/docs/libcurl/symbols-in-versions b/docs/libcurl/symbols-in-versions index 8e4ca9c..715cc80 100644 --- a/docs/libcurl/symbols-in-versions +++ b/docs/libcurl/symbols-in-versions @@ -280,6 +280,8 @@ CURLMOPT_MAXCONNECTS 7.16.3 CURLMOPT_MAX_HOST_CONNECTIONS 7.30.0 CURLMOPT_MAX_PIPELINE_LENGTH 7.30.0 CURLMOPT_MAX_TOTAL_CONNECTIONS 7.30.0 +CURLMOPT_PIPELINE_POLICY_DATA 7.40.0 +CURLMOPT_PIPELINE_POLICY_FUNCTION 7.40.0 CURLMOPT_PIPELINING 7.16.0 CURLMOPT_PIPELINING_SERVER_BL 7.30.0 CURLMOPT_PIPELINING_SITE_BL 7.30.0 @@ -627,6 +629,8 @@ CURLVERSION_FOURTH 7.16.1 CURLVERSION_NOW 7.10 CURLVERSION_SECOND 7.11.1 CURLVERSION_THIRD 7.12.0 +CURL_BLACKLISTED 7.40.0 +CURL_CAN_PIPELINE 7.40.0 CURL_CHUNK_BGN_FUNC_FAIL 7.21.0 CURL_CHUNK_BGN_FUNC_OK 7.21.0 CURL_CHUNK_BGN_FUNC_SKIP 7.21.0 @@ -722,6 +726,7 @@ CURL_SSLVERSION_TLSv1 7.9.2 CURL_SSLVERSION_TLSv1_0 7.34.0 CURL_SSLVERSION_TLSv1_1 7.34.0 CURL_SSLVERSION_TLSv1_2 7.34.0 +CURL_SUPPORTS_PIPELINING 7.40.0 CURL_TIMECOND_IFMODSINCE 7.9.7 CURL_TIMECOND_IFUNMODSINCE 7.9.7 CURL_TIMECOND_LASTMOD 7.9.7 diff --git a/include/curl/multi.h b/include/curl/multi.h index 3c4acb0..9da0869 100644 --- a/include/curl/multi.h +++ b/include/curl/multi.h @@ -289,6 +289,63 @@ CURL_EXTERN CURLMcode curl_multi_socket_action(CURLM *multi_handle, CURL_EXTERN CURLMcode curl_multi_socket_all(CURLM *multi_handle, int *running_handles); +/* + * Name: curl_pipeline_policy + * + * Desc: Contains the per site pipeline policy variables. The values default + * to CURLMOPT_MAX_HOST_CONNECTIONS, CURLMOPT_MAX_PIPELINE_LENGTH, + * and whether or not the site appears in CURLMOPT_PIPELINING_SITE_BL + * respectively. Initially CURL_SUPPORTS_PIPELINING is not set. + * However, these values can be overwritten by the user during the + * curl_pipeline_policy_callback set with + * CURLMOPT_PIPELINE_POLICY_FUNCTION. If CURL_SUPPORTS_PIPELINING and + * CURL_BLACKLISTED are both set then the site is blacklisted from + * pipelining. + */ + +#define CURL_SUPPORTS_PIPELINING 1 +#define CURL_BLACKLISTED 2 + +struct curl_pipeline_policy { + size_t max_host_connections; /* the maximum number of simultaneous + connections to the site */ + long max_pipeline_length; /* the maximum amount of requests in + a pipelined connection */ + int flags; /* bit-mask for CURL_SUPPORTS_PIPELINING + and CURL_BLACKLISTED */ +}; + +/* Return TRUE iff bundle supports pipelining and is not blacklisted */ +#define CURL_CAN_PIPELINE(bundle) ((bundle->policy.flags & \ + (CURL_SUPPORTS_PIPELINING|CURL_BLACKLISTED)) == CURL_SUPPORTS_PIPELINING) + +/* + * Name: curl_pipeline_policy_callback + * + * Desc: Called by libcurl whenever the library creates a new bundle for + * some hostname:port combination, or when the policy was changed + * due to pipelining capability detection or addition/removal + * of site blacklisting. The callback is not called when the + * policy is changed due to server version blacklisting. + * If the user resets CURL_SUPPORTS_PIPELINING and does not set + * CURL_BLACKLISTED then most likely the callback will happen + * again when the reply is received for the next request on + * the connection. If that is not desirable then just set + * CURL_BLACKLISTED instead. + * Note that if a connection is blacklisted due to + * CURLMOPT_PIPELINING_SERVER_BL then this callback is not + * called. This is to avoid repetive calling of the callback + * in the case the user would reset CURL_BLACKLISTED; + * CURLMOPT_PIPELINING_SERVER_BL overrules and cannot be tuned + * with this callback. + */ +typedef void (*curl_pipeline_policy_callback)( + char const *hostname, /* connected hostname */ + int port, /* connected port */ + struct curl_pipeline_policy* policy, /* writable policy */ + void *userp); /* private callback + pointer */ + #ifndef CURL_ALLOW_OLD_MULTI_SOCKET /* This macro below was added in 7.16.3 to push users who recompile to use the new curl_multi_socket_action() instead of the old curl_multi_socket() @@ -365,6 +422,12 @@ typedef enum { /* maximum number of open connections in total */ CINIT(MAX_TOTAL_CONNECTIONS, LONG, 13), + /* This is the pipeline policy callback function pointer */ + CINIT(PIPELINE_POLICY_FUNCTION, FUNCTIONPOINT, 14), + + /* This is the argument passed to the pipeline policy callback */ + CINIT(PIPELINE_POLICY_DATA, OBJECTPOINT, 15), + CURLMOPT_LASTENTRY /* the last unused */ } CURLMoption; diff --git a/lib/bundles.c b/lib/bundles.c index aadf026..e46b878 100644 --- a/lib/bundles.c +++ b/lib/bundles.c @@ -46,6 +46,7 @@ static void conn_llist_dtor(void *user, void *element) } CURLcode Curl_bundle_create(struct SessionHandle *data, + struct curl_pipeline_policy *policy, struct connectbundle **cb_ptr) { (void)data; @@ -55,7 +56,7 @@ CURLcode Curl_bundle_create(struct SessionHandle *data, return CURLE_OUT_OF_MEMORY; (*cb_ptr)->num_connections = 0; - (*cb_ptr)->server_supports_pipelining = FALSE; + (*cb_ptr)->policy = *policy; (*cb_ptr)->conn_list = Curl_llist_alloc((curl_llist_dtor) conn_llist_dtor); if(!(*cb_ptr)->conn_list) { diff --git a/lib/bundles.h b/lib/bundles.h index 3816c40..e418c2c 100644 --- a/lib/bundles.h +++ b/lib/bundles.h @@ -22,14 +22,16 @@ * ***************************************************************************/ +#include <curl/multi.h> + struct connectbundle { - bool server_supports_pipelining; /* TRUE if server supports pipelining, - set after first response */ + struct curl_pipeline_policy policy; /* connect policy */ size_t num_connections; /* Number of connections in the bundle */ struct curl_llist *conn_list; /* The connectdata members of the bundle */ }; CURLcode Curl_bundle_create(struct SessionHandle *data, + struct curl_pipeline_policy *policy, struct connectbundle **cb_ptr); void Curl_bundle_destroy(struct connectbundle *cb_ptr); diff --git a/lib/conncache.c b/lib/conncache.c index fcfb150..c068bfc 100644 --- a/lib/conncache.c +++ b/lib/conncache.c @@ -33,6 +33,7 @@ #include "rawstr.h" #include "bundles.h" #include "conncache.h" +#include "pipeline.h" #include "curl_memory.h" /* The last #include file should be: */ @@ -130,7 +131,23 @@ CURLcode Curl_conncache_add_conn(struct conncache *connc, bundle = Curl_conncache_find_bundle(data->state.conn_cache, conn->host.name); if(!bundle) { - result = Curl_bundle_create(data, &new_bundle); + struct Curl_multi *multi = data->multi; + struct curl_pipeline_policy policy = { 0, 0, 0 }; + if(multi) { + /* policy default values */ + policy.max_host_connections = Curl_multi_max_host_connections(multi); + policy.max_pipeline_length = Curl_multi_max_pipeline_length(multi); + if(Curl_pipeline_site_blacklisted(data, conn)) { + policy.flags = CURL_BLACKLISTED; + } + /* allow user to tweak the policy */ + if(multi->pipeline_policy_cb) + multi->pipeline_policy_cb(conn->host.name, + conn->remote_port, + &policy, + multi->pipeline_policy_userp); + } + result = Curl_bundle_create(data, &policy, &new_bundle); if(result) return result; diff --git a/lib/http.c b/lib/http.c index b50d00f..48b2aa3 100644 --- a/lib/http.c +++ b/lib/http.c @@ -3320,9 +3320,10 @@ CURLcode Curl_http_readwrite_headers(struct SessionHandle *data, "pipelining supported\n")); /* Activate pipelining if needed */ cb_ptr = conn->bundle; - if(cb_ptr) { - if(!Curl_pipeline_site_blacklisted(data, conn)) - cb_ptr->server_supports_pipelining = TRUE; + if(cb_ptr && !(cb_ptr->policy.flags & + (CURL_SUPPORTS_PIPELINING|CURL_BLACKLISTED))) { + Curl_pipeline_set_policy_flags(data->multi, conn, + CURL_SUPPORTS_PIPELINING); } } @@ -3403,10 +3404,13 @@ CURLcode Curl_http_readwrite_headers(struct SessionHandle *data, else if(checkprefix("Server:", k->p)) { char *server_name = Curl_copy_header_value(k->p); - /* Turn off pipelining if the server version is blacklisted */ - if(conn->bundle && conn->bundle->server_supports_pipelining) { + /* Check if the server version is blacklisted when + we're doing pipelining */ + if(conn->bundle && CURL_CAN_PIPELINE(conn->bundle)) { if(Curl_pipeline_server_blacklisted(data, server_name)) - conn->bundle->server_supports_pipelining = FALSE; + /* Do not call Curl_pipeline_set_policy_flags: + we do not want the policy callback to happen. */ + conn->bundle->policy.flags |= CURL_BLACKLISTED; } Curl_safefree(server_name); } diff --git a/lib/multi.c b/lib/multi.c index 7ea366c..6ac49c4 100644 --- a/lib/multi.c +++ b/lib/multi.c @@ -2381,6 +2381,8 @@ CURLMcode curl_multi_setopt(CURLM *multi_handle, case CURLMOPT_PIPELINING_SITE_BL: res = Curl_pipeline_set_site_blacklist(va_arg(param, char **), &multi->pipelining_site_bl); + if(!res) + Curl_pipeline_update_blacklist_policy(multi); break; case CURLMOPT_PIPELINING_SERVER_BL: res = Curl_pipeline_set_server_blacklist(va_arg(param, char **), @@ -2389,6 +2391,12 @@ CURLMcode curl_multi_setopt(CURLM *multi_handle, case CURLMOPT_MAX_TOTAL_CONNECTIONS: multi->max_total_connections = va_arg(param, long); break; + case CURLMOPT_PIPELINE_POLICY_FUNCTION: + multi->pipeline_policy_cb = va_arg(param, curl_pipeline_policy_callback); + break; + case CURLMOPT_PIPELINE_POLICY_DATA: + multi->pipeline_policy_userp = va_arg(param, void *); + break; default: res = CURLM_UNKNOWN_OPTION; break; diff --git a/lib/multihandle.h b/lib/multihandle.h index 1a4b1d9..a673f96 100644 --- a/lib/multihandle.h +++ b/lib/multihandle.h @@ -107,14 +107,18 @@ struct Curl_multi { long maxconnects; /* if >0, a fixed limit of the maximum number of entries we're allowed to grow the connection cache to */ - long max_host_connections; /* if >0, a fixed limit of the maximum number + long max_host_connections; /* if >0, the default limit of the maximum number of connections per host */ long max_total_connections; /* if >0, a fixed limit of the maximum number of connections in total */ - long max_pipeline_length; /* if >0, maximum number of requests in a - pipeline */ + /* callback function and user data pointer for the pipeline policy API */ + curl_pipeline_policy_callback pipeline_policy_cb; + void *pipeline_policy_userp; + + long max_pipeline_length; /* if >0, the default maximum number of requests + in a pipeline */ long content_length_penalty_size; /* a connection with a content-length bigger than diff --git a/lib/pipeline.c b/lib/pipeline.c index 2645fdb..67b7d5e 100644 --- a/lib/pipeline.c +++ b/lib/pipeline.c @@ -33,6 +33,7 @@ #include "sendf.h" #include "rawstr.h" #include "bundles.h" +#include "conncache.h" #include "curl_memory.h" /* The last #include file should be: */ @@ -248,6 +249,90 @@ CURLMcode Curl_pipeline_set_site_blacklist(char **sites, return CURLM_OK; } +void Curl_pipeline_set_policy_flags(struct Curl_multi *multi, + struct connectdata *conn, + int flags) +{ + struct connectbundle *cb_ptr = conn->bundle; + bool changed = (cb_ptr->policy.flags & flags) != flags; + cb_ptr->policy.flags |= flags; + /* If the policy was changed, call the policy callback */ + if(changed && multi && multi->pipeline_policy_cb) + multi->pipeline_policy_cb(conn->host.name, + conn->remote_port, + &cb_ptr->policy, + multi->pipeline_policy_userp); +} + +void Curl_pipeline_reset_policy_flags(struct Curl_multi *multi, + struct connectdata *conn, + int flags) +{ + struct connectbundle *cb_ptr = conn->bundle; + bool changed = (cb_ptr->policy.flags & flags) != 0; + cb_ptr->policy.flags &= ~flags; + /* If the policy was changed, call the policy callback */ + if(changed && multi && multi->pipeline_policy_cb) + multi->pipeline_policy_cb(conn->host.name, + conn->remote_port, + &cb_ptr->policy, + multi->pipeline_policy_userp); +} + +void Curl_pipeline_update_blacklist_policy(struct Curl_multi *multi) +{ + struct curl_llist *blacklist = Curl_multi_pipelining_site_bl(multi); + if(blacklist) { + struct conncache *connc = multi->conn_cache; + struct curl_hash_iterator iter; + struct curl_hash_element *he; + + /* Run over all bundles */ + Curl_hash_start_iterate(connc->hash, &iter); + he = Curl_hash_next_element(&iter); + while(he) { + struct connectbundle *bundle = he->ptr; + bool blacklisted = FALSE; + he = Curl_hash_next_element(&iter); + + /* Run over all connections (conn) in this bundle, until we + run out of connection or find one that is blacklisted. */ + struct curl_llist_element *curr_conn = bundle->conn_list->head; + while(!blacklisted && curr_conn) { + struct connectdata *conn = curr_conn->ptr; + curr_conn = curr_conn->next; + + /* Run over all sites (site) in the blacklist */ + struct curl_llist_element *curr = blacklist->head; + while(!blacklisted && curr) { + struct site_blacklist_entry *site = curr->ptr; + curr = curr->next; + + if(site->port == conn->remote_port && + Curl_raw_equal(site->hostname, conn->host.name)) { + /* Connection conn is blacklisted. Set flag. */ + Curl_pipeline_set_policy_flags(multi, conn, CURL_BLACKLISTED); + /* Read back the flag because the policy callback might have + reset it again. */ + blacklisted = (bundle->policy.flags & CURL_BLACKLISTED) != 0; + } + } + + if(!blacklisted) { + /* None of the connections in the bundle are blacklisted. + Reset the flag. */ + Curl_pipeline_reset_policy_flags(multi, conn, CURL_BLACKLISTED); + /* Read back the flag because the policy callback might have + set it again. */ + blacklisted = (bundle->policy.flags & CURL_BLACKLISTED) != 0; + } + + } /* Next conn in the bundle, unless the last one was blacklisted. */ + + } /* Next bundle */ + } +} + bool Curl_pipeline_server_blacklisted(struct SessionHandle *handle, char *server_name) { diff --git a/lib/pipeline.h b/lib/pipeline.h index 96c4c33..374370c 100644 --- a/lib/pipeline.h +++ b/lib/pipeline.h @@ -35,6 +35,16 @@ bool Curl_pipeline_site_blacklisted(struct SessionHandle *handle, CURLMcode Curl_pipeline_set_site_blacklist(char **sites, struct curl_llist **list_ptr); +void Curl_pipeline_set_policy_flags(struct Curl_multi *multi, + struct connectdata *conn, + int flags); + +void Curl_pipeline_reset_policy_flags(struct Curl_multi *multi, + struct connectdata *conn, + int flags); + +void Curl_pipeline_update_blacklist_policy(struct Curl_multi *multi); + bool Curl_pipeline_server_blacklisted(struct SessionHandle *handle, char *server_name); diff --git a/lib/url.c b/lib/url.c index 5b19f89..bb7ceba 100644 --- a/lib/url.c +++ b/lib/url.c @@ -3022,25 +3022,20 @@ ConnectionExists(struct SessionHandle *data, *force_reuse = FALSE; - /* We can't pipe if the site is blacklisted */ - if(canPipeline && Curl_pipeline_site_blacklisted(data, needle)) { - canPipeline = FALSE; - } - /* Look up the bundle with all the connections to this particular host */ bundle = Curl_conncache_find_bundle(data->state.conn_cache, needle->host.name); if(bundle) { - size_t max_pipe_len = Curl_multi_max_pipeline_length(data->multi); + size_t max_pipe_len = bundle->policy.max_pipeline_length; size_t best_pipe_len = max_pipe_len; struct curl_llist_element *curr; infof(data, "Found bundle for host %s: %p\n", needle->host.name, (void *)bundle); - /* We can't pipe if we don't know anything about the server */ - if(canPipeline && !bundle->server_supports_pipelining) { + /* Check if we are pipelining */ + if(canPipeline && !CURL_CAN_PIPELINE(bundle)) { infof(data, "Server doesn't support pipelining\n"); canPipeline = FALSE; } @@ -5203,7 +5198,6 @@ static CURLcode create_conn(struct SessionHandle *data, bool prot_missing = FALSE; bool no_connections_available = FALSE; bool force_reuse = FALSE; - size_t max_host_connections = Curl_multi_max_host_connections(data->multi); size_t max_total_connections = Curl_multi_max_total_connections(data->multi); *async = FALSE; @@ -5546,7 +5540,8 @@ static CURLcode create_conn(struct SessionHandle *data, infof(data, "Found connection %ld, with requests in the pipe (%zu)\n", conn_temp->connection_id, pipelen); - if(conn_temp->bundle->num_connections < max_host_connections && + if(conn_temp->bundle->num_connections < + conn_temp->bundle->policy.max_host_connections && data->state.conn_cache->num_connections < max_total_connections) { /* We want a new connection anyway */ reuse = FALSE; @@ -5585,8 +5580,8 @@ static CURLcode create_conn(struct SessionHandle *data, bundle = Curl_conncache_find_bundle(data->state.conn_cache, conn->host.name); - if(max_host_connections > 0 && bundle && - (bundle->num_connections >= max_host_connections)) { + if(bundle && bundle->policy.max_host_connections > 0 && + (bundle->num_connections >= bundle->policy.max_host_connections)) { struct connectdata *conn_candidate; /* The bundle is full. Let's see if we can kill a connection. */ diff --git a/packages/OS400/curl.inc.in b/packages/OS400/curl.inc.in index 2f6d86a..69e07db 100644 --- a/packages/OS400/curl.inc.in +++ b/packages/OS400/curl.inc.in @@ -1553,6 +1553,10 @@ d c 10012 d CURLMOPT_MAX_TOTAL_CONNECTIONS... d c 00013 + d CURLMOPT_PIPELINING_POLICY_FUNCTION... + d c 20014 + d CURLMOPT_PIPELINING_POLICY_DATA... + d c 10015 * * Public API enums for RTSP requests. * @@ -1771,6 +1775,9 @@ d curl_socket_callback... d s * based(######ptr######) procptr * + d curl_pipeline_policy_callback... + d s * based(######ptr######) procptr + * d curl_opensocket_callback... d s * based(######ptr######) procptr * -- 2.1.3 --MP_/CCBflSxEMJ8DU3AO7/YJ9En Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: base64 Content-Disposition: inline LS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0t LS0tLS0tLS0tLQpMaXN0IGFkbWluOiBodHRwOi8vY29vbC5oYXh4LnNlL2xpc3QvbGlzdGluZm8v Y3VybC1saWJyYXJ5CkV0aXF1ZXR0ZTogIGh0dHA6Ly9jdXJsLmhheHguc2UvbWFpbC9ldGlxdWV0 dGUuaHRtbA== --MP_/CCBflSxEMJ8DU3AO7/YJ9En--Received on 2001-09-17