Pipeline

This notebook is from the official Quantopian Guide on Pipelines. Make sure to visit their documentation for many more great resources!

Many trading algorithms have the following structure:

  1. For each asset in a known (large) set, compute N scalar values for the asset based on a trailing window of data.
  2. Select a smaller tradeable set of assets based on the values computed in (1).
  3. Calculate desired portfolio weights on the set of assets selected in (2).
  4. Place orders to move the algorithm’s current portfolio allocations to the desired weights computed in (3).

There are several technical challenges with doing this robustly. These include:

  • efficiently querying large sets of assets
  • performing computations on large sets of assets
  • handling adjustments (splits and dividends)
  • asset delistings

Pipeline exists to solve these challenges by providing a uniform API for expressing computations on a diverse collection of datasets.

Factors

A factor is a function from an asset and a moment in time to a numerical value.

A simple example of a factor is the most recent price of a security. Given a security and a specific point in time, the most recent price is a number. Another example is the 10-day average trading volume of a security. Factors are most commonly used to assign values to securities which can then be used in a number of ways. A factor can be used in each of the following procedures:

  • computing target weights
  • generating alpha signal
  • constructing other, more complex factors
  • constructing filters

Filters

A filter is a function from an asset and a moment in time to a boolean. An example of a filter is a function indicating whether a security's price is below $10. Given a security and a point in time, this evaluates to either True or False. Filters are most commonly used for describing sets of assets to include or exclude for some particular purpose.

Classifiers

A classifier is a function from an asset and a moment in time to a categorical output. More specifically, a classifier produces a string or an int that doesn't represent a numerical value (e.g. an integer label such as a sector code). Classifiers are most commonly used for grouping assets for complex transformations on Factor outputs. An example of a classifier is the exchange on which an asset is currently being traded.

Imports

In [54]:
from quantopian.pipeline import Pipeline

Create Object "pipeline"

In [55]:
def make_pipeline():
    return Pipeline()
In [56]:
pipe = make_pipeline()

Call run_pipeline()

In [57]:
from quantopian.research import run_pipeline
In [58]:
result = run_pipeline(pipe,'2017-01-01','2017-01-01')
In [59]:
result.head(10)
Out[59]:
2017-01-03 00:00:00+00:00 Equity(2 [ARNC])
Equity(21 [AAME])
Equity(24 [AAPL])
Equity(25 [ARNC_PR])
Equity(31 [ABAX])
Equity(39 [DDC])
Equity(41 [ARCB])
Equity(52 [ABM])
Equity(53 [ABMD])
Equity(62 [ABT])
In [60]:
result.columns
Out[60]:
Index([], dtype='object')

Note in result, since "2017-01-01" is not a trade day, the output date had been adjusted to "2017-01-03" automatically. The result sets consist of 2 index with no columns

In [61]:
result.info()
<class 'pandas.core.frame.DataFrame'>
MultiIndex: 8343 entries, (2017-01-03 00:00:00+00:00, Equity(2 [ARNC])) to (2017-01-03 00:00:00+00:00, Equity(50569 [OUSM]))
Empty DataFrame

Data

In [62]:
from quantopian.pipeline.data.builtin import USEquityPricing

Factors

Remember, Factors take in an asset sid and a timestamp and return some numerical value.

The examples below show getting factors of "30 Day Avg Close" and "Latest Close"

In [63]:
from quantopian.pipeline.factors import BollingerBands,SimpleMovingAverage,EWMA
In [64]:
SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
Out[64]:
SimpleMovingAverage((USEquityPricing.close::float64,), window_length=30)
In [65]:
def make_pipeline():
    
    mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=30)
    
    return Pipeline(columns={
        '30 Day Mean Close':mean_close_30
    })
