Flink Tutorial (1) - Cluster Installation and Deployment

[Subscribe to the collection of columns , all paid articles by the author can be read]


Recommended [ Flink Tutorial ] https://blog.csdn.net/hellozpc/category_10645207.html
Recommended [ Kafka Tutorial ] https://bigbird.blog.csdn.net/article/details/108770504
Recommended [ JVM Interview and Tuning Tutorial ] https://blog.csdn.net/hellozpc/category_10959501.html
recommended [ SpringBoot full set tutorial ] https://blog.csdn.net/hellozpc/category_10159292.html
recommended [ SpringCloud tutorial ] https://blog.csdn. net/hellozpc/article/details/83692496
recommended [ Mybatis tutorial ] https://blog.csdn.net/hellozpc/article/details/80878563
recommended [ SnowFlake tutorial ] https://blog.csdn.net/hellozpc/article/ details/108248227
Recommended [ Concurrent Current Limiting Tutorial ] http://www.ifindbug.com/doc/id-56860/name-guava-ratelimiter-current-limiter-2.html
Recommended [ Redis Tutorial ] https://bigbird.blog.csdn.net/article/details/81267030
Recommended [ Netty Tutorial ] ] https://blog.csdn.net/hellozpc/category_10945233.html


Overview of Flink

According to Apache's official introduction, Flink is a distributed processing engine and framework for state computing of bounded and unbounded data streams . In layman's terms, Flink is a stream computing framework, which is mainly used to process streaming data. It originated from the research project "Stratosphere" funded by the German Research Foundation in 2010, became an Apache incubation project in March 2014, and became an Apache top-level project in December. Flinken means agile in German, which means fast and delicate. Its code is mainly realized by Java, and some code is realized by Scala. In the world of Flink, all data is streaming: offline data (batch data) is a bounded stream; real-time data (streaming data) is an unbounded stream. Flink can process both bounded batch data sets and unbounded real-time streaming data, providing a unified programming model for batch and stream processing. If Storm is regarded as the first-generation stream processing framework and Spark Streaming (micro-batch processing) is regarded as the second-generation, then Flink can be regarded as the third-generation stream processing framework, and it is the integrator.

Flink installation and deployment

Like almost all big data processing frameworks, before using Flink, we need to install and deploy the Flink framework on the server. Flink can run in a variety of modes, either on a single machine or in a cluster. It can run independently in a cluster environment or rely on YARN to run. The various installation and deployment modes are described in detail below.

local mode

The local mode can be used by directly decompressing the flink binary package on the linux server, without modifying any parameters, for some simple test scenarios.

Download the installation package

Download the installation package directly from the Flink official website . For example, the latest version at the time of writing this article is flink-1.11.1-bin-scala_2.11.tgz

Upload and extract to linux

[ [email protected] myapp ] # pwd
/usr/local/myapp

[ [email protected] myapp ] # ll 
total usage 435772 
-rw-r--r--.   1 root root   255546057  Feb 8   02:29 flink -1.11.1-bin-scala_2.11.tgz
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Unzip to the specified directory

[ [email protected] myapp ] # tar -zxvf flink-1.11.1-bin-scala_2.11.tgz -C /usr/local/myapp/flink/
  • 1

Start Flink

Note that before running, make sure that JDK1.8 or above has been installed on the machine, and the JAVA_HOME environment variable has been configured. JDK installation can refer to this blog post

[ [email protected] ~ ] # java -version 
java version "1.8.0_261" 
Java ( TM ) SE Runtime Environment ( build 1.8 .0_261-b12 ) 
Java HotSpot ( TM )  64 -Bit Server VM ( build 25.261 -b12, mixed mode )
  • 1
  • 2
  • 3
  • 4

Enter the flink directory and execute the startup command

[ [email protected] ~ ] # cd /usr/local/myapp/flink/flink-1.11.1/ 
[ [email protected] flink-1.11.1 ] # bin/start-cluster.sh 
[ [email protected] flink-1.11.1 ] # jps 
3577 Jps
 3242 StandaloneSessionClusterEntrypoint
 3549 TaskManagerRunner
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Execute Jps to view the java process, and you can see that Flink related processes have been started. Flink's web interface can be accessed through a browser http://vm1:8081

insert image description here

The premise of accessing the above page in the local browser is that the hosts file of the Windows system is equipped with the mapping relationship between the host name and IP of the server vm1, and the firewall of the Linux server is closed.

turn off firewall

Check Linux firewall status

