Skip to content

Commit 7f6f8b2

Browse files
authored
Start working on adding previous run data (#114)
* Start working on adding previous run data * Fix up load previous run a bit * Drop large target size ex. * line breaks on log ops make sense to me. * in CI we don't have previous jobs * Use modern spark-testing-base for Python and use session not SQLCtx.
1 parent 61c0b31 commit 7f6f8b2

File tree

4 files changed

+53
-8
lines changed

4 files changed

+53
-8
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import os
2+
import tempfile
3+
4+
5+
class LoadPreviousRunData(object):
6+
def __init__(self, session):
7+
self.session = session
8+
9+
def find_oldest_id(self, local_path):
10+
"""Find the oldest Spark job since it's probably not being updated."""
11+
directories = os.listdir(local_path)
12+
return min(directories, key=lambda x: os.path.getmtime(f"{local_path}/{x}"))
13+
14+
def do_magic(self):
15+
local_path = "/tmp/spark-events"
16+
event_log_path = f"file://{local_path}"
17+
application_id = self.find_oldest_id(local_path)
18+
return self.load_json_records(event_log_path, application_id)
19+
20+
# tag::load[]
21+
def load_json_records(self, event_log_path, application_id):
22+
print(f"Loading {application_id}")
23+
full_log_path = f"{event_log_path}/{application_id}"
24+
df = self.session.read.json(full_log_path)
25+
special_events = df.filter(
26+
(df["Event"] == "SparkListenerExecutorAdded")
27+
| (df["Event"] == "SparkListenerJobEnd")
28+
)
29+
special_events.show()
30+
31+
# end::load[]
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from pyspark.sql.session import SparkSession
2+
import os
3+
import tempfile
4+
5+
from sparktestingbase.sqltestcase import SQLTestCase
6+
from .load_previous_run_data import LoadPreviousRunData
7+
8+
9+
class TestLoadPreviousRunData(SQLTestCase):
10+
def test_do_magic(self):
11+
lprd = LoadPreviousRunData(self.session)
12+
try:
13+
lprd.do_magic()
14+
except FileNotFoundError:
15+
print("No previous jobs")

python/requirements.txt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
spark-testing-base
22
pandas
33
pyarrow
4-
pyspark<3.5
4+
pyspark==3.5.0
55
pyspark-asyncactions
66
pandera
77
pandera[pyspark]
88
spark-expectations>=1.0
99
venv-pack
10-
delta-spark
1110
requests

python/tox.ini

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ isolated_build = True
44
requires = tox-conda
55
envlist =
66
isort
7-
py39
7+
py310
88
black
99
mypy
1010
flake8
@@ -13,9 +13,9 @@ skip_missing_interpeters = true
1313

1414
[gh-actions]
1515
python =
16-
3.9: py39
16+
# 3.9: py39
1717
# We need a new version of PySpark w/3.10 support.
18-
# 3.10: py310
18+
3.10: py310
1919

2020
[testenv]
2121
setenv =
@@ -29,9 +29,9 @@ extras =
2929
deps =
3030
pytest
3131
isort==4.3.21
32-
pyspark
32+
pyspark==3.5.0
3333
flake8
34-
spark-testing-base
34+
spark-testing-base>=0.11.1
3535
-rrequirements.txt
3636
commands =
3737
pytest examples \
@@ -56,7 +56,7 @@ deps =
5656
[testenv:flake8]
5757
extras = tests
5858
skipsdist = True
59-
commands = flake8 --ignore=F403,E402,F401,F405 examples
59+
commands = flake8 --ignore=F403,E402,F401,F405,W503 examples
6060
allowlist_externals = flake8
6161

6262
[testenv:mypy]

0 commit comments

Comments
 (0)