In [66]:
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
In [67]:
results.head(20)
Out[67]:
30 Day Mean Close
2017-01-03 00:00:00+00:00 Equity(2 [ARNC]) 20.110500
Equity(21 [AAME]) 3.899241
Equity(24 [AAPL]) 113.368433
Equity(25 [ARNC_PR]) 86.796111
Equity(31 [ABAX]) 52.498394
Equity(39 [DDC]) 9.523000
Equity(41 [ARCB]) 29.969167
Equity(52 [ABM]) 42.138239
Equity(53 [ABMD]) 114.030167
Equity(62 [ABT]) 38.664333
Equity(64 [ABX]) 15.117736
Equity(66 [AB]) 23.119167
Equity(67 [ADSK]) 75.960667
Equity(69 [ACAT]) 15.839500
Equity(70 [VBF]) 18.208480
Equity(76 [TAP]) 97.727924
Equity(84 [ACET]) 20.722753
Equity(100 [IEP]) 59.281167
Equity(106 [ACU]) 22.628433
Equity(110 [ACXM]) 26.800333
In [68]:
def make_pipeline():
    
    mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
    latest_close = USEquityPricing.close.latest
    
    return Pipeline(columns={
        '30 Day Mean Close':mean_close_30,
        'Latest Close':latest_close
    })
In [69]:
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
In [70]:
results.head(10)
Out[70]:
30 Day Mean Close Latest Close
2017-01-03 00:00:00+00:00 Equity(2 [ARNC]) 20.110500 18.55
Equity(21 [AAME]) 3.899241 4.10
Equity(24 [AAPL]) 113.368433 115.84
Equity(25 [ARNC_PR]) 86.796111 NaN
Equity(31 [ABAX]) 52.498394 52.74
Equity(39 [DDC]) 9.523000 9.69
Equity(41 [ARCB]) 29.969167 27.75
Equity(52 [ABM]) 42.138239 40.68
Equity(53 [ABMD]) 114.030167 112.70
Equity(62 [ABT]) 38.664333 38.42

Combining Factors

Combine other factor's info to the Pipeline

In [71]:
def make_pipeline():
    
    mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10)
    mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
    latest_close = USEquityPricing.close.latest
    
    percent_difference = (mean_close_10 - mean_close_30) / mean_close_30
    
    return Pipeline(columns={
        'Percent Difference':percent_difference,
        '30 Day Mean Close':mean_close_30,
        'Latest Close':latest_close
    })
In [72]:
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
In [73]:
results.head()
Out[73]:
30 Day Mean Close Latest Close Percent Difference
2017-01-03 00:00:00+00:00 Equity(2 [ARNC]) 20.110500 18.55 -0.022749
Equity(21 [AAME]) 3.899241 4.10 -0.005499
Equity(24 [AAPL]) 113.368433 115.84 0.028481
Equity(25 [ARNC_PR]) 86.796111 NaN -0.000474
Equity(31 [ABAX]) 52.498394 52.74 -0.007665

Filters and Screens

Filters take in an asset and a timestamp and return a boolean

Create a Filter

In [74]:
last_close_price = USEquityPricing.close.latest
close_price_filter = last_close_price > 20
In [75]:
close_price_filter
Out[75]:
NumExprFilter(expr='x_0 > (20.0)', bindings={'x_0': Latest((USEquityPricing.close::float64,), window_length=1)})
In [76]:
def make_pipeline():
    
    mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10)
    mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
    latest_close = USEquityPricing.close.latest
    
    percent_difference = (mean_close_10-mean_close_30) / mean_close_30
    
    perc_diff_check = percent_difference > 0 
    
    return Pipeline(columns={
        'Percent Difference':percent_difference,
        '30 Day Mean Close':mean_close_30,
        'Latest Close':latest_close,
        'Positive Percent Diff': perc_diff_check
    })
In [77]:
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head()
Out[77]:
30 Day Mean Close Latest Close Percent Difference Positive Percent Diff
2017-01-03 00:00:00+00:00 Equity(2 [ARNC]) 20.110500 18.55 -0.022749 False
Equity(21 [AAME]) 3.899241 4.10 -0.005499 False
Equity(24 [AAPL]) 113.368433 115.84 0.028481 True
Equity(25 [ARNC_PR]) 86.796111 NaN -0.000474 False
Equity(31 [ABAX]) 52.498394 52.74 -0.007665 False

Screens

Execute a Filter by adding a new argument screen to Pipeline()

