Creating a serverless pipeline for real-time market data

Feature Spotlight: Announcing Native Support for Apple M1
April 7, 2021
Cook up your own ML recipes with AI Platform
April 7, 2021
Feature Spotlight: Announcing Native Support for Apple M1
April 7, 2021
Cook up your own ML recipes with AI Platform
April 7, 2021

Financial Services

#da

Editor’s note: This is the second part in our special series on working with market data in Google Cloud. This post highlights how we leveraged serverless components to build a flexible data ingestion pipeline. Check out the first post on visualizing real-time market data in the cloud.

Capital markets firms must rapidly extract insights from unbounded real-time datasets. A firm’s market data pipelines should weigh the end user’s data access requirements as a central design consideration, but oftentimes the rigidity of delivery mechanisms and analytical tools block this goal. Serverless data offerings can solve this problem by removing operational friction when introducing a new, well-suited tool. This makes it simple for one data pipeline to serve separate user goals–say, one for training a real-time machine learning model and another for analyzing historical data.

With a serverless data pipeline, capital markets firms can focus on insights, not infrastructure, and stay ahead in a fast-moving industry. In this post, you’ll see best practices for real-time data ingestion using CME Group’s Smart Stream top-of-book (ToB) JSON feed. The reference architecture considers data usage patterns and schema when selecting storage and transport options. It also encapsulates business logic in Cloud Functions to increase the speed of development and to offload operational complexities. These patterns and designs can be applied to a wide variety of use cases.

Figure 1 depicts the ingestion pipeline’s reference architecture.

Figure 1: Reference architecture

1 Reference architecture.jpg

How we set up real-time data ingestion

The source of the real-time data we used is Smart Stream, a service available on Google Cloud from CME Group. The data originates with the CME Globex trading platform as a multicast stream running over UDP. The instrument price data is forwarded over an interconnect to different Pub/Sub topics, each corresponding to a single product, like silver or orange juice concentrate futures.

Pub/Sub is serverless and tightly integrated with other Google Cloud services. It’s also fully managed by Google, alleviating users from many scaling, planning, and reliability concerns. Google provides open-source Dataflow templates to ingest data from Pub/Sub to various sinks such as BigQuery and Cloud Storage.

Bigtable was leveraged as a real-time data store, serving most recent data and features to a prediction endpoint. (The endpoint passes this data to machine learning models hosted on Google Cloud’s AI Platform). In parallel, we use BigQuery as a scalable analytics warehouse. Pub/Sub data was streamed to both sinks with separate Dataflow jobs.

Figure 2 is a shell snippet launches a Dataflow job with a Google-provided template for PubSub-to-BigQuery pipelines:

Figure 2: Launching a Dataflow template to ingest messages from Pub/Sub to BigQuery

  JOB_NAME="smartstream-json-bq-ingest-$(date +%Y%m%d%H%M%S)"
PROJECT=<PROJECT>
TEMPLATE=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery
SERVICE_ACCOUNT=<service_account_email>
REGION=us-central1
SUB=<SUBSCRIPTION>
DEST=<PROJECT>:<DATASET>.<TABLE>

gcloud dataflow jobs run ${JOB_NAME} 
    --gcs-location ${TEMPLATE} 
    --service-account-email ${SERVICE_ACCOUNT} 
    --region ${REGION} 
    --parameters inputSubscription=${SUB},outputTableSpec=${DEST}

Figure 3 depicts a Dataflow pipeline with three input Pub/Sub topics (one per trading instrument) and Bigtable as a sink:

Figure 3: Dataflow job graph

3 Dataflow job graph.jpg

The class in Figure 4 defines an Apache Beam pipeline to ingest data from a single topic (i.e., product code) and write to Bigtable:

