为什么PandasUDF没有被并行化?
我有来自许多物联网传感器的数据。对于每个特定的传感器,数据帧中只有大约 100 行:数据没有倾斜。我正在为每个传感器训练一个单独的机器学习模型。
我正在pandas udf成功地使用并行训练和记录不同模型的 mlflow 指标(据说),如这里所教。
在 Azure 上使用 Databricks 和单节点集群(Standard_DS3_v2 - 14GB 内存 - 4 核)我能够在大约 23分钟内完成所有训练。
因为pandas udf,据说,为每个组并行计算,我认为我可以通过使用具有更多内核的单节点集群或使用具有更多工作人员的集群更快地完成训练。所以我尝试运行相同的笔记本:
- 一组计算机:1 个 master + 3 个 worker,全部(Standard_DS3_v2 - 14GB 内存 - 4 核)
- 具有(Standard_DS5_v2 - 56GB 内存 - 16 核)的单节点集群
对于我惊讶的是,训练时间没有减少:23分钟的选项1,然后26.5min选项2
我尝试使用较新的 applyInPandas,但结果大致相同。
注意:@Chris 回答后,查看 Web UI 上的 Stage Detail 页面(对于具有 1 个 master + 3 个 worker 的集群),我看到我只有一个 stage 负责 udf pandas 培训。花了 20 分钟。访问这个阶段的细节,我看到它只有一个任务,用{'Locality Level': 'PROCESS_LOCAL'},花了整整 20 分钟。截图如下。
所以@Chris 已经确定了问题:训练没有被并行化。
为了理解为什么applyInPandas(or udf pandas) 没有被并行化,我把我的代码放在下面(和applyInPandas版本一起)。请注意,我的目标只是用 记录训练后的模型指标mlflow,因此函数返回的只是它收到的原始 df。
另请注意,代码按预期工作。mlflow 正在成功记录培训。我唯一的问题是为什么它没有被并行化。
我有一种感觉,问题出在for loop,因为它与教程不同。
import pyspark.sql.functions as f
import mlflow
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error
import pmdarima as pm
from statsmodels.tsa.statespace.sarimax import SARIMAX
def train_model(df_pandas):
'''
Trains a model on grouped instances
'''
original_df = df_pandas.copy() #the original df will be returned in the end
PA = df_pandas['Princípio_Ativo'].iloc[0]
run_id = df_pandas['run_id'].iloc[0] # Pulls run ID to do a nested run
observacoes_no_teste = 12
horizonte = 1
observacoes_total = len(df_pandas.index)
observacoes_no_train = len(df_pandas.index) - observacoes_no_teste
try:
#train test split
X = df_pandas[:observacoes_no_train]['Demanda']
y = df_pandas[observacoes_no_train:]['Demanda']
# Train the model
model = pm.auto_arima(X, seasonal=True, m=12)
order = model.get_params()['order']
seasonal_order = model.get_params()['seasonal_order']
except:
pass
# Resume the top-level training
with mlflow.start_run(run_id=run_id, experiment_id=1333367041812290):
# Create a nested run for the specific device
with mlflow.start_run(run_name=str(PA), nested=True, experiment_id=1333367041812290) as run:
mae_list = []
mse_list = []
previsoes_list = []
teste_list = []
predictions_list = []
try:
#the purpose of the following loop is to do backtesting: the model is trained with n observations, and the (n+1)th is predicted. n is increased by 1 on each iteration.
for i in range(observacoes_total-observacoes_no_train-horizonte+1):
#train test split
X = df_pandas[:observacoes_no_train+i]['Demanda']
y = df_pandas[observacoes_no_train+i:observacoes_no_train+i+horizonte]['Demanda']
#train model
model = SARIMAX(X, order=order, seasonal_order=seasonal_order)
model = model.fit()
#make predictions
predictions = model.predict(start=observacoes_no_train + i, end=(observacoes_no_train + i + horizonte-1))
predictions_list.append(predictions)
mse = round(mean_squared_error(y, predictions),2)
mae = round(mean_absolute_error(y, predictions),2)
mse_list.append(mse)
mae_list.append(mae)
#series with predictions
in_sample_predictions = pd.concat(predictions_list)
in_sample_predictions.name = 'in_sample'
#out of sample predictions
hp = 3
out_of_sample_predictions = model.predict(start=observacoes_total, end=(observacoes_total + hp - 1))
out_of_sample_predictions.name = 'out_sample'
#in sample + out of sample predictions
df_predictions = pd.concat([df_pandas.drop('run_id',axis=1), in_sample_predictions,out_of_sample_predictions], axis=1)
#save df with predictions to be logged as an artifact my mlflow.
df_predictions.to_csv('df_predictions.csv')
#mlflow logging
mlflow.log_param("Princípio_Ativo", PA)
mlflow.log_param("mae_list", str(mae_list))
mlflow.log_param("mse_list", str(mse_list))
mlflow.log_param("status_sucesso", 'sim')
mlflow.log_artifact('df_predictions.csv')
except:
mlflow.log_param("status_falha", 'sim')
return original_df.drop('run_id', axis=1)
with mlflow.start_run(run_name="SARIMA", experiment_id=1333367041812290) as run:
run_id = run.info.run_uuid
modelDirectoriesDF = (df
.withColumn("run_id", f.lit(run_id)) # Add run_id
.groupby("Princípio_Ativo")
.applyInPandas(train_model, schema=df.schema)
)
combinedDF = (df
.join(modelDirectoriesDF, on="Princípio_Ativo", how="left")
)
display(combinedDF)
Spark UI 的屏幕截图: