Skip to content

convert : write tensors in parallel #12837

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

compilade
Copy link
Collaborator

@compilade compilade commented Apr 8, 2025

This makes GGUFWriter.write_tensors_to_file use a thread pool to write the tensors in parallel.

This should help make conversion faster, since otherwise nothing else happened when reading, processing or writing the tensors. Now that can all happen at once.

convert_hf_to_gguf.py has the new argument --threads (or -t, for short) to specify how many threads to use when writing the tensors.

A queue of all the tensors, their offset and their respective file (so that sharding should work) is used to distribute the work in a small threadpool.

This was inspired by the discussion with @ngxson in #12820 (comment).

@ngxson I'm not sure how this will interact with multithreaded remote tensor fetching in #12820; there may be a lot of threads at once.

This should be compatible with #12820, although it will conflict because of the added arguments to the Model initializer in convert_hf_to_gguf.py (which is easy to resolve).

TODO


Make sure to read the contributing guidelines before submitting a PR

@compilade compilade added performance Speed related topics python python script changes labels Apr 8, 2025
@compilade compilade requested a review from ngxson April 8, 2025 21:04
@ngxson
Copy link
Collaborator

ngxson commented Apr 8, 2025

@ngxson I'm not sure how this will interact with multithreaded remote tensor fetching in #12820; there may be a lot of threads at once.

Yes, I suspect that will be the case. To test, you can decrease the threshold in SafetensorRemote to 1MB so the multithread condition will always be triggered.

I can't think of any good strategy to make this better, probably just remove the multithread in my download logic. WDYT?

@compilade
Copy link
Collaborator Author

compilade commented Apr 8, 2025

@ngxson I'm not sure how this will interact with multithreaded remote tensor fetching in #12820; there may be a lot of threads at once.

Yes, I suspect that will be the case. To test, you can decrease the threshold in SafetensorRemote to 1MB so the multithread condition will always be triggered.

I can't think of any good strategy to make this better, probably just remove the multithread in my download logic. WDYT?

I think it can still be relevant to multithread the download, especially in the case where individual download streams are slow and the biggest tensor ends up being the last to be written in full because of that.

