Installation for MacOS

source: https://medium.com/@MinatoNamikaze02/installing-hadoop-on-macos-m1-m2-2023-d963abeab38e

brew install hadoop
cd /opt/homebrew/Cellar/hadoop/3.4.1/libexec/etc/hadoop

Configure core-site.xml, hdfs-site.xml, mapred-site.xml, yarn-site.xml and hadoop-env.sh

brew install --cask adoptopenjdk8 // deprecated
brew install --cask temurin@8 // use this instead. It will install Rosetta 2 which is very hard to remove from MacOS. Not really important.
hadoop % export JAVA_HOME="/Library/Java/JavaVirtualMachines/temurin-8.jdk/Contents/Home"

If start-all.sh throws an error like this Permission denied (publickey,password,keyboard-interactive) , then call ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa and cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

hadoop namenode -format

Start

start-all.sh

Anything should go wrong, just run

hadoop namenode -format

Don’t forget to stop it

stop-all.sh

Uninstall

Hadoop

brew uninstall --ignore-dependencies hadoop
 
brew autoremove
 
brew cleanup --prune=all
 
rm -rf ~/hadoop ~/.hadoop
 
nano ~/.zshrc  # or ~/.bashrc, and remove Hadoop-related exports
 
brew uninstall --force openjdk@11 mysql qemu python@3.13 gettext glib cairo
 
brew list ( and remove what we do not want )

Temurin (java)

brew uninstall --cask temurin@8

ssh-key

> ~/.ssh/authorized_keys

MapReduce with Python

a.k.a. the main MISSION of this assignment

I created hadoop_test/ which contains mapper.py and reducer.py. I have two files file01.txt and file02.txt which got the following contents — echo "Hello World Bye World" > file01.txt, echo "Hello Hadoop Goodbye Hadoop" > file02.txt.

"""
mapper.py:
"""
#!/usr/bin/env python3
import sys
 
for line in sys.stdin:
    for word in line.strip().split():
        print(f"{word}\t1")
 
"""
reducer.py
"""
#!/usr/bin/env python3
import sys
 
current_word = None
current_count = 0
for line in sys.stdin:
    word, count = line.strip().split("\t")
    count = int(count)
    if word == current_word:
        current_count += count
    else:
        if current_word:
            print(f"{current_word}\t{current_count}")
        current_word = word
        current_count = count
if current_word:
    print(f"{current_word}\t{current_count}")

I made both of them executables like this: chmod +x mapper.py reducer.py.

Run the Hadoop Streaming Job

hadoop fs -mkdir -p /user/costinchitic/input
hadoop fs -put file01.txt file02.txt /user/costinchitic/input

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming*.jar \
  -input /user/costinchitic/input \
  -output /user/costinchitic/output \
  -mapper mapper.py \
  -reducer reducer.py

From terminal:

costinchitic@Costins-MacBook-Pro hadoop_test % hdfs dfs -ls /user/costinchitic/input

Found 2 items

-rw-r--r--   1 costinchitic supergroup         22 2025-05-10 17:40 /user/costinchitic/input/file01.txt

-rw-r--r--   1 costinchitic supergroup         24 2025-05-10 17:40 /user/costinchitic/input/file02.txt

costinchitic@Costins-MacBook-Pro hadoop_test % hdfs dfs -ls /user/costinchitic/output

Found 2 items

-rw-r--r--   1 costinchitic supergroup          0 2025-05-10 17:48 /user/costinchitic/output/_SUCCESS

-rw-r--r--   1 costinchitic supergroup         31 2025-05-10 17:48 /user/costinchitic/output/part-00000

costinchitic@Costins-MacBook-Pro hadoop_test % hdfs dfs -cat /user/costinchitic/input/file01.txt

Hello World Bye World

costinchitic@Costins-MacBook-Pro hadoop_test % hdfs dfs -cat /user/costinchitic/input/file02.txt

Hello Hadoop Bye Hadoop

costinchitic@Costins-MacBook-Pro hadoop_test % hdfs dfs -cat /user/costinchitic/output/part-00000

Bye 2
Hadoop 2
Hello 2
World 2

After browsing to http://localhost:9870, I can view inside /user/costinchitic/input and output the files and the result!

Air Quality

Next, for the Air Quality application, I have a dataset from https://www.epa.gov/outdoor-air-quality-data/download-daily-data.

These are daily readings of CO concentration from 5 sites in California for the year 2023: Oakland, Oakland West, Laney College, Berkeley- Aquatic Park

The goal is to process the data from a csv/txt file and compute monthly averages, as well as filtering entries after some kind of pattern like average monthly CO concentration higher than annual averages.

I created \hadoop_co_avg where I put the .txt file, and the mapper.py and reducer.py. Again we have to call chmod +x mapper_co_avg.py reducer_co_avg.py

Filter Monthly

"""
mapper_co_avg.py
"""
#!/usr/bin/env python3
import sys
import csv
reader = csv.reader(sys.stdin)
 
for fields in reader:
    if not fields or len(fields) < 8:  # Skip empty or short rows
        continue
    if fields[0].startswith("Date"):  # Skip header
        continue
    try:
        station_id = fields[2]
        station_name = fields[7]
        date = fields[0]
        co_value = float(fields[4])
 
        date_parts = date.split("/")
        if len(date_parts) < 3:
            continue
        month = date_parts[0].zfill(2)
        year = date_parts[2]
        month_year = f"{month}/{year}"
 
        key = f"{station_id}_{station_name}_{month_year}"
        print(f"{key}\t{co_value}")
    except Exception:
        continue
 
"""
reducer_co_avg.py
"""
#!/usr/bin/env python3
import sys
 