[[email protected] ~]# systemctl status firewalld

Temporarily turn off the firewall

[[email protected] ~]# systemctl stop firewalld

Permanently turn off the firewall

[[email protected] ~]# systemctl disable firewalld

Close Flink

implementbin/stop-cluster.sh

Flink cluster mode

The cluster environment is suitable for use in the production environment, and the corresponding configuration parameters need to be modified. Flink provides a variety of cluster modes. Here we mainly introduce two modes: standalone and Flink on Yarn.

Standalone mode

Standalone is Flink's independent cluster deployment mode and does not depend on any other third-party software or libraries. If you want to build an independent Flink cluster, you can use this mode without relying on other components. To build a standard Flink cluster, you need to prepare 3 Linux machines.

Linux machine planning
Node typeCPU nameIP
Mastervm1192.168.174.136
Slavevm2192.168.174.137
Slavevm3192.168.174.138

In a Flink cluster, the JobManager (StandaloneSessionClusterEntrypoint) process will run on the Master node, and the TaskManager (TaskManagerRunner) process will run on the Slave node.

The Linux nodes in the cluster must be configured with JAVA_HOME, and the ssh password-free login needs to be set between the nodes, at least to ensure that the Master node can log in to the other two slave nodes without a password, and the Linux firewall also needs to be closed.

Set password-free login

1) First set the local password-free login on each machine

[ [email protected] ~ ] # ssh-keygen -t rsa 
[ [email protected] ~ ] # cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
  • 1
  • 2

Execute ssh to log in to itself on the local machine. If you do not prompt for a password, the configuration is successful.

[ [email protected] ~ ] # ssh vm1 
Last login: Tue Sep 29  22 :23:39 2020 from vm1
  • 1
  • 2

Perform the same operation on other machines vm2 and vm3:

ssh-keygen -t rsa
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

ssh vm2

ssh vm3

2) Set vm1 password-free login to other machines

Copy the public key file of vm1 to other machines vm2 and vm3

[ [email protected] ~ ] # scp ~/.ssh/id_rsa.pub [email protected]:~/ 
[ [email protected] ~ ] # scp ~/.ssh/id_rsa.pub [email protected]:~/
  • 1
  • 2

Log in to vm2, vm3, and append the public key file of vm1 to your own authorization file

[ [email protected] ~ ] # cat ~/id_rsa.pub >> ~/.ssh/authorized_keys 
[ [email protected] ~ ] # cat ~/id_rsa.pub >> ~/.ssh/authorized_keys
  • 1
  • 2

If you are prompted that there is no ~/.ssh/authorized_keys directory, you can execute ssh-keygen -t rsa on this machine. Creating the .ssh directory manually is not recommended!

Verify that the ssh login to vm2 and vm3 on vm1 does not require a password. If a password is not required, the configuration is successful!

[ [email protected] ~ ] # ssh vm2 
Last login: Mon Sep 28  22 :31:22 2020 from 192.168 .174.133

[ [email protected] ~ ] # ssh vm3 
Last login: Tue Sep 29  22 :35:25 2020 from vm1
  • 1
  • 2
  • 3
  • 4
  • 5

implementexitBack to this machine

[ [email protected] ~ ] # exit 
logout
Connection to vm3 closed.
[ [email protected] ~ ] #
  • 1
  • 2
  • 3
  • 4

3) Set password-free login between other machines in the same way

Do the same steps on vm2, vm3

Copy the public key file of vm2 to vm1 and vm3

[ [email protected] ~ ] # scp ~/.ssh/id_rsa.pub [email protected]:~/ 
[ [email protected] ~ ] # scp ~/.ssh/id_rsa.pub [email protected]:~/ 
[ [email protected] ~ ] # cat ~/id_rsa.pub >> ~/.ssh/authorized_keys 
[ [email protected] ~ ] # cat ~/id_rsa.pub >> ~/.ssh/authorized_keys
  • 1
  • 2
  • 3
  • 4

Copy the public key file of vm3 to vm1 and vm2

[ [email protected] ~ ] # scp ~/.ssh/id_rsa.pub [email protected]:~/ 
[ [email protected] ~ ] # scp ~/.ssh/id_rsa.pub [email protected]:~/ 
[ [email protected] ~ ] # cat ~/id_rsa.pub >> ~/.ssh/authorized_keys 
[ [email protected] ~ ] # cat ~/id_rsa.pub >> ~/.ssh/authorized_keys
  • 1
  • 2
  • 3
  • 4

