This notebook is from the official Quantopian Guide on Pipelines. Make sure to visit their documentation for many more great resources!
Many trading algorithms have the following structure:
There are several technical challenges with doing this robustly. These include:
Pipeline exists to solve these challenges by providing a uniform API for expressing computations on a diverse collection of datasets.
A factor is a function from an asset and a moment in time to a numerical value.
A simple example of a factor is the most recent price of a security. Given a security and a specific point in time, the most recent price is a number. Another example is the 10-day average trading volume of a security. Factors are most commonly used to assign values to securities which can then be used in a number of ways. A factor can be used in each of the following procedures:
A filter is a function from an asset and a moment in time to a boolean. An example of a filter is a function indicating whether a security's price is below $10. Given a security and a point in time, this evaluates to either True or False. Filters are most commonly used for describing sets of assets to include or exclude for some particular purpose.
A classifier is a function from an asset and a moment in time to a categorical output. More specifically, a classifier produces a string or an int that doesn't represent a numerical value (e.g. an integer label such as a sector code). Classifiers are most commonly used for grouping assets for complex transformations on Factor outputs. An example of a classifier is the exchange on which an asset is currently being traded.
from quantopian.pipeline import Pipeline
def make_pipeline():
return Pipeline()
pipe = make_pipeline()
from quantopian.research import run_pipeline
result = run_pipeline(pipe,'2017-01-01','2017-01-01')
result.head(10)
result.columns
Note in result, since "2017-01-01" is not a trade day, the output date had been adjusted to "2017-01-03" automatically. The result sets consist of 2 index with no columns
result.info()
from quantopian.pipeline.data.builtin import USEquityPricing
Remember, Factors take in an asset sid
and a timestamp and return some numerical value.
The examples below show getting factors of "30 Day Avg Close" and "Latest Close"
from quantopian.pipeline.factors import BollingerBands,SimpleMovingAverage,EWMA
SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
def make_pipeline():
mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=30)
return Pipeline(columns={
'30 Day Mean Close':mean_close_30
})
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head(20)
def make_pipeline():
mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
latest_close = USEquityPricing.close.latest
return Pipeline(columns={
'30 Day Mean Close':mean_close_30,
'Latest Close':latest_close
})
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head(10)
Combine other factor's info to the Pipeline
def make_pipeline():
mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10)
mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
latest_close = USEquityPricing.close.latest
percent_difference = (mean_close_10 - mean_close_30) / mean_close_30
return Pipeline(columns={
'Percent Difference':percent_difference,
'30 Day Mean Close':mean_close_30,
'Latest Close':latest_close
})
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head()
Filters take in an asset and a timestamp and return a boolean
last_close_price = USEquityPricing.close.latest
close_price_filter = last_close_price > 20
close_price_filter
def make_pipeline():
mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10)
mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
latest_close = USEquityPricing.close.latest
percent_difference = (mean_close_10-mean_close_30) / mean_close_30
perc_diff_check = percent_difference > 0
return Pipeline(columns={
'Percent Difference':percent_difference,
'30 Day Mean Close':mean_close_30,
'Latest Close':latest_close,
'Positive Percent Diff': perc_diff_check
})
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head()
Execute a Filter by adding a new argument screen
to Pipeline()
def make_pipeline():
mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10)
mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
latest_close = USEquityPricing.close.latest
percent_difference = (mean_close_10-mean_close_30) / mean_close_30
perc_diff_check = percent_difference > 0
return Pipeline(columns={
'Percent Difference':percent_difference,
'30 Day Mean Close':mean_close_30,
'Latest Close':latest_close,
'Positive Percent Diff': perc_diff_check},
screen=perc_diff_check)
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head()
Reverse a screen by putting a ~
(meaning NOT)
def make_pipeline():
mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10)
mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
latest_close = USEquityPricing.close.latest
percent_difference = (mean_close_10-mean_close_30) / mean_close_30
perc_diff_check = percent_difference > 0
return Pipeline(columns={
'Percent Difference':percent_difference,
'30 Day Mean Close':mean_close_30,
'Latest Close':latest_close,
'Positive Percent Diff': perc_diff_check},
screen=~perc_diff_check)
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head()
Create another filter "small_price" and combine it to Pipeline
def make_pipeline():
mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10)
mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
latest_close = USEquityPricing.close.latest
percent_difference = (mean_close_10-mean_close_30) / mean_close_30
perc_diff_check = percent_difference > 0
small_price = latest_close < 5
# Combine Filters
final_filter = perc_diff_check & small_price
return Pipeline(columns={
'Percent Difference':percent_difference,
'30 Day Mean Close':mean_close_30,
'Latest Close':latest_close,
'Positive Percent Diff': perc_diff_check},
screen=final_filter)
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head()
Sometimes we want to ignore certain assets when computing pipeline expresssions. There are two common cases where ignoring assets is useful:
Pass in argument mask=
to the factor before filter and screen take place
def make_pipeline():
# Create Filters for Masks First
latest_close = USEquityPricing.close.latest
small_price = latest_close < 5
# Pass in the mask
mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10,mask=small_price)
mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30,mask=small_price)
percent_difference = (mean_close_10-mean_close_30) / mean_close_30
perc_diff_check = percent_difference > 0
final_filter = perc_diff_check
return Pipeline(columns={
'Percent Difference':percent_difference,
'30 Day Mean Close':mean_close_30,
'Latest Close':latest_close,
'Positive Percent Diff': perc_diff_check},
screen=final_filter)
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head()
len(results)
A classifier is a function from an asset and a moment in time to a categorical output such as a string or integer label. Like get_fundermentals
from classifier morningstar
from quantopian.pipeline.data import morningstar
from quantopian.pipeline.classifiers.morningstar import Sector
morningstar_sector = Sector()
exchange = morningstar.share_class_reference.exchange_id.latest
exchange
nyse_filter = exchange.eq('NYS')
def make_pipeline():
# Create Filters for Masks First
latest_close = USEquityPricing.close.latest
small_price = latest_close < 5
# Classifier
nyse_filter = exchange.eq('NYS')
# Pass in the mask
mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10,mask=small_price)
mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30,mask=small_price)
percent_difference = (mean_close_10-mean_close_30) / mean_close_30
perc_diff_check = percent_difference > 0
final_filter = perc_diff_check & nyse_filter
return Pipeline(columns={
'Percent Difference':percent_difference,
'30 Day Mean Close':mean_close_30,
'Latest Close':latest_close,
'Positive Percent Diff': perc_diff_check},
screen=final_filter)
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head()
len(results)
from quantopian.pipeline import Pipeline
from quantopian.algorithm import attach_pipeline, pipeline_output
def initialize(context):
my_pipe = make_pipeline()
attach_pipeline(my_pipe, 'my_pipeline')
def make_pipeline():
return Pipeline()
def before_trading_start(context, data):
# Store our pipeline output DataFrame in context.
context.output = pipeline_output('my_pipeline')