In [78]:
def make_pipeline():
    
    mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10)
    mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
    latest_close = USEquityPricing.close.latest
    
    percent_difference = (mean_close_10-mean_close_30) / mean_close_30
    
    perc_diff_check = percent_difference > 0 
    
    return Pipeline(columns={
                            'Percent Difference':percent_difference,
                            '30 Day Mean Close':mean_close_30,
                            'Latest Close':latest_close,
                            'Positive Percent Diff': perc_diff_check},
                    screen=perc_diff_check)
In [79]:
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head()
Out[79]:
30 Day Mean Close Latest Close Percent Difference Positive Percent Diff
2017-01-03 00:00:00+00:00 Equity(24 [AAPL]) 113.368433 115.84 0.028481 True
Equity(66 [AB]) 23.119167 23.45 0.004578 True
Equity(69 [ACAT]) 15.839500 15.02 0.009375 True
Equity(70 [VBF]) 18.208480 18.49 0.011814 True
Equity(84 [ACET]) 20.722753 21.97 0.039630 True

Reverse a screen

Reverse a screen by putting a ~ (meaning NOT)

In [80]:
def make_pipeline():
    
    mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10)
    mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
    latest_close = USEquityPricing.close.latest
    
    percent_difference = (mean_close_10-mean_close_30) / mean_close_30
    
    perc_diff_check = percent_difference > 0 
    
    return Pipeline(columns={
                            'Percent Difference':percent_difference,
                            '30 Day Mean Close':mean_close_30,
                            'Latest Close':latest_close,
                            'Positive Percent Diff': perc_diff_check},
                    screen=~perc_diff_check)
In [81]:
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head()
Out[81]:
30 Day Mean Close Latest Close Percent Difference Positive Percent Diff
2017-01-03 00:00:00+00:00 Equity(2 [ARNC]) 20.110500 18.55 -0.022749 False
Equity(21 [AAME]) 3.899241 4.10 -0.005499 False
Equity(25 [ARNC_PR]) 86.796111 NaN -0.000474 False
Equity(31 [ABAX]) 52.498394 52.74 -0.007665 False
Equity(39 [DDC]) 9.523000 9.69 -0.015436 False

Combine Filters

Create another filter "small_price" and combine it to Pipeline

In [82]:
def make_pipeline():
    
    mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10)
    mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
    latest_close = USEquityPricing.close.latest
    
    percent_difference = (mean_close_10-mean_close_30) / mean_close_30
    
    perc_diff_check = percent_difference > 0 
    small_price = latest_close < 5
    
    # Combine Filters
    final_filter = perc_diff_check & small_price
    
    return Pipeline(columns={
                            'Percent Difference':percent_difference,
                            '30 Day Mean Close':mean_close_30,
                            'Latest Close':latest_close,
                            'Positive Percent Diff': perc_diff_check},
                    screen=final_filter)
In [83]:
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head()
Out[83]:
30 Day Mean Close Latest Close Percent Difference Positive Percent Diff
2017-01-03 00:00:00+00:00 Equity(535 [ARTW]) 3.097778 3.40 0.013271 True
Equity(677 [AXAS]) 2.265333 2.56 0.145527 True
Equity(1144 [BTX]) 3.531167 3.62 0.065795 True
Equity(1323 [CAW]) 2.541333 2.60 0.016002 True
Equity(1546 [CIF]) 2.500370 2.57 0.015579 True

Masking

Sometimes we want to ignore certain assets when computing pipeline expresssions. There are two common cases where ignoring assets is useful:

  • We want to compute an expression that's computationally expensive, and we know we only care about results for certain assets.
  • We want to compute an expression that performs comparisons between assets, but we only want those comparisons to be performed against a subset of all assets.

Pass in argument mask= to the factor before filter and screen take place

In [84]:
def make_pipeline():
    
    # Create Filters for Masks First
    latest_close = USEquityPricing.close.latest
    small_price = latest_close < 5
    
    # Pass in the mask
    mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10,mask=small_price)
    mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30,mask=small_price)
    
    
    percent_difference = (mean_close_10-mean_close_30) / mean_close_30
    
    perc_diff_check = percent_difference > 0 
    
    
    final_filter = perc_diff_check
    
    return Pipeline(columns={
                            'Percent Difference':percent_difference,
                            '30 Day Mean Close':mean_close_30,
                            'Latest Close':latest_close,
                            'Positive Percent Diff': perc_diff_check},
                    screen=final_filter)
In [85]:
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head()
Out[85]:
30 Day Mean Close Latest Close Percent Difference Positive Percent Diff
2017-01-03 00:00:00+00:00 Equity(535 [ARTW]) 3.097778 3.40 0.013271 True
Equity(677 [AXAS]) 2.265333 2.56 0.145527 True
Equity(1144 [BTX]) 3.531167 3.62 0.065795 True
Equity(1323 [CAW]) 2.541333 2.60 0.016002 True
Equity(1546 [CIF]) 2.500370 2.57 0.015579 True
In [86]:
len(results)
Out[86]:
391

Classifiers

A classifier is a function from an asset and a moment in time to a categorical output such as a string or integer label. Like get_fundermentals from classifier morningstar

In [87]:
from quantopian.pipeline.data import morningstar
from quantopian.pipeline.classifiers.morningstar import Sector
In [88]:
morningstar_sector = Sector()

Get fundermental data from .share_class_reference

In [89]:
exchange = morningstar.share_class_reference.exchange_id.latest
In [90]:
exchange
Out[90]:
Latest((share_class_reference.exchange_id::object,), window_length=1)

Classifier Methods

  • eq (equals)
  • isnull
  • startswith
In [91]:
nyse_filter = exchange.eq('NYS')
In [92]:
def make_pipeline():
    
    # Create Filters for Masks First
    latest_close = USEquityPricing.close.latest
    small_price = latest_close < 5
    
    # Classifier
    nyse_filter = exchange.eq('NYS')
    
    # Pass in the mask
    mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10,mask=small_price)
    mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30,mask=small_price)
    
    
    percent_difference = (mean_close_10-mean_close_30) / mean_close_30
    
    perc_diff_check = percent_difference > 0 
    
    
    final_filter = perc_diff_check & nyse_filter
    
    return Pipeline(columns={
                            'Percent Difference':percent_difference,
                            '30 Day Mean Close':mean_close_30,
                            'Latest Close':latest_close,
                            'Positive Percent Diff': perc_diff_check},
                    screen=final_filter)
In [93]:
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head()
Out[93]:
30 Day Mean Close Latest Close Percent Difference Positive Percent Diff
2017-01-03 00:00:00+00:00 Equity(2586 [EQS]) 1.960533 2.020 0.022120 True
Equity(3265 [GLF]) 1.576367 1.725 0.162420 True
Equity(3645 [HOV]) 2.406667 2.735 0.176939 True
Equity(4577 [LUB]) 4.292333 4.270 0.004116 True
Equity(4971 [RT]) 3.244000 3.240 0.009094 True
In [94]:
len(results)
Out[94]:
66

Pipelines in Quantopian IDE

In [53]:
from quantopian.pipeline import Pipeline
from quantopian.algorithm import attach_pipeline, pipeline_output

def initialize(context):
    my_pipe = make_pipeline()
    attach_pipeline(my_pipe, 'my_pipeline')

def make_pipeline():
    return Pipeline()

def before_trading_start(context, data):
    # Store our pipeline output DataFrame in context.
    context.output = pipeline_output('my_pipeline')

ImportErrorTraceback (most recent call last)
<ipython-input-53-2baad17b853e> in <module>()
      1 from quantopian.pipeline import Pipeline
----> 2 from quantopian.algorithm import attach_pipeline, pipeline_output
      3 
      4 def initialize(context):
      5     my_pipe = make_pipeline()

/build/src/qexec_repo/qexec/algo/safety.py in __call__(self, name, globals, locals, fromlist, level)
    265         # this is a whitelisted import
    266         return self._import_safety.make_safe(
--> 267             self._import(name, globals, locals, fromlist, level),
    268         )
    269 

ImportError: No module named algorithm