Chapter 2. Architecture and flow

2020. 7. 16.


# 개요

Apache Spark의 Mental Model을 코드 예제를 통하여 알아보는 챕터입니다.

예제를 통하여 Spark, 나아가서 Big Data 처리에서 Mental Model이 어떤지 확인해 보도록 합시다. 


## 참고, 멘탈 모델이란?


멘탈 모델이란 어떠한 목적을 달성하기 위한 과정을 도식화하는 것으로 이해하면 될 것 같습니다.


책에서 소개하는 Spark에서 Mental Model은 다음과 같습니다.

In terms of software, a mental model is a conceptual map that you can use to plan, predict, diagnose, and debug your applications. 
Big Data :  overall architecture, flows, and terminology


# Summary

  • Application == Driver 입니다. 데이터는 Driver로 들어가지 않고 원격의 Worker로 들어가게 됩니다.
  • Driver는 Master와 세션(SparkSession)을 생성합니다. 세션은 Worker Node의 Life Cycle을 정의하게 됩니다.
  • Master는 Remote Cluster 이외에도 Local 모드로 실행할 수 있습니다. Local 모드로 실행하면 클러스터 구성이 필요하지 않아 간편한 개발환경을 제공해줍니다.
  • 데이터는 Worker Node에 Partition에 분할되어 처리됩니다. 여기서 Partition은 각 Worker Node Memory에 위치하게 됩니다.
  • Spark는 작업 요청이 있어야만 동작하는 lazy pattern을 지향합니다. (Chapter 4)
  • Spark API는 Chaining Method가 많습니다. (NPE 주의)


# 예제를 통한 Mental model 학습

아래의 CSV to RDB ; ETL 프로세스를 통하여 Spark의 전체 아키텍처를 살펴보도록 하겠습니다.

목표 : csv 파일에서 제공하는 성과 이름을 합쳐서 풀네임을 RDB에 저장


## Sample CSV

Maréchal,Pierre Sylvain


Abstract Mental Model


## Java code

public class CsvToRelationalDatabaseApp {

   * main() is your entry point to the application.
   * @param args
  public static void main(String[] args) {
    CsvToRelationalDatabaseApp app = new CsvToRelationalDatabaseApp();

   * The processing code.
  private void start() {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("CSV to DB")

    // Step 1: Ingestion
    // ---------

    // Reads a CSV file with header, called authors.csv, stores it in a
    // dataframe
    Dataset<Row> df = spark.read()
        .option("header", "true")

    // Step 2: Transform
    // ---------

    // Creates a new column called "name" as the concatenation of lname, a
    // virtual column containing ", " and the fname column
    df = df.withColumn(
        concat(df.col("lname"), lit(", "), df.col("fname")));

    // Step 3: Save
    // ---------

    // The connection URL, assuming your PostgreSQL instance runs locally on
    // the
    // default port, and the database we use is "spark_labs"
    String dbConnectionUrl = "jdbc:postgresql://localhost/spark_labs";

    // Properties to connect to the database, the JDBC driver is part of our
    // pom.xml
    Properties prop = new Properties();
    prop.setProperty("driver", "org.postgresql.Driver");
    prop.setProperty("user", "jgp");
    prop.setProperty("password", "Spark<3Java");

    // Write in a table called ch02
        .jdbc(dbConnectionUrl, "ch02", prop);

    System.out.println("Process complete");


## Step 0. Master 연결

Driver(Application)는 Master와 연결하여 SparkSession을 맺습니다.

아래 예시는 local 모드로 접속을 하는 예시이고, 다른 master 사용하는 내용은 Chapter 5,6에서 다루게 됩니다.

SparkSession spark = SparkSession.builder()
         .appName( "CSV to DB" )
         .master( "local" )


Step 0. SparkSession



## Step 1. Ingesting (Extract)

ETL의 Extract, 즉 데이터를 추출하는 과정입니다.

Loading, Ingesting, Reading 등등.. 비슷한 의미의 용어로 많이 불리기도 합니다.


Dataset<Row> df = spark .read()
         .format( "csv" )
         .option( "header" , "true" )
         .load( "data/authors.csv" );

Spark에서 데이터는 클러스터의 Workers(Slaves)라 불리는 노드에서 분할하여 읽을 수 있습니다.


Step 1. Ingestion

위와 같이 분산되어 파일을 읽으려면 원본 파일이 분산 파일 시스템 혹은 공유 파일 시스템에 파일이 위치하여야 합니다. 그렇지 않으면 단일 노드에서 파일을 읽고 처리하게 됩니다.


만약 분산으로 처리하게 되는 경우, 각 워커별 할당량은 워커 노드의 메모리에 의해 결정됩니다.


Step 1-1. Create Tasks



이후, 데이터를 각 Task에서 읽고 특정 파티션에 저장을 하게 됩니다.


Step 1-2. Partitioning

[R > P] : Record to Partition

참고로 위 그림의 [Rec 1]은 csv 파일의 첫 번째 row인 'Pascal, Blaise'입니다.


여러 DataSet을 합치는 것은 비용이 꽤 큼. 초기에 데이터를 분할하여 가져올 수 있다면 효율적으로 돌 수 있음

* DataSet 통합 : Chapter 12, 13
* Repartition : Chapter 17


## Step 2. Transform

ETL의 Transform, 데이터 가공을 하는 과정입니다.

아래는 'lname', 'fname'이라는 필드를 합쳐서 Fool Name을 출력하는 Transform 작업을 하는 코드입니다.

df = df .withColumn("name" ,concat( df .col( "lname" ), lit( ", " ), df .col( "fname" )));


Step 2. Transforming


Spark is Lazy
동작의 최적화를 위해서 Lazy 한 코드가 많습니다. Chapter 4에서 다시 나오니 그때 자세하게 살펴보도록 합시다.


## Step 3. Save (Loading)

ETL의 Load, 데이터 저장을 하는 과정입니다.

Transforming된 데이터를 postgresql에 저장하는 코드입니다.

String dbConnectionUrl = "jdbc:postgresql://localhost/spark_labs" ;
     Properties prop = new Properties();
     prop .setProperty( "driver" , "org.postgresql.Driver" );
     prop .setProperty( "user" , "jgp" );
     prop .setProperty( "password" , "Spark<3Java" );
     df .write()
         .mode(SaveMode. Overwrite )
         .jdbc( dbConnectionUrl , "ch02" , prop );

Step 3. Saving

[P > D] : Partition to Database


Question. returns to the application: no data has ever been transferred from the worker to the application.
==> write하고 flush 되어서 메모리에 데이터가 없어지는 건가??



## 전체 프로세스 정리

  • Dataset은 Application(Driver)에 들어가는 것이 아닌 Worker의 Partition으로 들어가게 됩니다.
  • 모든 프로세스는 Worker에서 돌아가게 됩니다.
  • Task 숫자만큼 DB에 Connection이 생성이 될 텐데, 이를 컨트롤하기 위한 수단은 Chapter 17에서 배울 수 있습니다.


# 참고, Python 및 Scala 코드

## Python

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os

current_dir = os.path.dirname(__file__)
relative_path = "../../../../data/authors.csv"
absolute_file_path = os.path.join(current_dir, relative_path)

# Creates a session on a local master
spark = SparkSession.builder.appName("CSV to DB").master("local").getOrCreate()

#  Step 1: Ingestion
#  ---------
#  Reads a CSV file with header, called authors.csv, stores it in a dataframe
df = spark.read.csv(header=True, inferSchema=True, path=absolute_file_path)

# Step 2: Transform
# ---------
# Creates a new column called "name" as the concatenation of lname, a
# virtual column containing ", " and the fname column
df = df.withColumn("name", F.concat(F.col("lname"), F.lit(", "), F.col("fname")))

# Step 3: Save
# ----
# The connection URL, assuming your PostgreSQL instance runs locally on the
# default port, and the database we use is "spark_labs"
dbConnectionUrl = "jdbc:postgresql://localhost/spark_labs"

# Properties to connect to the database, the JDBC driver is part of our pom.xml
prop = {"driver":"org.postgresql.Driver", "user":"jgp", "password":"Spark<3Java"}

# Write in a table called ch02
df.write.jdbc(mode='overwrite', url=dbConnectionUrl, table="ch02", properties=prop)

# Good to stop SparkSession at the end of the application

## Scala

  def main(args: Array[String]): Unit = {

    // Creates a session on a local master
    val spark = SparkSession.builder
      .appName("CSV to DB")

    // Step 1: Ingestion
    // ---------
    // Reads a CSV file with header, called authors.csv, stores it in a
    // dataframe
    var df = spark.read.format("csv")
      .option("header", "true")

    // Step 2: Transform
    // ---------
    // Creates a new column called "name" as the concatenation of lname, a
    // virtual column containing ", " and the fname column
    df = df.withColumn("name", concat(col("lname"), lit(", "), col("fname")))

    // Step 3: Save
    // ----
    // The connection URL, assuming your PostgreSQL instance runs locally on
    // the
    // default port, and the database we use is "spark_labs"
    val dbConnectionUrl = "jdbc:postgresql://localhost/spark_labs"

    // Properties to connect to the database, the JDBC driver is part of our
    // pom.xml
    val prop = new Properties
    prop.setProperty("driver", "org.postgresql.Driver")
    prop.setProperty("user", "jgp")
    prop.setProperty("password", "Spark<3Java")

    // Write in a table called ch02
    df.write.mode(SaveMode.Overwrite).jdbc(dbConnectionUrl, "ch02", prop)

    // Good to stop SparkSession at the end of the application

    println("Process complete")