-
Notifications
You must be signed in to change notification settings - Fork 18
Description
Since the update to mlflow integration with hyperopt where names are automatically assigned to experiments (such as smiling-worm-674), I began getting the following error consistently when running a previously working mlflow experiment with SparkTrials().
ERROR:hyperopt-spark:trial task 0 failed, exception is
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 405.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 405.0 (TID 1472) (10.143.252.81 executor 0):
org.apache.spark.api.python.PythonException: '_pickle.PicklingError: Could not serialize object: IndexError: tuple index out of range'
However, my experiment is not doing any pickling and my code is not referenced in the full traceback, so I am not exactly sure what the issue is. I can confirm that the experiment works when using hyperopt.Trials() rather than hyperopt.SparkTrials(). Apologies for such a lengthy issue, and sorry if the issue is some simple mistake on my end!
Here is the full traceback:
Full Traceback
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
File "/databricks/python/lib/python3.9/site-packages/patsy/origin.py", line 117, in __getstate__
raise NotImplementedError
NotImplementedError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/serializers.py", line 527, in dumps
return cloudpickle.dumps(obj, pickle_protocol)
File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 604, in dump
if "recursion" in e.args[0]:
IndexError: tuple index out of range
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 876, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 868, in process
serializer.dump_stream(out_iter, outfile)
File "/databricks/spark/python/pyspark/serializers.py", line 329, in dump_stream
bytes = self.serializer.dumps(vs)
File "/databricks/spark/python/pyspark/serializers.py", line 537, in dumps
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: IndexError: tuple index out of range
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:692)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:902)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:884)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:645)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1029)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:168)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:136)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:96)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:889)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1692)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:892)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:747)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3257)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3189)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3180)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3180)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1414)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1414)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1414)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3466)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3407)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3395)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1166)
at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2702)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1027)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:411)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1025)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:282)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.GeneratedMethodAccessor282.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:306)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
at java.lang.Thread.run(Thread.java:748)
The following is the code that is being run in the experiments:
mlflow.start_run()
spark_trials = SparkTrials(parallelism=16)
with mlflow.start_run(run_name='test_experiment'):
best_result = fmin(
fn=objective,
space=space,
algo=tpe.suggest,
max_evals=1024,
trials=spark_trials)
Hyperopt Optimization Function
def objective(args):
# Initialize model pipeline
pipe = Pipeline(steps=[
('selection', args['selection'])
])
pipe.set_params(**args['params']) # Model parameters will be set here
pipe.fit(X, y)
penalty = pipe['selection'].penalty_
try:
residual = np.sum(pipe['selection']._resid) / len(pipe['selection']._resid)
except AttributeError:
residual = -10000
r2 = r2_score(y, pipe.predict(X))
score = 1 - r2
mean_square = mean_squared_error(y, pipe.predict(X))
mlflow.log_metric('avg_residual', residual)
mlflow.log_metric('mean_squared_error', mean_square)
mlflow.log_metric('penalty', penalty)
mlflow.log_metric('r2', r2)
print(f"Model Name: {args['selection']}: ", score)
# Since we have to minimize the score, we return 1- score.
return {'loss': score, 'status': STATUS_OK}
Here are the parameters and parameter space:
Params and Parameter Space
params = {
'selection__fixed': hp.choice('selection.fixed', fixed_arrs),
'selection__random': hp.choice('selection.random', random_arrs),
'selection__intercept': hp.choice('selection.intercept', (0, 1)),
'selection__cov': hp.choice('selection.cov', (0, 1))
}
space = hp.choice('regressors', [
{
'selection':LMEBaseRegressor(group=['panel'],
dependent=dependent,
media=media_cols),
'params': params
}
]
)
And finally here is the regressor I am using (including because its a custom class built ontop of sklearn):
LMEBaseRegressor Class
class LMEBaseRegressor(BaseEstimator, RegressorMixin):
"""Implementation of an LME Regression for scikit."""
def __init__(self, random=None, fixed=None,
group=['panel'], dependent=None,
intercept=0, cov=0, media=None):
self.random = random
self.fixed = fixed
self.group = group
self.dependent = dependent
self.intercept = intercept
self.cov = cov
self.media = media
def fit(self, X, y):
"""Fit the model with LME."""
str_dep = self.dependent[0]
str_fixed = ' + '.join(self.fixed)
str_random = ' + '.join(self.random)
data = pd.concat([X, y], axis=1)
self.penalty_ = 0
print(f"{str_dep} ~ {self.intercept} + {str_fixed}")
print(f"{self.cov} + {str_random}")
try:
mixed = smf.mixedlm(f"{str_dep} ~ {self.intercept} + {str_fixed}",
data,
re_formula=f"~ {self.cov} + {str_random}",
groups=data['panel'],
use_sqrt=True)\
.fit(method=['lbfgs'])
self._model = mixed
self._resid = mixed.resid
self.coef_ = mixed.params[0:len(self.fixed)]
except(ValueError):
print("Cannot predict random effects from singular covariance structure.")
self.penalty_ = 100
except(np.linalg.LinAlgError):
print("Linear Algebra Error: recheck base model fit or try using fewer variables.")
self.penalty_ = 100
return self
def predict(self, X):
"""Take the coefficients provided from fit and multiply them by X."""
if self.penalty_ != 0:
return np.ones(len(X)) * -100 * self.penalty_
return self._model.predict(X)