cURL / Mailing Lists / curl-library / Single Mail

curl-library

[PATCH] pipelining: Add CURLMOPT_PIPELINE_POLICY_FUNCTION

From: Carlo Wood <carlo_at_alinoe.com>
Date: Fri, 31 Oct 2014 21:01:57 +0100

Also adds CURLMOPT_PIPELINE_POLICY_DATA, CURL_SUPPORTS_PIPELINING,
CURL_BLACKLISTED, struct curl_pipeline_policy
and curl_pipeline_policy_callback

This allows the user to tweak the maximum number of connections
per host, the maximum number of requests in a pipeline and
to override blacklisting or implement whitelisting on a per
host basis (since bundles are only per host, not host:port - which
imho is a limitation). Finally they can cause libcurl to start
doing pipelining immediately instead after the first reply
is received.

The callback is called when a bundle is constructed and when libcurl
detects (for the first time) that a bundle can do http pipelining.
Without this the user can't specify anything of the policy unless the
bundle is destructed (which normally doesn't happen).

---
 docs/libcurl/symbols-in-versions |  5 +++
 include/curl/multi.h             | 63 +++++++++++++++++++++++++++++
 lib/bundles.c                    |  3 +-
 lib/bundles.h                    |  6 ++-
 lib/conncache.c                  | 19 ++++++++-
 lib/http.c                       | 16 +++++---
 lib/multi.c                      |  8 ++++
 lib/multihandle.h                | 10 +++--
 lib/pipeline.c                   | 85 ++++++++++++++++++++++++++++++++++++++++
 lib/pipeline.h                   | 10 +++++
 lib/url.c                        | 19 ++++-----
 packages/OS400/curl.inc.in       |  7 ++++
 12 files changed, 226 insertions(+), 25 deletions(-)
diff --git a/docs/libcurl/symbols-in-versions b/docs/libcurl/symbols-in-versions
index 8e4ca9c..715cc80 100644
--- a/docs/libcurl/symbols-in-versions
+++ b/docs/libcurl/symbols-in-versions
@@ -280,6 +280,8 @@ CURLMOPT_MAXCONNECTS            7.16.3
 CURLMOPT_MAX_HOST_CONNECTIONS   7.30.0
 CURLMOPT_MAX_PIPELINE_LENGTH    7.30.0
 CURLMOPT_MAX_TOTAL_CONNECTIONS  7.30.0
+CURLMOPT_PIPELINE_POLICY_DATA   7.40.0
+CURLMOPT_PIPELINE_POLICY_FUNCTION 7.40.0
 CURLMOPT_PIPELINING             7.16.0
 CURLMOPT_PIPELINING_SERVER_BL   7.30.0
 CURLMOPT_PIPELINING_SITE_BL     7.30.0
@@ -627,6 +629,8 @@ CURLVERSION_FOURTH              7.16.1
 CURLVERSION_NOW                 7.10
 CURLVERSION_SECOND              7.11.1
 CURLVERSION_THIRD               7.12.0
+CURL_BLACKLISTED                7.40.0
+CURL_CAN_PIPELINE               7.40.0
 CURL_CHUNK_BGN_FUNC_FAIL        7.21.0
 CURL_CHUNK_BGN_FUNC_OK          7.21.0
 CURL_CHUNK_BGN_FUNC_SKIP        7.21.0
@@ -722,6 +726,7 @@ CURL_SSLVERSION_TLSv1           7.9.2
 CURL_SSLVERSION_TLSv1_0         7.34.0
 CURL_SSLVERSION_TLSv1_1         7.34.0
 CURL_SSLVERSION_TLSv1_2         7.34.0
+CURL_SUPPORTS_PIPELINING        7.40.0
 CURL_TIMECOND_IFMODSINCE        7.9.7
 CURL_TIMECOND_IFUNMODSINCE      7.9.7
 CURL_TIMECOND_LASTMOD           7.9.7
diff --git a/include/curl/multi.h b/include/curl/multi.h
index 3c4acb0..9da0869 100644
--- a/include/curl/multi.h
+++ b/include/curl/multi.h
@@ -289,6 +289,63 @@ CURL_EXTERN CURLMcode curl_multi_socket_action(CURLM *multi_handle,
 CURL_EXTERN CURLMcode curl_multi_socket_all(CURLM *multi_handle,
                                             int *running_handles);
 
+/*
+ * Name:    curl_pipeline_policy
+ *
+ * Desc:    Contains the per site pipeline policy variables. The values default
+ *          to CURLMOPT_MAX_HOST_CONNECTIONS, CURLMOPT_MAX_PIPELINE_LENGTH,
+ *          and whether or not the site appears in CURLMOPT_PIPELINING_SITE_BL
+ *          respectively. Initially CURL_SUPPORTS_PIPELINING is not set.
+ *          However, these values can be overwritten by the user during the
+ *          curl_pipeline_policy_callback set with
+ *          CURLMOPT_PIPELINE_POLICY_FUNCTION. If CURL_SUPPORTS_PIPELINING and
+ *          CURL_BLACKLISTED are both set then the site is blacklisted from
+ *          pipelining.
+ */
+
+#define CURL_SUPPORTS_PIPELINING 1
+#define CURL_BLACKLISTED         2
+
+struct curl_pipeline_policy {
+  size_t max_host_connections;  /* the maximum number of simultaneous
+                                   connections to the site */
+  long max_pipeline_length;     /* the maximum amount of requests in
+                                   a pipelined connection */
+  int  flags;                   /* bit-mask for CURL_SUPPORTS_PIPELINING
+                                   and CURL_BLACKLISTED */
+};
+
+/* Return TRUE iff bundle supports pipelining and is not blacklisted */
+#define CURL_CAN_PIPELINE(bundle) ((bundle->policy.flags & \
+   (CURL_SUPPORTS_PIPELINING|CURL_BLACKLISTED)) == CURL_SUPPORTS_PIPELINING)
+
+/*
+ * Name:    curl_pipeline_policy_callback
+ *
+ * Desc:    Called by libcurl whenever the library creates a new bundle for
+ *          some hostname:port combination, or when the policy was changed
+ *          due to pipelining capability detection or addition/removal
+ *          of site blacklisting. The callback is not called when the
+ *          policy is changed due to server version blacklisting.
+ *          If the user resets CURL_SUPPORTS_PIPELINING and does not set
+ *          CURL_BLACKLISTED then most likely the callback will happen
+ *          again when the reply is received for the next request on
+ *          the connection. If that is not desirable then just set
+ *          CURL_BLACKLISTED instead.
+ *          Note that if a connection is blacklisted due to
+ *          CURLMOPT_PIPELINING_SERVER_BL then this callback is not
+ *          called. This is to avoid repetive calling of the callback
+ *          in the case the user would reset CURL_BLACKLISTED;
+ *          CURLMOPT_PIPELINING_SERVER_BL overrules and cannot be tuned
+ *          with this callback.
+ */
+typedef void (*curl_pipeline_policy_callback)(
+              char const *hostname,                /* connected hostname */
+              int port,                            /* connected port */
+              struct curl_pipeline_policy* policy, /* writable policy */
+              void *userp);                        /* private callback
+                                                      pointer */
+
 #ifndef CURL_ALLOW_OLD_MULTI_SOCKET
 /* This macro below was added in 7.16.3 to push users who recompile to use
    the new curl_multi_socket_action() instead of the old curl_multi_socket()
@@ -365,6 +422,12 @@ typedef enum {
   /* maximum number of open connections in total */
   CINIT(MAX_TOTAL_CONNECTIONS, LONG, 13),
 
