블로그 이미지
윤영식
Full Stacker, Application Architecter, KnowHow Dispenser and Bike Rider

Publication

Category

Recent Post

2013. 9. 13. 21:19 MongoDB/Prototyping

MongoDB에 구글의 도서검색 내역을 넣고, 여기서 도서의 Description을 하둡으로 분석하여 추천도서를 만들어 보자 



구글도서에서 Description을 MongoDB에 저장하기 

  - 이클립스에서 Maven Project를 하나 생성하고, pom.xml 을 다음과 같이 구성한다 

    자바에서 몽고디비를 사용하기 위한 드라이버와 구글 검색결과(JSON)을 파싱하기위한 JSON라이브러리를 추가한다 

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>com.mobiconsoft</groupId>

  <artifactId>booksearch</artifactId>

  <version>0.0.1-SNAPSHOT</version>

  <dependencies>

  <dependency>

  <groupId>org.mongodb</groupId>

  <artifactId>mongo-java-driver</artifactId>

  <version>2.11.2</version>

  </dependency>


  <dependency>

  <groupId>junit</groupId>

  <artifactId>junit</artifactId>

  <version>4.10</version>

  </dependency>

 

  <dependency>

  <groupId>org.json</groupId>

  <artifactId>json</artifactId>

  <version>20090211</version>

  </dependency>

  </dependencies>

</project>

  - BookSearcher.java 코딩 (편의상 import 구문 제외)

public class BookSearcher {


  // 도서검색 

  public String searchBooks(String keyword) {

    URL url = null;

    try {

      url = new URL("https://www.googleapis.com/books/v1/volumes?q=" + keyword);

    } catch (MalformedURLException e) {

      e.printStackTrace();

    }

    

    StringBuffer sb = new StringBuffer();

    String line;

    try {

      URLConnection urlConn = url.openConnection();

      BufferedReader br = new BufferedReader(new InputStreamReader(urlConn.getInputStream(), "utf-8"));

      while((line = br.readLine()) != null) sb.append(line);

    } catch(IOException e) {

      e.printStackTrace();

    }

    

    return sb.toString();

  }

  

  // 도서 검색 결과에서  items찾아와 저장한다 

  public void saveBooks(String books) {

    Mongo mongo = null;

    try{

      mongo = new MongoClient("localhost", 27017);

    } catch(Exception e) {

      e.printStackTrace();

      throw new RuntimeException();

    }

    

   // 몽고디비 db는 books-db 이고 컬렉션은 books 로 만들어짐 

    mongo.setWriteConcern(new WriteConcern(1, 2000));

    DB bookDB = mongo.getDB("books-db");

    DBCollection bookColl = bookDB.getCollection("books");

    

    try{

      JSONObject json = new JSONObject(books);

      JSONArray items = json.getJSONArray("items");

      for( int i=0; i<items.length(); i++) {

        DBObject doc = new BasicDBObject();

        // search-book key로 value가 들어간다

        doc.put("search-book", (DBObject)JSON.parse(items.getJSONObject(i).toString()));

        bookColl.save(doc);

      }

    } catch(JSONException e) {

      e.printStackTrace();

    }

  }

}

  - 테스트 해보자 

    JUnit 테스트전에 mongodb를 기동한다 

// 몽고디비 

$ ../bin/mongod -dbpath=/Users/dowon/Documents/mongodb/database


// 테스트 

public class BookSearcherTest {

  

    private BookSearcher bookSearcher;

    

    @Before

    public void setUp() throws Exception {

      this.bookSearcher = new BookSearcher();

    }


    // search 결과 보기 

    @Test

    public void testSearchBooks() throws Exception {

      String result = this.bookSearcher.searchBooks("nosql");

      //assertNotNull(result);

      System.out.println(result);

    }

    

    // 데이터 저장하기 

    @Test

    public void testSaveBooks() throws Exception {

      String result = this.bookSearcher.searchBooks("nosql");

      this.bookSearcher.saveBooks(result);

    }

}


// 테스트 성공후 mongo 쉘을 통하여 확인 

> use books-db

switched to db books-db

> show collections

books

system.indexes

> db.books.find().length();

10

> db.books.find()

