Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak when cancelling paused http 2 transfer #10971

Closed
lemourin opened this issue Apr 15, 2023 · 4 comments
Closed

Memory leak when cancelling paused http 2 transfer #10971

lemourin opened this issue Apr 15, 2023 · 4 comments
Assignees

Comments

@lemourin
Copy link
Contributor

I did this

I wrote the following program which at the high level does the following:

  1. fire two requests to the same url concurrently
  2. pause each of them after 16mb are received
  3. fire two requests to the same url concurrently 4s after the first request
  4. pause them after 16mb are received
  5. cancel requests fired in 3. 4s after they were started
  6. cancel requests fired in 1. 11s after they were started

Essentially it tries to cancel requests after they were paused.

#include <curl/curl.h>
#include <event2/event.h>

#include <iostream>
#include <list>
#include <sstream>
#include <vector>

namespace {

constexpr const char* kUrl =
    "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/"
    "BigBuckBunny.mp4";

constexpr int kRequestCount = 2;
constexpr int kHttpVersion = CURL_HTTP_VERSION_2;

void TimeoutEvent(evutil_socket_t, short, void* handle);

template <typename T>
T* CheckNotNull(T* p) {
  if (!p) {
    throw std::runtime_error("Unexpected null pointer.");
  }
  return p;
}

void Check(int code) {
  if (code < 0) {
    std::stringstream stream;
    stream << "Unexpected error code: " << code;
    throw std::runtime_error(stream.str().c_str());
  }
}

void Check(CURLcode code) {
  if (code != CURLE_OK) {
    throw std::runtime_error(curl_easy_strerror(code));
  }
}

void Check(CURLMcode code) {
  if (code != CURLM_OK) {
    throw std::runtime_error(curl_multi_strerror(code));
  }
}

struct CurlMultiDeleter {
  void operator()(CURLM* handle) const noexcept {
    Check(curl_multi_cleanup(handle));
  }
};

struct CurlHandleDeleter {
  void operator()(CURL* handle) const noexcept {
    Check(curl_multi_remove_handle(multi_handle, handle));
    curl_easy_cleanup(handle);
  }