Figure 4: Apache Beam ingestion pipeline to Bigtable

  public class SmartStreamIngest {
 
 private static final Logger LOG = LoggerFactory.getLogger(SmartStreamIngest.class);
 
 /**
 * Main entry point for executing the pipeline.
 *
 * @param args The command-line arguments to the pipeline.
 */
 public static void main(String[] args) {
 
  // Initialize smartStreamingIngest options
  SmartStreamIngestOptions smartStreamIngestOptions = PipelineOptionsFactory.fromArgs(args)
      .withValidation()
      .as(SmartStreamIngestOptions.class);
 
  // Create main pipeline
  Pipeline pipeline = Pipeline.create(smartStreamIngestOptions);
  PCollection<PubsubMessage> instrument = pipeline
      .apply("Read Instrument PubSub",
          PubsubIO.readMessagesWithAttributes()
              .fromSubscription(smartStreamIngestOptions.getInstrumentPubSubSubscription()));
 
  // Read from PubSub subscription and write to Bigtable
  PCollectionTuple parseMsg = instrument
      .apply("Transform to Bigtable",
          ParDo.of(new MutationTransformDoFn()).withOutputTags(MutationTransformDoFn.successTag,
              TupleTagList.of(DeadLetterError.DeadLetterTag)));
  
  // Writing successful parsed records to Bigtable
  parseMsg
      .get(MutationTransformDoFn.successTag)
      .apply("Write to Bigtable",
          BigtableIO.write()
              .withProjectId(smartStreamIngestOptions.getBigtableProjectId())
              .withInstanceId(smartStreamIngestOptions.getBigtableInstanceId())
              .withTableId(smartStreamIngestOptions.getBigtableTableId()));
 
  // Logging bad records to Pub/Sub
  parseMsg
      .get(DeadLetterError.DeadLetterTag)
      .setCoder(AvroCoder.of(DeadLetterError.class))
      .apply("Extract failed element", ParDo.of(new ExtractPubSubMessageDoFn()))
      .apply("Write Bad records to PubSub",
          PubsubIO.writeMessages().to(smartStreamIngestOptions.getOutputPubSubTopic()));
 
  // Execute the pipeline
  pipeline.run();
 
 }
}

At first glance, the app’s predictive models and web front-end charts are similar in their demand for data freshness. On further inspection, however, a difference emerges–the charts can use the Smart Stream price data directly without an intervening data store. So for front-end delivery, we settled on Pub/Sub via websockets.

Using Pub/Sub with serverless ingestion components offered architectural flexibility and removed operational complexity as a constraint. Data coming from one Pub/Sub topic can be stored in Bigtable for machine learning or in BigQuery for analytics, in addition to being sent directly over websockets to power rapidly changing visualizations.

Storage and schema considerations

Ideally, the time spent managing data is minimal compared to the time spent using data. If schema design and storage architecture is executed properly, users will feel that the data is working for them, rather than them working for the data.

Row key design is critical to any Bigtable pipeline. Our key concatenates a product symbol with a reverse timestamp, which is optimized for our access pattern (“fetch N most recent records”) while avoiding hotspotting.

In order to reverse the timestamp, we subtract it from the programming language’s maximum value for long integers (such as Java’s java.lang.Long.MAX_VALUE). This forms the key: <SYMBOL>#<INVERTED_TIMESTAMP>. A product code’s most recent events appear at the start of the table, speeding up the query response time. This approach accommodates our primary access pattern (which queries multiple recent product symbols)–but may yield poor performance for others. A post on Bigtable schema design for time series data has additional concepts, patterns and examples.

Figure 5 shows a sample data point ingested into Bigtable:

Figure 5: Representation of a market data record within Bigtable

  NQH0#9223370452974523035
  marketdata:askLevelPrice                 
    "829350"
  marketdata:askLevelQty                   
    "1"
  marketdata:bidLevelPrice                 
    "829125"
  marketdata:bidLevelQty                   
    "2"
  marketdata:exchangeMic                   
    "XCME"
  marketdata:lastUpdateTimeInstant         
    "1583880252772"
  marketdata:productType                   
    "FUT"
  marketdata:sendingTime                   
    "1583880252774"

While Bigtable is well-suited to deliver low-latency data to machine learning models, we also needed a more analytically tuned query engine for longer-lookback insights. BigQuery was a natural fit because of its serverless and scalable nature, as well as its integration with tools such as AutoML.

When designing, we considered three options for preparing the BigQuery data for visualization as classic OHLC “candlesticks.” First, we could store the nested Pub/Sub JSON into BigQuery and write a complex SQL query to unnest and aggregate. Second, we could write a view that unnests, and then write a simpler SQL query that aggregates (without unnesting). Third, we could develop and run a Dataflow job to unnest Pub/Sub records into a “flat” format for storage in BigQuery, and could then aggregate with a simple SQL query.

Though the third option may represent a superior design longer term, time constraints steered us towards the second option. The BigQuery view was simple to set up, and the team got productive quickly when querying against the flattened schema. Thanks to a DATE partition filter, the SQL view definition scans only the most recent day from the underlying table storing the quotes. This dramatically improves performance of the queries against the view.

A sample of the data transformation and views for this specific chart using the second methodology are shown in Figures 6 and 7.

Figure 6: SQL view definition to flatten original source records

  SELECT
 payload[
OFFSET
 (0)].bidLevel.lastUpdateTime AS timestamp,
 payload[
OFFSET
 (0)].instrument.symbol,
 ((payload[
   OFFSET
     (0)].bidLevel.price )) AS bid_price,
 ((payload[
   OFFSET
     (0)].askLevel.price )) AS ask_price,
 (((payload[
     OFFSET
       (0)].bidLevel.price) + (payload[
     OFFSET
       (0)].askLevel.price)) / 2) AS mid_price
