One - One Code All

Blog Content

大猩猩联手羊驼算法策略在python中的实现

智能投研   2015-12-05 09:38:03

大猩猩联手羊驼算法策略在python中的实现

1.羊驼指数
羊驼指数到底是个什么鬼?这还要从美国《旧金山纪事报》曾做过大猩猩选股实验说起。他们让大猩猩随机选择写有股票代码的纸板,一共选择了5支。然后,用大猩猩挑选的股票组合与《华尔街日报》8位知名分析师精心计算分析挑选的5只股票相比较,在持有一段时间之后,大猩猩随机抽取购买的股票,票面价值竟超过操盘手的股票。

在2014年,甘肃卫视的《马上知道》节目也向观众展示了用羊驼来选股,即每天卖掉持有的股票中收益率最差的一只,然后让羊驼随机选入一只股票来买。结果得到了很可观的收益率。根据这种情况,人们开始尝试使用羊驼来选股。

羊驼选股其实与华尔街大猩猩选股是一个道理,在A股市场的使用方法如下:

选股范围为A股市场上所有非ST类个股,由羊驼随机选出纳入指数的股票;

羊驼首次选择10只股票进入股票池,开始计算羊驼指数收益;

每周换股两次,每次原则上只剔除收益最差的一只股票,并由羊驼另选一只入池;

羊驼指数以2014年7月30日为基日,基日指数为1000。

简单说就是随机选取10只股,每周把收益最低的两只股票剔除出去,然后再随机选两只进来,不断重复这个过程。

羊驼指数为什么如此神奇?美国学者在1993年发现,买入过去表现比较好的股票,卖出过去表现比较差的股票,在3~12个月里有高于市场的超额收益!这就是所谓的动量理论,其本质是“追溯趋势”,如同一辆汽车,行驶的速度越快,具有前进的惯性就会越大。

羊驼指数就是用相反的动量原理来选择股票,不是持有最好的股票,而是卖出最差的股票。通过主动不断地淘汰劣质股票,被动留下最好的股票,从而达到筛选股票的目的!

所以说羊驼指数的神奇并非因为羊驼是神兽,而在于其背后蕴含的量化投资理论。

2.算法策略
将股票按收益率排序,卖掉买入股中最差收益率的,加入未买股中收益率最高的

3.代码

import random
import numpy as np
import pandas as pd
from pandas import Series,DataFrame
import scipy.stats as stats
import math
# 股票池为所有沪深300的股票
stocks = get_index_stocks('000300.XSHG')
#股票池中有多少股票
num=len(stocks)
set_universe(stocks)
set_commission(PerTrade(buy_cost=0.0008, sell_cost=0.0015, min_cost=5))
set_slippage(FixedSlippage(0))
#初始买入15只股票
num_of_stocks=15
#每次更新时替换2只股票
num_of_change=2
#计算1日收益率
period=1
#用一个列表来保存每天持有的股票代码
stockshold=[]
#判断参数输入是否符合条件,如果不符合,则重置为默认值
if num_of_stocks>num:
    log.info("too large num_of_stocks")
    num_of_stocks=10
elif num_of_change>num_of_stocks:
    log.info("too large num_of_change")
    num_of_change=1
#预处理数据,将没有数据的股票剔除,同时加入收益率
#构成一个列索引为股票名,收益率一行的索引为
#'return'的dataframe,并返回这个dataframe
def process():
    #取出每只股票period天的收盘价格
    stocks_info=history(period,'1d','close')
    #去除信息不全的数据
    stocks_info.dropna(axis=0,how='any',thresh=None)
    #取出昨天和period天之前的收盘价,计算收益率
    a1=list(stocks_info.iloc[0])
    a2=list(stocks_info.iloc[period-1])
    a1=np.array(a1)
    a2=np.array(a2)
    #用一个dataframe来保存所有股票的收益率信息
    stocks_return=DataFrame(a2/a1,columns=['return'],index=stocks_info.columns)
    stocks_info=stocks_info.T
    #把收益率的数据加到相应的列
    stocks_info=pd.concat([stocks_info,stocks_return],axis=1)
    #将股票信息按照收益率从大到小来存储
    stocks_info=stocks_info.sort(columns=['return'],ascending=[False])
    #返回处理好的dataframe
    return stocks_info

#股票入池
def BuyStocks(stocks_info,cash):
    #计算现在持有的股票数
    current_num=len(stockshold)
    stocks_info=stocks_info.T
    #将已持有的股票从股票池中剔除
    for i in range(0,current_num):
        if stockshold[i] in stocks_info.columns:
            del stocks_info[stockshold[i]]
    stocks_info=stocks_info.T
    #计算在每只股票上可以支付的现金
    if num_of_stocks-current_num>0:
        cash=cash/(num_of_stocks-current_num)
    for i in range(0,num_of_stocks-current_num):
        #取得股票当前的价格
        current_price=stocks_info['current_price'][i]
        #判断是否有价格数据
        if math.isnan(current_price)==False:
            #计算可以每只股票可以购买的数量
            num_of_shares=int(cash/current_price)
            if num_of_shares>0:
                order(stocks_info.index[i],+num_of_shares)
                log.info("buying %s" %(stocks_info.index[i]))
                #将购买的股票代码加到stockhold中
                stockshold.append(stocks_info.index[i])
#股票出池
def SellStocks(stocks_info):
    stocks_hold=DataFrame()
    current_num=len(stockshold)
    #用一个dataframe来保存持有的股票的信息
    for i in range(0,current_num):
        stocks_hold=pd.concat([stocks_hold,stocks_info.loc[stockshold[i]]],axis=1)
    stocks_hold=stocks_hold.T
    #在持有的股票数不为0时,将持有的股票信息按照收益率大小从小到大排序
    if current_num>0:
        stocks_hold=stocks_hold.sort(columns=['return'])
     #把收益率最低的股票卖空
    for k in range(0,min(num_of_change,current_num)):
        #判断是否停牌
        if stocks_hold['paused'][k]=='False':
            order_target(stocks_hold.index[k],0)
            log.info("Selling %s" % (stocks_hold.index[k]))
            #在stockshold中去除已经卖空的股票的信息
            stockshold.remove(stocks_hold.index[k])

# 每个单位时间(如果按天回测,则每天调用一次,如果按分钟,则每分钟调用一次)调用一次
def handle_data(context, data):
    stocks_info=process()
    stocks_num=len(stocks_info.index)
    #用一个列表来保存所有股票是否停牌的信息
    pause=[]
    for i in range(0,stocks_num):
        if data[stocks_info.index[i]].paused==True:
            pause.append('True')
        else:
            pause.append('False')
    #将列表转换成dataframe以便加入到stocks_info中
    paused=DataFrame(pause,columns=['paused'],index=stocks_info.index)
    stocks_info=pd.concat([stocks_info,paused],axis=1)
    #用一个列表来保存所有股票当前的价格信息
    currentprice=[]
    for i in range(0,stocks_num):
        currentprice.append(data[stocks_info.index[i]].price)
    current_price=DataFrame(currentprice,columns=['current_price'],index=stocks_info.index)
    #将股票是否停牌,当前价格的信息添加到stocks_info中
    stocks_info=pd.concat([stocks_info,current_price],axis=1)
    #取得当前现金
    cash=context.portfolio.cash
    #执行卖出股票的函数
    SellStocks(stocks_info)
    #执行买入股票的函数
    BuyStocks(stocks_info,cash)



上一篇:量化入门书籍推荐
下一篇:pandas的dataframe计算连续日期均线、移动平均线rolling方法

The minute you think of giving up, think of the reason why you held on so long.