  CURLM* multi_handle;
};

struct EventBaseDeleter {
  void operator()(event_base* event_loop) const noexcept {
    event_base_free(event_loop);
  }
};

struct EventDeleter {
  void operator()(event* event) const noexcept {
    Check(event_del(event));
    event_free(event);
  }
};

class CurlGlobalInitializer {
 public:
  CurlGlobalInitializer() { Check(curl_global_init(CURL_GLOBAL_DEFAULT)); }
  ~CurlGlobalInitializer() noexcept { curl_global_cleanup(); }
};

struct MultiContext {
  std::unique_ptr<CURLM, CurlMultiDeleter> handle{
      CheckNotNull(curl_multi_init())};
  event_base* event_loop;
  std::unique_ptr<event, EventDeleter> timeout_event{
      CheckNotNull(event_new(event_loop, /*fd=*/-1,
                             /*events=*/0, TimeoutEvent, handle.get()))};
};

struct HandleContext {
  std::unique_ptr<CURL, CurlHandleDeleter> handle;
  size_t bytes_read = 0;
  int resume_count = 0;
  event_base* event_loop;
  MultiContext* context;
};

auto MakeHandle(CURLM* multi_handle) {
  CURL* handle = CheckNotNull(curl_easy_init());
  if (CURLMcode code = curl_multi_add_handle(multi_handle, handle);
      code != CURLM_OK) {
    curl_easy_cleanup(handle);
    throw std::runtime_error(curl_multi_strerror(code));
  }
  return std::unique_ptr<CURL, CurlHandleDeleter>(
      handle, CurlHandleDeleter{multi_handle});
}

void ProcessEvents(CURLM* multi_handle) {
  CURLMsg* message;
  do {
    int message_count;
    message = curl_multi_info_read(multi_handle, &message_count);
    if (message && message->msg == CURLMSG_DONE) {
      std::cerr << "TRANSFER DONE\n";
    }
  } while (message != nullptr);
}

size_t WriteCallback(char* /*ptr*/, size_t size, size_t nmemb, void* userdata) {
  auto* context = static_cast<HandleContext*>(userdata);
  context->bytes_read += size * nmemb;
  if (context->bytes_read >= 16 * 1024 * 1024) {
    context->bytes_read = 0;
    std::cerr << "PAUSING " << context << '\n';
    return CURL_WRITEFUNC_PAUSE;
  }
  return size * nmemb;
}

void TimeoutEvent(evutil_socket_t, short, void* handle) {
  int running_handles;
  Check(curl_multi_socket_action(handle, CURL_SOCKET_TIMEOUT, 0,
                                 &running_handles));
  ProcessEvents(handle);
}

void SocketEvent(evutil_socket_t fd, short event, void* handle) {
  int running_handles;
  Check(
      curl_multi_socket_action(handle, fd,
                               ((event & EV_READ) ? CURL_CSELECT_IN : 0) |
                                   ((event & EV_WRITE) ? CURL_CSELECT_OUT : 0),
                               &running_handles));
  ProcessEvents(handle);
}

int SocketCallback(CURL*, curl_socket_t socket, int what, void* userp,
                   void* socketp) {
  auto* context = reinterpret_cast<MultiContext*>(userp);
  if (what == CURL_POLL_REMOVE) {
    auto* data = reinterpret_cast<event*>(socketp);
    if (data) {
      event_free(data);
    }
  } else {
    auto* data = reinterpret_cast<event*>(socketp);
    if (data) {
      event_free(data);
    }
    data = CheckNotNull(
        event_new(context->event_loop, socket,
                  static_cast<short>(((what & CURL_POLL_IN) ? EV_READ : 0) |
                                     ((what & CURL_POLL_OUT) ? EV_WRITE : 0) |
                                     EV_PERSIST),
                  SocketEvent, context->handle.get()));
    Check(curl_multi_assign(context->handle.get(), socket, data));
    Check(event_add(data, /*timeout=*/nullptr));
  }
  return 0;
}

int TimerCallback(CURLM*, long timeout_ms, void* userp) {
  auto* http = reinterpret_cast<MultiContext*>(userp);
  if (timeout_ms == -1) {
    Check(event_del(http->timeout_event.get()));
  } else {
    timeval tv = {
        .tv_sec = static_cast<decltype(tv.tv_sec)>(timeout_ms / 1000),
        .tv_usec = static_cast<decltype(tv.tv_usec)>(timeout_ms % 1000 * 1000)};
    Check(event_add(http->timeout_event.get(), &tv));
  }
  return 0;
}

}  // namespace

