conf = f'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
sender.dataframe(df, table_name='trades', at=TimestampNanos.now())
It takes about 45 sec to upload a dataframe with ~5M rows and 50 columns. I am using the default parameters as referenced in the example. I was expecting a much faster load performance.
How much data are we talking? Can you take a measurement of the rates, or at least the dataframe size?
The pandas streaming is zero-copy, but due to ILP limitations, has to be expanded into UTF-8 for transfer, so more data will be transferred than the absolute size of your dataframe.
We have some backlog work to replace the text protocol with a binary protocol, to alleviate this friction.
In the meantime, a quick way to speed this up is to slice the data into chunks and use multiple senders in parallel. You should be able to saturate your connection with a few extra senders. I may be able to get you an example script tomorrow!
Another option is to export Parquet files. Then use the read_parquet function and an INSERT INTO SELECT query to load it into a table.
The data per load is around 5M rows with each row of about 50 columns (mostly numeric) and consuming about 350 bytes. Like the idea of slicing and multi-processing will need to try. Example will be helpful. In regards to Parquet, its not possible as the data originates on remote machine. Thanks.
This is the kind of idea - you’ll need to work it into your code. This is ripped from a script written by a colleague!
Take your dataframe and split into batches:
from collections import deque
batches = deque()
for slice in np.array_split(df, df.size / batch_size):
batches.append(slice)
Define some sending function:
def send_batch(conf_str, table_name, batches, timestamp_name):
with Sender.from_conf(conf_str, auto_flush=False, init_buf_size=100000000) as qdb_sender:
while True:
try:
slice = batches.pop()
except IndexError:
break
qdb_sender.dataframe(slice, table_name=table_name, at=timestamp_name)
qdb_sender.flush()
Then feed as futures into a thread pool.
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=parallel) as executor:
futures = []
for _ in range(parallel):
futures.append(executor.submit(send_batch, conf_str, table_name, batches, timestamp_name))
for future in futures:
future.result()
Hopefully you can read around the edges on this!
Fwiw, some of the slowdown is caused by expanding binary data into ILP (a text protocol), and then sending this over the network.
We will be replacing the text protocol in future but its worth keeping in mind if your throughput is lower than expected (when measured as the dataframe size and compared to network bandwidth).
Thanks for the example using Threads. I experimented with multiprocessing as well and find that multiprocessing is a bit faster (loads all CPUs though). It now takes about 8 sec to persist the same table. Thanks.