4) Verify ssh password-free login

[ [email protected] ~ ] # ssh vm1 
[ [email protected] ~ ] # ssh vm3 
[ [email protected] ~ ] # ssh vm1 
[ [email protected] ~ ] # ssh vm2
  • 1
  • 2
  • 3
  • 4
Set up host time synchronization

If the time difference of the nodes in the cluster is too large, the cluster service will be abnormal, so it is necessary to ensure that the time of each node in the cluster is consistent.

Excuting an orderyum install -y ntpdateinstall ntpdate

Excuting an orderntpdate -u ntp.sjtu.edu.cnsynchronised time

Flink installation steps

The following steps are operated on the Master machine first, and then copied to other machines (make sure that jdk is installed on each machine)

  1. Unzip the Flink installation package

[[email protected] myapp]# tar -zxvf flink-1.11.1-bin-scala_2.11.tgz -C /usr/local/myapp/flink/

  1. Modify Flink's configuration file flink-1.11.1/conf/flink-conf.yaml

Change the parameter value of jobmanager.rpc.address configuration to vm1

jobmanager.rpc.address: vm1

  1. Modify Flink's configuration file flink-1.11.1/conf/workers
[ [email protected] conf ] # vim workers
vm2
vm3
  • 1
  • 2
  • 3
  1. Copy the modified flink-1.11.1 directory on the vm1 machine to the other two slave nodes
scp -rq /usr/local/myapp/flink vm2:/usr/local/myapp/
 scp -rq /usr/local/myapp/flink vm3:/usr/local/myapp/
  • 1
  • 2
  1. Start the Flink cluster service on the machine vm1

Make sure each server firewall is turned off when performing this step

Enter the flink directory/flink-1.11.1/bin and execute start-cluster.sh

[ [email protected] ~ ] # cd /usr/local/myapp/flink/flink-1.11.1/ 
[ [email protected] flink-1.11.1 ] # bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host vm1.
Starting taskexecutor daemon on host vm2.
Starting taskexecutor daemon on host vm3.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. View process information on the three nodes vm1, vm2 and vm3
[ [email protected] flink-1.11.1 ] # jps 
4983 StandaloneSessionClusterEntrypoint
 5048 Jps

[ [email protected] ~ ] # jps 
4122 TaskManagerRunner
 4175 Jps

[ [email protected] ~ ] # jps 
4101 Jps
 4059 TaskManagerRunner
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  1. To view the Flink Web UI interface, visit http://vm1:8081
    insert image description here

8) Submit the task for execution

[[email protected] flink-1.11.1]# bin/flink run ./examples/batch/WordCount.jar

Submitting tasks can be submitted on any flink client server. In this example, vm1, vm2, and vm3 can be submitted.
insert image description here

  1. stop flink cluster

bin/stop-cluster.sh

10) Start and stop processes individually

Manually start and stop the main process StandaloneSessionClusterEntrypoint

[ [email protected] flink-1.11.1 ] # bin/jobmanager.sh start 
[ [email protected] flink-1.11.1 ] # bin/jobmanager.sh stop
  • 1
  • 2

Manually start and stop TaskManagerRunner (often used to add new slave nodes to the cluster)

[ [email protected] flink-1.11.1 ] # bin/taskmanager.sh start 
[ [email protected] flink-1.11.1 ] # bin/taskmanager.sh stop
  • 1
  • 2

Flink on YARN mode

In the era of containerized deployment, Flink on Yarn came into being. The Flink on Yarn mode uses YARN as the task scheduling system, that is, starts and runs flink on YARN. The advantage is that it can make full use of cluster resources and improve server utilization. The premise of this mode is to have a Hadoop cluster, and only need to share a set of Hadoop clusters to execute MapReduce, Spark and Flink tasks, which is very convenient. Therefore, you need to build a hadoop cluster first.

Hadoop cluster construction

1) Download and extract to the specified directory

Download the Hadoop binary package from the official website , upload it to the Linux server, and extract it to the specified directory.

[ [email protected] ~ ] # tar -zxvf hadoop-2.9.2.tar.gz -C /usr/local/myapp/hadoop/
  • 1

2) Configure environment variables

vim /etc/profile

export  HADOOP_HOME = /usr/local/myapp/hadoop/hadoop-2.9.2/
 export  PATH = $PATH : $HADOOP_HOME /bin
  • 1
  • 2

Execute hadoop version to view the version number