current_key = None
total = 0.0
count = 0
for line in sys.stdin:
	key, value = line.strip().split('\t')
	value = float(value)
if key == current_key:
	total += value
	count += 1
else:
	if current_key:
		avg = total / count
		print(f"{current_key}\t{avg:.2f}")
	current_key = key
	total = value
	count = 1
if current_key:
	avg = total / count
	print(f"{current_key}\t{avg:.2f}")

Run the Hadoop Streaming Job

hdfs dfs -mkdir -p /user/costinchitic/air_quality_input
hdfs dfs -put air_quality/airquality.txt /user/costinchitic/air_quality_input

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming*.jar \
  -input /user/costinchitic/air_quality_input \
  -output /user/costinchitic/air_quality_output2 \
  -mapper mapper_co_avg.py \
  -reducer reducer_co_avg.py

After a successful run, we can see:

costinchitic@Costins-MacBook-Pro hadoop_co_avg % hdfs dfs -ls /user/costinchitic/air_quality_output

Found 2 items

-rw-r--r--   1 costinchitic supergroup          0 2025-05-11 01:07 /user/costinchitic/air_quality_output/_SUCCESS

-rw-r--r--   1 costinchitic supergroup          0 2025-05-11 01:07 /user/costinchitic/air_quality_output/part-00000

costinchitic@Costins-MacBook-Pro hadoop_co_avg % hdfs dfs -cat /user/costinchitic/air_quality_output/part-00000

StationID_StationName_Month Average CO Concentration

60010009_Oakland_01/2023 0.46

60010009_Oakland_02/2023 0.51

60010013_Berkeley- Aquatic Park_12/2023 0.76

...

Filter by monthly and annual averages

Filtering in this case is done by 2 steps of map reduce:

  • Compute Monthly CO averages per station intermediate output,
  • Compute Annual CO average per station using intermediate output, and filter this file into a final output monthly averages filtered.
"""
mapper1.py
"""
#!/usr/bin/env python3
import sys
import csv
 
reader = csv.reader(sys.stdin)
 
for fields in reader:
    if not fields or len(fields) < 8:
        continue
    if fields[0].startswith("Date"):
        continue
    try:
        station_id = fields[2]
        station_name = fields[7]
        date = fields[0]
        co_value = float(fields[4])
 
        date_parts = date.split("/")
        if len(date_parts) < 3:
            continue
        month = date_parts[0].zfill(2)
        year = date_parts[2]
        month_year = f"{month}/{year}"
 
        key = f"{station_id}_{station_name}_{month_year}"
        print(f"{key}\t{co_value}")
    except:
        continue
 
"""
reducer1.py
"""
 
#!/usr/bin/env python3
import sys
 
current_key = None
sum_val = 0.0
count = 0
 
print("StationID_StationName_Month\tAverage CO Concentration")
 
for line in sys.stdin:
    line = line.strip()
    if not line or "\t" not in line:
        continue
    try:
        key, value = line.split("\t")
        value = float(value)
 
        if key == current_key:
            sum_val += value
            count += 1
        else:
            if current_key:
                print(f"{current_key}\t{sum_val / count:.2f}")
            current_key = key
            sum_val = value
            count = 1
    except:
        continue
 
if current_key and count > 0:
    print(f"{current_key}\t{sum_val / count:.2f}")
 
"""
mapper2.py
* Parses the generated intermediate output file with {StationID_StationName_Month,AvgCO} pairs
* Creates new key with StationID_StationNamefor
the reducer to add up and compute annual average for the station
* The values are the monthand the monthly CO concentration
"""
#!/usr/bin/env python3
import sys
 
for line in sys.stdin:
    line = line.strip()
    if not line or line.startswith("StationID_StationName_Month"):
        continue
    try:
        key, value = line.split("\t")
        station_id, station_name, month_year = key.split("_", 2)
        co_value = float(value)
 
        station_key = f"{station_id}_{station_name}"
        print(f"{station_key}\t{month_year}_{co_value}")
    except:
        continue
 
"""
reducer2.py
• Takes the previously created mapping and creates a list of month/CO_value for every month on a station
• Computes the annual average from each monthly average
• Filter the months over the annual averages and outputs the Station, Month, Monthly Average and Annual Average
"""
#!/usr/bin/env python3
import sys
 
current_station = None
monthly_data = []
sum_val = 0.0
count = 0
 
print("StationID_StationName\tMonth\tAverage CO Concentration\tAnnual Average CO Concentration")
 
def emit_filtered():
    if count == 0:
        return
    annual_avg = sum_val / count
    for data in monthly_data:
        month_year, co_val = data
        if co_val >= annual_avg:
            print(f"{current_station}\t{month_year}\t{co_val:.2f}\t{annual_avg:.2f}")
 
for line in sys.stdin:
    line = line.strip()
    if not line:
        continue
    try:
        key, value = line.split("\t")
        month_year, co_val = value.rsplit("_", 1)
        co_val = float(co_val)
 
        if key == current_station:
            monthly_data.append((month_year, co_val))
            sum_val += co_val
            count += 1
        else:
            if current_station:
                emit_filtered()
            current_station = key
            monthly_data = [(month_year, co_val)]
            sum_val = co_val
            count = 1
    except:
        continue
 
if current_station:
    emit_filtered()
# STEP 1: Monthly average
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming*.jar \
  -input /user/costinchitic/air_quality_input \
  -output /user/costinchitic/intermediate_output \
  -mapper mapper_step1.py \
  -reducer reducer_step1.py

# STEP 2: Annual filtering
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming*.jar \
  -input /user/costinchitic/intermediate_output \
  -output /user/costinchitic/final_output \
  -mapper mapper_step2.py \
  -reducer reducer_step2.py