From cc424293f3f5c39a623e0ee54b65ecd51fa7c629 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Tue, 22 Feb 2022 14:04:09 +0900 Subject: [PATCH 1/2] Add SparkContext.addArchive in PySpark --- python/docs/source/reference/pyspark.rst | 1 + python/pyspark/context.py | 47 ++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/python/docs/source/reference/pyspark.rst b/python/docs/source/reference/pyspark.rst index 6d4d0b55477c..bf4e66ee3353 100644 --- a/python/docs/source/reference/pyspark.rst +++ b/python/docs/source/reference/pyspark.rst @@ -53,6 +53,7 @@ Spark Context APIs SparkContext.PACKAGE_EXTENSIONS SparkContext.accumulator + SparkContext.addArchive SparkContext.addFile SparkContext.addPyFile SparkContext.applicationId diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 68f748e68faa..d19d8e229bf8 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -1278,6 +1278,53 @@ def addPyFile(self, path: str) -> None: importlib.invalidate_caches() + def addArchive(self, path: str) -> None: + """ + Add an archive to be downloaded with this Spark job on every node. + The `path` passed can be either a local file, a file in HDFS + (or other Hadoop-supported filesystems), or an HTTP, HTTPS or + FTP URI. + + To access the file in Spark jobs, use :meth:`SparkFiles.get` with the + filename to find its download/unpacked location. The given path should + be one of .zip, .tar, .tar.gz, .tgz and .jar. + + .. versionadded:: 3.3.0 + + Notes + ----- + A path can be added only once. Subsequent additions of the same path are ignored. + + Examples + -------- + + Creates a zipped file that contains a text file written '100'. + + >>> import zipfile + >>> from pyspark import SparkFiles + >>> path = os.path.join(tempdir, "test.txt") + >>> zip_path = os.path.join(tempdir, "test.zip") + >>> with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipped: + ... with open(path, "w") as f: + ... _ = f.write("100") + ... zipped.write(path, os.path.basename(path)) + >>> sc.addArchive(zip_path) + + Read '100' as an integer and process with the data in the RDD. + + >>> def func(iterator): + ... with open("%s/test.txt" % SparkFiles.get("test.zip")) as f: + ... v = int(f.readline()) + ... return [x * int(v) for x in iterator] + >>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect() + [100, 200, 300, 400] + + Notes + ----- + This API is experimental. + """ + self._jsc.sc().addArchive(path) + def setCheckpointDir(self, dirName: str) -> None: """ Set the directory under which RDDs are going to be checkpointed. The From c40ee2c299bb8a786ba852926b9a6494f8099018 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Tue, 22 Feb 2022 17:15:29 +0900 Subject: [PATCH 2/2] Address comments --- python/pyspark/context.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index d19d8e229bf8..2f1746b0a434 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -1294,10 +1294,10 @@ def addArchive(self, path: str) -> None: Notes ----- A path can be added only once. Subsequent additions of the same path are ignored. + This API is experimental. Examples -------- - Creates a zipped file that contains a text file written '100'. >>> import zipfile @@ -1310,7 +1310,8 @@ def addArchive(self, path: str) -> None: ... zipped.write(path, os.path.basename(path)) >>> sc.addArchive(zip_path) - Read '100' as an integer and process with the data in the RDD. + Reads the '100' as an integer in the zipped file, and processes + it with the data in the RDD. >>> def func(iterator): ... with open("%s/test.txt" % SparkFiles.get("test.zip")) as f: @@ -1318,10 +1319,6 @@ def addArchive(self, path: str) -> None: ... return [x * int(v) for x in iterator] >>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect() [100, 200, 300, 400] - - Notes - ----- - This API is experimental. """ self._jsc.sc().addArchive(path)