Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Only convert in parallel for the ConsolidatedBlockCreator class for large data #40301

Closed
anjakefala opened this issue Feb 29, 2024 · 22 comments

Comments

@anjakefala
Copy link
Collaborator

Describe the enhancement requested

The Consolidated Block Creator runs the column conversion in parallel, creating a Scalar Memo Table for each column up until pa.cpu_count().

For performance reasons, jemalloc and mimalloc maintain allocations on a per-memory segment level to reduce contention between threads.

What this means is that if a user calls table.to_pandas(split_blocks=False) for a small table, a disproportionately large amount of memory gets allocated to build the Scalar Memo Table. Both jemalloc and mimalloc will essentially allocate a chunk of memory per column.

Here is some code:

import pyarrow as pa

def test_memory_usage():
    table = pa.table({'A': 'a', 'B': 'b', 'C': 'c', 'D': 'd', 'E': 'e', 'F': 'f', 'G': 'g'}) #'H':'h', 'I': 'i', 'J': 'j', 'K':'k', 'L':'l', 'M':'m', 'N':'n', 'O':'o', 'P':'p', 'Q':'q', 'R':'r', 'S':'s', 'T':'t', 'U':'u', 'V':'v', 'W':'w', 'X':'x','Y':'y','Z':'z'})
    df = table.to_pandas()

if __name__ == '__main__':
    test_memory_usage()

Here are the resulting memory allocations summarised with memray:

jemalloc with 7 columns:

📦 Total memory allocated:
        178.127MB

📊 Histogram of allocation size:
        min: 1.000B
        --------------------------------------------
        < 4.000B   :  3548 ▇▇
        < 24.000B  :  1253 ▇
        < 119.000B : 14686 ▇▇▇▇▇▇▇
        < 588.000B :  3746 ▇▇
        < 2.827KB  : 58959 ▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇
        < 13.924KB :  2550 ▇▇
        < 68.569KB :   403 ▇
        < 337.661KB:    84 ▇
        < 1.624MB  :    24 ▇
        <=7.996MB  :    20 ▇
        --------------------------------------------
        max: 7.996MB

jemalloc with 26 columns:

📦 Total memory allocated:
        238.229MB

📊 Histogram of allocation size:
        min: 1.000B
        --------------------------------------------
        < 4.000B   :  3545 ▇▇
        < 24.000B  :  1627 ▇
        < 119.000B : 15086 ▇▇▇▇▇▇▇
        < 588.000B :  3882 ▇▇
        < 2.828KB  : 58971 ▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇
        < 13.929KB :  2552 ▇▇
        < 68.593KB :   403 ▇
        < 337.794KB:    84 ▇
        < 1.625MB  :    24 ▇
        <=8.000MB  :    47 ▇
        --------------------------------------------
        max: 8.000MB

mimalloc with 7 columns:

📦 Total memory allocated:
        380.166MB

📊 Histogram of allocation size:
        min: 1.000B
        --------------------------------------------
        < 6.000B   :  3548 ▇▇
        < 36.000B  :  7470 ▇▇▇▇
        < 222.000B :  9524 ▇▇▇▇▇
        < 1.319KB  : 57271 ▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇
        < 7.999KB  :  6492 ▇▇▇
        < 48.503KB :   775 ▇
        < 294.066KB:   150 ▇
        < 1.741MB  :    26 ▇
        < 10.556MB :     1 ▇
        <=64.000MB :     4 ▇
        --------------------------------------------
        max: 64.000MB

mimalloc with 26 columns:

📦 Total memory allocated:
        1.434GB

📊 Histogram of allocation size:
        min: 1.000B
        --------------------------------------------
        < 6.000B   :  3545 ▇▇
        < 36.000B  :  7845 ▇▇▇▇
        < 222.000B : 10001 ▇▇▇▇▇
        < 1.319KB  : 57332 ▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇
        < 7.999KB  :  6501 ▇▇▇
        < 48.503KB :   794 ▇
        < 294.066KB:   150 ▇
        < 1.741MB  :    26 ▇
        < 10.556MB :     1 ▇
        <=64.000MB :    21 ▇
        --------------------------------------------
        max: 64.000MB

