GCP 교육 2일차 - DataFlow

박스님 2019. 2. 20. 15:22
# Elastic Data Processing
Streaming 데이터 같은 경우는, 시간에 대한 데이터 사이즈 변동이 심함.
==> 필요 컴퓨팅 리소스도 변동이 있음
==> Elastic 하다고 함

# Pipeline 구성 요소
  • Source
  • Transform (Filtering)
  • Sink
  • PCollection (Parrallel Collection)
    • 파이프라이에서 움직이는 데이터

# What is Dataflow?
Autoscaling data processing pipelines

# Apache Beam

Dataflow는 Apche Beam을 사용할 수 있는 공간이라고 생각하면 쉬움

# java code samples
  • ParDo ; Parralle Do
  • run() ; Lazy Evaluation --> Resource 할당 정보 제공

# ITL ; Ingest, Transform, Load
ETL과 같은 의미

  • windowed : 일정 간격을 규정. (시간이라던가, 사이즈라던가) --> tcp windowing과 비슷한 개념

# Dataflow Concept

# Pipeline


# PCollection

  • "Length" : Transform 이름 (Step)
  • new DoFn< input type, output type >


# Ingesting
Source에서 데이터 가져오는 것

# Load

# Executing


# Dataflow 러너 엔진 언어
  • Apache Beam에서 지원되는 언어는 모두 지원될 것으로 예상됨

# Key-Value Pair

# Map & FlatMap

# ParDo Parallel Processing

# Example
( Code )

* Copyright (C) 2016 Google Inc.
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.

package com.google.cloud.training.dataanalyst.javahelp;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;

* A dataflow pipeline that finds Java packages on github that are: (a) used a
* lot by other projects (we count the number of times this package appears in
* imports elsewhere) (b) needs help (count the number of times this package has
* the words FIXME or TODO in its source)
* @author vlakshmanan
public class JavaProjectsThatNeedHelp {

    private static final int TOPN = 1000; // how many packages to write out

    public static interface MyOptions extends PipelineOptions {
        @Description("Output prefix")
        String getOutputPrefix();

        void setOutputPrefix(String s);

    public static void main(String[] args) {
        MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
        Pipeline p = Pipeline.create(options);

        String javaQuery = "SELECT content FROM [fh-bigquery:github_extracts.contents_java_2016]";
        PCollection<String[]> javaContent = p.apply("GetJava", BigQueryIO.read().fromQuery(javaQuery)) //
                .apply("ToLines", ParDo.of(new DoFn<TableRow, String[]>() {
                    public void processElement(ProcessContext c) throws Exception {
                        TableRow row = c.element();
                        String content = (String) row.get("content");
                        if (content != null) {
                            String[] lines = content.split("\n");
                            if (lines.length > 0) {

        // packages that need help
        PCollectionView<Map<String, Integer>> packagesThatNeedHelp = javaContent
                .apply("NeedsHelp", ParDo.of(new DoFn<String[], KV<String, Integer>>() {

                    public void processElement(ProcessContext c) throws Exception {
                        String[] lines = c.element();
                        String[] packages = parsePackageStatement(lines);
                        int numHelpNeeded = countCallsForHelp(lines);
                        if (numHelpNeeded > 0) {
                            for (String packageName : packages) {
                                c.output(KV.of(packageName, numHelpNeeded));

                })) //
                .apply(Sum.integersPerKey()) // package -> number-of-help-wanted
                .apply("ToView", View.asMap());

        // packages in terms of use and which need help
        javaContent //
                .apply("IsPopular", ParDo.of(new DoFn<String[], KV<String, Integer>>() {
                    public void processElement(ProcessContext c) throws Exception {
                        String[] lines = c.element();
                        String[] packages = parseImportStatement(lines);
                        for (String packageName : packages) {
                            c.output(KV.of(packageName, 1)); // used once
                })) //
                .apply(Sum.integersPerKey()) // package -> number-of-uses
                .apply("CompositeScore", ParDo //
                        .of(new DoFn<KV<String, Integer>, KV<String, Double>>() {

                            public void processElement(ProcessContext c) throws Exception {
                                String packageName = c.element().getKey();
                                int numTimesUsed = c.element().getValue();
                                Integer numHelpNeeded = c.sideInput(packagesThatNeedHelp).get(packageName);
                                if (numHelpNeeded != null) {
                                    // multiply to get composite score
                                    // log() because these measures are subject to tournament effects
                                    c.output(KV.of(packageName, Math.log(numTimesUsed) * Math.log(numHelpNeeded)));

                        }).withSideInputs(packagesThatNeedHelp)) //
                .apply("Top_" + TOPN, Top.of(TOPN, new KV.OrderByValue<>())) //
                .apply("ToString", ParDo.of(new DoFn<List<KV<String, Double>>, String>() {

                    public void processElement(ProcessContext c) throws Exception {
                        List<KV<String, Double>> sorted = new ArrayList<>(c.element());
                        Collections.sort(sorted, new KV.OrderByValue<>());
                        StringBuffer sb = new StringBuffer();
                        for (KV<String, Double> kv : c.element()) {
                            sb.append(kv.getKey() + "," + kv.getValue() + '\n');

                })) //


    protected static String[] parseImportStatement(String[] lines) {
        final String keyword = "import";
        List<String> result = new ArrayList<>();
        for (String line : lines) {
            if (line.startsWith(keyword)) {
                result.addAll(getPackages(line, keyword));
        return result.toArray(new String[0]);

    // e.g: import java.util.List; --> java.util.List, java.util, java
    private static List<String> getPackages(String line, String keyword) {
        int start = line.indexOf(keyword) + keyword.length();
        int end = line.indexOf(";", start);
        if (start < end) {
            String packageName = line.substring(start, end).trim();
            return splitPackageName(packageName);
        return new ArrayList<String>();

    private static int countCallsForHelp(String[] lines) {
        int count = 0;
        for (String line : lines) {
            if (line.contains("FIXME") || line.contains("TODO")) {
        return count;

    private static String[] parsePackageStatement(String[] lines) {
        final String keyword = "package";
        for (String line : lines) {
            if (line.startsWith(keyword)) {
                // only one package statement per file
                return getPackages(line, keyword).toArray(new String[0]);
        return new String[0];

    private static List<String> splitPackageName(String packageName) {
        // e.g. given com.example.appname.library.widgetname
        // returns com
        // com.example
        // com.example.appname
        // etc.
        List<String> result = new ArrayList<>();
        int end = packageName.indexOf('.');
        while (end > 0) {
            result.add(packageName.substring(0, end));
            end = packageName.indexOf('.', end + 1);
        return result;

이 코드를 실행하면, 다음과 같이 Dataflow 페이지에 워크노드 뷰가 그려진다.

# 코드 없이 작업하고 싶다면?
Dataflow 페이지에서 "Create Job From Template" 클릭

이후, 등장하는 각종 설정을 입력하고 Job을 생성