[ [email protected] hadoop ] # source /etc/profile 
[ [email protected] hadoop ] # hadoop version 
Hadoop 2.9 .2
  • 1
  • 2
  • 3

3) Modify the hadoop-env.sh file

Modify the configuration export JAVA_HOME=${JAVA_HOME} to specify the JAVA_HOME path:

export  JAVA_HOME = /usr/local/myapp/jdk/jdk1.8.0_261/
  • 1

At the same time specify the Hadoop log path, first create a directory:

[ [email protected] ] # mkdir -p /data/hadoop_repo/logs/hadoop
  • 1

Reconfigure HADOOP_LOG_DIR

export  HADOOP_LOG_DIR = /data/hadoop_repo/logs/hadoop
  • 1

4) Modify the yarn-env.sh file

Specify the JAVA_HOME path

export  JAVA_HOME = /usr/local/myapp/jdk/jdk1.8.0_261/
  • 1

Specify the YARN log directory:

[ [email protected] ~ ] # mkdir -p /data/hadoop_repo/logs/yarn
  • 1
export  YARN_LOG_DIR = /data/hadoop_repo/logs/yarn
  • 1

4) Modify core-site.xml

Configure the address fs.defaultFS of the NameNode and the Hadoop temporary directory hadoop.tmp.dir

The data files of the NameNode and DataNode will be stored in the corresponding subdirectories of the temporary directory.

< configuration > 
< property > 
   < name > fs.defaultFS </ name > 
   < value > hdfs://vm1:9000 </ value > 
 </ property > 
 < property > 
   < name > hadoop.tmp.dir </ name > 
   < value > /data/hadoop_repo </ value > 
 </ property > 
</ configuration >
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

6) Modify hdfs-site.xml

dfs.namenode.secondary.http-address specifies the http address of the secondaryNameNode. In this example, the vm2 machine is set as the SecondaryNameNode

< configuration > 
 < property > 
   < name > dfs.replication </ name > 
   < value > 2 </ value > 
 </ property > 
 < property > 
   < name > dfs.namenode.secondary.http-address </ name > 
   < value > vm2:50090 </ value > 
 </ property > 
</ configuration >
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

7) Modify yarn-site.xml

yarn.resourcemanager.hostname specifies the server address of resourcemanager. In this example, the vm1 machine is set as the master node of hadoop

< configuration > 
< property > 
   < name > yarn.nodemanager.aux-services </ name > 
   < value > mapreduce_shuffle </ value > 
</ property > 
< property > 
   < name > yarn.resourcemanager.hostname </ name > 
   < value > vm1 </ value > 
</ property > 
</ configuration >
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

8) Modify mapred-site.xml

[[email protected] hadoop]# mv mapred-site.xml.template mapred-site.xml

< configuration > 
< property > 
   < name > mapreduce.framework.name </ name > 
   < value > yarn </ value > 
</ property > 
</ configuration >
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

The mapreduce.framework.name setting uses yarn to run the mapreduce program

9) Configure slaves

Set vm2 and vm3 as Hadoop secondary nodes

[[email protected] hadoop]# vim slaves

vm2
vm3
  • 1
  • 2

10) Set up password-free login

For password-free configuration, refer to the previous section to set up mutual password-free login between servers.

11) Copy hadoop to other machines

Copy the Hadoop directory configured on vm1 to other servers

[ [email protected] hadoop ] # scp -r /usr/local/myapp/hadoop/ vm2:/usr/local/myapp/ 
[ [email protected] hadoop ] # scp -r /usr/local/myapp/hadoop/ vm3:/ usr/local/myapp/
  • 1
  • 2

12) Format HDFS

Execute the format command on the master node vm1 of the Hadoop cluster

[ [email protected] bin ] # pwd
/usr/local/myapp/hadoop/hadoop-2.9.2/bin
[ [email protected] bin ] # hdfs namenode -format
  • 1
  • 2
  • 3

If you want to reformat the NameNode, you need to delete all the files under the original NameNode and DataNode first, otherwise an error will be reported. The directory where NameNode and DataNode are located is incore-site.xmlmiddlehadoop.tmp.dir,dfs.namenode.name.dir,dfs.datanode.data.dirproperty configuration

13) Start the cluster

Start all processes directly

[[email protected] hadoop-2.9.2]# sbin/start-all.sh

It is also possible to start HDFS separately

sbin/start-dfs.sh

It is also possible to start YARN separately

sbin/start-yarn.sh

14) View web pages