+  /* This is the pipeline policy callback function pointer */
+  CINIT(PIPELINE_POLICY_FUNCTION, FUNCTIONPOINT, 14),
+
+  /* This is the argument passed to the pipeline policy callback */
+  CINIT(PIPELINE_POLICY_DATA, OBJECTPOINT, 15),
+
   CURLMOPT_LASTENTRY /* the last unused */
 } CURLMoption;
 
diff --git a/lib/bundles.c b/lib/bundles.c
index aadf026..e46b878 100644
--- a/lib/bundles.c
+++ b/lib/bundles.c
@@ -46,6 +46,7 @@ static void conn_llist_dtor(void *user, void *element)
 }
 
 CURLcode Curl_bundle_create(struct SessionHandle *data,
+                            struct curl_pipeline_policy *policy,
                             struct connectbundle **cb_ptr)
 {
   (void)data;
@@ -55,7 +56,7 @@ CURLcode Curl_bundle_create(struct SessionHandle *data,
     return CURLE_OUT_OF_MEMORY;
 
   (*cb_ptr)->num_connections = 0;
-  (*cb_ptr)->server_supports_pipelining = FALSE;
+  (*cb_ptr)->policy = *policy;
 
   (*cb_ptr)->conn_list = Curl_llist_alloc((curl_llist_dtor) conn_llist_dtor);
   if(!(*cb_ptr)->conn_list) {
diff --git a/lib/bundles.h b/lib/bundles.h
index 3816c40..e418c2c 100644
--- a/lib/bundles.h
+++ b/lib/bundles.h
@@ -22,14 +22,16 @@
  *
  ***************************************************************************/
 
+#include <curl/multi.h>
+
 struct connectbundle {
-  bool server_supports_pipelining; /* TRUE if server supports pipelining,
-                                      set after first response */
+  struct curl_pipeline_policy policy; /* connect policy */
   size_t num_connections;       /* Number of connections in the bundle */
   struct curl_llist *conn_list; /* The connectdata members of the bundle */
 };
 
 CURLcode Curl_bundle_create(struct SessionHandle *data,
+                            struct curl_pipeline_policy *policy,
                             struct connectbundle **cb_ptr);
 
 void Curl_bundle_destroy(struct connectbundle *cb_ptr);
diff --git a/lib/conncache.c b/lib/conncache.c
index fcfb150..c068bfc 100644
--- a/lib/conncache.c
+++ b/lib/conncache.c
@@ -33,6 +33,7 @@
 #include "rawstr.h"
 #include "bundles.h"
 #include "conncache.h"
+#include "pipeline.h"
 
 #include "curl_memory.h"
 /* The last #include file should be: */
@@ -130,7 +131,23 @@ CURLcode Curl_conncache_add_conn(struct conncache *connc,
   bundle = Curl_conncache_find_bundle(data->state.conn_cache,
                                       conn->host.name);
   if(!bundle) {
-    result = Curl_bundle_create(data, &new_bundle);
+    struct Curl_multi *multi = data->multi;
+    struct curl_pipeline_policy policy = { 0, 0, 0 };
+    if(multi) {
+      /* policy default values */
+      policy.max_host_connections = Curl_multi_max_host_connections(multi);
+      policy.max_pipeline_length = Curl_multi_max_pipeline_length(multi);
+      if(Curl_pipeline_site_blacklisted(data, conn)) {
+        policy.flags = CURL_BLACKLISTED;
+      }
+      /* allow user to tweak the policy */
+      if(multi->pipeline_policy_cb)
+        multi->pipeline_policy_cb(conn->host.name,
+                                  conn->remote_port,
+                                  &policy,
+                                  multi->pipeline_policy_userp);
+    }
+    result = Curl_bundle_create(data, &policy, &new_bundle);
     if(result)
       return result;
 
diff --git a/lib/http.c b/lib/http.c
index b50d00f..48b2aa3 100644
--- a/lib/http.c
+++ b/lib/http.c
@@ -3320,9 +3320,10 @@ CURLcode Curl_http_readwrite_headers(struct SessionHandle *data,
                        "pipelining supported\n"));
           /* Activate pipelining if needed */
           cb_ptr = conn->bundle;
-          if(cb_ptr) {
-            if(!Curl_pipeline_site_blacklisted(data, conn))
-              cb_ptr->server_supports_pipelining = TRUE;
+          if(cb_ptr && !(cb_ptr->policy.flags &
+                         (CURL_SUPPORTS_PIPELINING|CURL_BLACKLISTED))) {
+            Curl_pipeline_set_policy_flags(data->multi, conn,
+                                           CURL_SUPPORTS_PIPELINING);
           }
         }
 
@@ -3403,10 +3404,13 @@ CURLcode Curl_http_readwrite_headers(struct SessionHandle *data,
     else if(checkprefix("Server:", k->p)) {
       char *server_name = Curl_copy_header_value(k->p);
 
-      /* Turn off pipelining if the server version is blacklisted */
-      if(conn->bundle && conn->bundle->server_supports_pipelining) {
+      /* Check if the server version is blacklisted when
+         we're doing pipelining */
+      if(conn->bundle && CURL_CAN_PIPELINE(conn->bundle)) {
         if(Curl_pipeline_server_blacklisted(data, server_name))
-          conn->bundle->server_supports_pipelining = FALSE;
+          /* Do not call Curl_pipeline_set_policy_flags:
+             we do not want the policy callback to happen. */
+          conn->bundle->policy.flags |= CURL_BLACKLISTED;
       }
       Curl_safefree(server_name);
     }
diff --git a/lib/multi.c b/lib/multi.c
index 7ea366c..6ac49c4 100644
--- a/lib/multi.c
+++ b/lib/multi.c
@@ -2381,6 +2381,8 @@ CURLMcode curl_multi_setopt(CURLM *multi_handle,
   case CURLMOPT_PIPELINING_SITE_BL:
     res = Curl_pipeline_set_site_blacklist(va_arg(param, char **),
                                            &multi->pipelining_site_bl);
+    if(!res)
+      Curl_pipeline_update_blacklist_policy(multi);
     break;
   case CURLMOPT_PIPELINING_SERVER_BL:
     res = Curl_pipeline_set_server_blacklist(va_arg(param, char **),
@@ -2389,6 +2391,12 @@ CURLMcode curl_multi_setopt(CURLM *multi_handle,
   case CURLMOPT_MAX_TOTAL_CONNECTIONS:
     multi->max_total_connections = va_arg(param, long);
     break;
+  case CURLMOPT_PIPELINE_POLICY_FUNCTION:
+    multi->pipeline_policy_cb = va_arg(param, curl_pipeline_policy_callback);
+    break;
+  case CURLMOPT_PIPELINE_POLICY_DATA:
+    multi->pipeline_policy_userp = va_arg(param, void *);
+    break;
   default:
     res = CURLM_UNKNOWN_OPTION;
     break;
diff --git a/lib/multihandle.h b/lib/multihandle.h
index 1a4b1d9..a673f96 100644
--- a/lib/multihandle.h
+++ b/lib/multihandle.h
@@ -107,14 +107,18 @@ struct Curl_multi {
   long maxconnects; /* if >0, a fixed limit of the maximum number of entries
                        we're allowed to grow the connection cache to */
 
-  long max_host_connections; /* if >0, a fixed limit of the maximum number
+  long max_host_connections; /* if >0, the default limit of the maximum number
                                 of connections per host */
 
   long max_total_connections; /* if >0, a fixed limit of the maximum number
                                  of connections in total */
 
-  long max_pipeline_length; /* if >0, maximum number of requests in a
-                               pipeline */
+  /* callback function and user data pointer for the pipeline policy API */
+  curl_pipeline_policy_callback pipeline_policy_cb;
+  void *pipeline_policy_userp;
+
+  long max_pipeline_length; /* if >0, the default maximum number of requests
+                               in a pipeline */
 
   long content_length_penalty_size; /* a connection with a
                                        content-length bigger than
diff --git a/lib/pipeline.c b/lib/pipeline.c
index 2645fdb..67b7d5e 100644
--- a/lib/pipeline.c
+++ b/lib/pipeline.c
@@ -33,6 +33,7 @@
 #include "sendf.h"
 #include "rawstr.h"
 #include "bundles.h"
+#include "conncache.h"
 
 #include "curl_memory.h"
 /* The last #include file should be: */
@@ -248,6 +249,90 @@ CURLMcode Curl_pipeline_set_site_blacklist(char **sites,
   return CURLM_OK;
 }
 
+void Curl_pipeline_set_policy_flags(struct Curl_multi *multi,
+                                    struct connectdata *conn,
+                                    int flags)
+{
+  struct connectbundle *cb_ptr = conn->bundle;
+  bool changed = (cb_ptr->policy.flags & flags) != flags;
+  cb_ptr->policy.flags |= flags;
+  /* If the policy was changed, call the policy callback */
+  if(changed && multi && multi->pipeline_policy_cb)
+    multi->pipeline_policy_cb(conn->host.name,
+                              conn->remote_port,
+                              &cb_ptr->policy,
+                              multi->pipeline_policy_userp);
+}
+
+void Curl_pipeline_reset_policy_flags(struct Curl_multi *multi,
+                                      struct connectdata *conn,
+                                      int flags)
+{
+  struct connectbundle *cb_ptr = conn->bundle;
+  bool changed = (cb_ptr->policy.flags & flags) != 0;
+  cb_ptr->policy.flags &= ~flags;
+  /* If the policy was changed, call the policy callback */
+  if(changed && multi && multi->pipeline_policy_cb)
+    multi->pipeline_policy_cb(conn->host.name,
+                              conn->remote_port,
+                              &cb_ptr->policy,
+                              multi->pipeline_policy_userp);
+}
+
+void Curl_pipeline_update_blacklist_policy(struct Curl_multi *multi)
+{
+  struct curl_llist *blacklist = Curl_multi_pipelining_site_bl(multi);
+  if(blacklist) {
+    struct conncache *connc = multi->conn_cache;
+    struct curl_hash_iterator iter;
+    struct curl_hash_element *he;
+
+    /* Run over all bundles */
+    Curl_hash_start_iterate(connc->hash, &iter);
+    he = Curl_hash_next_element(&iter);
+    while(he) {
+      struct connectbundle *bundle = he->ptr;
+      bool blacklisted = FALSE;
+      he = Curl_hash_next_element(&iter);
+
+      /* Run over all connections (conn) in this bundle, until we
+         run out of connection or find one that is blacklisted. */
+      struct curl_llist_element *curr_conn = bundle->conn_list->head;
+      while(!blacklisted && curr_conn) {
+        struct connectdata *conn = curr_conn->ptr;
+        curr_conn = curr_conn->next;
+
+        /* Run over all sites (site) in the blacklist */
+        struct curl_llist_element *curr = blacklist->head;
+        while(!blacklisted && curr) {
+          struct site_blacklist_entry *site = curr->ptr;
+          curr = curr->next;
+
+          if(site->port == conn->remote_port &&
+             Curl_raw_equal(site->hostname, conn->host.name)) {
+            /* Connection conn is blacklisted. Set flag. */
+            Curl_pipeline_set_policy_flags(multi, conn, CURL_BLACKLISTED);
+            /* Read back the flag because the policy callback might have
+               reset it again. */
+            blacklisted = (bundle->policy.flags & CURL_BLACKLISTED) != 0;
+          }
+        }
+
+        if(!blacklisted) {
+          /* None of the connections in the bundle are blacklisted.
+             Reset the flag. */
+          Curl_pipeline_reset_policy_flags(multi, conn, CURL_BLACKLISTED);
+          /* Read back the flag because the policy callback might have
+             set it again. */
+          blacklisted = (bundle->policy.flags & CURL_BLACKLISTED) != 0;
+        }
+
+      } /* Next conn in the bundle, unless the last one was blacklisted. */
+
+    } /* Next bundle */
+  }
+}
+
 bool Curl_pipeline_server_blacklisted(struct SessionHandle *handle,
                                       char *server_name)
 {
diff --git a/lib/pipeline.h b/lib/pipeline.h
index 96c4c33..374370c 100644
--- a/lib/pipeline.h
+++ b/lib/pipeline.h
@@ -35,6 +35,16 @@ bool Curl_pipeline_site_blacklisted(struct SessionHandle *handle,
 CURLMcode Curl_pipeline_set_site_blacklist(char **sites,
                                            struct curl_llist **list_ptr);
 
+void Curl_pipeline_set_policy_flags(struct Curl_multi *multi,
+                                    struct connectdata *conn,
+                                    int flags);
+
+void Curl_pipeline_reset_policy_flags(struct Curl_multi *multi,
+                                      struct connectdata *conn,
+                                      int flags);
+
+void Curl_pipeline_update_blacklist_policy(struct Curl_multi *multi);
+
 bool Curl_pipeline_server_blacklisted(struct SessionHandle *handle,
                                       char *server_name);
 
diff --git a/lib/url.c b/lib/url.c
index 5b19f89..bb7ceba 100644
--- a/lib/url.c
+++ b/lib/url.c
@@ -3022,25 +3022,20 @@ ConnectionExists(struct SessionHandle *data,
 
   *force_reuse = FALSE;
 
-  /* We can't pipe if the site is blacklisted */
-  if(canPipeline && Curl_pipeline_site_blacklisted(data, needle)) {
-    canPipeline = FALSE;
-  }
-
   /* Look up the bundle with all the connections to this
      particular host */
   bundle = Curl_conncache_find_bundle(data->state.conn_cache,
                                       needle->host.name);
   if(bundle) {
-    size_t max_pipe_len = Curl_multi_max_pipeline_length(data->multi);
+    size_t max_pipe_len = bundle->policy.max_pipeline_length;
     size_t best_pipe_len = max_pipe_len;
     struct curl_llist_element *curr;
 
     infof(data, "Found bundle for host %s: %p\n",
           needle->host.name, (void *)bundle);
 
-    /* We can't pipe if we don't know anything about the server */
-    if(canPipeline && !bundle->server_supports_pipelining) {
+    /* Check if we are pipelining */
+    if(canPipeline && !CURL_CAN_PIPELINE(bundle)) {
       infof(data, "Server doesn't support pipelining\n");
       canPipeline = FALSE;
     }
@@ -5203,7 +5198,6 @@ static CURLcode create_conn(struct SessionHandle *data,
   bool prot_missing = FALSE;
   bool no_connections_available = FALSE;
   bool force_reuse = FALSE;
-  size_t max_host_connections = Curl_multi_max_host_connections(data->multi);
   size_t max_total_connections = Curl_multi_max_total_connections(data->multi);
 
   *async = FALSE;
@@ -5546,7 +5540,8 @@ static CURLcode create_conn(struct SessionHandle *data,
       infof(data, "Found connection %ld, with requests in the pipe (%zu)\n",
             conn_temp->connection_id, pipelen);
 
-      if(conn_temp->bundle->num_connections < max_host_connections &&
+      if(conn_temp->bundle->num_connections <
+          conn_temp->bundle->policy.max_host_connections &&
          data->state.conn_cache->num_connections < max_total_connections) {
         /* We want a new connection anyway */
         reuse = FALSE;
@@ -5585,8 +5580,8 @@ static CURLcode create_conn(struct SessionHandle *data,
 
     bundle = Curl_conncache_find_bundle(data->state.conn_cache,
                                         conn->host.name);
-    if(max_host_connections > 0 && bundle &&
-       (bundle->num_connections >= max_host_connections)) {
+    if(bundle && bundle->policy.max_host_connections > 0 &&
+       (bundle->num_connections >= bundle->policy.max_host_connections)) {
       struct connectdata *conn_candidate;
 
       /* The bundle is full. Let's see if we can kill a connection. */
diff --git a/packages/OS400/curl.inc.in b/packages/OS400/curl.inc.in
index 2f6d86a..69e07db 100644
--- a/packages/OS400/curl.inc.in
+++ b/packages/OS400/curl.inc.in
@@ -1553,6 +1553,10 @@
      d                 c                   10012
      d  CURLMOPT_MAX_TOTAL_CONNECTIONS...
      d                 c                   00013
+     d  CURLMOPT_PIPELINING_POLICY_FUNCTION...
+     d                 c                   20014
+     d  CURLMOPT_PIPELINING_POLICY_DATA...
+     d                 c                   10015
       *
       *  Public API enums for RTSP requests.
       *
@@ -1771,6 +1775,9 @@
      d curl_socket_callback...
      d                 s               *   based(######ptr######) procptr
       *
+     d curl_pipeline_policy_callback...
+     d                 s               *   based(######ptr######) procptr
+      *
      d curl_opensocket_callback...
      d                 s               *   based(######ptr######) procptr
       *
-- 
2.1.3
--MP_/CCBflSxEMJ8DU3AO7/YJ9En
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: base64
Content-Disposition: inline
LS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0tLS0t
LS0tLS0tLS0tLQpMaXN0IGFkbWluOiBodHRwOi8vY29vbC5oYXh4LnNlL2xpc3QvbGlzdGluZm8v
Y3VybC1saWJyYXJ5CkV0aXF1ZXR0ZTogIGh0dHA6Ly9jdXJsLmhheHguc2UvbWFpbC9ldGlxdWV0
dGUuaHRtbA==
--MP_/CCBflSxEMJ8DU3AO7/YJ9En--
Received on 2001-09-17