Skip to main content

Command Palette

Search for a command to run...

Real-Time Data Engineering: Streaming Unstructured Data to AWS with Apache Spark

Updated
15 min read
Real-Time Data Engineering: Streaming Unstructured Data to AWS with Apache Spark

Introduction

In the era of big data, organizations generate vast amounts of unstructured data in formats such as JSON, CSV, and TXT files. Processing and analyzing this data efficiently is crucial for real-time decision-making. This project demonstrates an end-to-end data streaming pipeline using Apache Spark Streaming, Docker, and AWS services to handle unstructured data seamlessly. The pipeline ingests JSON, CSV, and TXT files via Spark Streaming, processes them in a distributed Spark cluster, and stores them in an S3 Data Lake. An AWS Glue Crawler extracts metadata and updates the Glue Data Catalog, enabling Athena to query the data efficiently. The goal is to demonstrate how data engineers can automate real-time processing of unstructured data and seamlessly integrate it with AWS services for storage, metadata management, and analytics.

Architecture

About Unstructured Data

Unstructured data refers to information that does not have a predefined schema or organized format, making it difficult to store, process, and analyze using traditional relational databases. Unlike structured data, which is neatly arranged in rows and columns (e.g., SQL databases), unstructured data lacks a consistent structure and can exist in various formats such as text, images, videos, audio, and logs. With the rapid growth of data-driven applications, organizations generate massive volumes of unstructured data from social media, IoT sensors, emails, customer reviews, and machine logs. Data engineers play a crucial role in extracting valuable insights from this data by designing scalable pipelines that ingest, process, and analyze it efficiently. Big data technologies such as Apache Spark, Hadoop, and AWS services help process unstructured data and transform it into a usable format for analytics and decision-making.

Unstructured data accounts for nearly 80-90% of the world’s data and continues to grow exponentially. Some real-world examples include:

  • Text Data: Emails, chat messages, customer feedback, PDFs, and reports.

  • Multimedia: Images, videos, and audio recordings from platforms like YouTube or surveillance systems.

  • Logs and Machine Data: Server logs, application logs, IoT sensor data, and cybersecurity logs.

  • Social Media Content: Tweets, Facebook posts, Instagram captions, and TikTok videos.

Here in this project, I am using unstructured data sources as source which include text files, and some semistructured data including CSV and JSON files. The examples of files in the source are shown below:

  1. TXT file

     Pr111
     Product: Parle-G Biscuit
    
     Category: Food
    
     Costs Rs.20
    
     Best From 2024-05-01 to 2025-05-01
    

    About 100 such files are generated in the source by using a simple Python script as shown below:

     import random
     import datetime
    
     def generate_random_product(index):
         categories = {
             "Food": [
                 "Parle-G Biscuit", "Dairy Milk Chocolate", "Lay's Chips", "Oreo Cookies", "Maggie Noodles", 
                 "Kellogg's Corn Flakes", "KitKat", "Bournvita", "Tropicana Juice", "Pepsi"
             ],
             "Personal Care": [
                 "Colgate Toothpaste", "Dove Shampoo", "Lifebuoy Soap", "Nivea Cream", "Vaseline Lotion", 
                 "Garnier Face Wash", "Old Spice Deodorant", "Himalaya Face Pack", "Gillette Razor", "Head & Shoulders Shampoo"
             ],
             "Footwear": [
                 "Nike Running Shoes", "Adidas Sneakers", "Puma Sandals", "Reebok Sports Shoes", "Skechers Loafers", 
                 "Woodland Boots", "Bata Formal Shoes", "Crocs Slippers", "Fila Trainers", "New Balance Sneakers"
             ],
             "Electronics": [
                 "Samsung Galaxy S21", "Apple iPhone 14", "Sony Headphones", "JBL Bluetooth Speaker", "Dell Laptop", 
                 "HP Printer", "LG Smart TV", "Canon DSLR Camera", "Fitbit Smartwatch", "Logitech Keyboard"
             ]
         }
    
         category = random.choice(list(categories.keys()))
         product = random.choice(categories[category])
         product_code = f"Pr{1000 + index}"
         cost = random.randint(1000, 10000)  # Random cost between Rs.10 and Rs.500
    
         start_date = datetime.date.today() + datetime.timedelta(days=random.randint(0, 365))
         end_date = start_date + datetime.timedelta(days=365)
    
         content = f"""
     {product_code}
     Product: {product}
    
     Category: {category}
    
     Costs Rs.{cost}
    
     Best From {start_date} to {end_date}
     """.strip()
    
         return content
    
     def save_to_file(content, filename):
         with open(filename, "w") as file:
             file.write(content)
         print(f"File '{filename}' has been created with random product details.")
    
     if __name__ == "__main__":
         for i in range(1, 101):  # Generate 100 files with unique product codes
             product_details = generate_random_product(i)
             file_path = f"./source_files/txt_source/product_details_{i}.txt"
             save_to_file(product_details, file_path)
    
  1. CSV file

     p_id,p_name,p_category,p_price,p_created_date,p_expiry_date
     Pr1,Power Motors,Electronics,2500,2025-01-01,2025-09-30
    
  2. JSON file

     {
         "p_id": "Pr11",
         "p_name": "Gaming Mouse",
         "p_category": "Computer",
         "p_price": 1000,
         "p_created_date": "2023-09-10",
         "p_expiry_date": "2026-05-10"
     }
    