You can see how dramatically the memory increases even for a very small table.

My proposal is that we only do the conversion in parallel when it might make a substantial performance difference for a table of a certain size. I'm not quite sure which size, but once the code has been refactored, we can run experiments to come to a data-informed decision.

Component(s)

C++

@jorisvandenbossche jorisvandenbossche changed the title Only convert in parallel for the ConsolidatedBlockCreator class for large data [Python] Only convert in parallel for the ConsolidatedBlockCreator class for large data Mar 1, 2024
@felipecrv
Copy link
Contributor

felipecrv commented Mar 1, 2024

I’ve taken a look at the code.

OptionalParallelFor is used when PandasOptions::use_threads is set. Internally, it uses the default CPU thread pool which can allocate CPU-count threads at a time. That’s why it allocates CPU-count threads as @anjakefala described.

~My first solution/idea: add another option to PandasOptionsint threads. Then if that’s different than 0, create a different ThreadPool with that as capacity when calling the parallel-for.

Easy, but it’s an extra option that users have to remember to set.~

EDIT 1: I had a better idea.

Alternative

Add more parameters to parallel-for(columns) that modify the loop that submits tasks to the thread-pool:

Start with a low minimum number of submitted tasks (eg 2. each column is a task). After that, tasks are added with a small delay (microseconds) as to give the chance of the initially submitted tasks to finish (because cols are small) and make these initially used threads to be re-used.

This might require internal ThreadPool changes to avoid waits that wouldn’t lead to a global reduction in time taken to convert all columns.

EDIT 2: the better solution is too allow the same thread to continue converting more columns if available with a loop in the main thread increasing the number of threads used if necessary

@anjakefala
Copy link
Collaborator Author

@felipecrv Is modifying ThreadPool better than an option where we use an approach similar to the SplitBlockCreator class for tables under a certain size? That's more along the line of what I was thinking of.

However, if you think work-stealing would be the most robust solution, that other functions would benefit from, I'd be game for approaching this.

I prefer the work-stealing approach because, ideally, we wouldn't require the user to know about the existence of an option to set. Folks might not know that the memory usage has to do with the spawning of individual threads. They might not even know why to_pandas spawns multiple threads.

pitrou added a commit that referenced this issue Mar 20, 2024
### Rationale for this change

