原文:Demand forecasting with BigQuery and TensorFlow
在本notebook中,我们将开发一个机器学习模型,用于预测对纽约的出租汽车需求。
要开发该模型,我们将需要获得出租汽车使用的历史数据。该数据存在于BigQuery中。让我们通过查看模式开始吧。
In [25]:
import gcp.bigquery as bq
import pandas as pd
import numpy as np
In [26]:
%%bigquery schema --table "nyc-tlc:green.trips_2015"
Out[26]:
让我们拉取在2015年数据集中的每天出游数我们可以使用BigQuery的内建Date-and-time函数DAYOFYEAR (见BigQuery查询参考以获得这样的函数的完整列表)。
In [27]:
%%sql
SELECT DAYOFYEAR(pickup_datetime) AS daynumber FROM [nyc-tlc:green.trips_2015] LIMIT 10
Out[27]:
120
120
120
120
149
150
150
150
89
89
(rows: 10, time: 1.6s, cached, job: job_awmH7VUQHbnwGhY2Tsa3ElMwsvk)
让我们使用出游数的总数作为出租车请求的代理(其他合理的替代方案是总trip_distance或者总fare_amount)。可以使用Tensorflow预测多个变量,但是为了简单起见,我们将坚持只预测出游数。
我们将把我们的查询命名为'taxiquery',并让他使用一个输入变量'$YEAR'。然后,可以通过提供一个YEAR来调用'taxiquery'。to_dataframe()将BigQuery结果转换成一个Pandas dataframe。
In [28]:
%%sql --module taxiquery
SELECT daynumber, COUNT(*) AS numtrips FROM
(SELECT DAYOFYEAR(pickup_datetime) AS daynumber FROM [nyc-tlc:green.trips_$YEAR])
GROUP BY daynumber ORDER BY daynumber
In [29]:
trips = bq.Query(taxiquery, YEAR=2015).to_dataframe()
trips[:5]
Out[29]:
| daynumber | numtrips
---|---|---
0 | 1 | 62943
1 | 2 | 43410
2 | 3 | 53866
3 | 4 | 41602
4 | 5 | 41923
通常情况下,某事物的合理估计是其历史平均水平。因此,我们可以将我们的机器学习模型与历史平均水平进行比较。
In [30]:
avg = np.mean(trips['numtrips'])
print 'Just using average={0} has RMSE of {1}'.format(avg, np.sqrt(np.mean((trips['numtrips'] - avg)**2)))
Just using average=54674.0994475 has RMSE of 10163.4654442
这里的平均值约为55,000,而在这种情况下的根均方误差(RMSE)约为10,000。换句话说,如果我们估计在任意一天中都有55,000次出租车出游,那么在任一方向将平均有约为10,000的误差。
让我们看看是否可以做的比这更好 —— 我们的目的是进行出租车请求预测,使得RMSE少于10,000。
什么样的因素影响了人们对出租车的使用?
我们怀疑天气影响人们使用出租车的频率。也许在极冷或阴雨的情况下,那些通常走路去上班会乘坐出租车。
Google用户Felipe Hoffa已经公开他从美国国家海洋和大气管理局做出的气象观测到BigQuery中。让我们使用该数据集,并找到对应纽约La Guardia机场的站号。
In [31]:
%%sql
SELECT * FROM [fh-bigquery:weather_gsod.stations]
WHERE state = 'NY' AND wban != '99999' AND name contains 'LA GUARDIA'
Out[31]:
| usaf | wban | name | country | fips | state | call | lat | lon | elev | begin | end |
|---|---|---|---|---|---|---|---|---|---|---|---|
| 725030 | 14732 | NEW YORK/LA GUARDIA | US | US | NY | KLGA | 40779.0 | -73880.0 | 94 | ||
| 19350222 | 20120619 |
(rows: 1, time: 0.9s, cached, job: job_UpFXBnE1mCNSKcjG9VzAGXRsuZc)
让我们拉取La Guardia机场的最低和最高气温(华氏温度),以及降雨量(英寸)。
In [32]:
%%sql --module wxquery
SELECT DAYOFYEAR(TIMESTAMP('$YEAR'+mo+da)) daynumber,
FIRST(DAYOFWEEK(TIMESTAMP('$YEAR'+mo+da))) dayofweek,
MIN(min) mintemp, MAX(max) maxtemp, MAX(IF(prcp=99.99,0,prcp)) rain
FROM [fh-bigquery:weather_gsod.gsod$YEAR]
WHERE stn='725030' GROUP BY 1 ORDER BY daynumber DESC
In [33]:
weather = bq.Query(wxquery, YEAR=2015).to_dataframe()
weather[:5]
Out[33]:
| daynumber | dayofweek | mintemp | maxtemp | rain
---|---|---|---|---|---
0 | 365 | 5 | 46.0 | 48.2 | 0.17
1 | 364 | 4 | 34.0 | 48.0 | 0.13
2 | 363 | 3 | 33.8 | 46.9 | 0.37
3 | 362 | 2 | 39.0 | 62.1 | 0.02
4 | 361 | 1 | 46.0 | 62.6 | 0.14
使用Pandas,按天合并出租车和气象数据集。
In [34]:
data = pd.merge(weather, trips, on='daynumber')
data[:5]
Out[34]:
| daynumber | dayofweek | mintemp | maxtemp | rain | numtrips
---|---|---|---|---|---|---
0 | 181 | 3 | 64.0 | 82.0 | 0.00 | 42978
1 | 180 | 2 | 62.1 | 78.1 | 0.26 | 41397
2 | 179 | 1 | 60.1 | 73.9 | 1.32 | 54632
3 | 178 | 7 | 62.1 | 80.1 | 0.00 | 74883
4 | 177 | 6 | 69.1 | 84.9 | 0.00 | 58371
在最高气温和出游数之间是否存在关系呢?
In [35]:
j = data.plot(kind='scatter', x='maxtemp', y='numtrips')
上面的散点图看上去并不十分乐观。
一周内的某天与出游数是否存在关系呢?
In [36]:
j = data.plot(kind='scatter', x='dayofweek', y='numtrips')
哇,我们似乎已经找到了一个预测点。看起来,一周中越往后使用出租车越频繁。也许纽约人每周做出决定要多走路,再后来逐渐失去决心,或者它反映了纽约市旅游动态。
也许如果我们拿出一周内天的_混杂_影响,最高气温将会开始发挥效果。让我们看看是否是这样:
In [37]:
j = data[data['dayofweek'] == 7].plot(kind='scatter', x='maxtemp', y='numtrips')
移除混杂因素似乎反映了关于温度的基本趋势。但是……你不觉得数据有点稀疏吗?有些事情你必须铭记在心 —— 你开始考虑越多的预测点(这里我们使用两个:一周中的天以及最高气温),则需要越多的行,从而避免_过度拟合_模型。
让我们添加2014年的数据到Pandas dataframe中。注意,对我们来说,对YEAR模块化查询是多么有用。
In [38]:
trips = bq.Query(taxiquery, YEAR=2014).to_dataframe()
weather = bq.Query(wxquery, YEAR=2014).to_dataframe()
data2014 = pd.merge(weather, trips, on='daynumber')
data2014[:5]
Out[38]:
| daynumber | dayofweek | mintemp | maxtemp | rain | numtrips
---|---|---|---|---|---|---
0 | 365 | 4 | 27.0 | 35.1 | 0.0 | 67518
1 | 364 | 3 | 28.9 | 44.1 | 0.0 | 43173
2 | 363 | 2 | 37.9 | 52.0 | 0.1 | 37979
3 | 362 | 1 | 42.1 | 55.0 | 0.0 | 43055
4 | 361 | 7 | 39.9 | 55.0 | 0.0 | 46961
In [39]:
data2 = pd.concat([data, data2014])
data2.describe()
Out[39]:
| daynumber | dayofweek | mintemp | maxtemp | rain | numtrips
---|---|---|---|---|---|---
count | 546.000000 | 546.000000 | 546.000000 | 546.000000 | 546.000000 |
546.000000
mean | 152.501832 | 4.000000 | 43.511355 | 61.806410 | 0.130568 | 47130.009158
std | 101.099356 | 2.001834 | 18.598263 | 19.169493 | 0.360620 | 11613.865885
min | 1.000000 | 1.000000 | 3.000000 | 21.000000 | 0.000000 | 13436.000000
25% | 69.000000 | 2.000000 | 28.225000 | 44.100000 | 0.000000 | 39150.250000
50% | 137.000000 | 4.000000 | 44.100000 | 64.000000 | 0.000000 | 46720.500000
75% | 228.750000 | 6.000000 | 61.000000 | 79.000000 | 0.070000 | 53855.500000
max | 365.000000 | 7.000000 | 75.900000 | 93.000000 | 4.880000 | 81574.000000
In [40]:
j = data2[data2['dayofweek'] == 7].plot(kind='scatter', x='maxtemp', y='numtrips')
这些数据似乎更稳健一点。如果我们有更多的数据,当然会更好。但是,在这种情况下,我们只有2014年和2015年的数据,所以这就是我们将处理的。
我们将使用训练数据集的80%和20%的数据对我们已经训练的模型进行测试。让我们打乱Pandas dataframe的行,以使划分是随机的。预测(或输入)列将是数据库中的每一列,而不是出游数(这是我们的目标,或者我们所期望的预测)。
我们将使用的机器学习模型 —— 线性回归和神经网络 —— 都要求输入变量在本质上是数字。
然而,一周中的天是一个分类变量(例如,周二并不是真的大于周一)。因此,我们应该为它是否是周一(值为0或1)、周二等创建单独的列。
在这种情况下,我们确实有有限的数据(记住:你使用作为输入特性的列越多,在训练数据集中需要的行越多),并且对于一周的天,似乎有清晰的线性趋势。所以这里,为了简单起见,我们将选择它,并使用数据原样。如果你对这种简化的影响感到好奇,那么尝试取消为了对一周的天创建单独的列的代码的注释,然后重新运行notebook。
In [41]:
import tensorflow as tf
shuffled = data2.sample(frac=1)
# It would be a good idea, if we had more data, to treat the days as categorical variables
# with the small amount of data, we have though, the model tends to overfit
#predictors = shuffled.iloc[:,2:5]
#for day in xrange(1,8):
# matching = shuffled['dayofweek'] == day
# predictors.loc[matching, 'day_' + str(day)] = 1
# predictors.loc[~matching, 'day_' + str(day)] = 0
predictors = shuffled.iloc[:,1:5]
predictors[:5]
Out[41]:
| dayofweek | mintemp | maxtemp | rain
---|---|---|---|---
118 | 5 | 71.1 | 86.0 | 0.00
15 | 2 | 62.6 | 79.0 | 0.79
360 | 1 | 26.1 | 35.6 | 0.00
114 | 2 | 64.9 | 80.1 | 0.00
354 | 7 | 27.0 | 55.0 | 0.22
In [42]:
shuffled[:5]
Out[42]:
| daynumber | dayofweek | mintemp | maxtemp | rain | numtrips
---|---|---|---|---|---|---
118 | 247 | 5 | 71.1 | 86.0 | 0.00 | 43408
15 | 166 | 2 | 62.6 | 79.0 | 0.79 | 46807
360 | 5 | 1 | 26.1 | 35.6 | 0.00 | 20560
114 | 251 | 2 | 64.9 | 80.1 | 0.00 | 37058
354 | 11 | 7 | 27.0 | 55.0 | 0.22 | 29799
In [43]:
targets = shuffled.iloc[:,5]
targets[:5]
Out[43]:
118 43408
15 46807
360 20560
114 37058
354 29799
Name: numtrips, dtype: int64让我们基于80-20划分和更大的数据集来更新我们的基准。
In [44]:
trainsize = int(len(shuffled['numtrips']) * 0.8)
avg = np.mean(shuffled['numtrips'][:trainsize])
rmse = np.sqrt(np.mean((shuffled['numtrips'][trainsize:] - avg)**2))
print 'Just using average={0} has RMSE of {1}'.format(avg, rmse)
Just using average=47147.5206422 has RMSE of 12732.1343487
这里的关键语句是我们正在创建的模型:
model = (tf.matmul(feature_data, weights) + biases) * SCALE_NUM_TRIPS
这是一种线性模型和我们最小化的误差测量:
cost = tf.nn.l2_loss(model - target_data)
这是方误差(所以我们正在做的是最小二乘线性回归)。
剩下的部分就相当样板式了。缩放使得优化好得多(当权重是小数字时,更容易收敛,也就是说,不大可能放大)。我们保存权重到/tmp/trained_model,并且早测试数据集上展示均方根误差。
In [45]:
SCALE_NUM_TRIPS = 100000
trainsize = int(len(shuffled['numtrips']) * 0.8)
testsize = len(shuffled['numtrips']) - trainsize
npredictors = len(predictors.columns)
noutputs = 1
numiter = 10000
modelfile = '/tmp/trained_model'
with tf.Session() as sess:
feature_data = tf.placeholder("float", [None, npredictors])
target_data = tf.placeholder("float", [None, noutputs])
weights = tf.Variable(tf.truncated_normal([npredictors, noutputs], stddev=0.01))
biases = tf.Variable(tf.ones([noutputs]))
model = (tf.matmul(feature_data, weights) + biases) * SCALE_NUM_TRIPS
cost = tf.nn.l2_loss(model - target_data)
training_step = tf.train.AdamOptimizer(learning_rate=0.0001).minimize(cost)
init = tf.initialize_all_variables()
sess.run(init)
saver = tf.train.Saver({'weights' : weights, 'biases' : biases})
for iter in xrange(0, numiter):
sess.run(training_step, feed_dict = {
feature_data : predictors[:trainsize].values,
target_data : targets[:trainsize].values.reshape(trainsize, noutputs)
})
if iter%1000 == 0:
print '{0} error={1}'.format(iter, np.sqrt(cost.eval(feed_dict = {
feature_data : predictors[:trainsize].values,
target_data : targets[:trainsize].values.reshape(trainsize, noutputs)
}) / trainsize))
filename = saver.save(sess, modelfile, global_step=numiter)
print 'Model written to {0}'.format(filename)
print 'testerror={0}'.format(np.sqrt(cost.eval(feed_dict = {
feature_data : predictors[trainsize:].values,
target_data : targets[trainsize:].values.reshape(testsize, noutputs)
}) / testsize))
0 error=37226.769573
1000 error=11397.6734661
2000 error=10759.6089285
3000 error=10037.6474454
4000 error=9306.57428869
5000 error=8646.24931995
6000 error=8103.11729799
7000 error=7699.99690218
8000 error=7440.12137614
9000 error=7305.52026543
Model written to /tmp/trained_model-10000
testerror=8386.6236006
大约8400的测试误差表明,较之只是用历史平均值(基准),使用机器学习模型会得到更好的结果。
要从线性回归变成神经网络回归,修改模型以拥有更多层次,并且插入线性组合的功能性转变(这里,我们进行热鲁(relu),但我们也可以进行正切(tanh)或者sigmoid):
model = (tf.matmul(tf.nn.relu(tf.matmul(feature_data, weights1) + biases1), weights2) + biases2) * SCALE_NUM_TRIPS
还有跟多的权重,需要保存和检索,但是样板代码几乎跟线性回归的例子相同。
In [46]:
SCALE_NUM_TRIPS = 100000
trainsize = int(len(shuffled['numtrips']) * 0.8)
testsize = len(shuffled['numtrips']) - trainsize
npredictors = len(predictors.columns)
noutputs = 1
nhidden = 5
numiter = 10000
modelfile = '/tmp/trained_model'
with tf.Session() as sess:
feature_data = tf.placeholder("float", [None, npredictors])
target_data = tf.placeholder("float", [None, noutputs])
weights1 = tf.Variable(tf.truncated_normal([npredictors, nhidden], stddev=0.01))
weights2 = tf.Variable(tf.truncated_normal([nhidden, noutputs], stddev=0.01))
biases1 = tf.Variable(tf.ones([nhidden]))
biases2 = tf.Variable(tf.ones([noutputs]))
model = (tf.matmul(tf.nn.relu(tf.matmul(feature_data, weights1) + biases1), weights2) + biases2) * SCALE_NUM_TRIPS
cost = tf.nn.l2_loss(model - target_data)
training_step = tf.train.AdamOptimizer(learning_rate=0.0001).minimize(cost)
init = tf.initialize_all_variables()
sess.run(init)
saver = tf.train.Saver({'weights1' : weights1, 'biases1' : biases1, 'weights2' : weights2, 'biases2' : biases2})
for iter in xrange(0, numiter):
sess.run(training_step, feed_dict = {
feature_data : predictors[:trainsize].values,
target_data : targets[:trainsize].values.reshape(trainsize, noutputs)
})
if iter%1000 == 0:
print '{0} error={1}'.format(iter, np.sqrt(cost.eval(feed_dict = {
feature_data : predictors[:trainsize].values,
target_data : targets[:trainsize].values.reshape(trainsize, noutputs)
}) / trainsize))
filename = saver.save(sess, modelfile, global_step=numiter)
print 'Model written to {0}'.format(filename)
print 'testerror={0}'.format(np.sqrt(cost.eval(feed_dict = {
feature_data : predictors[trainsize:].values,
target_data : targets[trainsize:].values.reshape(testsize, noutputs)
}) / testsize))
0 error=38225.6868073
1000 error=10581.3908453
2000 error=8598.12454908
3000 error=7775.54995489
4000 error=7289.19077071
5000 error=7166.95208592
6000 error=7115.87436393
7000 error=7073.102492
8000 error=7059.94672122
9000 error=7048.97842816
Model written to /tmp/trained_model-10000
testerror=8181.71200578
使用神经网络似乎让我们在性能上有了额外的提升(当我运行它时,是从8400到8200 —— 由于随机种子,你实际的结果可能有所不同)。
所以,我们训练了一个模型,将其保存到一个文件中。给定三天的预期天气,让我们使用这个模型来预测出租车需求。
这里,根据那些输入,我们创建了一个Dataframe,加载保存的模型(注意,我们必须知道模型方程 —— 它并不保存在模型文件中),并用它来预测出租车需求。
In [47]:
input = pd.DataFrame.from_dict(data =
{'dayofweek' : [4, 5, 6],
'mintemp' : [50, 36, 54],
'maxtemp' : [80, 40, 75],
'rain' : [0, 0.8, 0.3]})
with tf.Session() as sess:
filename = modelfile + '-' + str(numiter)
saver = tf.train.Saver({'weights1' : weights1, 'biases1' : biases1, 'weights2' : weights2, 'biases2' : biases2})
saver.restore(sess, filename)
feature_data = tf.placeholder("float", [None, npredictors])
predict_operation = (tf.matmul(tf.nn.relu(tf.matmul(feature_data, weights1) + biases1), weights2) + biases2) * SCALE_NUM_TRIPS
predicted = sess.run(predict_operation, feed_dict = {
feature_data : input.values
})
print predicted
[[ 32465.81054688]
[ 49777.13671875]
[ 41207.85546875]]
看起来,我们应该告诉我们的一些出租车司机在周三(day=4)放一天假,并且在周四(day=5)全体出动。不足为奇 —— 预报显示周四有冷的要死的雨!
注意,周四通常是“慢”日子(周末出现出租车需求高峰),但是机器学习模型告诉我们,由于天气,这个特殊的周四会出现大量的需求。