About Apache Spark and Spark Streaming

Apache Spark is an open-source, distributed computing framework designed for fast and scalable big data processing. Unlike traditional batch processing systems, Spark performs computations in memory, significantly improving performance. It follows a Master-Slave architecture, where a Spark Master manages the cluster, and multiple Spark Workers (Executors) execute tasks in parallel. Spark Streaming is a real-time data processing engine that enables micro-batch processing of streaming data. It can ingest data from various sources like Kafka, Flume, AWS S3, and file systems, process it using Spark’s DAG (Directed Acyclic Graph) execution model, and output results to databases, dashboards, or cloud storage making it ideal for log analysis, fraud detection and IOT applications

To simplify Spark deployment, Docker is used to containerize Spark components, ensuring portability, scalability, and ease of configuration. Using Docker Compose, we define Spark’s cluster setup, including one master node and three worker nodes, making the environment easily replicable across different systems. The docker-compose file for spark configuration is shown below:

version: '3.8'

x-spark-common: &spark-common
  image: bitnami/spark:3.5.1
  volumes:
    - ./jobs:/opt/bitnami/spark/jobs
  command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
  depends_on:
    - spark-master
  environment:
    SPARK_MODE: worker
    SPARK_WORKER_CORES: 1
    SPARK_WORKER_MEMORY: 1g
    SPARK_MASTER_URL: spark://spark-master:7077
  networks:
    - spark-network
  restart: always

services:
  spark-master:
    image: bitnami/spark:3.5.1
    volumes:
      - ./jobs:/opt/bitnami/spark/jobs
    command: bin/spark-class org.apache.spark.deploy.master.Master
    ports:
      - "9090:8080"
      - "7077:7077"
    networks:
      - spark-network
    restart: always

  spark-worker-1:
    <<: *spark-common

  spark-worker-2:
    <<: *spark-common

  spark-worker-3:
    <<: *spark-common

networks:
  spark-network:
    driver: bridge

The provided Docker Compose file configures Spark with:

  • Spark Master: Runs on port 7077 and manages the cluster.

  • Three Spark Workers: Each with 1 CPU core and 1GB memory (SPARK_WORKER_CORES: 1, SPARK_WORKER_MEMORY: 1g).

  • Volumes: Mounted to share Spark jobs (./jobs:/opt/bitnami/spark/jobs). This jobs folder includes the actual spark job which will read the unstructured data from source files, parse them, and store them in the AWS S3 data lake.

  • Network Bridge: spark-network enables inter-coAntainer communication.

  • Auto Restart: Ensures the services restart automatically in case of failure.

