flink 简单入门

安装

./start-cluster.sh

5cbd8c62e3fd2381f92695825eaf0c98.png

demo

  • 新建maven 项目
  • pom 文件配置如下:
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.thinker</groupId>
  <artifactId>flink-test</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>flink-test</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
  </properties>

  <dependencies>

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>1.10.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java_2.11</artifactId>
          <version>1.10.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients_2.11</artifactId>
          <version>1.10.0</version>
      </dependency>

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>statefun-sdk</artifactId>
          <version>2.0.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>statefun-flink-harness</artifactId>
          <version>2.0.0</version>
      </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>

        <!-- 该插件用于将Scala代码编译成class文件 -->
        <plugin>
          <groupId>net.alchim31.maven</groupId>
          <artifactId>scala-maven-plugin</artifactId>
          <version>3.4.6</version>
          <executions>
            <execution>
              <!-- 声明绑定到maven的compile阶段 -->
              <goals>
                <goal>compile</goal>
                <goal>testCompile</goal>
              </goals>
            </execution>
          </executions>
        </plugin>

        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-assembly-plugin</artifactId>
          <version>3.0.0</version>
          <configuration>
            <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
          </configuration>
          <executions>
            <execution>
              <id>make-assembly</id>
              <phase>package</phase>
              <goals>
                <goal>single</goal>
              </goals>
            </execution>
          </executions>
        </plugin>

      </plugins>
    </pluginManagement>
  </build>
</project>

创建下面文件:

package com.thinker;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * Hello world!
 */
public class App {

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
            String[] tokens = s.toLowerCase().split("\\W+");
            for (String token: tokens){
                if (token.length() > 0){
                    collector.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
        System.out.println("H   ello World!");
        if (args.length != 2) {
            System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
            return;
        }
        String hostname = args[0];
        Integer port = Integer.parseInt(args[1]);

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> streamSource = env.socketTextStream(hostname, port);

        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = streamSource.flatMap(new LineSplitter())
                .keyBy(0).sum(1);
        sum.print();
        env.execute("Java WordCount from SocketTextStream Example");
    }
}

打包:

mvn install -Dmaven.test.skip=true

监听9000端口:

ncat -l 9000

在flink的安装目录下面执行:

./flink run -c com.thinker.App /home/zeek/project/flink-test/target/flink-test-1.0-SNAPSHOT.jar 127.0.0.1 9000

demo 测试

flink界面上可以显示出当前的任务:

8ba2b7057d07c42ff9e7545560c628de.png

监听的位置输入文字:
12de5867b16652f2fd4c12607eb32f00.png

则可以在输出的位置看到结果:

0e88a484676a12748e965e99713d900f.png

# flink  转载 


标 题:《flink 简单入门
作 者:zeekling
提 示:转载请注明文章转载自个人博客:浪浪山旁那个村

评论

取消