|
35 | 35 | SONAR_SERVER = "sonarqube" |
36 | 36 | GCP_SA_CRED = "gcp-sa" |
37 | 37 |
|
38 | | - // Optional override (leave empty) — if you know a working path, set it to one of: |
| 38 | + // Optional override (leave empty). Accepts: |
39 | 39 | // - gs://hadoop-lib/hadoop-streaming/hadoop-streaming.jar |
40 | 40 | // - gs://<your-bucket>/lib/hadoop-streaming-3.3.6.jar |
41 | 41 | // - file:///usr/lib/hadoop-mapreduce/hadoop-streaming.jar |
@@ -112,28 +112,23 @@ spec: |
112 | 112 | echo "== Describe Dataproc cluster ==" && gcloud dataproc clusters describe "${CLUSTER_NAME}" --region "${REGION}" >/dev/null |
113 | 113 | echo "== Probe GCS bucket ==" && gsutil ls "gs://${BUCKET}/" || true |
114 | 114 |
|
115 | | - # Helper: robust downloader (curl -> wget -> python) |
| 115 | + # Helper downloader (curl -> wget -> python3) |
116 | 116 | dl() { |
117 | 117 | local url="$1" out="$2" |
118 | | - if command -v curl >/dev/null 2>&1; then |
119 | | - curl -fSL "$url" -o "$out" && return 0 |
120 | | - fi |
121 | | - if command -v wget >/dev/null 2>&1; then |
122 | | - wget -O "$out" "$url" && return 0 |
123 | | - fi |
| 118 | + if command -v curl >/dev/null 2>&1; then curl -fSL "$url" -o "$out" && return 0; fi |
| 119 | + if command -v wget >/dev/null 2>&1; then wget -O "$out" "$url" && return 0; fi |
124 | 120 | if command -v python3 >/dev/null 2>&1; then |
125 | 121 | python3 - "$url" "$out" << 'PY' |
126 | 122 | import sys, urllib.request |
127 | | -u,o=sys.argv[1],sys.argv[2] |
128 | | -urllib.request.urlretrieve(u,o) |
| 123 | +u,o=sys.argv[1],sys.argv[2]; urllib.request.urlretrieve(u,o) |
129 | 124 | PY |
130 | 125 | return 0 |
131 | 126 | fi |
132 | 127 | echo "No downloader available (curl/wget/python3)"; return 1 |
133 | 128 | } |
134 | 129 |
|
135 | 130 | # Resolve streaming jar |
136 | | - HSJ="${HADOOP_STREAMING_JAR:-}" # safe default avoids 'unbound variable' |
| 131 | + HSJ="${HADOOP_STREAMING_JAR:-}" |
137 | 132 | RESOLVED_JAR="" |
138 | 133 |
|
139 | 134 | # 1) Use provided env if valid |
|
146 | 141 | echo "Provided HADOOP_STREAMING_JAR not found: $HSJ" |
147 | 142 | fi |
148 | 143 | else |
149 | | - # allow file:/// (cannot preflight) |
150 | | - RESOLVED_JAR="$HSJ" |
| 144 | + RESOLVED_JAR="$HSJ" # allow file:/// |
151 | 145 | echo "Using provided non-GCS jar path: $RESOLVED_JAR" |
152 | 146 | fi |
153 | 147 | fi |
154 | 148 |
|
155 | | - # 2) Try public GCS locations |
| 149 | + # 2) Try public GCS |
156 | 150 | if [[ -z "$RESOLVED_JAR" ]]; then |
157 | 151 | for C in \ |
158 | 152 | "gs://hadoop-lib/hadoop-streaming/hadoop-streaming.jar" \ |
|
166 | 160 | done |
167 | 161 | fi |
168 | 162 |
|
169 | | - # 3) Fallback to cluster local path (will still stage a known-good jar next) |
| 163 | + # 3) Fallback to cluster local (then stage a known-good jar) |
170 | 164 | if [[ -z "$RESOLVED_JAR" ]]; then |
171 | 165 | RESOLVED_JAR="file:///usr/lib/hadoop-mapreduce/hadoop-streaming.jar" |
172 | 166 | echo "Fallback to cluster-local path: $RESOLVED_JAR" |
173 | 167 | fi |
174 | 168 |
|
175 | 169 | # 4) Stage known-good jar to your bucket and switch to it |
176 | | - # (ensures success even if cluster-local path doesn't exist) |
177 | 170 | if [[ "$RESOLVED_JAR" == file://* ]]; then |
178 | 171 | HVER="3.3.6" |
179 | 172 | LOCAL="hadoop-streaming-${HVER}.jar" |
|
192 | 185 | echo "Resolved jar (staged): $RESOLVED_JAR" |
193 | 186 | fi |
194 | 187 |
|
195 | | - # Persist for next stage |
196 | 188 | echo "export HADOOP_STREAMING_RESOLVED_JAR=\"$RESOLVED_JAR\"" > .resolved_jar.env |
197 | 189 | echo "Preflight OK. Using streaming jar: $RESOLVED_JAR" |
198 | 190 | ''' |
|
201 | 193 | } |
202 | 194 | } |
203 | 195 |
|
204 | | - stage('Prep inputs (upload .py to GCS)') { |
| 196 | + stage('Stage code (mapper/reducer) & data to GCS') { |
205 | 197 | steps { |
206 | 198 | container('cloud-sdk') { |
207 | 199 | withCredentials([file(credentialsId: env.GCP_SA_CRED, variable: 'GOOGLE_APPLICATION_CREDENTIALS')]) { |
|
212 | 204 | fi |
213 | 205 | gcloud config set project "${PROJECT_ID}" |
214 | 206 |
|
215 | | - INPUT_PATH="gs://${BUCKET}/inputs/${JOB_NAME}/${BUILD_NUMBER}" |
| 207 | + JOB_ROOT="gs://${BUCKET}/${JOB_NAME}/${BUILD_NUMBER}" |
| 208 | + CODE_PREFIX="${JOB_ROOT}/code" |
| 209 | + DATA_PREFIX="${JOB_ROOT}/data" |
| 210 | +
|
| 211 | + # discover mapper / reducer within repo |
| 212 | + MAP="${MAP:-}" |
| 213 | + RED="${RED:-}" |
| 214 | + if [[ -z "$MAP" ]]; then |
| 215 | + if [[ -f mapper.py ]]; then MAP=mapper.py; else MAP="$(git ls-files | grep -E '^mapper\\.py$|/?mapper\\.py$' | head -n1)"; fi |
| 216 | + fi |
| 217 | + if [[ -z "$RED" ]]; then |
| 218 | + if [[ -f reducer.py ]]; then RED=reducer.py; else RED="$(git ls-files | grep -E '^reducer\\.py$|/?reducer\\.py$' | head -n1)"; fi |
| 219 | + fi |
| 220 | + [[ -n "$MAP" && -n "$RED" ]] || { echo "mapper.py/reducer.py not found in repo"; exit 1; } |
| 221 | + echo "Mapper: $MAP" |
| 222 | + echo "Reducer: $RED" |
| 223 | +
|
| 224 | + # clean and upload ONLY mapper & reducer under code/ |
| 225 | + gsutil -m rm -r "${CODE_PREFIX}" >/dev/null 2>&1 || true |
| 226 | + gsutil -m cp "$MAP" "${CODE_PREFIX}/" |
| 227 | + gsutil -m cp "$RED" "${CODE_PREFIX}/" |
216 | 228 |
|
217 | | - gsutil -m rm -r "${INPUT_PATH}" >/dev/null 2>&1 || true |
| 229 | + # pick data files from repo (flat) – .txt/.csv/.log by default |
| 230 | + gsutil -m rm -r "${DATA_PREFIX}" >/dev/null 2>&1 || true |
| 231 | + mkdir -p /tmp/upload_data |
218 | 232 |
|
219 | | - mkdir -p /tmp/upload_py |
| 233 | + found=0 |
220 | 234 | while IFS= read -r f; do |
221 | | - mkdir -p "/tmp/upload_py/$(dirname "$f")" |
222 | | - cp "$f" "/tmp/upload_py/$f" |
223 | | - done < <(git ls-files '*.py') |
| 235 | + cp "$f" "/tmp/upload_data/$(basename "$f")" |
| 236 | + found=1 |
| 237 | + done < <(git ls-files | grep -Ei '\\.(txt|csv|log)$' || true) |
| 238 | +
|
| 239 | + # if no data files in repo, create a tiny sample |
| 240 | + if [[ "$found" -eq 0 ]]; then |
| 241 | + echo "No data files found (*.txt, *.csv, *.log). Creating sample..." |
| 242 | + cat > /tmp/upload_data/sample.txt <<EOF |
| 243 | +alpha |
| 244 | +beta |
| 245 | +gamma |
| 246 | +alpha |
| 247 | +beta |
| 248 | +alpha |
| 249 | +EOF |
| 250 | + fi |
| 251 | +
|
| 252 | + gsutil -m cp /tmp/upload_data/* "${DATA_PREFIX}/" |
224 | 253 |
|
225 | | - (cd /tmp/upload_py && gsutil -m cp -r . "${INPUT_PATH}/") |
226 | | - echo "Uploaded inputs to ${INPUT_PATH}" |
| 254 | + # persist paths for submit stage |
| 255 | + { |
| 256 | + echo "export CODE_PREFIX='${CODE_PREFIX}'" |
| 257 | + echo "export DATA_PREFIX='${DATA_PREFIX}'" |
| 258 | + echo "export MAP_BASENAME='$(basename "$MAP")'" |
| 259 | + echo "export RED_BASENAME='$(basename "$RED")'" |
| 260 | + } >> .resolved_jar.env |
| 261 | +
|
| 262 | + echo "Staged code -> ${CODE_PREFIX}" |
| 263 | + echo "Staged data -> ${DATA_PREFIX}" |
227 | 264 | ''' |
228 | 265 | } |
229 | 266 | } |
|
242 | 279 | gcloud config set project "${PROJECT_ID}" |
243 | 280 | gcloud config set dataproc/region "${REGION}" |
244 | 281 |
|
245 | | - # load resolved jar |
| 282 | + # load resolved vars |
246 | 283 | source .resolved_jar.env |
247 | | - echo "Submitting with streaming JAR: ${HADOOP_STREAMING_RESOLVED_JAR}" |
| 284 | + echo "Streaming JAR: ${HADOOP_STREAMING_RESOLVED_JAR}" |
| 285 | + echo "CODE_PREFIX : ${CODE_PREFIX}" |
| 286 | + echo "DATA_PREFIX : ${DATA_PREFIX}" |
248 | 287 |
|
249 | | - INPUT_PREFIX="gs://${BUCKET}/inputs/${JOB_NAME}/${BUILD_NUMBER}" |
250 | 288 | OUT="gs://${BUCKET}/results/${JOB_NAME}/${BUILD_NUMBER}" |
251 | | -
|
252 | | - # discover mapper / reducer |
253 | | - MAP="${MAP:-}" |
254 | | - RED="${RED:-}" |
255 | | - if [[ -z "$MAP" ]]; then |
256 | | - if [[ -f mapper.py ]]; then MAP=mapper.py; else MAP="$(git ls-files | grep -E '/?mapper\\.py$' | head -n1)"; fi |
257 | | - fi |
258 | | - if [[ -z "$RED" ]]; then |
259 | | - if [[ -f reducer.py ]]; then RED=reducer.py; else RED="$(git ls-files | grep -E '/?reducer\\.py$' | head -n1)"; fi |
260 | | - fi |
261 | | - [[ -n "$MAP" && -n "$RED" ]] || { echo "mapper.py/reducer.py not found"; exit 1; } |
262 | | -
|
263 | | - echo "Using mapper: $MAP" |
264 | | - echo "Using reducer: $RED" |
265 | | -
|
266 | | - MAP_GS="${INPUT_PREFIX}/${MAP}" |
267 | | - RED_GS="${INPUT_PREFIX}/${RED}" |
268 | | -
|
269 | 289 | gsutil -m rm -r "${OUT}" >/dev/null 2>&1 || true |
270 | 290 |
|
| 291 | + # Use files from flat data prefix only (avoid directories) |
| 292 | + # Ship mapper/reducer via -files from code prefix |
271 | 293 | gcloud dataproc jobs submit hadoop \ |
272 | 294 | --cluster="${CLUSTER_NAME}" \ |
273 | 295 | --region="${REGION}" \ |
274 | 296 | --jar="${HADOOP_STREAMING_RESOLVED_JAR}" \ |
275 | 297 | -- \ |
276 | | - -D mapreduce.job.maps=4 \ |
277 | | - -D mapreduce.job.reduces=2 \ |
278 | | - -files "${MAP_GS},${RED_GS}" \ |
279 | | - -mapper "python3 $(basename "${MAP}")" \ |
280 | | - -reducer "python3 $(basename "${RED}")" \ |
281 | | - -input "${INPUT_PREFIX}" \ |
| 298 | + -D mapreduce.job.maps=2 \ |
| 299 | + -D mapreduce.job.reduces=1 \ |
| 300 | + -files "${CODE_PREFIX}/${MAP_BASENAME},${CODE_PREFIX}/${RED_BASENAME}" \ |
| 301 | + -mapper "python3 ${MAP_BASENAME}" \ |
| 302 | + -reducer "python3 ${RED_BASENAME}" \ |
| 303 | + -input "${DATA_PREFIX}/*" \ |
282 | 304 | -output "${OUT}" |
283 | 305 |
|
284 | 306 | gsutil cat "${OUT}"/part-* | tee line_counts.txt |
|
0 commit comments