int main() {
  CurlGlobalInitializer initializer;

  std::cerr << curl_version() << '\n';

  std::unique_ptr<event_base, EventBaseDeleter> event_loop(
      CheckNotNull(event_base_new()));
  MultiContext context{.event_loop = event_loop.get()};
  Check(curl_multi_setopt(context.handle.get(), CURLMOPT_SOCKETFUNCTION,
                          SocketCallback));
  Check(curl_multi_setopt(context.handle.get(), CURLMOPT_SOCKETDATA, &context));
  Check(curl_multi_setopt(context.handle.get(), CURLMOPT_TIMERFUNCTION,
                          TimerCallback));
  Check(curl_multi_setopt(context.handle.get(), CURLMOPT_TIMERDATA, &context));

  std::list<HandleContext> handle_contexts{kRequestCount};
  for (auto& handle_context : handle_contexts) {
    handle_context.context = &context;
    handle_context.event_loop = event_loop.get();

    CURL* handle =
        (handle_context.handle = MakeHandle(context.handle.get())).get();
    Check(curl_easy_setopt(handle, CURLOPT_URL, kUrl));
    Check(curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, WriteCallback));
    Check(curl_easy_setopt(handle, CURLOPT_WRITEDATA, &handle_context));
    Check(curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, kHttpVersion));
  }
  struct Data {
    MultiContext* context;
    event_base* event_loop;
    std::list<HandleContext>* handle_contexts;
  } data = {.context = &context,
            .event_loop = event_loop.get(),
            .handle_contexts = &handle_contexts};
  {
    timeval tv{.tv_sec = 3};
    Check(event_base_once(
        event_loop.get(), /*fd=*/-1, /*events=*/EV_TIMEOUT,
        [](evutil_socket_t, short, void* userdata) {
          auto* context = static_cast<Data*>(userdata);
          for (int i = 0; i < kRequestCount; i++) {
            HandleContext& handle_context =
                context->handle_contexts->emplace_back();
            handle_context.context = context->context;
            handle_context.event_loop = context->event_loop;

            CURL* handle = (handle_context.handle =
                                MakeHandle(context->context->handle.get()))
                               .get();
            Check(curl_easy_setopt(handle, CURLOPT_URL, kUrl));
            Check(
                curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, WriteCallback));
            Check(curl_easy_setopt(handle, CURLOPT_WRITEDATA, &handle_context));
            Check(curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, kHttpVersion));
            timeval tv{.tv_sec = 4};
            Check(event_base_once(
                context->event_loop, /*fd=*/-1, /*events=*/EV_TIMEOUT,
                [](evutil_socket_t, short, void* userdata) {
                  auto* context = static_cast<HandleContext*>(userdata);
                  std::cerr << "TIMEOUT DELAYED TRANSFER " << context << '\n';
                  Check(curl_multi_remove_handle(context->context->handle.get(),
                                                 context->handle.get()));
                },
                &handle_context, &tv));
          }
        },
        &data, &tv));
  }

  for (auto& handle_context : handle_contexts) {
    timeval tv{.tv_sec = 11};
    Check(event_base_once(
        event_loop.get(), /*fd=*/-1, /*events=*/EV_TIMEOUT,
        [](evutil_socket_t, short, void* userdata) {
          auto* context = static_cast<HandleContext*>(userdata);
          std::cerr << "TIMEOUT TRANSFER " << context << '\n';
          Check(curl_multi_remove_handle(context->context->handle.get(),
                                         context->handle.get()));
        },
        &handle_context, &tv));
  }
  Check(event_base_loop(event_loop.get(), /*flags=*/0));

  return 0;
}

This results in the following memory leak when running with address sanitizer:

Detected memory leaks
Direct leak of 1048656 byte(s) in 1 object(s) allocated from:
  at 0x563f7e2385ea __interceptor_realloc.part.0 (asan_malloc_linux.cpp.o)
  at 0x563f7e293621 curl_dbg_realloc (memdebug.c:265)
  at 0x563f7e2e5aec dyn_nappend (dynbuf.c:100)
  at 0x563f7e2e58e9 Curl_dyn_addn (dynbuf.c:164)
  at 0x563f7e2a7423 pausewrite (sendf.c:236)
  at 0x563f7e2a6e15 chop_write (sendf.c:268)
  at 0x563f7e2a6b47 Curl_client_write (sendf.c:386)
  at 0x563f7e2b97aa readwrite_data (transfer.c:706)
  at 0x563f7e2b88a5 Curl_readwrite (transfer.c:1111)
  at 0x563f7e29ff52 multi_runsingle (multi.c:2436)
  at 0x563f7e2a19ba multi_socket (multi.c:3219)
  at 0x563f7e2a1b51 curl_multi_socket_action (multi.c:3340)
  at 0x563f7e283d55 (anonymous namespace)::SocketEvent(int, short, void*) (curl.cc:142)
  at 0x563f7e3512d7 event_persist_closure (event.c:1659)
  at 0x563f7e3508de event_process_active_single_queue (event.c:1718)
  at 0x563f7e34b0b4 event_process_active (event.c:1819)
  at 0x563f7e349ebb event_base_loop (event.c:2068)
  at 0x563f7e281c1e main (curl.cc:270)
  at 0x7f95d37e978f