FROM
 `DATASET_NAME.btc`
WHERE
 _PARTITIONDATE >= DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) or
 _PARTITIONDATE IS NULL

Figure 7: SQL view definition that generates OHLC bars (“candlesticks”)

  SELECT
 symbol,
 EXTRACT(HOUR from timestamp) hour,
 EXTRACT(minute from timestamp) minute,
 TIMESTAMP_TRUNC(timestamp, MINUTE) AS event_minute,
 ARRAY_AGG(mid_price
 ORDER BY
   timestamp
 LIMIT
   1)[SAFE_OFFSET(0)] open,
 MAX(mid_price) high,
 MIN(mid_price) low,
 ARRAY_AGG(mid_price
 ORDER BY
   timestamp DESC
 LIMIT
   1)[SAFE_OFFSET(0)] close
FROM
 `DATASET_NAME.BTC_FLAT`
GROUP BY
 symbol, hour, minute, event_minute
ORDER BY
 symbol, event_minute DESC

Because both Bigtable and BigQuery are serverless, the maintenance of each storage solution is minimal, and more time can be spent deriving and delivering value from the data, rather than capacity planning around procurement of storage arrays and operational complexities.

Market data microservices

Cloud Functions offer developers two primary benefits. First, they provide the ability to bypass non-differentiating low-level implementation details in favor of business-specific code. Second, they support flexible use of data by encapsulating business logic outside of the database. In our pipeline, we accordingly used Cloud Functions for isolated, task-specific code chunks.

One example is our pipeline’s Fetch Prediction function, which retrieves trade records from Bigtable and extracts first-order features (mean, sum, max, etc) for input to the machine learning model. This enables rapid predictions that are used by the bots to make algorithmic trading decisions in near real-time.

This is demonstrated in Figure 8.

Figure 8: Python routine to fetch predictions at runtime

  # Fetch features for the input Symbol from Bigtable
   end_time = int(float(Connector.fetch_time_central()[0]) * 1000)  
   start_time = end_time - 1000
   latest_record_fetched = False 
   smartstream_data = connector.query_smartstream_data(symbol, start_time, end_time)
 
   if not smartstream_data:
     logger.debug("No smartstream data found")
     latest_record_fetched = True
     smartstream_data = connector.query_smartstream_data(symbol)  
 
   last_second_aggregates = connector.extract_entry_from_smartstream_data(smartstream_data, end_time, latest_record_fetched)
 
   if last_second_aggregates:
     features_key_list = [
       "mean_ask_price",
       "mean_bid_price",
       "spread",
       "sum_volume_bid",
       "sum_volume_ask",
       "max_price",
       "min_price",
       "timestamp",
       "number_of_messages"
       ]
     model_inputs = {key: last_second_aggregates.get(key) for key in features_key_list}

Fetch Candles is a Cloud Function that fetches a recent price summary from BigQuery detailing the opening, closing, highest and lowest price observed for each minute. To improve the request-response performance, we enabled HTTP(s) load balancing with a serverless network endpoint group for our app, and then optimized delivery with Cloud CDN. Configured this way, Fetch Candles will query BigQuery only for the first request for a given minute and product code. Subsequent requests will be delivered from the Cloud CDN cache, until the maximum cache TTL of one minute is reached. This can significantly reduce the overall volume of query executions as web client traffic scales up. Since the data is in a lookback window, there’s no functional necessity for BigQuery to calculate the aggregates more than once per individual duration.

Figure 9: OHLC “candlestick” chart visualization

By enabling a microservices architecture, Cloud Functions allowed each developer to develop in their preferred language, and to develop, test, and debug individual functions in isolation.

Figure 10 shows an inventory of the principal functions used in our market data pipelines.

Figure 10: Sample inventory of Cloud Functions

10 Sample inventory of Cloud Functions.jpg

Many of these functions provide input to a machine learning model, while others fetch data from BigQuery for visualizations of trader performance in a real-time Profit/Loss ledger.

Conclusion

A data pipeline built from serverless components allows capital markets firms to focus on developing valuable insights and offerings, and not on managing infrastructure. In a serverless environment, the end users’ data access patterns can strongly influence the data pipeline architecture and schema design. This, in conjunction with a microservices architecture, minimizes code complexity and reduced coupling. As organizations continue to add data sources and information tools to their operations, using serverless computing models enable them to focus on value-added tasks of using data to make better decisions.

Learn more about Google Cloud for financial services.

Leave a Reply

Your email address will not be published. Required fields are marked *