{ "_id" : ObjectId("5232e69cda06561b2e11306c"), "search-book" : { "saleInfo" : { "saleability" : "NOT_FOR_SALE", "isEbook" : false, "country" : "KR" }, "id" : "tv5iO9MnObUC", "searchInfo" : { "textSnippet" : "They provide examples, practical solutions, and expert education in new technologies, all designed to help programmers do a better job. wrox.com Programmer Forums Join our Programmer to Programmer forums to ask and answer programming ..." }, "etag" : "HX8hesQgrJM", "volumeInfo" : { "pageCount" : 408, "averageRating" : 3, "infoLink" : "http://books.google.co.kr/books?id=tv5iO9MnObUC&dq=nosql&hl=&source=gbs_api", "printType" : "BOOK", "publisher" : "John Wiley & Sons", "authors" : [  "Shashank Tiwari" ], "canonicalVolumeLink" : "http://books.google.co.kr/books/about/Professional_NoSQL.html?hl=&id=tv5iO9MnObUC", "title" : "Professional NoSQL", "previewLink ... 중략 ...



MongoDB Hadoop Connector 사용하기  

  - 몽고디비와 하둡을 연결하는 방법을 제공한다 

     https://github.com/mongodb/mongo-hadoop 에서 1.1.x 의 Core 다운로드 한다 (mongo-hadoop-core_1.1.2-1.1.0.jar)

  - Input-Output으로 몽고디비를 사용할 경우   

    

 - 분석을 위하여 Pig, MR을 할 경우

    

  - ETL처럼 처리후 별도의 저장소로 던져질 경우

    ETL from MongoDB

      

    ETL to MongoDB

        


  - Eclipse에 새로운 Book Search Mapper와 Reducer 프로젝트를 만들고 pom.xml 을 만든다 

    mongo-hadoop-core  파일을 maven에 등록되어 있지 않기때문에 수동으로 .m2/repository에 만들어 주어야 한다 

    예)

    > 레파지토리 : /Users/dowon/.m2/repository

    > 파일위치 : mongo-hadoop-core/mongo-hadoop-core_1.1.2/1.1.0/mongo-hadoop-core_1.1.2-1.1.0.jar

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>com.mobiconsoft</groupId>

  <artifactId>booksearch_mapreduce</artifactId>

  <version>0.0.1-SNAPSHOT</version>

  <dependencies>

    <dependency>

      <groupId>org.apache.hadoop</groupId>

      <artifactId>hadoop-core</artifactId>

      <version>1.1.2</version>

    </dependency>

    

    <dependency>

      <groupId>org.mongodb</groupId>

      <artifactId>mongo-java-driver</artifactId>

      <version>2.11.2</version>

    </dependency>

    

    <!-- 수동 설정 --> 

    <dependency>

      <groupId>mongo-hadoop-core</groupId>

      <artifactId>mongo-hadoop-core_1.1.2</artifactId>

      <version>1.1.0</version>

    </dependency>

  </dependencies>

  

  <build>

    <plugins>

      <plugin>

        <artifactId>maven-antrun-plugin</artifactId>

        <configuration>

          <tasks>

            <copy file="target/${project.artifactId}-${project.version}.jar"

              tofile="/Users/dowon/Documents/input/${project.artifactId}-${project.version}.jar" />

          </tasks>

        </configuration>

        <executions>

          <execution>

            <phase>install</phase>

            <goals>

              <goal>run</goal>

            </goals>

          </execution>

        </executions>

      </plugin>

    </plugins>

  </build>

  

</project>

  - Mapper와 Reducer 클래스를 코딩 

// Mapper

public class BookSearchMapper extends Mapper<Object, BSONObject, Text, IntWritable> {

  

  private final static IntWritable ONE = new IntWritable();

  private Text word = new Text();

  

  protected void map(Object key, BSONObject value, Context context) 

    throws IOException, InterruptedException {

    BasicDBObject anItem = (BasicDBObject)value.get("search-book");

    BasicDBObject volumeInfo = (BasicDBObject)anItem.get("volumeInfo");

    String description = volumeInfo.getString("description");

    if(description == null || description.trim().length() <= 0) return;

    

    StringTokenizer st = new StringTokenizer(description);

    while(st.hasMoreTokens()) {

      word.set(st.nextToken());

      context.write(word, ONE);

    }

  }

}


// Reducer

public class BookSearcherReducer extends

  Reducer<Text, IntWritable, Text, IntWritable> {

  

  protected void reduce(Text key, Iterable<IntWritable> values, Context context) 

    throws IOException, InterruptedException {

    int sum = 0;

    for(final IntWritable value : values) sum += value.get();

    context.write(key, new IntWritable(sum));

  }

}

  - Job을 만든다

public class MongoJob extends MongoTool {

  static {

    Configuration.addDefaultResource("mongo-default.xml");

    Configuration.addDefaultResource("mongo-book.xml");

  }

  

  public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    

    JobHelper.addJarForJob(conf, "/Users/dowon/.m2/repository/mongo-hadoop-core/mongo-hadoop-core_1.1.2/1.1.0/mongo-hadoop-core_1.1.2-1.1.0.jar:"

          + "/Users/dowon/.m2/repository/org/mongodb/mongo-java-driver/2.11.2/mongo-java-driver-2.11.2.jar");

    

    System.exit(ToolRunner.run(conf, new MongoJob(), args));

  }

}

  - 리소스 xml을 만든다 

    https://github.com/mongodb/mongo-hadoop/blob/master/examples/treasury_yield/src/main/resources/mongo-defaults.xml 

   에서 xml 정보를 copy 하여 mongo-book.xml 을 만든 후 하기 내용을 수정하여 입력해야 한다

<property>

    <!-- Class for the mapper -->

    <name>mongo.job.mapper</name>

    <value>booksearch_mapreduce.BookSearchMapper</value>

  </property>

  <property>

    <!-- Reducer class -->

    <name>mongo.job.reducer</name>

    <value>booksearch_mapreduce.BookSearcherReducer</value>

  </property>

  <property>

    <!-- InputFormat Class -->

    <name>mongo.job.input.format</name>

    <value>com.mongodb.hadoop.MongoInputFormat</value>

  </property>

  <property>

    <!-- OutputFormat Class -->

    <name>mongo.job.output.format</name>

    <value>com.mongodb.hadoop.MongoOutputFormat</value>

  </property>

  <property>

    <!-- Output key class for the output format -->

    <name>mongo.job.output.key</name>

    <value>org.apache.hadoop.io.Text</value>

  </property>

  <property>

    <!-- Output value class for the output format -->

    <name>mongo.job.output.value</name>

    <value>com.mongodb.hadoop.io.BSONWritable</value>

  </property>

  <property>

    <!-- Output key class for the mapper [optional] -->

    <name>mongo.job.mapper.output.key</name>

    <value>org.apache.hadoop.io.Text</value>

  </property>

  <property>

    <!-- Output value class for the mapper [optional] -->

    <name>mongo.job.mapper.output.value</name>

    <value>org.apache.hadoop.io.IntWritable</value>

  </property>

  <property>

    <!-- Class for the combiner [optional] -->

    <name>mongo.job.combiner</name>

    <value>booksearch_mapreduce.BookSearcherReducer</value>

  </property>

  - "Mave Build..." clean install 하여 .jar  파일을 만든다 (참조에 첨부파일)

 - 다음 하둡 runtime(start-all.sh) 을 수행한다 

  - 하둡 수행 쉘을 만든다 

//////////////////////

// mongodb.sh 내역 

#!/bin/sh


export REPO=/Users/dowon/.m2/repository

export MONGO_DRIVER=$REPO/org/mongodb/mongo-java-driver/2.11.2/mongo-java-driver-2.11.2.jar

export MONGO_HADOOP=$REPO/mongo-hadoop-core/mongo-hadoop-core_1.1.2/1.1.0/mongo-hadoop-core_1.1.2-1.1.0.jar

export HADOOP_CLASSPATH=$MONGO_DRIVER:$MONGO_HADOOP

export HADOOP_USER_CLASSPATH_FIRST=true


hadoop jar booksearch_mapreduce-0.0.1-SNAPSHOT.jar booksearch_mapreduce.MongoJob



/////////////

/// 수행하기 

$ mongodb.sh

2013-09-13 20:53:24.630 java[1431:1203] Unable to load realm info from SCDynamicStore

13/09/13 20:53:24 INFO util.MongoTool: Created a conf: 'Configuration: core-default.xml, core-site.xml, mongo-default.xml, mongo-book.xml, mapred-default.xml, mapred-site.xml' on {class booksearch_mapreduce.MongoJob} as job named '<unnamed MongoTool job>'

13/09/13 20:53:24 INFO util.MongoTool: Mapper Class: class booksearch_mapreduce.BookSearchMapper

13/09/13 20:53:24 INFO util.MongoTool: Setting up and running MapReduce job in foreground, will wait for results.  {Verbose? false}

13/09/13 20:53:25 INFO util.MongoSplitter: MongoSplitter calculating splits

13/09/13 20:53:25 INFO util.MongoSplitter: use range queries: false

.. 중략 ...

13/09/13 20:53:41 INFO mapred.JobClient:     Spilled Records=1428

13/09/13 20:53:41 INFO mapred.JobClient:     Map output bytes=14049

13/09/13 20:53:41 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200

13/09/13 20:53:41 INFO mapred.JobClient:     Combine input records=1299

13/09/13 20:53:41 INFO mapred.JobClient:     SPLIT_RAW_BYTES=195

13/09/13 20:53:41 INFO mapred.JobClient:     Reduce input records=714

13/09/13 20:53:41 INFO mapred.JobClient:     Reduce input groups=714

13/09/13 20:53:41 INFO mapred.JobClient:     Combine output records=714

13/09/13 20:53:41 INFO mapred.JobClient:     Reduce output records=714

13/09/13 20:53:41 INFO mapred.JobClient:     Map output records=1299



MongoDB에서 결과값 확인하기 

  - 브라우져에서 결과값을 확인하고 싶다면 몽고디비에 옵션으로 --rest 를 주면 28017 포트로 RESTful 하게 호출할 수 있다 

 $ ./bin/mongod -dbpath=/Users/dowon/Documents/mongodb/database --rest 

  - 결과 화면 

    결과값은 out 컬렉션에 생성이 된다


<참조>

  - 검색 책 정보 

books.json


  - 이클립스 project workspace

booksearch.tar


  - 하둡기동후 수행하는 쉘 

mongodb.sh


  - 반출한 booksearch mapreducer jar 파일 

booksearch_mapreduce-0.0.1-SNAPSHOT.jar






posted by 윤영식
2013. 9. 12. 20:45 Big Data

몽고디비를 하둡의 Input/Output의 Store를 사용하면 어떨까? 어차피 몽고디비는 Document Store 이며 Scale-Out을 위한 무한한 Sharding(RDB의 Partitioning) 환경을 제공하니 충분히 사용할 수 있을 것이다. Store에 저장된 데이터의 Batch Processing Engine으로 하둡을 사용하면 될 일이다. 




Mongo-Hadoop Connector 소개

  - Hadoop을 통하면 Mongo안에 있는 데이터를 전체 코어를 사용하면서 병렬로 처리할 수 있다 

  - 하둡포멧으로 Mongo를 BSON format을 파일로 저장하거나, MongoDB에 바로 저장할 수 있는 Java API존재

  - Pig + Hive를 사용할 수 있음 

  - AWS의 Amazon Elastic MapReduce 사용



Batch Processing Model 종류
  - 사실 MongoDB에서도 Aggregation Framework을 제공하여 MapReduce프로그래밍을 JavaScript로 개발 적용할 수 있다다
  - 시간단위 Batch Processing은 요렇게도 사용할 수 있겠다 


  - 데이터가 정말 Big 이면 하둡을 이용하여 Batch Processing을 해야겠다. 여기서 몽고디비를 "Raw Data Store" 와 "Result Data Store"로 사용한다 


  - MongoDB & Hadoop : Batch Processing Model 전체 내역을 보자 



<참조>

  - 결국 처리된 데이터는 표현되어야 한다 : Data Visualization Resources

  - MongoDB 넌 뭐니? NoSQL에 대한 이야기 (조대협)

posted by 윤영식
2013. 9. 11. 19:20 Big Data

Mapper & Reducer를 .jar로 배포하고 직접 하둡명령으로 수행하는 방법에 대하여 알아보자 



MapReduce 프로그램 

  - Writable Interface는 Value에서 사용한다

  - Mapper 인터페이스 

    Mapper<K1, V1, K2, V2>의 형태 : key는 WritableComparable를 구현해야 하며, value는 Writable를 구현해야 함.


  - Reducer 인터페이스 

    reducer는 여러가지 매퍼로부터 생성된 결과를 받고, key/value 쌍의 key에 대해 데이터를 정렬하고 동일한 key에 대한 모든 값을 그룹핑 함.


  - 이전의 WordCount에 대한 것을 직접 코딩하였는데, 맵퍼-TokenCountMapper-, 리듀서-LongSumReducer-를 사용해서 동일하게 만들 수 있다



hadoop 명령어로 .jar 직접 수행하기

  - pom.xml 에 MapReduce Jar파일을 만들어 특정위치로 복사하는 플러그인 설정을 넣는다 

<build>

  <plugins>

    <plugin>

      <artifactId>maven-antrun-plugin</artifactId>

      <configuration>

        <tasks>

          <copy file="target/${project.artifactId}-${project.version}.jar"

            tofile="/Users/dowon/Documents/hadoop-jobs/${project.artifactId}-${project.version}.jar" />

        </tasks>

      </configuration>

      <executions>

        <execution>

          <phase>install</phase>

          <goals>

            <goal>run</goal>

          </goals>

        </execution>

      </executions>

    </plugin>

  </plugins>

</build>

  - 기존 WordCount에 대한 WordCount3 복사본을 만들고 TokenCountMapper와 LongSumReducer로 변형한다

    즉 직접 코딩하지 말고 하둡에서 제공하는 클래스를 사용한다 

import java.io.IOException;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.lib.LongSumReducer;

import org.apache.hadoop.mapred.lib.TokenCountMapper;


public class WordCount3 {


  public static void main(String[] args) throws IOException {

    // 1. configuration Mapper & Reducer of Hadoop

    JobConf conf = new JobConf(WordCount3.class);

    conf.setJobName("wordcount3");

    

    // 2. final output key type & value type

    conf.setOutputKeyClass(Text.class);

    conf.setOutputValueClass(LongWritable.class);

    

    // 3. in/output format 

    conf.setMapperClass(TokenCountMapper.class);

    conf.setCombinerClass(LongSumReducer.class);

    conf.setReducerClass(LongSumReducer.class);

    

    // 4. set the path of file for read files

    //    input path : args[0]

    //    output path : args[1]

    FileInputFormat.setInputPaths(conf, new Path(args[0]));

    FileOutputFormat.setOutputPath(conf, new Path(args[1]));

    

    // 5. run job

    JobClient client = new JobClient();

    client.setConf(conf);

    JobClient.runJob(conf);

  }

}

  - eclipse의 프로젝트를 선택하고 "Run As"에서 "Maven build..."를 선택하여 "clean install" 입력하고 "run"버튼을 클릭한다 


  - 결과로 배포가 성공으로 나오면 된다  : /Users/dowon/Documents/hadoop-jobs 디렉토리에 *.jar 파일 생성을 확인한다 


  - 하둡 데몬들을 수행하기 전 NameNode에 대해서 format을 하고 수행한다 

// name node 포멧

hadoop namenode -format


// .bash_profile 에 PATH 설정

set -o vi

export JAVA_HOME=/Library/Java/Home

export H_HOME=~/Documents/hadoop-1.2.1

export PATH=.:$PATH:$JAVA_HOME/bin:$H_HOME/bin:/usr/bin

alias ll='ls -alrt'

alias cdh='cd $H_HOME'


// 하둡 데몬 수행

// 50030 : job-tracker 접속 포트

// 50070 : NameNode 접속 포트

$ start-all.sh 

  - input 의 위치를 지정하여 준다 (만일, NameNode를 포멧하였다면)

// 위치가 하기와 같다면 

$ pwd

/Users/dowon/Documents/input

$ ls

total 16

-rw-r--r--   1 dowon  staff   22  9 11 19:26 file01

-rw-r--r--   1 dowon  staff   21  9 11 19:26 file02


// input 을 HDFS에 만든다 

$ hadoop fs -put . input


1) http://localhost:50070/으로 접속하여 "Browser the filesystem"을 클릭하면 볼 수 있다 

