This repository contains the code and analysis for the Advanced Database Topics project, conducted by Panagiotis Papastathis and Stavros Mitropoulos. The project leverages Apache Spark and AWS services to perform large-scale analysis on crime data in Los Angeles.
- Project Overview
- Technologies Used
- Data Sources
- Repository Structure
- Analysis & Queries
- Performance Analysis
The primary goal of this project is to analyze a large dataset of crime incidents in Los Angeles to extract meaningful insights. We explore various aspects of the data, including victim demographics, police performance, and the socio-economic factors related to crime. The project emphasizes performance optimization in a distributed environment by comparing different Spark APIs, data formats, and execution configurations.
- Cloud Platform: Amazon Web Services (AWS)
- Data Storage: AWS S3
- Processing Engine: Apache Spark
- Spark APIs: DataFrame API, Spark SQL, RDDs
- Geospatial Analysis: Apache Sedona (formerly GeoSpark)
- Language: Python (PySpark)
All datasets were sourced from public S3 buckets.
- Crime Data: Two CSV files covering incidents from 2010 to the present.
- Police Stations: A CSV file containing locations and details of LAPD stations.
- LA Income Data: A CSV file with estimated median income for 2015 by zip code.
- Census Blocks: A GeoJSON file with 2010 census block geometries and demographic data.
- Victim Descent Codes: A CSV file mapping descent codes to full descriptions.
The project's notebooks are organized by query:
Query 1.ipynb: Analysis of victim age demographics./Query2: Contains notebooks related to police precinct performance.query2_dataframe.ipynb: Implementation using the DataFrame API.query2_sql.ipynb: Implementation using Spark SQL.query2b_upload.ipynb: Script to prepare and upload data for performance tests.q2_calculating_size_of_data.ipynb: Script to calculate the size of data in different formats.
Query 3.ipynb: Geospatial analysis of income and crime rates, with extensive join strategy performance tests.Query 4.ipynb: Analysis of victim profiles in high and low-income areas.Query 5.ipynb: Geospatial analysis of crime proximity to police stations.
The project is structured around five key queries, each designed to answer a specific question.
- Objective: To determine the age distribution of victims of assault-related crimes (crime codes 230, 235, 236).
- Implementation: Victims are categorized into age groups (children, young adults, adults, elders). The query then ranks these groups by the number of victims.
- Analysis: Compares the performance of Spark's high-level DataFrame API against the lower-level RDD API for the same aggregation task.
- Objective: To identify the top three police precincts with the highest "closed case rate" for each year.
- Implementation: Joins crime data with police station data to link incidents to a precinct. It calculates the percentage of cases that are not open (status is not 'IC', 'CC', or 'UNK').
- Analysis:
- Compares the implementation and performance of the DataFrame API versus Spark SQL.
- Investigates the performance impact of data storage formats by saving and reading the intermediate joined data as both
.csvand.parquet.
- Objective: To investigate the relationship between the median income of a community and its crime rate.
- Implementation: This geospatial query uses Apache Sedona to join crime locations, census block geometries, and income data. It calculates the "crimes per person" and "median income per person" for each community.
- Analysis: A significant part of this query involves performance tuning. We systematically tested and timed various Spark join strategies (
Broadcast Hash Join,Shuffle Hash Join,Sort Merge Join,Shuffle-Replicate Nested Loop) to find the most efficient method for both the relational and spatial joins.
- Objective: To analyze and compare the racial/ethnic background of crime victims in the wealthiest and poorest communities.
- Implementation: Building on Query 3, this query identifies the top 3 and bottom 3 communities by median income. It then filters crimes from 2015 within these areas and aggregates the victims by their descent.
- Analysis: Demonstrates the use of a broadcast join to efficiently map victim descent codes to their full descriptions. The query also includes performance tests with different Spark executor configurations.
- Objective: To calculate the average distance between a crime scene and its nearest police station for each police division.
- Implementation: A geospatial query using Sedona that performs a cross-join between all crimes and all police stations. It calculates the spherical distance for each pair, identifies the minimum distance for each crime, and then aggregates the results per division.
- Analysis: The performance impact of cluster configuration is explored by running the query with varying numbers of executors (2, 4, and 8) and measuring the execution time.
A key theme throughout the project is performance optimization and analysis. Our findings include:
- DataFrame/SQL vs. RDD: High-level APIs (DataFrame, SQL) generally offer better performance due to the Catalyst optimizer. For Query 1, the DataFrame API was significantly faster than the RDD-based implementation.
- Parquet vs. CSV: Parquet, as a columnar storage format, consistently outperformed CSV for read-heavy analytical queries. In Query 2, we observed that writing data to Parquet was faster than writing to CSV, and subsequent reads from Parquet were also more performant.
- Join Strategies: The choice of join strategy has a massive impact on performance. For the complex geospatial and relational joins in Query 3, we systematically tested multiple strategies.
Broadcast Joinswere highly effective for joining a large DataFrame with a small one. For large-to-large joins,Sort Merge Join(merge) andShuffle Hash Join(shuffle_hash) were tested, with their performance depending on the specific data characteristics. - Executor Configuration: Scaling out the number of executors can reduce execution time, but the gains are not always linear and depend on the nature of the workload and data partitioning. For instance, in Query 5, increasing the number of executors from 2 to 8 led to a decrease in execution time, demonstrating the benefit of parallelization for computationally intensive tasks like distance calculations. Conversely, in Query 4, performance was benchmarked with 1, 2, and 4 cores, showing improved times with more resources.