In this article, our attention will be directed towards the Alpaca real-time stock market WebSocket bars data feed. We're steering clear of order book data due to limitations in storage, computational power, and network capacity, especially with approximately 4500 symbols to manage. For this task, I utilized Python libraries such as alpaca-py, asyncio, apscheduler, threading, psutil, and signal.
The program operates as follows: it initiates two tasks, reading and writing, each running on separate threads continuously. The reading task establishes a connection to the Alpaca WebSocket, retrieving incoming market data and storing it in a thread-locked variable. Meanwhile, the writing task executes every minute, transferring the data from the thread-locked variable to a CSV file and then clearing the variable. Additionally, I integrated a kill switch within the writing task, designed to terminate the program completely after post-close trading hours.
After months of running this program live, I've yet to encounter any errors. However, in the rare event of an unexpected crash, I'll promptly debug the issue. Fortunately, I'm not overly concerned about data loss as I've established multiple end-of-day price data jobs to retrieve the day's minute bar data after the market closes. The primary objective of this program is to assess real-time market data, laying the groundwork for its future utilization in monitoring and managing our live portfolio.
import datetime, time, os, csv, sys
# uncommon libraries
from alpaca.data.live import CryptoDataStream
from alpaca.data.live import StockDataStream
from alpaca.data.enums import DataFeed
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import threading
import psutil
import signal
# Global (threadlocked) variable to be shared
bars = []
bars_lock = threading.Lock()
# how I want my data to arrive from Alpaca WebSocket (thread 1 subfunction)
async def bar_data_handler(data):
bar = "{0},{1},{2},{3},{4},{5},{6},{7},{8}".format(data.symbol,
data.timestamp,
data.open,
data.high,
data.low,
data.close,
data.volume,
data.trade_count,
data.vwap)
global bars
with bars_lock:
bars.append(bar)
# the alpaca reading (thread 1) job
def stream_job():
websocket_params = {
"ping_interval": 1,
"ping_timeout": 180,
"max_queue": 100000,
}
wss_client = StockDataStream(api_key='ABC123',
secret_key='DEF456',
feed=DataFeed.SIP,
websocket_params=websocket_params)
wss_client.subscribe_bars(bar_data_handler, "*")
wss_client.run()
# the data dump function (thread 2 subfunction) with kill switch
async def async_write_to_file():
global bars
with bars_lock:
timestamp = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%S")
datestamp = datetime.datetime.utcnow().strftime("%Y%m%d")
dir = "/US_EQUITY/live/prices"
filename = "market_stream_{0}.csv".format(timestamp)
header = ["symbol", "timestamp", "open", "high", "low", "close", "volume", "trade_count", "vwap"]
with open(os.path.join(dir, datestamp, "alpaca", filename), "w", newline="") as file:
writer = csv.writer(file)
writer.writerow(header)
for bar in bars:
writer.writerow(bar.split(','))
bars = []
# Get the current time
current_time = datetime.datetime.now()
current_system_pid = os.getpid()
# Check if (EOD) the hour is 23 and the minute is 55
if current_time.hour == 23 and current_time.minute >= 58:
#pid = psutil.Process(current_system_pid)
#pid.terminate()
os.kill(current_system_pid, signal.SIGTERM)
else:
print(f"CONTINUE ... {current_system_pid}")
# the writing job (thread 2)
def write_job():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
scheduler = AsyncIOScheduler()
async def async_write_task():
await async_write_to_file()
scheduler.add_job(async_write_task, "interval", minutes=1, seconds=10)
scheduler.start()
try:
loop.run_forever()
except asyncio.CancelledError:
pass
finally:
scheduler.shutdown()
loop.close()
# main function starts both threads to run indefinitly
def main():
# Create threads for each function
thread_function1 = threading.Thread(target=stream_job, args=())
thread_function2 = threading.Thread(target=write_job, args=())
# Start the threads
thread_function1.start()
thread_function2.start()
# Optionally, wait for the threads to finish
thread_function1.join()
thread_function2.join()
if __name__ == '__main__':
main() If you have any constructive suggestions to enhance this program, please don't hesitate to share them.
Coming up next
Now that we've written our two data ingestion codes, it's time to configure our real-time data ingestion tasks. In the next post, I'll showcase and discuss the jobs currently operational on my computer.