2) /user/dowon/input 경로로 만들어 졌음을 알 수 있다 

  - hadoop 명령어로 생성된 jar 파일을 수행해 보자 

// 경로가 다음과 같고, WordCount3.class가 들어있는 .jar 파일이 존재한다 

$ pwd

/Users/dowon/Documents/hadoop-jobs

$ ls

-rw-r--r--   1 dowon  staff  5391  9 11 19:45 MapReduce-1.0.0-SNAPSHOT.jar


// 명령어 수행 

$ hadoop jar *.jar WordCount3 /user/dowon/input /user/dowon/output3


1) 네임노드에 접속해서 /user/dowon에 들어가 보면 "output3"이 생성된 것을 볼 수있다

2) output3으로 들어가면 결과값을 지니 파일이 존재한다 

3) 만일 명령을 재수행하고 싶다면 output3 디렉토리리 삭제해야 한다 

    $ hadoop fs -rmr /user/dowon/output3


eclipse에서 수행하지 않고 반출된 .jar 파일을 가지고 hadoop명령으로 수행하는 방법을 알아보았다. 


<참조>

  없음 


posted by 윤영식
2013. 9. 9. 21:52 Big Data

Eclipse하에서 하둡코딩시 Maven을 기본으로 하여 외부 라이브러리 의존성을 관리하자.