Direct leak of 16392 byte(s) in 1 object(s) allocated from:
  at 0x563f7e2385ea __interceptor_realloc.part.0 (asan_malloc_linux.cpp.o)
  at 0x563f7e293621 curl_dbg_realloc (memdebug.c:265)
  at 0x563f7e2e5aec dyn_nappend (dynbuf.c:100)
  at 0x563f7e2e58e9 Curl_dyn_addn (dynbuf.c:164)
  at 0x563f7e2a7423 pausewrite (sendf.c:236)
  at 0x563f7e2a6f92 chop_write (sendf.c:311)
  at 0x563f7e2a6b47 Curl_client_write (sendf.c:386)
  at 0x563f7e2b97aa readwrite_data (transfer.c:706)
  at 0x563f7e2b88a5 Curl_readwrite (transfer.c:1111)
  at 0x563f7e29ff52 multi_runsingle (multi.c:2436)
  at 0x563f7e2a19ba multi_socket (multi.c:3219)
  at 0x563f7e2a1b51 curl_multi_socket_action (multi.c:3340)
  at 0x563f7e283d55 (anonymous namespace)::SocketEvent(int, short, void*) (curl.cc:142)
  at 0x563f7e3512d7 event_persist_closure (event.c:1659)
  at 0x563f7e3508de event_process_active_single_queue (event.c:1718)
  at 0x563f7e34b0b4 event_process_active (event.c:1819)
  at 0x563f7e349ebb event_base_loop (event.c:2068)
  at 0x563f7e281c1e main (curl.cc:270)
  at 0x7f95d37e978f

Interestingly enough, the memory leak doesn't occur when forcing http version 1.1. Possibly the issue could be specific to http 2.

I expected the following

No memory leaks.

curl/libcurl version

libcurl/7.88.1-DEV OpenSSL/3.0.8 zlib/1.2.13 c-ares/1.19.0 nghttp2/1.51.0

operating system

Linux razer 6.1.23-1-MANJARO #1 SMP PREEMPT_DYNAMIC Thu Apr 6 19:47:04 UTC 2023 x86_64 GNU/Linux

Happens on Windows as well.

@bagder
Copy link
Member

bagder commented Apr 15, 2023

This is the code that is not executed for this case and thus the leak happens:

curl/lib/multi.c

Lines 722 to 727 in b32b7bb

/* if the transfer was completed in a paused state there can be buffered
data left to free */
for(i = 0; i < data->state.tempcount; i++) {
Curl_dyn_free(&data->state.tempwrite[i].b);
}
data->state.tempcount = 0;

The question is why not. multi_done() gets called from curl_multi_remove_handle() if there's a connection still associated with transfer - which I presume this has. I suspect that the problem is this:

curl/lib/multi.c

Lines 704 to 710 in b32b7bb

if(CONN_INUSE(conn)) {
/* Stop if still used. */
CONNCACHE_UNLOCK(data);
DEBUGF(infof(data, "Connection still in use %zu, "
"no more multi_done now!",
conn->easyq.size));
return CURLE_OK;

... which then makes it return before the free, if the connection is still in use! We should probably try to move the free calls to before that check and see if that doesn't fix it.

@lemourin, is it hard to reproduce for you or would you be able to try out a patch/PR to check?

(Only h2 and h3 would do that caching as it is due to how the stream windows work, for h1 there is no such caching done.)

@bagder bagder self-assigned this Apr 15, 2023
bagder added a commit that referenced this issue Apr 15, 2023
Before checking for more users of the connection and possibly bailing
out.

Fixes #10971
Reported-by: Paweł Wegner
@bagder
Copy link
Member

bagder commented Apr 15, 2023

#10972 is an attempt to fix this. Any chance you can test this on your case?

@lemourin
Copy link
Contributor Author

I tested with my test program, looks good to me. Thanks for the quick fix!

@bagder bagder closed this as completed in 81b2b57 Apr 15, 2023
@lemourin
Copy link
Contributor Author

lemourin commented Apr 16, 2023

I did more testing today with my real app coro-cloudstorage and I noticed that the downloads are now getting stuck. I can provide more context on what the app does if necessary; or try to write a smaller repro.

Without #10972 it leaks, with #10972 the connections get stuck.

EDIT: changed the test program a bit, it now resumes some transfers; with the memory leak fix one transfer gets stuck and returns "Stream error in the HTTP/2 framing layer" after ~5 minutes

bch pushed a commit to bch/curl that referenced this issue Jul 19, 2023
Before checking for more users of the connection and possibly bailing
out.

Fixes curl#10971
Reported-by: Paweł Wegner
Closes curl#10972
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging a pull request may close this issue.

2 participants