Mimalloc and jemalloc can allocate a [relatively large amount of memory for the ScalarMemoTable](#40301). For this reason, the ScalarMemoTable should only be allocated when it is used (when `options.deduplicate_objects=True`).

I tested this change, and for small tables it does improve memory allocation.

`options.deduplicate_objects=False`

After this change:

📦 Total memory allocated:
        174.422MB

📊 Histogram of allocation size:
        min: 1.000B
        --------------------------------------------
        < 6.000B   :  3064 ▇▇
        < 36.000B  :  7533 ▇▇▇▇
        < 222.000B :  9974 ▇▇▇▇▇
        < 1.319KB  : 53264 ▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇
        < 7.999KB  :  5188 ▇▇▇
        < 48.503KB :   742 ▇
        < 294.066KB:   102 ▇
        < 1.741MB  :    22 ▇
        < 10.556MB :     1 ▇
        <=64.000MB :     1 ▇
        --------------------------------------------
        max: 64.000MB

Before this change:

📦 Total memory allocated:
        1.295GB

📊 Histogram of allocation size:
        min: 1.000B
        --------------------------------------------
        < 6.000B   :  3064 ▇▇
        < 36.000B  :  7543 ▇▇▇▇
        < 222.000B : 10009 ▇▇▇▇▇
        < 1.319KB  : 53269 ▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇
        < 7.999KB  :  5192 ▇▇▇
        < 48.503KB :   761 ▇
        < 294.066KB:   102 ▇
        < 1.741MB  :    22 ▇
        < 10.556MB :     1 ▇
        <=64.000MB :    19 ▇
        --------------------------------------------
        max: 64.000MB

### What changes are included in this PR?

The allocation of `memo_table` and `unique_values` have been moved underneath an `if (options.deduplicate_objects)` block. Since they are used within a lambda, they have been changed to shared pointers, so that their values exist for the lifetime needed.

### Are these changes tested?

`deduplicate_objects` has extensive existing tests: https://github.com/apache/arrow/blob/b235f83ed10bcad174b267113479a24ca045def5/python/pyarrow/tests/test_pandas.py#L3211 and https://github.com/apache/arrow/blob/b235f83ed10bcad174b267113479a24ca045def5/python/benchmarks/convert_pandas.py#L71

### Are there any user-facing changes?

Nope.
* GitHub Issue: #40316

Lead-authored-by: anjakefala <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
@felipecrv
Copy link
Contributor

felipecrv commented Apr 8, 2024

Sharing some progress here in the open:

@anjakefala will test a version of this that uses a version of ParallelFor [1] that doesn't create one Future per task/column — the function keeps going by popping the next task (column index) from a shared std::atomic<int> called next_task. This atomic integer plays the same role as the multiple task queues in a generic work-stealing algorithm implementation.

In ParallelForWithBackoff [1], each worker starts from a task/column they should complete and when done it can keep going while next_task.fetch_add(1, std::memory_order_acq_rel) returns a task smaller than num_tasks. Getting that atomic and incrementing it is how it "steals" the task from all the other workers. This maximizes throughput and reduces thread-local allocations because the same thread can very quickly go through many columns.

[1] https://github.com/felipecrv/arrow/blob/b54cb408a8c73af0c4fc76cdceadca0c0b240cb0/cpp/src/arrow/util/parallel.h#L104-L229

@pitrou
Copy link
Member

pitrou commented Apr 9, 2024

I'm honestly not sure what this has to do with work stealing.

If I read correctly:

  • a new ScalarMemoTable is created when converting each column (either eagerly or lazily, this might or might not matter)
  • columns are converted in parallel for smaller execution times
  • a lot of memory allocations may occur because of this

What I would like to know is:

  1. What is the concern? The total number of allocations? The max allocated working set during the workload?
  2. Is it a practical concern?

@jorisvandenbossche
Copy link
Member

Is it a practical concern?

I think the original reported case (converting a tiny table of a few kilobytes to pandas can give a spike of several hundred MBs in memory usage) is something people can certainly run into. And although it will often not be concerning (typically when working with smaller data, memory usage is not an issue, and when actually working with larger tables and memory usage becomes relevant, this overhead will disappear), it is definitely surprising and can lead to confusion. So I think it is worth "fixing".

But the potential fix I was thinking of could also be something much simpler, like with some heuristic decide to just not do the conversion in parallel for smaller data.
For the conversion the other way around (pandas -> pyarrow), we actually have some heuristic currently (in python):

# NOTE(wesm): If nthreads=None, then we use a heuristic to decide whether
# using a thread pool is worth it. Currently the heuristic is whether the
# nrows > 100 * ncols and ncols > 1.
if nthreads is None:
nrows, ncols = len(df), len(df.columns)
if nrows > ncols * 100 and ncols > 1:
nthreads = pa.cpu_count()
else:
nthreads = 1

@pitrou
Copy link
Member

pitrou commented Apr 9, 2024

Yes, I agree this could just be fixed with a heuristic, especially as parallelizing will not be performant on very small data.

@felipecrv
Copy link
Contributor

felipecrv commented Apr 9, 2024

I'm honestly not sure what this has to do with work stealing.

It's the inspiration for this, but if it seems to make the understanding harder, just ignore it.

What is the concern? The total number of allocations? The max allocated working set during the workload?

The practical concern is that jemalloc and mimalloc allocate significant amounts of thread-local data:

For performance reasons, jemalloc and microsoft/mimalloc#351 maintain allocations on a per-memory segment level to reduce contention between threads.


Yes, I agree this could just be fixed with a heuristic, especially as parallelizing will not be performant on very small data.

And that is exactly what I'm doing, but instead of looking at number of rows I look at how long it takes to convert the columns on average, while doing the conversions. Never blocking [1] or sleeping.

It's an adaptive solution: you pass a duration, if it takes more than that duration, a few more threads are allowed to start.

      // If the average wall-clock time per task is greater than the
      // small_task_duration_secs, grow the number of workers geometrically.
      if (static_cast<double>(elapsed_time.count() * steady_clock::period::num) >
          (small_task_duration_secs * steady_clock::period::den) * num_tasks_completed) {
        num_workers = std::min(
            ((num_workers * GrowthRatio::num - 1) / GrowthRatio::den) + 1, max_workers);
      }

And instead of having these issues for every kind of parallelization we do in the future, this function can be used on any problem where the size of the tasks can range from trivial to very expensive.

This what the fix looks like at the callsite:

-    return OptionalParallelFor(options_.use_threads, num_columns_, WriteColumn);
+    if (options_.use_threads) {
+      const double kSmallTaskDurationSecs = 0.008;  // 8ms
+      return ::arrow::internal::ParallelForWithBackOff(num_columns_, WriteColumn,
+                                                       kSmallTaskDurationSecs);
+    } else {
+      for (int i = 0; i < num_columns_; ++i) {
+        RETURN_NOT_OK(WriteColumn(i));
+      }
+      return Status::OK();
+    }

[1] except for the lock-free fetch/add of the next_task atomic

@pitrou
Copy link
Member

pitrou commented Apr 10, 2024

For the concrete example here, it is still surprising that allocating 26 memo tables, all with only one (!) element, would result in 200+MB of memory allocations on jemalloc, and 1+GB on mimalloc.

Our hash table is presized for 32 elements, which shouldn't probably result in so much memory being allocated.

(caveat: I don't know what memray is accounting exactly here)

So before trying to add heuristics for parallelization, perhaps we should investigate this?

@pitrou
Copy link
Member

pitrou commented Apr 10, 2024

A quick experiment (passing deduplicate_objects=False) shows that the memo table is probably not the culprit, since not allocating a memo table seems to result in the same memory consumption.

Edit: this is wrong, I was testing with 15.0.2 but #40316 is only in git main.

@pitrou
Copy link
Member

pitrou commented Apr 10, 2024

Ok, so this is really the initial pre-sizing of the memo tables.

However, the problem here is wrongly interpreting the observations. The 1+GB are not so much "allocated" than pre-reserved by the allocator (mimalloc or jemalloc). Those empty pages are most probably not backed by physical memory until they are actually used by the application (Arrow). The only thing allocated is virtual address space, and virtual address space is virtually infinite (up to 2**48 bytes per process on x86-64).

The actual amount of allocated physical memory is the amount necessary for each 32-element memo table, and that is bound to be very small.

@pitrou
Copy link
Member

pitrou commented Apr 10, 2024

Users tend to worry about such figures because they can give a hint that something is wrong (a leak? a performance issue?) while everything is actually operating as normal. Ideally, profilers such as memray would be able to detect the different kinds of "allocations" but that's a non-trivial problem, especially when private copies of e.g. jemalloc or mimalloc are involved.

@pitrou
Copy link
Member

pitrou commented Apr 10, 2024

I've opened a feature request issue (probably a bit pie-in-the-sky) on the memray side: bloomberg/memray#577

@pitrou
Copy link
Member

pitrou commented Apr 10, 2024

Also a couple more observations:

  • on Linux (Ubuntu 22.04), if you switch to the system allocator (ARROW_DEFAULT_MEMORY_POOL=system) the behavior disappears, which is not better in itself, but may reassure users
  • if you choose mimalloc, MIMALLOC_SHOW_STATS=1 gives you summary stats from mimalloc's PoV at shutdown.

We can therefore confirm that the amount of memory committed (not just reserved) is reasonably small:

$ ARROW_DEFAULT_MEMORY_POOL=mimalloc MIMALLOC_SHOW_STATS=1 python -c "import pyarrow as pa; table = pa.table({'A': 'a', 'B': 'b', 'C': 'c', 'D': 'd', 'E': 'e', 'F': 'f', 'G': 'g', 'H':'h', 'I': 'i', 'J': 'j', 'K':'k', 'L':'l', 'M':'m', 'N':'n', 'O':'o', 'P':'p', 'Q':'q', 'R':'r', 'S':'s', 'T':'t', 'U':'u', 'V':'v', 'W':'w', 'X':'x','Y':'y','Z':'z'}); table.to_pandas()"
heap stats:    peak      total      freed    current       unit      count   
  reserved:    1.5 GiB    1.5 GiB  128.5 KiB    1.5 GiB                        not all freed!
 committed:  112.1 MiB  115.1 MiB  115.1 MiB   64.2 KiB                        not all freed!
     reset:      0          0          0          0                            ok
   touched:    3.1 MiB    3.1 MiB    3.2 MiB -100.3 KiB                        ok
  segments:     25         25         25          0                            ok
-abandoned:      0          0          0          0                            ok
   -cached:      0          0          0          0                            ok
     pages:      0          0         25        -25                            ok
-abandoned:      0          0          0          0                            ok
 -extended:      0    
 -noretire:      0    
     mmaps:      0    
   commits:     48    
   threads:     24         24         48        -24                            ok
  searches:     0.0 avg
numa nodes:       1
   elapsed:       0.255 s
   process: user: 0.232 s, system: 0.038 s, faults: 2, rss: 90.6 MiB, commit: 112.1 MiB

@pitrou
Copy link
Member

pitrou commented Apr 10, 2024

We further discussed this in the aforementioned memray issue, and I created a PR that exposes dynamic symbols for memray to interpose: #41128

We'll see whether the memray devs decide they want to support them, since this is Arrow-specific. Ideally, there would be a "standard" way for libraries and applications to signal that certain dynamic symbols are really malloc-like calls.

@felipecrv
Copy link
Contributor

So before trying to add heuristics for parallelization, perhaps we should investigate this?

Different people take different approaches to problems. I saw ParallelFor very naively posting small tasks to a ThreadPool and came up with a version that doesn't do that.

You saw this problem and decided to fix how the profiler measures memory consumption. Which is a valid strategy, but not the only one, and certainly an even bigger rabbit hole for me personally if I were to take it.

@anjakefala
Copy link
Collaborator Author

Different people take different approaches to problems. I saw ParallelFor very naively posting small tasks to a ThreadPool and came up with a version that doesn't do that.

I think there's still value to Felipe's approach, and I'd like to support testing it. I did mis-interpret results, but it does seem better to ramp up allocation with the need, instead of, by default, spinning up a thread per column.

@pitrou
Copy link
Member

pitrou commented Apr 10, 2024

Note that it's not really spinning up new threads, unless it's the first time the thread pool is being used :-)

@felipecrv
Copy link
Contributor

felipecrv commented Apr 10, 2024

Note that it's not really spinning up new threads, unless it's the first time the thread pool is being used :-)