Hadoop 역할

  - 분산된 파일을 처리하는 순서

   > input HDFS으로 들어오기

   > Job 수행 : 읽어서 로직처리

   > 결과를 파일 또는 DB에 넣는다 

  - Tera 단위의 데이터가 이미 HDFS에 있을 경우 해당 데이터를 처리하는데 하둡의 쓰임새가 있다

  - HDFS와 MapReduce의 이해 



Maven Project 만들기

  - Maven Project 선택하고 "Create a simple project" 선택한다  


  - 메이븐의 GroupID와 ArtifactID 설정한다 


  - 최종 생성 내역 

    MapReduce 프로그래밍을 여기서 하게 되고, 단위 테스트 프로그래밍도 할 수 있다


  - pom.xml  에 hadoop 관련 라이브러리 의존관계를 넣는다.  (파란색이 추가부분)

    추가하고 저장을 하면 자동으로 의존관계 라이브러리를 다운로드 받는다

    이클립트 좌측 "Project Explorer"의 "Maven Dependencies"에서 관련 파일들이 추가된 것을 확인할 수 있다 

// pom.xml 내역

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>kr.mobiconsoft.hadoop</groupId>

  <artifactId>MapReduce</artifactId>

  <version>1.0.0-SNAPSHOT</version>

  

  <dependencies>

  <dependency>

  <groupId>org.apache.hadoop</groupId>

  <artifactId>hadoop-core</artifactId>

  <version>1.1.2</version>

         </dependency>

  </dependencies>

  