Having the download thread pool across downloads instead of per download might help with that, although it might require having a max range size per unit of work done by the download thread pool (but that may also cause way to many requests to happen per tensor if that max size doesn't also consider the size of the tensors). Not sure what's the best way to schedule those threads.

Maybe it's fine to keep it per download like you did. The decision of how many download threads to use mostly depends on the network bandwidth per stream. There might be a way to detect when it's not faster to use more than N download streams.

@compilade
Copy link
Collaborator Author

compilade commented Apr 9, 2025

@ngxson

I've thought about some situations regarding multi-threaded download as in #12820. The numbered top-level statements are the two situations I'm considering, and they each get split further into two sub-consideration, one with a single thread per tensor downloaded, and the other with multiple threads per download.

This all assumes that there are multiple tensor-writing threads.

  1. A model with one big tensor and hundreads of much smaller tensors
    • One thread per tensor
      • The download for the big tensor progresses as the smaller ones are downloaded and written, all probably have the same priority network-wise
        • The threads are busy downloading and writing their tensors as long as the pending tensor queue is not empty
      • If all the smaller tensors are downloaded and written before the big one finishes, then only the big one is being downloaded
        • Assuming TCP does traffic shaping correctly this should be fine
        • If all the download streams have the same bandwidth, this should only happen when the biggest tensor is bigger than 1 / (number of write threads) (as a fraction of the total model size). The only models where that may be true are small models so downloading them should not take that long anyway.
        • On some networks (which?) a single download stream is slower than multiple
    • Multiple download threads per tensor
      • Multiple streams on the big tensor might slow down the download for the smaller tensors in favor of the big one if the smaller ones are too small to be multi-threaded.
      • More requests for the bigger tensor
  2. A model with a lot of big tensors
    • One thread per tensor
      • One request per tensor
      • There should be enough simultaneous requests to saturate the bandwidth
        • The number of simultaneous requests is configurable with the number of write threads (--threads or -t
      • There will be a last tensor, and that will be single-threaded (which is slow on some networks)
    • Multiple download threads per tensor
      • Lots of download threads at the same time, maybe too many; there may be extra overhead
      • Multiple requests for all of the big tensors
      • The last tensor (if big) will still download with multiple streams (which is good).

So after writing the above, I think having a single download thread per write thread should be simpler and sufficient, at least when writing the tensors can be multi-threaded (as introduced here).

@ngxson
Copy link
Collaborator

ngxson commented Apr 9, 2025

So after writing the above, I think having a single download thread per write thread should be simpler and sufficient, at least when writing the tensors can be multi-threaded (as introduced here).

Yes I agree, I think the multithread is only relevant to models having many big tensors, which is the case for all models > 3B. For smaller models, having it or not won't change much (I mean the time it takes to finish still change, but 20s vs 30s are not a big difference in this context)

I assume that for very big models like Scout or Maverick, either having multithread per tensor or multithread per write, the time it takes will be the same, because most tensors are very big (on each layer, the expert FFN tensor alone is 21GB)

I'm reverting my change in the other PR.

@compilade
Copy link
Collaborator Author

I assume that for very big models like Scout or Maverick, either having multithread per tensor or multithread per write, the time it takes will be the same, because most tensors are very big (on each layer, the expert FFN tensor alone is 21GB)

I think the main difference in time would be for the last tensor which is more likely to be big for very big models.

The worst case with a single download thread per tensor is when every other tensor has finished downloading and the last one is huge.

@ngxson
Copy link
Collaborator

ngxson commented Apr 9, 2025

Should I merge this now? (Given that you still have some TODOs unchecked)

@compilade
Copy link
Collaborator Author

compilade commented Apr 9, 2025

@ngxson I've tested more cases, and it seems to still give identical resulting files as master, even when outputting sharded models or when there's definitely padding (all tested with 4 writing threads).

I think I'll keep the default as 2 threads for now, because the convert script is still mostly I/O bound, and this at least allows interleaving some work when busy reshaping and/or quantizing/converting the types of the tensors. (and also the GIL limits some of the performance benefits for higher thread counts)

@compilade
Copy link
Collaborator Author

compilade commented Apr 9, 2025

Note that I did not yet test on a slow HDD, only an external SSD.

I don't know to what extent slower drives are affected by read/write contention from multiple threads. This is probably relevant to test with different numbers of threads.

@ngxson
Copy link
Collaborator

ngxson commented Apr 9, 2025

Yeah that's a good question. I suspect that will vary depending on the OS and hardware config. But anw I think most modern OS will write to cache firstly anyway, so the performance difference won't be significant between multiple vs single thread writing

Also I'm pretty sure that the write to file descriptor function in python is single thread

Edit: no it's not single threaded, people still need to use Lock

@compilade
Copy link
Collaborator Author

@ngxson I've tried this with the remote lazy tensors from #12820, and when there's a connection error the thread which does the download raises an exception but that doesn't stop the program, so it continues converting until it can't, then then hangs (since not all tensors were written (the queue is not empty)).

Retries will need to be handled in the RemoteTensor.data() function, but even then I think exceptions should be able to stop the conversion.

I'll rewrite the thread pool part with concurrent.futures.ThreadPoolExecutor so that exceptions are easier to propagate to the main thread.

@ngxson
Copy link
Collaborator

ngxson commented May 8, 2025

hey @compilade , is there any chance we can accelerate this a bit? I need this feature because many models on HF has been migrated to Xet backend, which introduce quite a bit more delay when requesting the byte range (kinda expected, because they need to go through a gateway). Having parallel write support can speed up a lot!

@compilade
Copy link
Collaborator Author

hey @compilade , is there any chance we can accelerate this a bit?

@ngxson

The main remaining concern is that threads in Python make error handling more difficult. I did not yet figure out how to cancel non-cooperative threads (since they are blocking on download), and so when there's a failure, the running threads will continue which means if one fails (and the retries fail), then the remaining threads will still try to finish their download before being ignored from the failure.

I'm not sure if the retries should be infinite or not (currently they are limited to 8 retries with progressively more delay (but maybe not enough delay)).

I might make -t 1 completely not use a thread pool (to also avoid the interactions with Ctrl+C, which can only interrupt lock waits on unix-like OSes (I think; never tested on something else than Linux, where spamming Ctrl+C does work to interrupt)).

@ngxson
Copy link
Collaborator

ngxson commented May 8, 2025

I did not yet figure out how to cancel non-cooperative threads (since they are blocking on download)

I think it probably fine to kill the whole process if one of the thread is KO (which will kill all other running threads), right? (Same idea for Ctrl+C handler)

I'm not sure if the retries should be infinite or not (currently they are limited to 8 retries with progressively more delay (but maybe not enough delay)).

Hmm I think we can just probably go with no retry at all, to make your life a bit easier. Btw, I think the retry logic should be handled at HTTP level (i.e. RemoteTensor), so if someone ever need it, I think I'll make a follow up PR.

@compilade
Copy link
Collaborator Author

compilade commented May 8, 2025

I think it probably fine to kill the whole process if one of the thread is KO

@ngxson

Right, but ideally graceful error handling would be preferred (in this case I guess it's fine to exit).

Also it seems like ThreadPoolExecutor doesn't use daemon threads and so exiting will still wait for all the running threads to finish...

So I still don't know how to cancel non-cooperative Python threads. Maybe making them cooperative would help, although it would be hard to coordinate with the rest of the lazy evaluation stuff.

Or maybe using multiprocessing instead of threads would fix cancellation, but that also complicates data sharing (especially when a tensor is split into multiple written tensors (hmm, actually this might not be thread-safe in the current version...)).

Hmm I think we can just probably go with no retry at all, to make your life a bit easier. Btw, I think the retry logic should be handled at HTTP level (i.e. RemoteTensor), so if someone ever need it, I think I'll make a follow up PR.

It's already handled in RemoteTensor here:

class RemoteTensor:
name: str
dtype: str
shape: tuple[int, ...]
offset_start: int
size: int
url: str
def data(self) -> bytearray:
data = None
MAX_RETRIES = 8
for i in range(MAX_RETRIES):
try:
# NOTE: using a bytearray, otherwise PyTorch complains the buffer is not writeable
data = bytearray(
SafetensorRemote.get_data_by_range(
url=self.url, start=self.offset_start, size=self.size
)
)
except (
requests.exceptions.ChunkedEncodingError,
requests.exceptions.ContentDecodingError,
requests.exceptions.ConnectionError,
) as e:
if i == MAX_RETRIES - 1:
raise RuntimeError(f"Failed to download tensor {self.name}") from e
logger.warning(f"Retry ({i + 1}/{MAX_RETRIES}) downloading tensor {self.name} because of {e}")
time.sleep(2 * i + 1) # 1 3 5 7 9 11 13
continue
if data is None:
raise RuntimeError(f"Failed to download tensor {self.name}")
return data

But the delays might not be long enough.

The problem is mostly with failures, and retries partially help with avoiding transient failures.

@ngxson
Copy link
Collaborator

ngxson commented May 8, 2025

Hmm yeah you're right, I was hoping that I can register a signal handler in python thread to bypass all the blocking stuff. But turns out, python does not allow this.

I looked at the second option is to use threading.get_ident and signal.pthread_kill to kill each thread by its thread ID. It works but Windows does not support that.

The last-ditch option is to signal.raise_signal(signal.SIGKILL):

def signal_handler(sig, frame):
  signal.raise_signal(signal.SIGKILL)

signal.signal(signal.SIGINT, signal_handler)

This is hacky but works 😂

I haven't looked into other functions in threading module, but I feel like this should be quite trivial from low-level perspective. A thread and a process kinda the same in the POV of system, only the memory space are different. So if we can kill a process, then why can't we kill a thread.

I think python make it complicated because they don't want people to do unsafe stuff with multithreading.


Re your point about cooperative vs non-cooperative, IIUC the only part that need to be converted into cooperative would be the RemoteTensor, or more specifically, requests.get. I think the most intuitive way would be to convert it to a stream, so we can add a check like if interrupted for each iteration of the stream

@ngxson
Copy link
Collaborator

ngxson commented May 8, 2025

Or maybe using multiprocessing instead of threads would fix cancellation, but that also complicates data sharing (especially when a tensor is split into multiple written tensors (hmm, actually this might not be thread-safe in the current version...)).

Multiprocess sounds a bit overkill IMO, I think multiprocess is mostly useful in the case of isolating memory space, like for example browser spawns different processes for different origin (for security reason).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance Speed related topics python python script changes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants