이야기박스
Chapter 8. Ingestion from databases 본문
이번 챕터에서는 RDB에서 데이터를 추출하는 내용을 다룸
크게 두 가지 방법
1. 전체 테이블 추출
2. DB에서 특정 동작 후, 추출 (filtering, joining, aggregating 등)
# 커넥션
JDBC 드라이버 이용
위 예제처럼 Spark가 DB에 직접 접근하는 것이 아닌 JDBC 드라이버를 통해서 접근
그러므로 JDBC 드라이버의 특징이 고대로 나타남
참고. CHANGE DATA CAPTURE
Change data capture (CDC) is a set of software design patterns used to determine (and track) the data that has changed so that action can be taken using the changed data.
CDC solutions occur most often in data-warehouse environments because capturing and preserving the state of data across time is one of the core functions of a data warehouse, but CDC can be utilized in any database or data repository system.
Although Spark is not designed for data warehousing, CDC techniques can be used from incremental analytics. Overall, adding a last_update column as a timestamp (or similar) is a good practice.
Information in this sidebar was adapted from Wikipedia.
다양한 방법으로 connection 생성하는 방법
1. read().jdbc('url', 'table', properties)
Properties props = new Properties();
props .put( "user" , "root" );
props .put( "password" , "Spark<3Java" );
props .put( "useSSL" , "false" );
Dataset<Row> df = spark .read().jdbc(
"jdbc:mysql://localhost:3306/sakila?serverTimezone=EST" ,
"actor" ,
props );
2. properties in url
String jdbcUrl = "jdbc:mysql://localhost:3306/sakila"
+ "?user=root"
+ "&password=Spark<3Java"
+ "&useSSL=false"
+ "&serverTimezone=EST" ;
Dataset<Row> df = spark .read()
.jdbc( jdbcUrl , "actor" , new Properties());
2. using options chain method and load()
Dataset<Row> df = spark .read()
.option( "url" , "jdbc:mysql://localhost:3306/sakila" )
.option( "dbtable" , "actor" )
.option( "user" , "root" )
.option( "password" , "Spark<3Java" )
.option( "useSSL" , "false" )
.option( "serverTimezone" , "EST" )
.format( "jdbc" )
.load();
어떠한 방식으로해도 lazy하게 동작하기 때문에 동일한 효과를 냄
# dialect
dataframe ~ jdbc driver 사이의 다리와 같은 역할
Note. Spark 3.0 에서 지원되는 dialects. 아래 리스트에 없다면 [SparkPackage] 참조
- IBM Db2
- Apache Derby
- MySQL
- Microsoft SQL Server
- Oracle - PostgreSQL
- Teradata Database
vendor에서 dialect를 지원해주지 않는다면? 만들어라
package net.jgp.books.spark.ch08.lab_200_informix_dialect;
import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MetadataBuilder;
import scala.Option;
public class InformixJdbcDialect extends JdbcDialect {
private static final long serialVersionUID = -672901;
@Override
public boolean canHandle(String url ) {
return url .startsWith( "jdbc:informix-sqli" );
}
@Override
public Option<DataType> getCatalystType(int sqlType ,
String typeName , int size , MetadataBuilder md ) {
if ( typeName .toLowerCase().compareTo( "serial" ) == 0) {
return Option.apply(DataTypes. IntegerType );
}
if ( typeName .toLowerCase().compareTo( "calendar" ) == 0) {
return Option.apply(DataTypes. BinaryType );
}
...
return Option.empty();
}
}
Scala type으로 반환이 필요. 위 케이스에서는 마지막에 empty로 반환하는 것을 볼 수 있음
# Advanced Quries and ingestion
Point 1. 테이블의 모든 row를 가져올 필요가 없는 경우, 일부 데이터만 가져오는 방법은?
--> filtering the data
--> SQL Quries를 통하여 일부 데이터 추출 (가장 위 서론에서 나온 [2. DB에서 특정 동작 후, 추출] 방법)
dataframe = spark.read().format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "( " + {Table Query} + " ) A ") \
.option("driver", "com.mysql.jdbc.Driver") \
.option("user", "storyparks") \
.option("password", "*******") \
.option("fetchSize", fs) \
.load()
Point 2. Joining Table
테이블을 각각 받아서 Dataframe에서 합칠 수도 있지만, 위의 예제처럼 DB에서 처리하는게 훨씬 효율적임.
자세한 예제는 위와 같으므로 생략
Note. SQL 구문은 JDBC 드라이버에 맞추어서 작성
## 성능과 파티션
위와 같이 하나의 파티션으로 데이터를 읽을수도 있고
이처럼 열개의 파티션으로 나눠서 읽을 수도 있음
package net.jgp.books.spark.ch08.lab320_ingestion_partinioning;
...
public class MySQLToDatasetWithPartitionApp {
...
Properties props = new Properties();
props .put( "user" , "root" );
props .put( "password" , "Spark<3Java" );
props .put( "useSSL" , "false" );
props .put( "serverTimezone" , "EST" );
props .put( "partitionColumn" , "film_id" );
props .put( "lowerBound" , "1" );
props .put( "upperBound" , "1000" );
props .put( "numPartitions" , "10" );
Dataset<Row> df = spark .read().jdbc(
"jdbc:mysql://localhost:3306/sakila" ,
"film" ,
props );
...
위 코드처럼 파티션 숫자를 지정할 수 있음
# ElasticSearch
RDBMS와 마찬가지로 ES Driver를 통하여 접근
위 드라이버를 통하여 Elasticsearch와 Spark 사이의 양방향 통신이 가능해짐
package net.jgp.books.spark.ch08.lab400_es_ingestion;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class ElasticsearchToDatasetApp {
public static void main(String[] args ) {
ElasticsearchToDatasetApp app =
new ElasticsearchToDatasetApp();
app .start();
}
private void start() {
long t0 = System.currentTimeMillis();
SparkSession spark = SparkSession.builder()
.appName( "Elasticsearch to Dataframe" )
.master( "local" )
.getOrCreate();
long t1 = System.currentTimeMillis();
System. out .println( "Getting a session took: " + ( t1 - t0 ) + " ms" );
Dataset<Row> df = spark
.read()
.format( "org.elasticsearch.spark.sql" )
.option( "es.nodes" , "localhost" )
.option( "es.port" , "9200" )
.option( "es.query" , "?q=*" )
.option( "es.read.field.as.array.include" , "Inspection_Date" )
.load( "nyc_restaurants" );
long t2 = System.currentTimeMillis();
System. out .println(
"Init communication and starting to get some results took: "
+ ( t2 - t1 ) + " ms" );
df .show(10);
long t3 = System.currentTimeMillis();
System. out .println( "Showing a few records took: " + ( t3 - t2 ) + " ms" );
df .printSchema();
long t4 = System.currentTimeMillis();
System. out .println( "Displaying the schema took: " + ( t4 - t3 ) + " ms" );
System. out .println( "The dataframe contains " +
df .count() + " record(s)." );
long t5 = System.currentTimeMillis();
System. out .println( "Counting the number of records took: " + ( t5 - t4 )
+ " ms" );
System. out .println( "The dataframe is split over " + df .rdd()
.getPartitions(). length + " partition(s)." );
long t6 = System.currentTimeMillis();
System. out .println( "Counting the # of partitions took: " + ( t6 - t5 )
+ " ms" );
}
}
일부 데이터를 가져올때는 많은 메모리를 사용하지 않음
단, count() 호출할때는 전체 데이터 사이즈만큼 메모리 사용
'Computer & Data > Big Data' 카테고리의 다른 글
Chapter 3. The majestic role of the dataframe (0) | 2020.10.22 |
---|---|
Spark Appendix. RDD vs Dataframe vs Dataset (0) | 2020.10.22 |
Chapter 7. Ingestion from files (0) | 2020.08.06 |
Chapter 6. Deploying your simple app (0) | 2020.07.30 |
Chapter 5. Building a simple app for deployment (0) | 2020.07.30 |