</project>


// 결과 



Word Counting MapReduce 구현하기 

  - file 2개 생성하고 유사한 word를 넣는다 

// file01

hello world bye world


// file02

hi world hello dowon

  - Mapper Class를 생성 


import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reporter;


/**

 * K1 : read key type

 * V2 : read value type

 * K2 : write key type

 * V2 : write value type

 */

//public class WordCountMapper implements Mapper<K1, V1, K2, V2> {

public class WordCountMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {


// map 결과는 reducer로 자동으로 던져진다 

public void map(LongWritable key, Text value,

OutputCollector<Text, IntWritable> output, Reporter reporter)

throws IOException {

// TODO Auto-generated method stub

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while(tokenizer.hasMoreTokens()) {

Text outputKey = new Text(tokenizer.nextToken());

// Hadoop 에서 wrapping한 Integer 타입의 객체를 넣어줌 

// param1: outputKey, param2: outputValue

output.collect(outputKey, new IntWritable(1));

}

}

}


  - Reducer Class 생성


import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;


/**

 * K1 : Mapper의 K2 와 동일

 * V1 : Mapper의 V2 와 동일 

 */

public class WordCountReducer extends MapReduceBase 

 implements Reducer<Text, IntWritable, Text, IntWritable> {


/**

* V1 에서 values는 Iterator이다. 실제 같은 단어가 여러개 일 경우 

*/

public void reduce(Text key, Iterator<IntWritable> values,

OutputCollector<Text, IntWritable> output, Reporter reporter)

throws IOException {

// TODO Auto-generated method stub

int sum = 0;

while(values.hasNext()) {

sum += values.next().get(); // get Integer value

}

output.collect(key, new IntWritable(sum));

}

}


  - Job Tracker를 생성 : 하단 main 선택한다 