About AWS Services Involved

Amazon S3 as a Data Lake for Streaming Unstructured Data**

Amazon Simple Storage Service (S3) is a highly scalable and durable object storage service that serves as the foundation for building data lakes in AWS. A data lake is a centralized repository that allows storing structured, semi-structured, and unstructured data at any scale. In this project, Spark Streaming reads unstructured files (CSV, JSON, TXT), processes them into structured data using Spark DataFrames, and writes the output as Parquet files to S3, making it ready for analytics.

For configuring S3 as a data lake, firstly new access keys were set up using AWS IAM. The following were the steps taken

  1. Firstly navigate to AWS Console and then search for IAM

  2. Then select Users > user_name > Create Access Key. You will be redirected to the screen as shown below

    Select the command line interface and tick on confirmation, this will then create new access keys and you will be redirected to a new screen as shown below

    Here you can copy the access key and secret access key and store them in a safe location as these keys will later be used for accessing S3 buckets for storing the parsed data. Alternatively, the keys can also be downloaded as a CSV file.

    The next step is creating a new S3 bucket which will be used to store the parsed data.

    1. For this search for AWS S3 and navigate to AWS S3. Click on Create New Bucket.

    2. Here configure the bucket as shown below, choose an appropriate name that should be globally unique.

Initially, the S3 bucket will be empty, but once we run the spark job, this bucket will be populated by folders like checkpoints and data which is dynamically created by the spark job.

AWS Glue

AWS Glue is a fully managed ETL (Extract, Transform, Load) service that enables seamless data discovery, cataloging, and transformation in the AWS ecosystem. In this project, Glue plays a critical role in making the unstructured data queryable and analyzable after being processed by Spark Streaming and stored in Amazon S3 as Parquet files.

Key Components of AWS Glue in This Project

  1. Glue Crawler

    • A Glue Crawler scans the S3 bucket detects new Parquet files, and extracts metadata such as column names, data types, and table structure.

    • It automatically updates the Glue Data Catalog, ensuring that any new or updated data is discoverable.

    • The crawler runs periodically to detect new files added to S3 as part of the Spark Streaming pipeline.

  2. Glue Data Catalog

    • Acts as a centralized metadata repository, storing schema details for all discovered datasets.

    • Enables seamless integration with AWS Athena, Redshift, and other AWS analytics services.

    • Provides a unified view of structured data extracted from unstructured files stored in S3.

  3. Glue Database and Tables

    • The Glue Crawler creates a new table inside a designated Glue Database based on the schema detected in the S3 data lake.

    • Each time new files arrive in S3, the table is automatically updated to reflect the changes.

    • The Glue database acts as a logical grouping for related datasets, making it easy to manage and query structured data.

    • Here the crawler will create a table named bronze.

To configure AWS Glue, the following steps are taken

  1. Firstly search for AWS Glue in the IAM console and navigate to Glue

  2. Then navigate to the Crawlers page to create a new glue crawler

  3. Configure a new crawler as shown below.

    Here we need to add the S3 bucket path of the source folder as the data source as shown below.

    Also, we need to create a new IAM role to allow the crawler to create a new table.

    After creating IAM role, we also have to create new glue database to store the tables as shown below.

    Once the crawler is created, we can run the crawler to crawl the data from the S3 data lake and to create a new table in the AWS Glue database.

AWS Athena

AWS Athena is a serverless, interactive query service that allows users to analyze structured data stored in Amazon S3 using standard SQL. In this project, Athena plays a crucial role in querying the processed unstructured data after it has been streamed, structured, and cataloged by Apache Spark and AWS Glue.

After the Glue crawler creates a new table, AWS Athena can be used to query this newly created table. But before querying we need to do some configuration in AWS Athena.