To access the virtual machine through http on the local machine, first turn off the linux firewall. Please refer to the previous article to turn off the linux firewall.

Check out the HDFS web page:

http://vm1:50070/

Check out the YARN web page:

http://vm1:8088/cluster

15) View each node process

[ [email protected] ~ ] # jps 
5026 ResourceManager
 5918 Jps
 5503 NameNode

[ [email protected] ~ ] # jps 
52512 NodeManager
 52824 Jps
 52377 DataNode
 52441 SecondaryNameNode

[ [email protected] ~ ] # jps 
52307 DataNode
 52380 NodeManager
 52655 Jps

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

16) Stop the Hadoop cluster

[[email protected] hadoop-2.9.2]# sbin/stop-all.sh

After the Hadoop cluster is built, you can run Flink on Yarn!

Two ways of Flink on Yarn

Type 1: Pre-initialize a Flink cluster in YARN, occupying fixed resources in YARN. The Flink cluster resides in YARN, and all Flink tasks are submitted here. The disadvantage of this method is that the Flink cluster will monopolize system resources regardless of whether or not Flink tasks are executed unless manually stopped. If the resources allocated to the Flink cluster in YARN are exhausted, the next Flink job can only be submitted normally after a job in YARN is executed to complete the release of resources. This method is suitable for small-scale, short-term computing tasks.

Type 2: Apply for resources to YARN separately each time you submit a Flink task, that is, create a new Flink cluster on YARN each time. After the task is executed, the Flink cluster is terminated and no longer occupies machine resources. In this way, different Flink tasks are independent of each other and do not affect each other. This method can maximize resource utilization and is suitable for long-term, large-scale computing tasks.

The specific steps of the two methods are described below.

1st way

Either way, run the Hadoop cluster first

1) Start the Hadoop cluster

[[email protected] hadoop-2.9.2]# sbin/start-all.sh

2) Copy the hadoop related jar package that flink depends on to the flink directory

[ [email protected] ] # cp /usr/local/myapp/hadoop/hadoop-2.9.2/share/hadoop/yarn/hadoop-yarn-api-2.9.2.jar /usr/local/myapp/flink/flink- 1.11.1/lib 
[ [email protected] ] # cp /usr/local/myapp/hadoop/hadoop-2.9.2/share/hadoop/yarn/sources/hadoop-yarn-api-2.9.2-sources.jar /usr /local/myapp/flink/flink-1.11.1/lib
  • 1
  • 2

You also need flink-shaded-hadoop-2-uber-2.8.3-10.0.jar, which can be downloaded from the maven repository and placed in the lib directory of flink.

3) Create and start the flink cluster

Execute in the installation directory of flink

bin/yarn-session.sh -n 2 -jm 512 -tm 512 -d
  • 1

This method creates a running flink cluster, also known as flink yarn-session

Since the Flink cluster in Yarn mode is started by yarn, you can check whether any flink tasks are running successfully on the yarn console, that is, the hadoop cluster management page: http://vm1:8088/cluster
insert image description here

After the creation is successful, the flink console will output the access address of the web page, and you can view the execution of the flink task on the web page:
insert image description here

The console output http://vm2:43243 It can be considered that the Jobmanager process of flink is running on vm2, and the port is 43243. This address + port can be used when specifying host and port to submit flink tasks

4) Attach to the flink cluster

After a flink cluster is created, there will be a corresponding applicationId, so when executing a flink task, it can also be attached to an existing and running flink cluster

#Attach to the specified flink cluster 
[ [email protected] flink-1.11.1 ] # bin/yarn-session.sh -id application_1602852161124_0001
  • 1
  • 2

The applicationId parameter is the corresponding applicationId when the flink cluster was created in the previous step

5) Submit the flink task

You can run the wordcount example that comes with flink:

[[email protected] flink-1.11.1]# bin/flink run ./examples/batch/WordCount.jar

You can see the running record on the flink web page http://vm2:43243/:
insert image description here

The input data directory and output data directory can be manually specified by -input and -output:

-input hdfs://vm1:9000/words
-output hdfs://vm1:9000/wordcount-result.txt

2nd way

This method is very simple, which is to create a flink cluster at the same time when submitting a flink task

[[email protected] flink-1.11.1]# bin/flink run -m yarn-cluster -yjm 1024 ./examples/batch/WordCount.jar

The environment variable YARN_CONF_DIR, HADOOP_CONF_DIR or HADOOP_HOME environment variable needs to be configured on the machine that executes the above command (that is, the flink client). Flink will read the configuration information of YARN and HDFS through this environment variable.