import java.io.IOException;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.TextOutputFormat;



public class WordCount {


public static void main(String[] args) throws IOException {

// 1. configuration Mapper & Reducer of Hadoop

JobConf conf = new JobConf();

conf.setJobName("wordcount");

conf.setMapperClass(WordCountMapper.class);

conf.setReducerClass(WordCountReducer.class);

// 2. final output key type & value type

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);

// 3. in/output format 

conf.setInputFormat(TextInputFormat.class);

conf.setOutputFormat(TextOutputFormat.class);

// 4. set the path of file for read files

//    input path : args[0]

//    output path : args[1]

FileInputFormat.setInputPaths(conf, new Path(args[0]));

FileOutputFormat.setOutputPath(conf, new Path(args[1]));

// 5. run job

JobClient.runJob(conf);

}


}

  

  - 최종 모습 


  - eclipse 설정하기 

    main펑션이 있는 WordCount를 수행할 때 input path와 output path를 지정하여 준다 

    이때 output path의 디렉토리는 생성되어 있지 않아야 한다 (target/hadoop-result)

    하단 우측 "run" 클릭 


  - 결과값 

 

  - 결국 이런 처리과정을 수행하게 된다 


  - Mapper와 Reducer 역할 

    Mapper : 소스를 쪼개어 key:value 맵을 여러개 만들고

    Reducer : 여러 Map 값을 하나의 결과값으로 만들어 준다 



단위 테스트 해보기 

  - pom.xml에 mrunit 추가 

 <dependencies>

  <dependency>

  <groupId>org.apache.hadoop</groupId>

  <artifactId>hadoop-core</artifactId>

  <version>1.1.2</version>

  </dependency>

 

  <dependency>

  <groupId>org.apache.mrunit</groupId>

  <artifactId>mrunit</artifactId>

  <version>0.8.0-incubating</version>

  <scope>test</scope>

  </dependency>

  </dependencies>


  - Mapper Test 클래스 생성

    Run As... 에서 JUnit으로 테스트 하여 초록색-성공인지 체크한다 

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mrunit.MapDriver;

import org.junit.Test;


/**

 * 테스트를 통하여 Mapper와 Reducer를 테스트에서 수행하여 검증 할 수 있다 

 * @author dowon

 *

 */

public class WordCountMapperTest {


  @Test

  public void testMap() {

    // 1. 설

    Text value = new Text("Hello World Bye World");

    

    MapDriver<LongWritable, Text, Text, IntWritable> mapDriver = new MapDriver();

    mapDriver.withMapper(new WordCountMapper());

    mapDriver.withInputValue(value);

    

    // 2. 검정 및 실행 

    // 순서를 정확히 해야 에러없이 수행된다. 빼먹어도 에러가 난다 

    mapDriver.withOutput(new Text("Hello"), new IntWritable(1));

    mapDriver.withOutput(new Text("World"), new IntWritable(1));

    mapDriver.withOutput(new Text("Bye"), new IntWritable(1));

    mapDriver.withOutput(new Text("World"), new IntWritable(1));

    mapDriver.runTest();

  }

}


  - Reducer Test 클래스 생성

    Run As... 에서 JUnit으로 테스트 하여 초록색-성공인지 체크한다 

import java.util.Arrays;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mrunit.ReduceDriver;

import org.junit.Test;


public class WordCountReducerTest {

  

  @Test

  public void testReducer() {

      // 1. 설정

    ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver = new ReduceDriver();

    reduceDriver.withReducer(new WordCountReducer());

    reduceDriver.withInputKey(new Text("World"));

    reduceDriver.withInputValues(Arrays.asList(new IntWritable(1), new IntWritable(1)));

    

    // 2. 검증 및 실행 

    reduceDriver.withOutput(new Text("World"), new IntWritable(2));

    reduceDriver.runTest();

  }

}