I tried to understand the behavior of ThreadPool and my (not very confident) conclusion was that each Submit posts the task to a new thread and thread re-use only really starts if the thread pool capacity has been reached.

Is that the case?

@pitrou
Copy link
Member

pitrou commented Apr 10, 2024

It's a bit more complicated. Off the top of my head:

  • worker threads are created on demand; if a task is submitted and there is at least one idle worker, no new worker thread is created;
  • there's never more threads than the desired capacity, which defines an upper limit;
  • tasks are not submitted to a specific thread, they are appended to a shared queue which all existing worker pop from.

@felipecrv
Copy link
Contributor

tasks are not submitted to a specific thread, they are appended to a shared queue which all existing worker pop from.

Seem fare to assume that a very eager loop like

  for (int i = 0; i < num_tasks; ++i) {
    ARROW_ASSIGN_OR_RAISE(futures[i], executor->Submit(func, i));
  }

will reach capacity threads very quickly unless completing a task takes less time than the cost of making capacity iterations of this loop.

@pitrou
Copy link
Member

pitrou commented Apr 10, 2024

Yes, indeed. I was answering the more general question.

@anjakefala
Copy link
Collaborator Author

anjakefala commented Apr 22, 2024

We ran benchmarks on @felipecrv's ParallelFor code and unfortunately the results weren't promising. With @pitrou's explanation on how to interpret the memory allocation results, and learning more about how the ThreadPool works, having a ParallelFor for one step of Arrow analysis doesn't significantly reduce memory usage. And the memory usage isn't as high as I was previously concerned it was.

I'm going to go ahead and close this issue for now! Thanks everyone for helping me look into this. @jorisvandenbossche @felipecrv and @pitrou ❤️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants