Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add SparkContext.addArchive in PySpark
  • Loading branch information
HyukjinKwon committed Feb 22, 2022
commit cc424293f3f5c39a623e0ee54b65ecd51fa7c629
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Spark Context APIs

SparkContext.PACKAGE_EXTENSIONS
SparkContext.accumulator
SparkContext.addArchive
SparkContext.addFile
SparkContext.addPyFile
SparkContext.applicationId
Expand Down
47 changes: 47 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,53 @@ def addPyFile(self, path: str) -> None:

importlib.invalidate_caches()

def addArchive(self, path: str) -> None:
"""
Add an archive to be downloaded with this Spark job on every node.
The `path` passed can be either a local file, a file in HDFS
(or other Hadoop-supported filesystems), or an HTTP, HTTPS or
FTP URI.

To access the file in Spark jobs, use :meth:`SparkFiles.get` with the
filename to find its download/unpacked location. The given path should
be one of .zip, .tar, .tar.gz, .tgz and .jar.

.. versionadded:: 3.3.0

Notes
-----
A path can be added only once. Subsequent additions of the same path are ignored.

Examples
--------

Creates a zipped file that contains a text file written '100'.

>>> import zipfile
>>> from pyspark import SparkFiles
>>> path = os.path.join(tempdir, "test.txt")
>>> zip_path = os.path.join(tempdir, "test.zip")
>>> with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipped:
... with open(path, "w") as f:
... _ = f.write("100")
... zipped.write(path, os.path.basename(path))
>>> sc.addArchive(zip_path)

Read '100' as an integer and process with the data in the RDD.

>>> def func(iterator):
... with open("%s/test.txt" % SparkFiles.get("test.zip")) as f:
... v = int(f.readline())
... return [x * int(v) for x in iterator]
>>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()
[100, 200, 300, 400]

Notes
-----
This API is experimental.
"""
self._jsc.sc().addArchive(path)

def setCheckpointDir(self, dirName: str) -> None:
"""
Set the directory under which RDDs are going to be checkpointed. The
Expand Down