<참조>

  - Maven 기초 사용법

posted by 윤영식
2013. 9. 9. 21:34 Big Data

왜 빅데이터가 이슈가 되고 있을까?  HW와 SW의 가격은 저렴해지고, 표준은 평준화 되고 접근이 수워지고 있다. 그러나 데이터는 복제나 공유가 되지 않고 자사의 데이터가 돈이 되는 시대가 왔다. 그런 의미에서 하둡은 빅데이터를 처리하는 분야의 SW이다.



하둡 개념

  - Input: 분석할 데이터, Output: 결과값 

  - MasterNode: HDFS-분산파일위치 정보지님 (NameNode) 

  - SlaveNode: 분산된 실 데이터를 저장 (DataNode)

  - MapReduce/HDFS Layer 영역으로 나뉨 


  - 역할에 대한 이해하기 


  - JobTracker : Map -> Reduce 할때 Shuffle+Sort의 로직처리가 성능을 좌우한다. 

     즉, Map출력결과 (Mapper) -> Suffle+Sort -> Sorting된 Reduce 입력 (Reducer)

     Mapper/Reducer 프로그래밍도 분산된 것이다 


  - 개념이해하기 



설치하기 

  - http://apache.tt.co.kr/hadoop/common/hadoop-1.2.1/ 에서 hadoop-1.2.1-bin.tar.gz 파일을 다운로드 받는다 

  - 기본 환경은 Mac OS를 사용한다 

  - .bash_profile 안에 JAVA_HOME을 설정한다 : Java버전은 반드시 1.6 이상이어야 한다

$ cat .bash_profile

alias ll='ls -alrt'

set -o vi

export JAVA_HOME=/Library/Java/Home

  - 압축을 푼다. 설치 끝



Standalone 사용하기 

  - Document 메뉴에서 1.2.1로 이동하여 "Single Node Setup"을 클릭 : http://hadoop.apache.org/docs/r1.2.1/single_node_setup.html

  - 간단한 수행

// 폴더를 하나 만들고 xml 환경파일을 복사한다 

$ mkdir input 

$ cp conf/*.xml input 


// 하기 명령을 수행한다 

// input 읽을 꺼리를 주고 결과값을 output에 담아라  

$ bin/hadoop jar hadoop-examples-*.jar grep input output 'dfs[a-z.]+' 


// output  폴더가 자동 생성되고 cat 하였을 때 존재하는 파일의 내역이 하기와 같이 보이면 성공!

$ cat output/*

1 dfsadmin

  - 수행은 어떤 의미일까?

    + NameNode : 하둡전체 관리 =  JobTracker + DataNode

    + JobTracker : 처리역할

    + DataNode : HDFS 위치 (분산파일을 하나의 파일 인것처럼 사용하게 해줄 수 있는것)

  - 하둡은 특정 input이 있고 처리하고 output 결과가 나온다. 현재 예는 로컬 input에 있는 것을 읽고 로컬 output에 생성하였다. 

    그러나 실제에서는 분산으로 처리하므로 local이 아닐 것이다. 


Hadoop  종류

  - 종류

 Local (Standalone) Mode [로컬(독립)모드]

  하둡의 기본모드(아무런 환경설정을 하지 않음): 로컬 머신에서만 실행

  다른 노드와 통신할 필요가 없기 때문에 HDFS를 사용하지 않으며 다른 데몬들도 실행시키지 않음

  독립적으로 MapReduce 프로그램의 로직을 개발하고 디버깅하는데 유용함


 Pseudo-Distributed Mode [가상분산 모드]

  한대의 컴퓨터로 클러스터를 구성하고, 모든 데몬을 실행함.

  독립실행(standalone) 모드 기능 보완

  – 메모리 사용 정도, HDFS 입출력 관련 문제, 다른 데몬 과의 상호작용에서 발생하는 일을 검사


 Fully-Distributed Mode [완전분산 모드]

  분산 저장과 분산 연산의 모든 기능이 갖추어진 클러스터를 구성함

  master - 클러스터의 master 노드로, NameNode와 JobTracker 데몬을 제공

  backup - SNN(Secondary NameNode 데몬)을 제공하는 서버

  slaves - DataNode와 TaskTracker 데몬을 실행하는 slaves 들 



가상의 Standalone Hadoop 실행하기

  - 3가지 기본 환경을 추가한다

  - conf/core-site.xml

<?xml version="1.0"?>

<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>


<!-- Put site-specific property overrides in this file. -->


<configuration>

   <property>

        <name>fs.default.name</name>

        <value>hdfs://localhost:9000</value>

   </property>

</configuration>

  - conf/hdfs-site.xml

<?xml version="1.0"?>

<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>


<!-- Put site-specific property overrides in this file. -->


<configuration>

     <property>

         <name>dfs.replication</name>

         <value>1</value>

     </property>

</configuration>

  - conf/mared-site.xml

<?xml version="1.0"?>

<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>


<!-- Put site-specific property overrides in this file. -->


<configuration>

     <property>

         <name>mapred.job.tracker</name>

         <value>localhost:9001</value>

     </property>

</configuration>


  - NameNode 초기화

    NameNode 가 있는 곳에서 수행한다 

$ bin/hadoop namenode -format

13/09/09 20:51:59 INFO namenode.NameNode: STARTUP_MSG: 

/************************************************************

STARTUP_MSG: Starting NameNode

STARTUP_MSG:   host = KOSTA17ui-iMac.local/192.168.0.15

STARTUP_MSG:   args = [-format]

STARTUP_MSG:   version = 1.2.1

STARTUP_MSG:   build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.2 -r 1503152; compiled by 'mattf' on Mon Jul 22 15:23:09 PDT 2013

STARTUP_MSG:   java = 1.6.0_45

************************************************************/

