Let’s say we need to fetch analyst earnings estimates for all symbols in our universe at the start of the day, before the market opens. We rely on a data provider that restricts us to 100 API calls per minute. With a universe of 4500 symbols and each query taking 1-2 seconds, it would take 75-150 minutes to gather all the data sequentially, which is impractical. If we need to access additional data sources from the same vendor, we risk not finishing before the market opens.
To streamline our data retrieval, we must fully exploit our API rate limit, assuming we have only one API key to work with. The most efficient approach is to employ multithreading to maximize our API calls. This should allow us to hit the API rate limit and gather all the data within just 45 minutes. Python offers robust libraries for multithreading tasks, multiprocessing, and monitoring API request limits, like ratelimit, making it the perfect tool for this task optimization.
Below is a Python pseudo-code snippet that outlines the process: fetching our daily universe, storing it in a working list, and executing the data queries concurrently using multithreading. This implementation ensures our entire data ingestion is completed within the targeted 45-minute timeframe. Note that in my pseudo-code, I intentionally didn't reach the 100 API rate limit. This decision was made to avoid potential program failures due to unexpected API counting issues from the data vendor. I would rather be safe, then having to spend time rerunning or debugging any potential issues.
from ratelimit import limits, sleep_and_retry
from multiprocessing.dummy import Pool as ThreadPool
def get_data(symbols):
# utilize as many threads as possible
pool = ThreadPool(8)
# wrap the rate limited function
def worker_wrapper(arg):
args, kwargs = arg
return call_rate_func(*args, **kwargs)
# our api rate limited function
@sleep_and_rety
@limits(calls=95, period=60) # 100 API CALLS PER 1 MINUTE
def call_rate_func(idx, **kwargs):
symbol = ''
for key, value in kwargs.items():
if key == 'symbol':
symbol = value
else:
Exception(f"Unknown key: {key}")
try:
# REQUEST DATA
# SAVE DATA
except Exception as err:
raise(r"Error: {err}")
# create a list of all symbol jobs we will run
arg = [((idx,), {"symbol": job[0]}) for idx, job in enumerate(itertools.products(symbols))]
pool.map(worker_wrapper, arg)
pool.close()
pool.join()
def get_symbols():
# FUNCTION TO PULL TODAYS SYMBOL UNIVERSE AS A LIST
symbols = ['AAPL', 'META', 'GOOGL']
return symbols
def main():
symbols = get_symbols()
get_details(symbols)
if __main__ == '__main__':
main() We won't delve into scenarios where some data vendors either lack API rate limits or offer data in single-file bundles since these situations don't demand much ingenuity. If you have any questions, don't hesitate to reach out.
Coming up next
Now that we've configured our rate-limited data ingestion functions, let's shift our focus to real-time websocket data ingestion. Specifically, we'll be loading minute bar data from Alpaca using their real-time websocket service.