If you get the following error, you need to disable hadoop virtual memory checking:

Diagnostics from YARN: Application application_1602852161124_0004 failed 1  times  ( global limit = 2 ;  local limit is = 1 ) due to AM Container for appattempt_1602852161124_0004_000001 exited with exitCode: -103
Failing this attempt.Diagnostics: [ 2020-10-16 23 :35:56.735 ] Container [ pid = 6890 , containerID = container_1602852161124_0004_01_000001 ] is running beyond virtual memory limits. Current usage: 105.8 MB of 1 GB physical memory used ;  2.2 GB of 2.1 GB virtual memory used. Killing container.
  • 1
  • 2

Modify the file $HADOOP_HOME/etc/hadoop/yarn-site.xml for all hadoop machines (all nodemanagers)

< property >   
    < name > yarn.nodemanager.vmem-check-enabled < /name >   
    < value > false < /value >   
< /property >
  • 1
  • 2
  • 3
  • 4

Restart hadoop cluster to run again

[ [email protected] hadoop-2.9.2 ] # sbin/stop-all.sh 
[ [email protected] hadoop-2.9.2 ] # sbin/start-all.sh 
[ [email protected] flink-1.11.1 ] # bin/flink run -m yarn-cluster -yjm 1024 ./examples/batch/WordCount.jar
  • 1
  • 2
  • 3

The task is executed successfully and the console output is as follows. The tasks can be viewed using the web page address vm3:44429 of the console output. However, in this mode, the Flink cluster is terminated after the task is executed, so you may not see the result when you enter the address vm3:44429, because the task may be executed at this time, the flink cluster is terminated, and the page cannot be accessed.
insert image description here

Two commands are used in the above two cases of Flink On Yarn: yarn-session.sh and flink run

yarn-session.sh can be used to create and start a flink cluster on Yarn. You can view common parameters with the following commands:

[[email protected] flink-1.11.1]# bin/yarn-session.sh -h

-n : Indicates the number of allocated containers, that is, the number of TaskManagers

-s : The number of slots for each TaskManager, generally set according to the number of cpu cores

-jm: Set jobManagerMemory, that is, the memory of JobManager, in MB

-tm: Set taskManagerMemory, that is, the memory of TaskManager, in MB

-d: Set the running mode to detached, that is, the background runs independently

-nm: set the name of the application running on YARN

-id: The applicationId of the specified task on the YARN cluster, attached to the yarn session running independently in the background

The flink run command can either submit tasks to the Flink cluster for execution, or create a new flink cluster when submitting tasks. You can view common parameters with the following commands:

[[email protected] flink-1.11.1]# bin/flink run -h

-m: Specifies the address of the master node (JobManger), the JobManger address specified in this command takes precedence over the one in the configuration file

-c: Specify the entry class of the jar package, this parameter is before the name of the jar package

-p: specifies the degree of parallelism of the task, which also overrides the value in the configuration file

Example of using flink run:

1) Submit and execute the flink task, and find the JobManager of the yarn-session existing in the current YARN cluster by default

[ [email protected] flink-1.11.1 ] # bin/flink run ./examples/batch/WordCount.jar -input hdfs://vm1:9000/hello.txt -output hdfs://vm1:9000/result_hello
  • 1

2) When submitting the flink task, explicitly specify the port of the host of the JobManager. The domain name and port are output from the console when the flink cluster is created.

[ [email protected] flink-1.11.1 ] # bin/flink run -m vm3:39921 ./examples/batch/WordCount.jar -input hdfs://vm1:9000/hello.txt -output hdfs://vm1: 9000/result_hello
  • 1

3) Start a new Flink cluster in YARN and submit the task

[ [email protected] flink-1.11.1 ] # bin/flink run -m yarn-cluster -yjm 1024 ./examples/batch/WordCount.jar -input hdfs://vm1:9000/hello.txt -output hdfs:/ /vm1:9000/result_hello
  • 1

-m yarn-cluster is a fixed way of writing, which tells flink not to go to the standalone cluster and the yarn session cluster, but to start a cluster based on the currently submitted job.

For the complete tutorial, please subscribe to the " Author's Column Collection ", thank you for your support!

Tags: Flink Tutorial (1) - Cluster Installation and Deployment

Flink practical tutorial flink in action flink flink tutorial

Related: Flink Tutorial (1) - Cluster Installation and Deployment