13/09/09 20:52:00 INFO util.GSet: Computing capacity for map BlocksMap

... 중략 ...

13/09/09 20:52:00 INFO common.Storage: Image file /tmp/hadoop-dowon/dfs/name/current/fsimage of size 111 bytes saved in 0 seconds.

13/09/09 20:52:00 INFO common.Storage: Storage directory /tmp/hadoop-dowon/dfs/name has been successfully formatted.

  - NameNode, JobTracker, DataNode를 한번에 띄우기 

$ bin/start-all.sh

starting namenode, logging to /Users/dowon/Documents/hadoop-1.2.1/libexec/../logs/hadoop-dowon-namenode-KOSTA17ui-iMac.local.out

2013-09-09 20:53:36.430 java[7632:1603] Unable to load realm info from SCDynamicStore


// 수행후 브라우져에서 50030, 50070 포트 호출

http://localhost:50030/dfshealth.jsp : JobTracker


http://localhost:50070/dfshealth.jsp : NameNode

  - 가상 HDFS 방식으로 수행해 보자 

    결국 결과값을 HDFS에 넣어주는 것이다 

// NameNode의 HDFS에 conf/* 모든 파일을 input 디렉토리명으로 생성하여 복사한다  

$ bin/hadoop fs -put conf input

2013-09-09 20:58:13.847 java[7966:1603] Unable to load realm info from SCDynamicStore


// 명령을 수행하면 JobTracker가 수행되고 HDFS에 output 디렉토리에 결과값이 생성된다 

// 결과값은 JobTracker가 처리하여 생성된 것이다 

$ bin/hadoop jar hadoop-examples-*.jar grep input output 'dfs[a-z.]+'

2013-09-09 21:01:57.770 java[8017:1603] Unable to load realm info from SCDynamicStore

13/09/09 21:01:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

13/09/09 21:01:58 WARN snappy.LoadSnappy: Snappy native library not loaded

13/09/09 21:01:58 INFO mapred.FileInputFormat: Total input paths to process : 17

13/09/09 21:01:58 INFO mapred.JobClient: Running job: job_201309092053_0001

13/09/09 21:01:59 INFO mapred.JobClient:  map 0% reduce 0%

13/09/09 21:02:03 INFO mapred.JobClient:  map 11% reduce 0%

13/09/09 21:02:05 INFO mapred.JobClient:  map 23% reduce 0%

.. 중략..


// NameNode 결과 확인

// http://localhost:50070/ 에서 "Browser the filesystem" 을 클릭한다

// http://localhost:50075/browseDirectory.jsp?dir=%2Fuser%2Fdowon&namenodeInfoPort=50070

해당 브라우져 내역에 신규생성된 "user"밑의 "dowon"밑의 "input" 과 "output"이 보인다 (dowon은 계정명)

Name
Type
Size
Replication
Block Size
Modification Time
Permission
Owner
Group
input
dir



2013-09-09 20:58
rwxr-xr-x
dowon
supergroup
output
dir



2013-09-09 21:02
rwxr-xr-x
dowon
supergroup


// output에 결과값 : part-00000 에 결과내역이 write 되어 있다 

Name
Type
Size
Replication
Block Size
Modification Time
Permission
Owner
Group
_SUCCESS
file
0 KB
1
64 MB
2013-09-09 21:02
rw-r--r--
dowon
supergroup
_logs
dir



2013-09-09 21:02
rwxr-xr-x
dowon
supergroup
part-00000
file
0.05 KB
1
64 MB
2013-09-09 21:02
rw-r--r--
dowon
supergrou


// JobTracker 처리현황 확인 

// http://localhost:50030/jobtracker.jsp


다음에는 eclipse에서 하둡 프로그래밍을 해보자 


<참조>

  - Hadoop 튜토리얼

posted by 윤영식
prev 1 next