For this navigate to AWS Athena by searching it in AWS Management console. Here we need to configure query result location where we configure S3 location to store query results.

End-to-End Workflow Explanation

The end-to-end workflow for this project follows these steps:

  1. Uploading Source Files

    • Firstly, Input files (TXT, CSV, JSON) are uploaded into designated S3 source folders.

  1. Spark Job Processing

    • Spark reads and processes these files using appropriate methods:

      • CSV & JSON: Directly loaded into Spark DataFrames.

      • TXT Files: Processed using regular expressions and user-defined functions (UDFs) to extract required fields.

    • For example, UDFs extract details like product_id, product_name, category, price, and dates.

Spark Job Code

    from pyspark.sql import SparkSession, DataFrame
    from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, DateType
    from pyspark.sql.functions import udf, regexp_replace
    from config.config import configuration

    from datetime import datetime
    import re

    def extract_p_id(file_content):
        file_content = file_content.strip()
        # First word of the file is the product id
        match = re.search(r'^(\w+)', file_content)
        p_id =  match.group(1) if match else None
        return p_id

    def extract_p_name(file_content):
        # search for Product: <product_name>
        match = re.search(r'Product:\s*(.+)', file_content)
        p_name = match.group(1).strip() if match else None
        return p_name

    def extract_p_category(file_content):
        # search for Category: <product_category>
        match = re.search(r'Category:\s*(.+)', file_content)
        p_category = match.group(1).strip() if match else None
        return p_category

    def extract_p_price(file_content):
        # Search for Costs Rs.<price>
        match = re.search(r'Costs\s*Rs\.(\d+)', file_content)
        price = float(match.group(1)) if match else None
        return price

    def extract_dates(file_content):
        match = re.search(r'Best From\s*(\d{4}-\d{2}-\d{2})\s*to\s*(\d{4}-\d{2}-\d{2})', file_content)
        if match:
            start_date = datetime.strptime(match.group(1), '%Y-%m-%d')
            end_date = datetime.strptime(match.group(2), '%Y-%m-%d')
            return start_date, end_date
        return None, None

    def define_udfs():
        return {
            'extract_p_id_udf': udf(extract_p_id, StringType()),
            'extract_p_name_udf': udf(extract_p_name, StringType()),
            'extract_p_category_udf': udf(extract_p_category, StringType()),
            'extract_p_price_udf': udf(extract_p_price, DoubleType()),
            'extract_dates_udf': udf(extract_dates, StructType([
                StructField('p_created_date', DateType(), True),
                StructField('p_expiry_date', DateType(), True)
            ])),
        }

    def streamWriter(input: DataFrame, checkpointFolder, output):
        return (
            input
            .writeStream
            .outputMode('append')
            .format('parquet')
            .option('path', output)
            .option('checkpointLocation', checkpointFolder)
            .trigger(processingTime='5 seconds')
            .start()
        )

    if __name__ == '__main__':
        # Initialize the Spark session
        spark = (
            SparkSession.builder.appName('Unstructured_Data_Streaming_Spark_AWS')
            .config("spark.jars.packages", 
                    'org.apache.hadoop:hadoop-aws:3.3.1,'
                    'com.amazonaws:aws-java-sdk:1.11.469')
            .config('spark.hadoop.fs.s3a.impl','org.apache.hadoop.fs.s3a.S3AFileSystem')
            .config('spark.hadoop.fs.s3a.access.key',configuration.get('AWS_ACCESS_KEY'))
            .config('spark.hadoop.fs.s3a.secret.key',configuration.get('AWS_SECRET_KEY'))
            .config('spark.hadoop.fs.s3a.aws.credentials.provider','org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
            .getOrCreate()
        )

        # Variables for source files
        txt_src_dir = 'file:///opt/bitnami/spark/jobs/source_files/txt_source'
        csv_src_dir = 'file:///opt/bitnami/spark/jobs/source_files/csv_source'
        json_src_dir = 'file:///opt/bitnami/spark/jobs/source_files/json_source'

        # define data schema
        # p_id,p_name,p_category,p_price,p_create_date,p_expiry_date
        data_schema = StructType([
            StructField('p_id', StringType(), True),
            StructField('p_name', StringType(), True),
            StructField('p_category', StringType(), True),
            StructField('p_price', DoubleType(), True),
            StructField('p_created_date', DateType(), True),
            StructField('p_expiry_date', DateType(), True),
        ])

        # define udfs
        udfs = define_udfs()

        # reading from a file stream
        txt_file_df = (
            spark.readStream
            .format('text')
            .option('wholetext','true')
            .load(txt_src_dir)
        )

        txt_file_df = txt_file_df.withColumn('p_id', regexp_replace(udfs['extract_p_id_udf']('value'), r'\r',' '))
        txt_file_df = txt_file_df.withColumn('p_name', udfs['extract_p_name_udf']('value'))
        txt_file_df = txt_file_df.withColumn('p_category', udfs['extract_p_category_udf']('value'))
        txt_file_df = txt_file_df.withColumn('p_price', udfs['extract_p_price_udf']('value'))
        txt_file_df = txt_file_df.withColumn('p_create_date', udfs['extract_dates_udf']('value').getField('p_created_date'))
        txt_file_df = txt_file_df.withColumn('p_expiry_date', udfs['extract_dates_udf']('value').getField('p_expiry_date'))

        txt_parsed_df = txt_file_df.select('p_id', 'p_name', 'p_category', 'p_price', 'p_create_date', 'p_expiry_date')

        # reading from json source
        json_df = (
            spark.readStream
            .json(json_src_dir,schema=data_schema,multiLine=True)
        )

        # reading from csv source
        csv_df = (
            spark.readStream
            .format('csv')
            .option('header', 'true')
            .schema(data_schema)
            .load(csv_src_dir)
        )

        # union all the dataframes
        src_union_df = txt_parsed_df.union(json_df).union(csv_df)

        # to write the dataframe into s3 data lake in parquet format
        query = streamWriter(src_union_df, checkpointFolder='s3a://anish-shilpakar-bucket/checkpoints', output='s3a://anish-shilpakar-bucket/data/spark_unstructured/bronze')

        #to display the read stream in console as output
        # query = (
        #     src_union_df
        #     .writeStream
        #     .outputMode('append')
        #     .format('console')
        #     .option('truncate', False)
        #     .start()
        # ) 

        query.awaitTermination()

        spark.stop()

This Spark application is designed to process unstructured text, CSV, and JSON files, extract meaningful information, and store the structured data in AWS S3 as Parquet files. Code Explanation

  1. Importing Required Libraries:

    • Uses pyspark.sql for Spark DataFrames, UDFs, and schema definitions.

    • Imports re and datetime for text processing and date parsing.

  2. Text Processing Functions:

    • Defines UDFs (User-Defined Functions) using Regular Expressions (RegEx) to extract:

      • p_id: Extract the product ID (first word).

      • p_name: Extract the product name (after "Product:").

      • p_category: Extract the product category (after "Category:").

      • p_price: Extract the product price (after "Costs Rs.").

      • p_created_date & p_expiry_date: Extract validity dates (after "Best From").

  3. Spark Streaming Initialization:

    • Creates a SparkSession with AWS S3 configurations for accessing the data lake.

    • Uses credentials from a config file (configuration.get('AWS_ACCESS_KEY')).

    • The configuration.py stores required access keys for S3 buckets as shown below

        configuration = {
            'AWS_ACCESS_KEY': '...',
            'AWS_SECRET_KEY': '...'
        }
      
  4. Reading Input Files (TXT, CSV, JSON) as Streams:

    • Text Files: Read as raw text and processed using UDFs.

    • JSON & CSV: Read using Spark's built-in schema inference.

  5. Data Transformation:

    • Extracts required fields from TXT files using UDFs.

    • Ensures data consistency by applying the same schema across JSON, CSV, and parsed TXT data.

    • Combines (unions) data from all sources into a single data frame.

  6. Streaming Data to S3:

    • Uses writeStream to continuously write the structured data into AWS S3 in Parquet format.

    • A checkpoint directory is maintained in S3 for fault tolerance.

  7. Execution and Monitoring:

    • The query runs continuously, processing new files every 5 seconds (trigger(processingTime='5 seconds')).

    • Optionally, data can be displayed in the console for debugging.

    • The spark session is stopped after execution.

  1. Executing the Spark Job

    To run the job locally in local spark

    spark-submit main.py

    To run the job inside a docker-based distributed spark cluster

    docker exec -it docker_setup-spark-master-1 spark-submit --master spark://spark-master:7077 jobs/main.py

    Once the spark job starts running, we can see that it reads the source files, creates dataframes and start streaming the results as parquet files in the s3 data lake.

    On inspecting logs, we can see results like shown below:

    Once reading all files is completed and when no new files are added the logs show that the query has been idle as shown below

  2. Inspecting S3 Storage

    The Spark job creates two folders in S3:

    • Checkpoints Folder:

      • Stores metadata required for Spark Structured Streaming to track processed data and ensure fault tolerance.

      • Contains subfolders like:

        • commits/: Stores commit logs for processed batches.

        • metadata/: Maintains streaming query metadata.

        • offsets/: Tracks data read offsets to prevent duplicate processing.

        • sources/: Stores source-related metadata for tracking input data streams.

    • Data Folder (Bronze Layer): Stores processed Parquet files.

      • Contains the processed and structured data in Parquet format stored in an S3 Data Lake.

        • The Bronze layer holds raw ingested data before further transformations.

        • Includes _spark_metadata/ for tracking written files and multiple .parquet files containing structured records.

  1. Running AWS Glue Crawler

    Once the parquet files are created and stored by the spark job inside the S3 data lake, we will now run the Glue crawler which we have previously configured

    Once the Glue crawler run is completed, we can check the tables in glue database and we can see that a new table has been created.

  2. Querying with AWS Athena

    Finally, AWS Athena can be used to query the Glue table for structured data analysis.

    We can see the query runs successfully and we obtain the desired result

Test Real-Time Streaming

To test whether the real-time streaming is working properly or not, we keep the spark job running by running the docker container and then adding a new file inside one of the source folders. Running spark as a docker container and checking spark UI.

Newly added CSV file with 5 new records

Now if we check the logs of the spark job we can see that it is reading the content from this new file, parsing the content, and storing the resulting parquet files inthe S3 bucket

By inspecting the S3 data lake we can see the newly added file by checking the last modified column

Then we run the glue crawler to update the Glue database

Once the Glue crawler succeeds, we then again query the glue table like before, and this time we can see that the count has increased from 105 to 110 indicating the addition of new records from the newly added CSV file

Conclusion

This project showcases how Apache Spark, AWS S3, Glue, and Athena can work together to build a real-time streaming pipeline for processing unstructured data. We enable seamless data analysis by continuously ingesting TXT, CSV, and JSON files, transforming them into a structured format, and storing them efficiently in Parquet on S3. With Glue crawlers automating schema detection and Athena providing instant querying, this setup ensures a smooth, scalable, and efficient data processing workflow. Whether for real-time analytics, reporting, or further transformations, this approach lays a solid foundation for handling streaming data in the cloud. This project can now be further enhanced to transform the data in the Bronze layer, store processed data in a data warehouse like Redshift, and integrate with BI tools like AWS QuickSight or Power BI for deeper insights and visualization.

References

  1. (3798) End to End Realtime Streaming with Unstructured Data | Get Hired as an Experienced Data Engineer - YouTube

  2. Welcome to AWS Documentation

More from this blog

Anish Shilpakar

15 posts

Passionate Developer and Enthusiastic Learner. I am writing about all new things I learned as I move forward in my developer journey.