Spark 部署文档-如何向 Spark 上提交应用程序-2

Descriptions:
最近正在系统阅读 Spark 官方文档,阅读的同时也试着翻译了其中部分的章节,本篇文档原文链接地址:http://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management
个人水平有限,若翻译有不恰当之处欢迎指正,邮箱地址 kylin27@outlook.com


Submitting Applications

如何向 Spark 上提交应用程序

The spark-submit script in Spark’s bin directory is used to launch applications on a cluster.
位于 Spark bin 目录下面的 spark-submit 脚本是用来在集群上启动应用程序的。

It can use all Spark’s supported cluster cluster managers through a uniform interface so you don’t have to configure your application specially for each one.
spark-submit 启动脚本通过统一接口来支持 Spark 中的多种集群(资源)管理器(译者: 比如说 Spark 自带的资源管理器,或是 Mesos ,YARN 这种第三方资源调度框架),所以你无需因为管理器的不同来重新配置你的应用程序。

Building Your Applications’s Dependencies

如何构建依赖于其他库的应用程序

If your code depends on other projects, you will need to package them alongside your application in order to distribute the code to a Spark cluster.
如果你所编写的代码依赖于其他的项目的话,建议你在打包时将代码依赖的项目和你所编写的应用放到一起,这样的话可以保证依赖项目可以随代码一起被分发到 Spark 集群上。

To do this, create an assembly jar(or “uber” jar) containing your code and its dependencies.
可以通过创建一个组装 Jar 包,在该 Jar 中既包含你编写的代码又包含代码所依赖的文件,这样的话便可以实现自定义代码所依赖项目会随应用程序一起被分发了。

Both sbt and Maven have assembly plugins.
sbtMaven 这两种项目管理工具均有可生成上述的组装 Jar 包的插件。

When creating assembly jars, list Spark and Hadoop as provided dependencies; these need not be bundled since they are provided by the cluster manager at runtime.
当打包组装 Jar 包的时候,会将你所编写代码所依赖的 Spark 和 Hadoop 库显示出来; 不过你并不需要将 Spark 和 Hadoop 的库文件加载到 Jar 包中的,因为这些依赖文件将会在应用程序的运行期由 Spark 集群资源管理器来提供。

Once you have an assembled jar you can call the bin/spark-submit script as shown here while passing your jar.
一旦 Jar 文件被成功创建,你便可以通过调用位于 bin 目录下面的也是当前我们正在介绍的这个 spark-submit 脚本来将你的 Jar 文件提交到 Spark 集群上了。

For Python, you can use the –py-files argument of spark-submit to add .py, .zip or .egg files to be distributed with your application.
如果依赖的是 Python 语言编写的库的话,你可以在使用 spark-submit 脚本的时候通过后接 –py-files 这个参数选项来将以 .py,.zip 或是 .egg 为后缀文件和你的应用一起发布到 Spark 集群上面。

If you depend on multiple Python files we recommend packaging them into a .zip or .egg.
如果你的应用程序依赖于不止一个 Python 文件,建议你将多个 Python 文件打包成 .zip 或是 .egg 类型的文件。

Launching Applications with spark-submit

使用 spark-submit 脚本来发布应用程序到 Spark 集群

Once a user application is bundled, it can be launched using the bin/spark-submit script.
一旦用户自定义应用程序和依赖文件被成功绑定,便可以使用 bin 路径下的 spark-submit 脚本将其发布到 Spark 上。

This script takes care of setting up the classpath with Spark and its dependencies, and can support different cluster managers and deploy modes that Sparks supports.
spark-submit 脚本支持为 Spark 和其所依赖的库来设定 classpath,同时 spark-submit 启动脚本后接的多种参数选项可以很好的配合 Spark 支持的多种集群管理器与启动模式:

ps: 由于博客支持符号有限,将下面的 # 替换成左右尖括号

./bin/spark-submit \
  --class  #main-class#     \
  --master #master-url#      \
  --deploy-mode #deploy-mode# \
  --conf #key#=#value#           \
  ... # other options 
  #application-jar#                \
  [application-arguments]

Some of the commonly used options are:
spark-submit 脚本中常用的参数选项描述如下:

  • –class: The entry point for your application(e.g. org.apache.spark.examples.SparkPi)
  • –class: 这个参数选项是用来指定整个应用程序的入口点的(例如, Spark 源码包中的 SparkPi 这个类就可以看做是整个应用的入口点)。

  • –master: The master URL for the cluster(e.g. spark://23.195.26.187:7077)

  • –master: 这个参数选项是用来指定集群主节点的 URL 地址的(例如你可以将主结点的 IP 设定为 23.195.26.187:7077 然后通过 spark://23.195.26.187:7077 来访问主结点)。

  • –deploy-mode: Whether to deploy your driver on the worker nodes(cluster) or locally as an external client(client)(default: client)

  • –deploy-mode: 这个选项是用来设定你在哪里启动驱动程序的,是在工作结点上(这个是集群模式)还是作为外部客户端在本地启动的(这个是客户端模式),默认的缺省部署模式是本地启动的客户模式。

  • –conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes(as shown).

  • –conf: Spark 中用来以键值对方式强制改写配置信息的参数选项。如果键值对的数值中有空格,可以使用引号来包装”键=值”。(像这样 “键=值”)

  • application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs://path of a file:// path that is present on all nodes.

  • application-jar: 这个参数选项用来指定包含你的应用程序和其所依赖的文件在内绑定好的 jar 文件的路径。 如果使用 URL 来定位的话,这个 URL 对于整个集群来说一定要是全局可见的,例如,若是在 HDFS 上面以 hdfs:// 开头, 或是如果文件路径在所有结点上都存在则可以以 file:// 来开头。

  • application-arguments: Arguments passed to the main method of your main class,if any.

  • application-arguments: 如果你所编写的应用程序的主函数入口在运行的时候需要传参的话,使用这个参数选项并后接需要传递给主函数的参数,参数便可以正确地传递给应用程序的主函数。

A common deployment strategy is to submit your application from a gateway machine that is physically co-located with your worker machines(e.g. Master node in a standalone EC2 cluster).
一种比较常用的在 Spark 集群上部署应用程序的方式是在网关主机上提交你的应用,这个网关主机指的就是在集群中和其他工作主机通过物理网络是互相可达的。(例如在独立模式的 EC2 集群中的主结点就扮演着网关主机的角色)。

In this setup, client mode is appropriate. In client mode, the driver is launched directly within the spark-submit process which acts as a client to the cluster.
启动 Spark 时, 客户/本地模式是很推荐的启动模式。 在客户模式下启动,驱动程序会以进程成员的方式随 spark-submit 一起直接被启动,该进程扮演着访问集群的客户端的角色。

The input and output of the application is attached to the console.
应用程序的输入输出信息可通过控制台上来直接访问。

Thus, this mode is especially suitable for applications that involve the REPL(e.g. Spark shell).
正因如此这种模式尤其适合使用到 REPL 表达式(例如 Spark shell 脚本)的应用程序。

Alternatively, if your application is submitted from a machine far from the worker machines(e.g. locally on your laptop), it is common to use cluster mode to minimize network latency between the drivers and the executors.
相应的,如果你提交应用程序的主机距离其所支配的工作主机很远的话(比如说,你使用笔记本来执行本地任务提交),推荐你使用集群这种部署方式来启动 Spark 以减少驱动程序和执行器二者之间网络通信所带来的延迟。

Note that cluster mode is currently not supported for Mesos clusters.
值得注意的是,目前 Spark 的集群并不适用于由 Mesos 管理的集群中的。

Currently only YARN supports cluster mode for Python applications.
目前只有基于 YARN 管理器的 Spark 集群支持 Python 应用程序的运行。

For Python applications, simply pass a .py file in the place of instead of a JAR, and Python .zip, .egg or .py files to the search path with –py-files.
关于如何向 Spark 集群提交 Python 应用程序,只需要简单地将在参数选项 后面原本追加 JAR 文件的地方使用你所要上传的 .py 文件即可,如果是多个 .py 文件的话,可以将其打包成 .zip ,.egg 文件包或者使用 –py-files 参数来制定该多个 .py 文件的搜索路径名称都可以。

There are a few options available that are specific to the cluster manager that is being used.
专门用于处于运行中状态的集群管理器的命令参数选项并不多。

For example, with a Spark standalone cluster with cluster deploy mode, you can also specify –supervise to make sure that the driver is automatically restarted if it fails with non-zero exit code.
例如以独立模式启动的 Spark你可以通过 –supervise 这个命令参数是来确保当驱动程序以返回值非零的状态退出之后(译者:也就是错误状态退出的时候),该驱动程序可以实现自动重启。

To enumerate all such options avaialable to spark-submit, run it with –help. Here are a few examples of common options:
如果想一一列举 spark-submit 脚本可用的参数选项信息的话,可以在启动 spark-submit 脚本的时候输入 –help 选项。 下面是使用 spark-submit 脚本常用的参数选项举例:

# Run application locally on 8 cores
# 以本地 8 核的方式来向 Spark 提交应用程序

./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master local[8]  \
  /path/to/examples.jar \
  100

译者注: 上述的 spark-submit 提交命令解释入下
--class org.apache.spark.examples.SparkPi 
这条命令所对应的是用于指定提交给 Spark 的 Jar 文件的主入口函数的
(也就是包含 main() 入口函数的类,并且在其中成功创建 SparkContext 对象实例的所在类)所在类

--master 
这个参数选项,应该指定(暂时还不太清楚这个)

/path/to/examples.jar 
这个参数选项用来指定提交给 Spark 的 Jar 文件在本地的路径信息

100 
这个参数是用来传递给 Jar 中的主入口函数在启动的时候需要向 
main(String [] args ){...} 传入的参数,
当然如果主入口函数不需要传入参数的话,这个参数选项可以不加。 
# Run on a Spark standalone cluster in client deploy mode
# 以客户部署模式来启动独立 Spark 集群

./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --executor-memory 20G  \
  --total-executor-cores 100 \
  /path/to/examples.jar  \
  1000

译者注: 上述的 spark-submit 提交命令注释如下
--class org.apache.spark.examples.SparkPi
主函数入口所在类

--master spark://207.184.161.138:7077
用 URL 来定位 Spark 集群中的主结点

--executor-memory 20G 
用来为每个执行器进程分配内存空间

--total-executor-cores 100 
用来指定 Spark 开启多少个执行者进程

/path/to/examples.jar 
用来指定用户提交 Jar 文件所在目录信息

1000
用户提交的 Jar 文件中主函数启动所需要的参数

# Run on a Spark standalone cluster in cluster deploy mode with supervise
# 使用监控着来以集群部署模式来启动独立 Spark 集群

./bin/spark-submit  \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --deploy-mode cluster
  --supervise
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar \
  1000 

# Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
 --class org.apache.spark.examples.SparkPi \
 --master yarn \
 --deploy-mode cluster \  # can be client for client mod
 --executor-memory 20G \
 --num-executors 50  \
 /path/to/examples.jar \
 1000

# Run a Python application on a Spark standalone cluster
# 在独立的 Spark 集群上运行 Python 应用程序

./bin/spark-submit   \
  --master spark://207.184.161,138:7077 \
  examples/src/main/python/pi.py  \
  1000

# Run on a Mesos cluster in cluster deploy mode with supervise 
# 使用 supervise 监控在集群部署模式下启动运行在 Mesos 集群上的 Spark 

./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master mesos://207.184.161.138:7077 \
  --deploy-mode cluster
  --supervise
  --executor-memory 20G \
  --total-executor-cores 100 \
  http://path/to/examples.jar \
  1000

Master URLs

关于主结点 URL 地址描述格式

The master URL passed to Spark can be in one of the following formats:
主节点的 URL 地址以参数选项 –master 的方式传递给 Spark ,不过该主结点的 URL 地址需要遵循如下描述的格式:

Master URL / Meaning
主结点 URL / 该 URL 地址所适用的场合

  • local
  • Runing Spark locally with one worker thread (i.e. no parallelism at all).
  • 此种主结点 URL 地址描述适用于仅开启一个工作者线程的本地 Spark 运行模式(也就是说,运行于此模式下的 Spark 并不支持并行)

  • local[K]
  • Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
  • 此证 URL 地址描述适用于开启 K 个工作者线程且本地模式启动 Spark 的场景,(通常情况下,理想的方式是运行 Spark 主机 CPU 中有多少个核便相应地开启多少个线程)

  • local[*]
  • Run Spark locally with as many worker threads as logical cores on your machine.
  • 本地模式启动 Spark 且让当前主机的 CPU 中的内核 AT 力场全开的运行尽量多的线程。

  • spark://HOST:PORT
  • Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default.
  • 这种 URL 地址适用于用于连接以独立集群的方式启动的 Spark 上的主结点。 端口号要先对其进行配置然后才可以使用,默认的端口号是 7077 .

  • mesos://HOST:PORT
  • Connect to the given Mesos cluster. The port must be whichever one your is configured to use, which 5050 by default. Or, for a Mesos cluster using Zookeeper, use mesos://zk://…. To submit with –deploy-mode cluster, the HOST:PORT should be configured to connect to the MesosClusterDispatcher.
  • 当想要连接使用 Mesos 管理的 Spark 集群中的主结点的时候,使用这个 URL 地址就对了。 端口号在使用之前必须要通过相关配置文件加以设定,缺省端口号是 5050. 或者是, 如果这个 Mesos 使用 Zookeeper 框架的话,那么 URL 地址就相应地变成 mesos://zk://….。 可以使用 –deploy-mode cluster 来向 Mesos 组织的 Spark 集群上提交应用程序,而 HOST:PORT 应该被配置成 Mesos 集群分配器的 URL 访问地址。

  • yarn
  • Connect to a YARN cluster in client or cluster mode depending on the value of –deploy-mode. The cluster location will be found based on the HADOOP_CONF_DIR or YARN_CONF_DIR variable.
  • yarn 常用作连接使用 YARN 管理器的以本地客户端模式或是集群模式启动的 Spark 集群中的主结点, yarn 是 –deploy-mode 这个参数选项的后接值。 集群的地址可通过查找相关配置文件中的 HADOOP_CONF_DIR 或是 YARN_CONF_DIR 环境变量值来定位。

  • yarn-client
  • Equivalent to yarn with –deploy-mode client, which is preferred to ‘yarn-client’.
  • 该 URL 等同于以 yarn 的客户/本地模式启动 Spark 的时候后接 –deploy-mode 这个参数选项,不过前者更加被人所熟知。

  • yarn-cluster
  • Equivalent to yarn with –deploy-mode cluster, which is prefered to ‘yarn-cluster’
  • 这个 URL 地址使用与以 yarn 集群 方式启动的时候使用 –deploy-mode cluster 这个参数选项,也通常人们更喜欢用 ‘yarn-cluster’ 。

Loading Configuration from a File

从文件中来加载配置选项

The spark-submit script can load default Spark configuration values from a properties file and pass them on to your application.
Spark 中用于提交应用的 spark-submit 脚本可以从配置文件中加载默认的 Spark 配置信息,并将这些配置信息应用在你所提交的应用程序中。

By default it will read options from conf/spark-defaults.conf in the Spark directory.
默认情况下, spark-submit 脚本会从 Spark 所在路径下的 conf/spark-default.conf 配置文件中来读取配置选项信息。

For more detail, see the section on loading default configurations.
若想要进一步了解关于配置信息加载的问题,可以查阅如何加载默认配置信息这篇文章。

Loading default Spark configurations this way can obviate the need for certain flags to spark-submit. For instance, if the spark.master property is set, you can safely omit the –master flag from spark-submit.
如果 Spark 配置文件书写得当的话,可以减少在运行 spark-submit 脚本的时候后接参数的数目。 例如,如果 spark.master 这个参数的选项已经在配置脚本中设定好了,那么在调用 spark-submit 的时候可以省去不写 –master 这个参数选项。

In general, configuration values explicitly set on a SparkConf take the highest precedence, then flags passed to spark-submit, then values in the defaults file.
通常情况下不同方式的配置数值之间是有着明确的优先级的,其中通过 SparkConf 对象实例所设定的配置参数享有最高的优先级, 接下来是运行 spark-submit 脚本时所传递的参数,最后是写在配置文件中的默认选项信息。

If your are ever unclear where configuration options are coming from, you can print out fine-grained debugging information by running spark-submit with the –verose option.
如果你对 Spark 中的某些配置选项不是很理解,不知道某些参数是用来做什么的话,建议在运行 spark-submit 脚本的时后接 –verose 参数这个选项,这样就可以将每个后接参数的调试信息详细打印出来了。

Advanced Dependency Management

##
When using spark-submit, the application jar along with any jars included with the –jars option will be automatically transferred to the cluster.
当运行 spark-submit 脚本时,应用程序连同任何 –jars 参数选项后接的 jar 文件都会被自动的提交给集群。

Spark uses the following URL scheme to allow different strategies for disseminating jars:
Spark 使用如下所示的多种 URL 模式来实现不同策略的 jar 文件的分发:

  • file: - Absolute path and file:/URIs are servered by the driver’s HTTP file server, and every executor pulls the file from the driver HTTP server.
  • file: - 此种文件描述方式使用的是本地文件所在的绝对路径地址以及使用 file:/URL 的文件路径描述方式是由文件 HTTP 服务器所提供的文件定位服务。位于每个结点上的执行器也均是从 HTTP 文件服务器上来抽取其所需要的文件。

  • hdfs:, http:, https:, ftp: - these pull down files and JARs from the URI as expected

  • hdfs: 此种文件路径描述是以 http, https, ftp 文件传输协议根据文件的 URI 描述地址来抓取普通和 JAR 类型的文件的。
  • local: - a URI starting with local:/ is expected to exist as a local file on each worker node. This means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker, or shared via NFS, ClusterFS, etc.
  • 使用 local 来作为文件路径描述的话,通常是按照 local:/ + 文件的 URI 地址这种格式,不过若以次种文件描述方式,需要每个工作结点的本地路径上都需要有该文件的备份。 这便意味着在整个过程中不会涉及到文件在网络进行传输,且由于大块的普通 JAR 文件都已经被推送到每个工作结点本地或是以 NFS, ClusterFS 文件系统的方式进行共享,所以在这种情景下其工作效率十分的高效。
    (译者注: 不过将大文件冗余地存放到如此多的结点上所带来的开销也是很大的,类似于算法中的空间与时间二者之间的权衡)

Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes.
需要知道的是 JAR 和 普通文件会被拷贝到位于每个执行结点上所创建的的 SparkContext 的工作路径下。

This can use up a significant amount of space over time and will need to be cleaned up.
这是一种会随着时间推移十分吃内存的处理方法,所以会涉及到空间的清理与回收操作。

With YARN, cleanup is handled automatically, and with Spark standalone, automatic cleanup can be configured with the spark.worker.cleanup.appDataTtl property.
如果使用 YARN 这种资源管理框架的话,空间资源的清理与回收会被自动的执行;如果使用 Spark 自带的资源管理器的话,可以通过配置 spark.worker.cleanup.appDataTlt 这个选项来决定让 Spark 是否能够自动清理与回收空间。

Users may also included any other dependencies by supplying a comma-delimited list of maven coordinates with –packages.
若是用户希望包含其他 maven 相关的依赖文件,还在命令后配合使用 –packages 这个参数选项;若是不止一个依赖文件可使用逗号作为分隔符。

All transitive dependencies will be handled when using this command.
使用上述的这个参数选项的话,依赖文件的所有隐式依赖文件也一并会被处理。

Additional repositories( or resolvers in SBT) can be added in a comma-delimited fashion with the flag –repositories.
另外,项目代码资源库(或是使用 SBT 作为版本控制项目)也可以以逗号分隔符的方式配合 –repositories 这个参数选项来使用。

These commands can be used with pyspark, spark-shell, and spark-submit to include Spark packages.
上述的这些命令可以配合 Spark 安装包中的 pyspark,spark-shell 和 spark-submit 这些脚本中的任意一个脚本使用。

For Python, the equivalent –py-files option can be used to distribute .egg, .zip and .py libraries to executors.
如果是上传以 Python 编写的项目代码的话,可以相应地使用 –py-files 这个参数选项来将打包成 .egg,.zip 的 python 文件,或是 python 库文件本身分发到位于每个结点的执行器上。

More Information

关于进一步学习

Once you have deployed your application, the cluster mode overview describes the components involved in distributed execution, and how to monitor and debug applications.
一旦你将你的应用部署到集群之上,在关于集群模式概览一文中介绍了 Spark 在分布式环境下运行时所需要的组件,以及如何监控与调试你所部署的应用程序。

end

Spark 部署文档-Spark 集群模式概览-1

Descriptions:

最近正在系统学习 Spark ,本篇文章翻译自 Spark 官方指导文档 如有不当之处敬请指正,联系邮箱: kylin27@outlook.com


Cluster Mode Overview

Spark集群模式概览

This document gives a short overview of how Spark runs on clusters, to make it easier to understand the components involved.

本文档简要介绍了 Spark 的集群运行模式,目的是为了让用户更加容易了解 Spark 中涉及到的组件。

Read through the application submission guide to learn about launching applications on a cluster.

用户可以通读如何向 spark 集群提交应用程序指导手册来了解如何在集群上运行应用程序。

Components

Spark 中的组件

Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in your main program(called the driver program).

Spark 应用程序以独立的进程集合运行与集群之上,并被位于主程序(这个主程序也被称作是驱动程序)中称作 Spark 上下文(SparkContext)的对象实例所协调调度。

Specifically, to run on a cluster, the SparkContext can connect to several types of cluster managers(either Spark’s own standalone cluster manager, Mesos or YARN), which allocate resources across applications.

特别值得注意的是,以集群运行的 Spark 中的 SparkContext 可以连接多种类型的用于跨应用程序分配资源的集群调度器(这个调度器既可以是 Spark 自己独立的集群资源管理器,也可以是 Mesos 或是 YARN).

Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application.

一旦 Spark 与资源调度器二者相连接,Spark 便会向位于集群中的多个结点索取执行器资源。这些执行器由多个进程构成的专门用来为你的应用程序提供计算和存放数据的功能。

Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors.

接下来你所提交的应用程序代码(被传递给 Spark 上下文对象实例的应用程序代码是以 JAR 或者是 Python 文件所存放的)会被发送给位于集群中各个结点的执行器上。

Finally, SparkContext sends tasks to the exeuctors to run.

最后,Spark 上下文对象便会将任务分发给执行器来运行。

There are several useful things to note about this architecture:
对于上述的这种架构下面是几点有用的建议:


1. Each application gets its own executor processes, which stay up for the duration of the whole application and runs tasks in multiple threads. This has the benefit of isolating applications from each other, on both the scheduling side( each driver schedules its own tasks) and executor side (tasks from different applications run in different JVMs). However, it also means that data cannot be shared across different Spark applications (instances of SparkContext) without writing it to an external storage system.

1. 每个应用程序都有着属于它自己的多个执行器进程,这些执行器进程会在该应用程序的整个生命周期中存活,并且将分配给它们的任务以多线程的方式来运行。 这种机制有利于让应用程序彼此之间保持隔离,无论是资源调度方面的隔离(程序各自的驱动程序调度各自的任务),还是在程序在执行期间的隔离(隶属于不同应用的任务各自运行在不同的 JVM 中)。然而这种隔离同样也意味着位于 多个Spark 应用程序(就是包含 SparkContext 对象实例的程序)中的数据如果不将其写入到外存存储系统的话是无法在多个 Spark 应用程序间被共享的。


2. Spark is a agnostic to the underlying cluster manager. As long as it can acquire executor processes, and these communicate with each other, it is relatively easy to run it even on a cluster manager that also supports other applications (e.g. Mesos/YARN).

2. 在 Spark 集群底层可以使用多种类型的集群调度器来管理(agnostic 不可知,在这里指的是未来的 Spark 集群底层可能不仅仅支持一种资源调度器)。 只要这种资源调度器可以获取 Spark 中的执行进程并且支持进程彼此之间的正常通行就可以,即便将用于管理其他类型集群的像是 Mesos 或是 YARN 这些管理器用于管理 Spark 集群也并不是什么难事。


3. The driver program must listen for and accept incoming connections from its executors throughout its lifetime(e.g, see spark.driver.port and spark.fileserver.port in the network config section). As such, the driver program must be network addressable from the worker nodes.

3. 驱动程序必须在其整个生命周期内来监听与接收来自其执行器的连接请求(请看在网路配置章节中的 spark 驱动端口和 spark 文件服务器端口示例文档)。 如此一来,该驱动程序对工作结点来说必须要通过网络地址来定位查找到的才可以。


4. Because the driver schedules tasks on the cluster, it should be run close to the worker nodes, preferably on the same local area network. If you’d like to send requests to the cluster remotely, its’s better to open an RPC to the driver and have it submit operations from nearby than to run a driver far away from the worker nodes.

4. 由于驱动程序负责调度集群中的任务, 所以对于驱动程序来说它里其所调度的工作结点物理位置越近越好,最好通过本地地址可以直接访问到其所控制的工作节点。 如果你打算向远程集群发送请求信息的话,最好的处理方式便是通过创建一个到驱动程序 RPC 连接,通过 RPC 来决定向哪一个驱动程序提交请求操作信息,而不是调用一个物理距离其工作结点很远的驱动程序。

Cluster Manager Types

The system currently supports three cluster managers:

  • Standalone - a simple cluster manager included with Spark that makes it easy to set up a cluster.
  • 单机模式 - 该模式是基于 Spark 自身包含的集群资源管理器来启动 Spark 的,该资源管理器的作用便是是为了让集群易于启动。

  • Apache Mesos - a general cluster manager that can also run Hadoop MapReduce and service applications.

  • 基于 Apache Mesos 的集群模式 - Apache Mesos 是一种普遍被用于集群资源管理的调度器,也可以用作 Hadoop MapReduce 和其他服务应用的资源调度器。
  • Hadoop YARN - the resource manager in Hadoop 2.
  • 基于 Hadoop YARN 的集群模式 - Hadoop YARN 是应用于 Hadoop 2版本中的资源管理调度器。

In addition, Spark’s EC2 launch scripts make it easy to launch a standalone cluster on Amazon EC2.

除了上述的资源调度器之外, 通过运行 Spark 中的 EC2 启动脚本也可以很容易地在 Amazon 的 EC2 服务器上以单机模式来启动 Spark 。

Submitting Applications

关于如何向 Spark 上提交应用

Applications can be submitted to a cluster of any type using the spark-submit script.

可以使用Spark 自带的 spark-submit 脚本来将应用程序提交到上述 Spark 的任意一类集群的上。

The application submission guide describes how to do this.

这篇 [如何向 Spark 集群提交应用程序教程] 上详细记录了如何提交应用到 Spark 集群。

Monitoring

关于 Spark 集群监控

Each driver program has a web UI, typically on port 4040, that displays information about running tasks, executors, and storage usage.

每个驱动程序都有其用于展示任务运行,执行器状态以及存储详细信息的 web 图形展示界面,该图形界面的访问端口号通常是 4040。

Simply go to http://:4040 in a web browser to access this UI. The monitoring guide also describes other monitoring options.

(你只要)简单地在浏览器中输入这个网址 http://:4040 Spark 的图形用户界面就展示在你眼前了。

The monitoring guide also describes other monitoring options.

Spark 集群监控教程这一文档中同样也介绍了其他用于 Spark 集群监控的选项。

Job Scheduling

作业调度

Spark gives control over resource allocation both across applications(at the level of the cluster manager) and within applications(if multiple computations are happening on the same SparkContext).

Spark 同时在跨应用程序和在应用程序中充当着掌控资源分配的角色,在跨程序资源分配中 Spark 是作为集群管理者来进行资源分配,而当 SparkContext 中同时执行多个计算时 Spark 会在应用程序内进行资源分配。

The job scheduling overview describes this in more detail.

作业调度概览一文中对此进行了详细的介绍。

Glossary

Spark 集群术语列表

The following table summarizes terms you’ll see used to refer to cluster concepts:

下面表格总结了 Spark 集群中常用到的概念术语:

术语/解释

  • Application
  • 应用程序
  • Using program built on Spark. Consists of a driver program and executors on the cluster.
  • 运行在 Spark 平台上的程序。(这种程序通常是)由集群上的一个驱动(程序)和多个执行器(程序)所组成的。

  • Application jar
  • jar 文件格式的应用程序
  • A jar containing the user’s Spark application. In some cases users will want to create an “uber jar” containing their application along with its dependencies. The user’s jar should never include Hadoop or Spark libraries, however, these will be added at runtime.
  • 所谓的 jar 应用程序指的是将用户自己编写的 Spark 代码打包成 jar 应用程序。在某些场合用户会将应用程序和其所依赖的文件打包成一个 “uber jar” 类型文件。 但是值得注意的是,用户是万万不可在生成应用 jar 文件时将应用程序中所依赖的 Hadoop 或是Spark 库文件一并加载到其中的。应用程序中所使用的Hadoop 和 Spark 库会在应用程序的运行期由 Spark 集群平台来提供。

  • Driver program
  • 驱动程序
  • The process running the main() function of the application and creating the SparkContext
  • 驱动程序指的是代码中包含程序入口函数 main() 方法同时在代码中创建 SparkContext 对象的程序。

  • Cluster manager
  • 集群调度器/其实叫做集群管理器也行啦~
  • An external service for acquiring resources on the cluster( e.g. standalone manager, Mesos, YARN)
  • 用于在集群中获取资源的(不属于集群的)外部服务程序(就像是 Spark 单机结点启动的资源管理器,或是第三方像 Mesos , YARN 这样的资源调度框架)

  • Deploy mode
  • (Spark的)部署模式
  • Distinguishes where the driver process runs. In “cluster” mode, the framework launches the driver inside of the cluster. In “client” mode, the submitter lanuches the driver of the cluster.
  • 集群的部署模式这一术语是用来区分驱动程序在何处运行的。如果是以”集群”模式运行的,指的是 Spark 这个计算框架在集群中来调用驱动程序的。 如果是以”客户端”模式运行,那便是 Spark 中的 submitter/任务提交者在集群之外调用驱动程序来启动的。

  • Worker node
  • 工作结点
  • Any node that can run application code in the cluster.
  • 在集群中只要能跑应用程序代码的结点都叫做工作结点

  • Executor
  • 执行器
  • A process launched for an application on a worker node, that runs tasks and keeps data in memory or disk storage across them. Each application has its own executors.
  • 在用于运行任务并在内存或是磁盘上保存数据的工作结点上为执行应用程序而启动进程便可以称它是执行器。每个应用都有其自己的一组执行器。

  • Task
  • 任务
  • A unit of work that will be sent to one executor.
  • 任务指的是将会被发送给一个执行器的一系列工作。

  • Job
  • 作业
  • A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action(e.g. save, collect); you’ll see this term used in the driver’s logs.
  • 作业指的是由多个任务所构成的并行计算操作。 这些任务从 Spark 所响应的操作(例如存储,收集数据)中获取执行结果;作业在驱动程序所打印的日志文件中是比较常见的术语。

  • Stage
  • 阶段
  • Each job gets divided into smaller sets of tasks called stages that depend on each other(similar to the map and reduce stages in MapReduce); you’ll see this term used in the driver’s logs.
  • 每份作业在被执行的时候都会被分割成由更小粒度的任务构成的集合。而该任务又被称作是’阶段’,而这些多个阶段彼此间是相互依赖的(就和 MapReduce 中的 map阶段和reduce阶段类似);’阶段’这一术语在驱动程序打印的日志中比较常见。

end

Multithreading Benefits [多线程带来的福利]

Descriptions:
java 并发文档翻译,文章链接地址http://tutorials.jenkov.com/java-concurrency/benefits.html


The reason multithreading is still used in spite of its challenges is that multithreading can have several benifits.
虽然使用多线程编程有着诸多挑战,但是它却仍被广泛使用的原因是因为多线程本身具有着许多优势。

Some of these benefits are:
优势如下:

  • Better resource utilization.
  • (多线程)对资源的利用率更高。

  • Simpler program design in some situations.

  • 在某些特定场合可以简化程序的设计。

  • More responsive programs.

  • 可以写出响应性更好的程序。

Better resource utilization

更好的资源利用率

Imagine an applicaton that reads and processes files from the local file system.
设想一下有从本地文件系统读入并执行文件这样一段程序。

Lets say that reading a file from disk takes 5 seconds and processing it takes 2 seconds. Processing two files then takes
假设如果从磁盘上面将文件加载到当前程序中需要 5s ,然后执行该文件需要 2s.
那么代码处理 2 份文件的所需要的时间统计如下所示:


  5 seconds reading file A
  2 seconds processing file A
  5 seconds reading file B
  2 seconds processing file B
  ************************
  14 seconds total

When reading the file from disk most of the CPU time is spent waiting for the disk to read the data.
当程序在执行从磁盘上读入文件的这段期间内 CPU 将大部分的时间花费在等待上。

The CPU is pretty much idle druing that time.
在等待的那段期间内,CPU 很有可能处于一种空闲状态。

It could be doing something else.
而在那段时间内 CPU 完全是可以做点其他的事情的。

By changing the order of the operations, the CPU could be better utilized. Look at this ordering:
通过修改执行操作的顺序,能够提升 CPU 的利用率。
可以参照如下的顺序:


  5 seconds reading file A
  5 seconds reading file B + 2 seconds processing file A
  2 seconds processing file B
  ********************
  12 seconds total

The CPU waits for the first file to be read.
CPU 在首次从磁盘读入文件的时候处于等待状态。

Then it starts the read of the second file.
然后,开始第二份文件的加载操作。

While the second file is being read, the CPU processes the first file.
当等待第二份文件从硬盘加载的期间内, CPU 会运行程序来处理读入第一份文件。

Remember, while waiting for the file to be read from disk, the CPU is mostly idel.
记住这一点,当 CPU 处于等待文件从磁盘加载文件的这段期间时, CPU 十有八九是处于空闲状态的。

In general, the CPU can be doing other things while waiting for IO.
就通常情况来说,当 CPU 等待 IO 操作的这段期间内完全可以做一些其他事情。

It doesn’t have to be disk IO.
也不一定非要是基于磁盘的 IO 操作。

It can be network IO as well, or input from a user at the machine.
还可以是基于网络的 IO 操作,或是来自于另外一台主机用户的输入数据。

Network and disk IO is often a lot slower than CPU’s and memory IO.
网络和磁盘上数据的读入写出速度通常要远远比 CPU 直接读写内存的速度要慢。

Simpler program desgin in some situations

在特定场合更简洁的程序设计

If you were to program the above ordering of reading and processing by hand in a singlethread applicaton, you would have to keep track of both the read and procesisng state of each file.
如果你曾试图将上述的代码按照读、执行的顺序来用单线程编写实现的话,你将不得不在每个线程中保存为每个文件的读取和执行的相关状态信息。

Instead you can start two threads that each just reads and processes a single file.
同样你也可以创建两个线程,让每个线程仅做读文件或是处理文件中的一种。

Each of these threads will be blocked while waiting for the disk to read its file.
每当线程等待从文件从磁盘中读取的时间段内都会陷入阻塞状态。

While waiting, other threads can use the CPU to process the parts of the file they have already read.
在该线程处于等待的过程中,其他的线程便会使用 CPU 资源来操作它已经从磁盘加载到内存的文件。

The result is, that the disk is kept busy at all times, reading from various files into memory. This results in a better utilization of both the disk and the CPU.
这样操作的结果便是可以保证磁盘一直处于忙碌状态,将不同的文件加载到内存中。 这种处理方法也可以提高使磁盘和 CPU 的资源利用率。

It is also easier to program, since each thread only has to keep track of a single file.
另一个好处便是是程序的代码编写起来更加的容易,因为每个线程仅仅需要保存单个文件的信息即可。

More responsive programs

写出响应性更好的程序

Another common goal for turning a singlethreaded application into a multithreaded application is to achieve a more responsive application.
将单线程应用程序改写成多线程应用程序的另一个目的便是获得应用程序更好的响应能力。

Imagine a server application that listens on some port for incoming requests, when a request is received, it handles the request and then goes back to listening.
假设在某个端口上监听是否有请求到来的服务器应用程序, 如果接到了一个请求的话,这个服务器应用程序首先会处理这个请求,然后等到处理请求之后才会返回去继续监听操作。

The server loop is sketched below.
关于上述服务器的程序概况描述如下.


while (server is active){
    listen for request
    process request
}

If the request takes a long time to process, no new client can send requests to the server for that duration.
如果服务器端执行的请求需要很长的一端时间,那么在那段时间内没有任何客户端能够成功的将自己的请求发送至服务器端。

An alternate design would be for the listening thread to pass the request to a worker thread, and return to listening immediately.
可以在设计上进行这样的变动: 创建一个用来将请求信息转发给处理请求的工作者线程这样的请求转发线程,每当转发执行结束便会立即恢复到监听的状态。

The worker thread will process the request and send a reply to the client.
而上述设计变动中所描述的工作者线程将会执行具体的请求,然后将执行结果回复给客户端。

This design is sketched below:
上述的设计概况可以描述如下:


while ( server is active){
    listen for request
    hand request to worker thread
}

This way the server thread will be back at listening sooner.
在这种设计模式下,服务器线程能够在响应请求连接之后很短的时间内回复到监听状态。

Thus more clients can send requests to the server.
因此能够满足让更多的客户端将请求发送给服务器端。

The server has become more responsive.
这样的服务器的响应性更好。

The same is true for desktop applications.
上述的道理对于桌面应用程序来说也说得通。

If you click a button that starts a long task, and the thread executing the task is the thread updating the windows, button etc., then the application will appear unresponsive while the task executes.
如果你通过点击一个按钮来启动一个长任务的话,并且该线程所执行任务是更新 windows 系统,不过在更新任务执行的时间段内该程序对于所有的操作都不会有任何响应操作。

Instead the task can be handed off to a worker thread.
如果我们将’更新’任务交给一个工作者线程来完成的话。

While the worker thread is busy with the task, the window thread is free to respond to other user requests.
那么即便是工作者线程处于繁忙的执行任务状态,窗口线程仍是处于空闲状态且能够立即响应来自其他用户的请求。

When the worker thread is done it signals the window thread.
当工作者线程完成了它的工作的话,它便会向窗口线程发送信号。

The window thread can then update the application windows with the result of the task.
(接收到信号的)窗口线程随后便可以根据工作线程执行的结果来更新窗口应用程序。

The program with the worker thread design will appear more responsive to the user.
如果在设计程序的时候为其增设工作者线程的话将会让程序对于用户而言更具有响应性。

下一篇文章

end

Multithreading Costs [多线程所花费的代价]

Descriptions:
java 并发文档翻译,文章链接地址 http://tutorials.jenkov.com/java-concurrency/costs.html


Going from a singlethreaded to a multithreaded application doesn’t just provide benifits.
将单线程程序修改成多线程好处是有的,但同时也会带来其他方面的损失。

It also has some costs. Don’t just multithread-enable an application just because you can.
使用多线程需要为之付出相应的代价。 不要仅仅因为你会多线程便去在编程中使用这种技术。

You should have a good idea that the benefits gained by doing so, are larger than the cost.
在使用之前你应该明确的知道相比于应用多线程所花费的代价,你能够获得更大的收益(的时候再来决定将多线程加入到你的程序中)。

When in doubt, try measuring the performance or responsiveness of the application, instead of just guessing.
如果你不能很好的做出决断,那么试着权衡一下程序的性能或是响应能力在使用多线程技术前后有何变化,而不是仅仅通过猜测的方式来决定是否使用(多线程)。

More complex design

设计更加复杂

Though some parts of a multithreaded applications is simpler than a singlethreaded application, other parts are more complex.
虽然多线程应用程序在某些地方的实现要比单线程应用程序要简单的多,但是其他地方的设计要远比单线程设计复杂得多。

Code executed by multiple threads accessing shared data need special attention.
被多线程执行的访问共享数据的代码端需要给予高度重视。

Thread interaction is far from always simple.
线程之间的交互要远比想象的复杂。

Errors arising from incorrect thread synchronization can be very hard to detect, reproduce and fix.
线程同步操作所引发的错误很难被探知,重现或是修复。

Context Switching Overhead

过渡上下文切换

When a CPU switches from executing one thread to executing another, the CPU needs to save the local data, program pointer etc. of the current thread, and load the local data, program pointer etc. of the next thread to execute.
当 CPU 从一个正在执行的线程切换去执行另一个线程的时候,这个 CPU 需要存放当前线程的本地数据,程序指针等信息; 然后加载另一个将要执行的线程中的本地数据,程序指针等信息。

This switch is called a “context switch”.
上面所描述的切换就叫做 “上下文切换”。

The CPU switches from executing in the context of one thread to executing in the context of another.
对于 CPU 来说,它从正在运行线程的上下文中切换到另一个即将运行的线程的上下文中。

Context switching isn’t cheap.
上下文切换的花费高昂。

You don’t want to switch between threads more than necessary.
所以不到万不得已,我认为你是不会在两个线程之间进行上下文切换的。

You can read more about context switching on Wikipedia:
你可以通过查阅维基百科来获取更多关于上下文切换相关的知识:

http://en.wikipedia.org/wiki/Context_switch

Increased Resource Consumption

增加资源消耗

A thread needs some resources from computer in order to run.
一个线程需要从计算机中获取相应资源以便于能够运行起来。

Besides CPU time a thread needs some memory to keep its local stack.
除此之外,线程在获取到 CPU 资源之后需要额外的内存空间来存放它的本地栈。

It may also take up some resources inside the operating system needed to manage the thread.
线程或许也会在用来管理线程的操作系统内部占据一些资源。

Try creating a program that creates 100 thread that does nothing but wait, and see how much memory the application takes when running.
(你可以)试着创建一个包含100个不做任何事情且仅处于等待状态的线程的程序,然后观察一下该程序运行起来的时候它总共消耗多少内存。

Java Concurrency / Multithreading Tutorial [Java 并发/多线程教程]

Description:
java 并发文档翻译,文章链接地址http://tutorials.jenkov.com/java-concurrency/index.html



Back in the old days a computer had a single CPU, and was only capable of executing a single program at a time.

回退到旧时光,那时的计算机只有单核并且每次只能够运行一段程序。


Later came multitasking which meant that computers could execute multiple programs(AKA tasks or processes) at the same time.

不久之后多任务计算机登场,多任务计算机指的是那些可以同时运行多段程序(也可以叫做多任务或多进程)的计算机。


It wasn’t really “at the same time” though.

事实上这些任务并非是在同一时间段”并发的”。


The single CPU was shared between the programs.

计算机单核的处理器资源是以共享的方式在多个程序之间被调用。


The operating system would switch between the programs running, executing each of them for a little while before switching.

操作系统也是在多个”同时”执行的程序间来回进行切换,在从一个程序切换到另一个程序的过程中依次执行每段程序中的一小段。


Along with multitasking came new challenges for software developers.

随着多任务计算机时代的来临,软件开发人员也面临着新的跳帧。


Programers can no longer assume to have all the CPU time available, nor all memory or any other computer resources.

程序员再也不能确保随时都能够访问计算机的 CPU ,内存乃至其他计算机的资源了。


A “good citizen” program should release all resources it is no longer using, so other programs can use them.

对于一段”模范”程序来说,应该将对它无用的资源全部释放掉,以确保这些资源可被其他程序所调用。


Later yet came multithread which mean that you could have multiple thread of execution inside the same program.
接下来到来的便是多线程的时代,就是在同一段程序中允许你能同时运行多个线程。


A thread of execution can be thought of as a CPU executing the program.

由一个线程所运行的程序可被等价地看做是由一个 CPU 所执行的程序。


When you have multiple threads executing the same program, it is like having multiple CPUs execute within the same program.

多个线程运行同一段程序,便可以将其比作是多个 CPU 在同一段程序中运行。


Multithreading can be a great way to increase the performance of some types of programs.

多线程这种技术可以极大程度上来提高某种类型程序的效率。


However, multithreading is even more challenging than multitasking.

然而,多线程编程要远比多任务编程面临更多的挑战。


The threads are executing within the same program and are hence reading and writing the same memory simultanously.

由于程序允许多个线程运行在其中,因此这些线程便有可能在同一时间点上的读写相同的内存空间。


This can result in errors not seen in a singlethread program.

上述的这种操作可能引发在单线程运行的程序中遇不到的错误的发生。


Some of these errors may not be seen on single CPU machines, because two threads never really execute “simultanously”.

在引发的错误中有些错误在单核计算机上可能不会遇到,因为运行于单核计算机上的两个线程并不是真正意义上的并发。


Modern computers, though, come with multicore CPUs, and even with multiple CPUs too. This means that separate threads can be executed by separate cores or CPUs simultaneously.

如今的现代计算机都是多核甚至多 CPU 的。 这就意味着可以将线程以 CPU 的核或是 CPU 为单位来实现并发计算。



If a thread reads a memory location while another thread writes to it, what value will the first thread end up reading ?

(设想一下这个场景)如果一个线程在读取一个内存单元空间的时候恰好另一个线程在对这块内存空间执行写入操作的话,第一个线程在执行读操作之后它将获得的数值是什么?


The old value ?

是内存空间中原先的那个数值?


The value written by the second thread ?

还是另一个线程刚刚写入的数值?


Or a value that is mix between the two ?

还是新旧混合的数值?


Or, if two threads are writing to be the same memory location simultanously, what value will be left when they are done ?

又或是,如果两个线程同时向同一块内存空间中执行写入操作的话,当这两个线程执行写操作结束的时候,所写入的内存空间中将会是那个数值?


The value written by the first thread ?

是第一个线程写入的数值?


The value written by the second thread ?

还是第二个线程刚刚写入的数值?


Or a mix of the two values written?

亦或是两个线程写入数值的混合值?


Without proper precautions any of these outcomes are possible.

如果没有适当的提前声明(约定)的话上述的任何一种情况都是有可能发生的。


The behavior would not even be predicable.

执行操作的方式甚至都有可能是不可预知的。


The outcome could change from time to time.

也有可能每次运行都会得到不同的结果。


Therefore it is important as a developer to know how to take the right precautions - meaning learning to control how threads access shared resources like memory, files, databases etc.

因此对于一个开发人员来说认识到如何(根据不同的情景/场合)来做出正确的预防措施 - 也就是说他应该清楚如何控制线程来(安全地)访问一些像是内存,文件,数据库等等诸如此类的共享资源 是十分重要的。


That is one of the topics this java concurrency tutorial addresses.

而(如何在多线程并发的情况下安全地访问共享资源)这便是这篇’java 并发教程’所要向您重点讲述的内容。

Multithreading and Concurrency in Java

Java 中的多线程与并发


Java was one of the first languages to make multithreading easily available to developers.

Java 是第一批实现将多线程功能对开发者而言简单易用的开发语言中的一种。


Java had multithreading capabilities from the very beginging.

Java 这种开发语言在被创造之处就支持多线程编程这种技术。


Therefore, Java developers often face the problems described above.

这也是为何 Java 的开发人员经常会在开发中遇到上描述的众多问题。


That is the reason I am writing this trail on Java concurrency.

而这也正是为何我要编写以 Java 并行技术为主题的这一系列文档。


As notes to myself, and any fellow Java developer whom may benifit from it.

而这一系列文章也将作为我自己的学习笔记,同时希望可以让其他的 Java 开发人员从中受益。


The trail will primarily be concerned with multithreading in Java, but some of the problems occuring in multithreading are similar to problems occuring in multitasking and in distributed systems.


这篇专栏主要关注 Java 开发中的多线程技术,除此之外其实在多任务系统和分布式系统中也会遇到与多线程相似的问题这也是我们讨论的重点。


References to multitasking and distributed systems may therefore occur in this trail too.

考虑到多任务和分布式系统中的相关知识也会在本文中有所提及。


Hence the word “concurrency” rather “multithreading”.

所以在这里我们将这一系列的文章以’并发’而不是’多线程’来进行命名。

2015 年之后和 2015 年之前的 Java 并发技术


A lot has happened in the world of concurrent architecture and design since the first Java concurrency books were written, and even since the Java 5 concurrency utilities were released.

自从第一版关于 Java 并发的书籍问世以及 Java 5 中的并发包被发布之后,并发架构的世界中已经发生了很大的变动。


New, asynchronous “shared-nothing” platforms and APIs like Vert.x and Play/Akka and Qbit have emerged.

新的一种,异步的 “无需共享任何资源” 的平台和API 像是 Vert.x , Play/Akka 和 Qbit 这样的框架相继涌现。


These platforms use a different concurrency model than the standard Java/JEE concurrency model of threading, shared memory and locking.

这种平台相比于标准的 Java/JEE 使用共享内存和所的线程并发模式,采用了完全不同的并发模式。


New non-blocking concurrency algorithms have been published, and new non-blocking tools like the LMax Disrupter have been added to our toolkits.

新式的无阻塞并发算法被推出,同时新型的无阻塞开发工具显示 LMax Disrupter 被键入到我们的开发平台工具中。


New functional programming parallelism has been introduced with the Fork and Join framework in Java7, and the collection streams API in Java8.

基于函数式并发新的编程方法被引入到 Java7 中,同时在 Java8 中对集合流的相关 API 予以支持。


With all these new developments it is about time that I updated this Java Concurrency tutorial.

Java 开发中发生了如此之多的变化,所以这也是我该更新 Java 并发教程手册的时候了。


Therefore, this tutorial is once again work in progress.

这篇 Java 并发教程会一如既往地以不断更新的方式来发布。


New tutorials will be published whenever time is available to write them.

只要我一有时间便会向文章集中添加新的教程文章。

下一篇文章

end

基于单节点的 Spark & IDEA 远程调试

Description:
本篇博文主要介绍一下,如何使用 intellij IDEA 这款可视化集成编程工具来远程调试运行在 Spark 集群上面的 jar 文件。
在这里采用伪分布式单节点来部署 Spark 的运行环境,目的是为了防止分布式集群环境中(创建多个虚拟机在每台虚拟机上面部署 Spark 环境)一些不可控错误的发生。


环境搭建

实验环境准备

  • 虚拟机 Oracle Virtual Box 版本:5.0.10
  • 虚拟机中的 Linux 版本: 15.0.4
  • Linux 中安装的 spark 版本: spark-1.5.2-bin-hadoop2.6.tgz
  • Linux 中安装的 hadoop 版本: hadoop-2.6.3.tar.gz
  • Linux 中安装的 JDK 版本: 1.7.0
  • Linux 中安装的 scala 版本: 2.11.7
  • 主机参数:
    • CPU: i5-4460
    • RAM: 8.00 GB
  • 主机 Intellij 版本: Intellij-14.14 (正版破解版… =。=)
  • 主机 JDK 版本: 1.8.0
  • 主机 spark 软件发布包:spark-1.5.2-bin-hadoop2.6.tgz
  • 在虚拟机上部署环境

  • 首先更新 Ubuntu 的源,然后安装 Oracle 版本的 java7
  • 安装过 spark 的同学告诉我,说是用 java8 来安装 spark 会抛出莫名异常,所以在 Linux 的上用的是就是 java7 版本,而在主机上 Intellij 的编译器使用的是 java8

单点部署安装 hadoop

  • 将 hadoop 压缩包解压到指定目录下面

$ tar -xzf hadoop-2.6.3.tar.gz 
$ mkdir /hadoop
$ mv hadoop-2.6.3 /hadoop/
  • 为当前结点配置 ssh 的密钥对(即便是单点部署也需要实现 ssh 无密码登陆)

$ apt-get install openssh-server            
* 如果系统中没有安装 ssh 软件的话使用 apt-get install 命令安装

$ apt-get install openssh-client        
* 安装 ssh 的客户端

$ ssh-keygen -t rsa -P ""                 
* 生成无密码的公私密钥对    

$ cd ~/.ssh/                            
* 生成的密钥对文件存放在 ~/.ssh/ 文件夹的下面

$ cat id_rsa.pub >> authorized_keys     
* 将刚刚生成的公钥文件中的内容以追加的方式写入到 authorized_keys 文件中

$ chmod 600 authorized_keys             
* 修改存放公钥文件的权限

$ ssh localhost                            
* 使用 ssh 来远程登录自身所在的主机

$ hostname                               
* 查看主机名称,我的是 aimer-v,存放主机名的配置文件(Ubuntu)是 /etc/hostname

$ ssh aimer-v                             
* 使用 ssh 登录名为 aimer-v 的主机,如果能够成功登录说明 ssh 正常工作

* 如果出现无法 ssh 正常访问登录的情况的话,首先使用 ping 命令来检查数据包是否可以正常收发
* 如果 ping 没有问题的话同时修改 ssh 对应路径下面的配置文件,并通过重启 ssh 服务来是配置文件生效
* 我的 ssh 配置文件所在路径是

$ /etc/ssh/ssh_config

* 修改过(注销注释)的字段是
   PasswordAuthentication yes
   CheckHostIP yes
   IdentityFile ~/.ssh/id_rsa
   Port 22
   SendEnv LANG LC_*
   HashKnownHosts yes
   GSSAPIAuthentication yes
   GSSAPIDelegateCredentials no

  • 修改系统环境配置文件将 Hadoop 相关路径写入存放到其中,也就是将 Hadoop 安装包所在的文件夹路径添加到系统的搜索路径中,以便于用户无论在那个路径下面输入 Hadoop 相关命令都可以启动运行 Hadoop 文件夹下面的可执行脚本

$ vi /etc/profile            
 在文件的末尾追加

#set for hadoop
export HADOOP_HOME=/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
export HADOOP_MAPRED_HOME=$HADOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib:$HADOOP_HOME/lib/native"

$ source /etc/profile       # 让修改过的系统变量立即生效
  • 修改 Hadoop 的相关配置文件
    Hadoop 的一系列配置文件所在路径为 ${HADOOP_HOME}/etc/hadoop/ 的下面
  • 首先修改的是名为 hadoop-env.sh 的配置文件

  # hadoop-env.sh 文件中记录的是 hadoop 在启动的时候,到哪里去找 java 的编译器
  # 和启动时内存空间大小的分配等,我只修改了下面这一个选项 
  export JAVA_HOME=/usr/lib/jvm/java-7-oracle    
  • 然后修改的是名为 core-site.xml 的配置文件

  • 接下来修改的是 hdfs-site.xml

  • 最后修改的是 yarn-site.xml
    yarn 是 apache 的资源管理调度框架,和 yarn 等价的还有 mesos ,我没有在这里做过深入研究,感兴趣的同学可以进一步查阅相关资料。

  • 根据配置文件中设置的文件夹和文件,并在当前系统中的对应路径下创建对应的文件夹和文件

$ mkdir /hadoop/tmp
$ mkdir /hadoop/dfs
$ mkdir /hadoop/dfs/name
$ mkdir /hadoop/dfs/data
  • 格式化 hdfs

// 首先将路径切换到 ${HADOOP_HOME}/bin 的下面,然后执行下面的命令
$ ./hdfs namenode -format                  # 将 namenode 进行格式化操作

//  如果配置信息无误且正确运行的话,将会显示如下的输出信息 


STARTUP_MSG:   java = 1.7.0_80
************************************************************/
16/02/26 14:33:49 INFO namenode.NameNode: registered UNIX signal handlers for [TERM, HUP, INT]
16/02/26 14:33:49 INFO namenode.NameNode: createNameNode [-format]
Formatting using clusterid: CID-88b14724-7d23-46fd-a623-83029ad20c44
16/02/26 14:33:51 INFO namenode.FSNamesystem: No KeyProvider found.
16/02/26 14:33:51 INFO namenode.FSNamesystem: fsLock is fair:true
16/02/26 14:33:51 INFO blockmanagement.DatanodeManager: dfs.block.invalidate.limit=1000
16/02/26 14:33:51 INFO blockmanagement.DatanodeManager: dfs.namenode.datanode.registration.ip-hostname-check=true
16/02/26 14:33:51 INFO blockmanagement.BlockManager: dfs.namenode.startup.delay.block.deletion.sec is set to 000:00:00:00.000
16/02/26 14:33:51 INFO blockmanagement.BlockManager: The block deletion will start around 2016 Feb 26 14:33:51
16/02/26 14:33:51 INFO util.GSet: Computing capacity for map BlocksMap
16/02/26 14:33:51 INFO util.GSet: VM type       = 64-bit
16/02/26 14:33:51 INFO util.GSet: 2.0% max memory 966.7 MB = 19.3 MB
16/02/26 14:33:51 INFO util.GSet: capacity      = 2^21 = 2097152 entries
16/02/26 14:33:51 INFO blockmanagement.BlockManager: dfs.block.access.token.enable=false
16/02/26 14:33:51 INFO blockmanagement.BlockManager: defaultReplication         = 1
16/02/26 14:33:51 INFO blockmanagement.BlockManager: maxReplication             = 512
16/02/26 14:33:51 INFO blockmanagement.BlockManager: minReplication             = 1
16/02/26 14:33:51 INFO blockmanagement.BlockManager: maxReplicationStreams      = 2
16/02/26 14:33:51 INFO blockmanagement.BlockManager: replicationRecheckInterval = 3000
16/02/26 14:33:51 INFO blockmanagement.BlockManager: encryptDataTransfer        = false
16/02/26 14:33:51 INFO blockmanagement.BlockManager: maxNumBlocksToLog          = 1000
16/02/26 14:33:51 INFO namenode.FSNamesystem: fsOwner             = root (auth:SIMPLE)
16/02/26 14:33:51 INFO namenode.FSNamesystem: supergroup          = supergroup
16/02/26 14:33:51 INFO namenode.FSNamesystem: isPermissionEnabled = false
16/02/26 14:33:51 INFO namenode.FSNamesystem: HA Enabled: false
16/02/26 14:33:51 INFO namenode.FSNamesystem: Append Enabled: true
16/02/26 14:33:52 INFO util.GSet: Computing capacity for map INodeMap
16/02/26 14:33:52 INFO util.GSet: VM type       = 64-bit
16/02/26 14:33:52 INFO util.GSet: 1.0% max memory 966.7 MB = 9.7 MB
16/02/26 14:33:52 INFO util.GSet: capacity      = 2^20 = 1048576 entries
16/02/26 14:33:52 INFO namenode.NameNode: Caching file names occuring more than 10 times
16/02/26 14:33:52 INFO util.GSet: Computing capacity for map cachedBlocks
16/02/26 14:33:52 INFO util.GSet: VM type       = 64-bit
16/02/26 14:33:52 INFO util.GSet: 0.25% max memory 966.7 MB = 2.4 MB
16/02/26 14:33:52 INFO util.GSet: capacity      = 2^18 = 262144 entries
16/02/26 14:33:52 INFO namenode.FSNamesystem: dfs.namenode.safemode.threshold-pct = 0.9990000128746033
16/02/26 14:33:52 INFO namenode.FSNamesystem: dfs.namenode.safemode.min.datanodes = 0
16/02/26 14:33:52 INFO namenode.FSNamesystem: dfs.namenode.safemode.extension     = 30000
16/02/26 14:33:52 INFO namenode.FSNamesystem: Retry cache on namenode is enabled
16/02/26 14:33:52 INFO namenode.FSNamesystem: Retry cache will use 0.03 of total heap and retry cache entry expiry time is 600000 millis
16/02/26 14:33:52 INFO util.GSet: Computing capacity for map NameNodeRetryCache
16/02/26 14:33:52 INFO util.GSet: VM type       = 64-bit
16/02/26 14:33:52 INFO util.GSet: 0.029999999329447746% max memory 966.7 MB = 297.0 KB
16/02/26 14:33:52 INFO util.GSet: capacity      = 2^15 = 32768 entries
16/02/26 14:33:52 INFO namenode.NNConf: ACLs enabled? false
16/02/26 14:33:52 INFO namenode.NNConf: XAttrs enabled? true
16/02/26 14:33:52 INFO namenode.NNConf: Maximum size of an xattr: 16384
Re-format filesystem in Storage Directory /hadoop/dfs/name ? (Y or N) 

//  输入 'Y' 表示同意格式化 namenode 
//  如果成功初始化的话,将会显示如下的信息 

16/02/26 14:34:55 INFO namenode.FSImage: Allocated new BlockPoolId: BP-1525144641-127.0.0.1-1456468495698
16/02/26 14:34:56 INFO common.Storage: Storage directory /hadoop/dfs/name has been successfully formatted.
16/02/26 14:34:56 INFO namenode.NNStorageRetentionManager: Going to retain 1 images with txid >= 0
16/02/26 14:34:56 INFO util.ExitUtil: Exiting with status 0
16/02/26 14:34:56 INFO namenode.NameNode: SHUTDOWN_MSG: 
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at aimer-v/127.0.0.1
************************************************************/

显示如上信息便说明 namenode 格式化成功 
  • 然后再将路径切换到 ${HADOOP_HOME}/sbin 的下面

$ ./start-dfs.sh                           # 启动 hdfs 
  • 如若成功启动显示日志信息如下

root@aimer-v:/hadoop/sbin# ./start-dfs.sh 
Starting namenodes on [localhost]
localhost: starting namenode, logging to /hadoop/logs/hadoop-root-namenode-aimer-v.out
localhost: starting datanode, logging to /hadoop/logs/hadoop-root-datanode-aimer-v.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /hadoop/logs/hadoop-root-secondarynamenode-aimer-v.out
  • 启动 yarn

$ ./start-yarn.sh                          
  • 若 yarn 成功运行显示如下日志信息

root@aimer-v:/hadoop/sbin# ./start-yarn.sh  &
[1] 5993
root@aimer-v:/hadoop/sbin# starting yarn daemons
starting resourcemanager, logging to /hadoop/logs/yarn-root-resourcemanager-aimer-v.out
localhost: starting nodemanager, logging to /hadoop/logs/yarn-root-nodemanager-aimer-v.out

[1]+  Done                    ./start-yarn.sh
  • 通过输入 jps 命令来查看 Hadoop 相关进程是否处于正常工作的状态

$ jps 
5602 NameNode
6137 NodeManager
5866 SecondaryNameNode
6031 ResourceManager
6445 Jps

安装 spark 之前先安装好和安装 spark 版本相匹配的 scala

  • 首先写在系统中默认安装的 scala

$ apt-get remove scala 
  • 将下载到本地的 scala 压缩包进行解压

$ tar -xvf scala-2.11.7.tgz
  • 修改系统配置文件,将 scala 所在路径追加到系统搜索路径中

$ vi /etc/profile
  • 向文件中追加如下的信息

export SCALA_HOME=/scala
export PATH=$SCALA_HOME/bin:$PATH

在 hadoop 的基础上继续安装 spark

  • 解压软件包

tar -xvf spark-1.5.2-bin-hadoop2.6.tgz
  • 修改系统配置文件,将 spark 所在路径添加到系统搜索路径中
1
2
3
4
5
6
7
8
9
10
$ vi /etc/profile 
export HADOOP_CONF_DIR=/hadoop/etc/hadoop
export SPARK_MASTER=localhost
export SPARK_LOCAL_IP=localhost
export SPARK_HOME=/spark
export SPARK_LIBRARY_PATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$HADOOP_HOME/lib/native
export YARN_CONF_DIR=/hadoop/etc/hadoop
export PATH=$PATH:$SPARK_HOME/bin
export SCALA_HOME=/opt/scala
export PATH=$SCALA_HOME/bin:$PATH
  • 最后通过该命令让修改的系统配置信息立即生效
1
$ source /etc/profile
  • 修改 spark 的配置文件

$ cd ${SPARK_HOME}/conf
$ cp spark-env.sh.template spark-env.sh      # 在这里建议将配置文件的文件模板进行保留,通过创建它的备份的方式来在备份文件上面进行修改
  • 打开 spark-env.sh 文件,然后添加如下的信息

$ vi spark-env.sh                    

export JAVA_HOME=/usr/lib/jvm/java-7-oracle
export SCALA_HOME=/opt/scala
export HADOOP_CONF_DIR=/hadoop/etc/hadoop
* 通过脚本来启动 spark 相关的服务

$ cd ${SPARK_HOME}/sbin
$ ./start-all.sh
  • 如果启动成功的话,将会显示出如下的信息

root@aimer-v:/spark/sbin# ./start-all.sh 
rsync from localhost
rsync: change_dir "/spark/sbin//localhost" failed: No such file or directory (2)
rsync error: some files/attrs were not transferred (see previous errors) (code 23) at main.c(1183) [sender=3.1.1]
starting org.apache.spark.deploy.master.Master, logging to /spark/sbin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-aimer-v.out
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /spark/sbin/../logs/spark-root-org.apache.spark.deploy.worker.Worker-1-aimer-v.out

  • 通过输入 jps 命令来查看系统中各个进程的运行状态信息

$ jps

root@aimer-v:/spark/sbin# jps
5470 SecondaryNameNode
5332 DataNode
6911 Worker
6690 Master
5776 NodeManager
5224 NameNode
5669 ResourceManager
6955 Jps
  • 其中的 Master 和 Worker 便是我们刚才启动 Spark 所运行的相关进程
  • 通过脚本来运行 spark-shell 通过脚本的方式来访问 spark

$ cd  ${SPARK_HOME}/bin
$ ./spark-shell
//  如果正常启动的话,将会显示如下日志信息 

自此,虚拟机上面 hadoop & spark 的单节点运行环境部署结束


在主机上部署环境

搭建 WordCounter 的编程环境

  • 在这里我们使用的是 scala 编程语言来进行编写 wordcounter 程序

  • step 1. 在 Intellij 中创建一个新的 scala 项目


  • step 2. 打开 File -> Project Structure -> 点击最左栏中的 Libraries 选项 –> 绿色的 ‘+’ 按钮

  • step 3. 将刚刚下载的 spark-1.5.2-bin-hadoop2.6 文件下 spark-1.5.2-bin-hadoop2.6\spark-1.5.2-bin-hadoop2.6\lib\spark-assembly-1.5.2-hadoop2.6.0.jar 文件加载到当前编程环境中

  • step 4. 设置编译代码生成的 .jar 文件所在的路径, 打开 File -> Project Structure -> 点击最左栏中的 Artifacts 选项 –> 绿色的 ‘+’ 按钮 Jar -> From modules with dependencies ,然后在弹出的 ‘Create JAR from Modules’ 中的 ‘Main Class’ 选中对应的函数入口类文件,在这里我们选的是 SparkWordCount 这个文件

  • step 5. 为 META-INF/MAINFEST.MF 这个将要生成的配置文件设置路径
  • step 6. 在 Name 栏中设置将要生成的 .jar 文件的名称, 在 Output directory 一栏中设置 .jar 文件将会输出的路径
  • step 7. 同时不要忘了将 Build on make 设置有效,最后点击 ok 按钮

  • step 8. 编写代码 SparkWordCount.scala 代码如下所示

import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SparkWordCount").setMaster("localhost")
    val sc = new SparkContext(conf)
    val count=sc.textFile(args(0)).filter(line => line.contains("Spark")).count()
    println("count="+count)
    sc.stop()
  }
}
  • 在再次运行代码的过程中出了一点问题: IDE 报错了显示缺少 scala (SDK) 相关的 jar 文件
  • 引发报错的原因是: 之前引用到当前系统中的 SDK 索引没有更新,重新导入一次即可,如下图所示, IDE 重新创建一下索引即可征程编译

  • step 9. 编译刚刚编写的代码 Build -> Make Project , 然后到对应的路径下面找 jar 文件

将生成的 jar 文件上传至安装有 spark 的虚拟机上

  • 在对应的路径下创建文件夹,然后将生成的 .jar 文件上传到该文件夹下面

开始远程调试

首先在虚拟机(Linux)的命令行中输入命令让 spark 来以调试的方式执行上传 jar 文件


$ cd ${SPARK_HOME}/bin
  • 然后查看虚拟机的 IP 地址信息,我的是

root@aimer-v:/home/aimer/spark_remote# ifconfig
eth0      Link encap:Ethernet  HWaddr 08:00:27:f5:3e:29  
          inet addr:10.0.2.15  Bcast:10.0.2.255  Mask:255.255.255.0
          inet6 addr: fe80::a00:27ff:fef5:3e29/64 Scope:Link
          UP BROADCAST RUNNING MULTICAST  MTU:1500  Metric:1
          RX packets:169490 errors:0 dropped:0 overruns:0 frame:0
          TX packets:50036 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:1000 
          RX bytes:208071948 (208.0 MB)  TX bytes:3995229 (3.9 MB)

eth1      Link encap:Ethernet  HWaddr 08:00:27:e5:6b:20  
          inet addr:192.168.56.113  Bcast:192.168.56.255  Mask:255.255.255.0
          inet6 addr: fe80::a00:27ff:fee5:6b20/64 Scope:Link
          UP BROADCAST RUNNING MULTICAST  MTU:1500  Metric:1
          RX packets:934783 errors:0 dropped:0 overruns:0 frame:0
          TX packets:458868 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:1000 
          RX bytes:987944856 (987.9 MB)  TX bytes:42138914 (42.1 MB)

eth2      Link encap:Ethernet  HWaddr 08:00:27:15:4d:1b  
          inet addr:192.168.56.112  Bcast:192.168.56.255  Mask:255.255.255.0
          inet6 addr: fe80::a00:27ff:fe15:4d1b/64 Scope:Link
          UP BROADCAST RUNNING MULTICAST  MTU:1500  Metric:1
          RX packets:390 errors:0 dropped:0 overruns:0 frame:0
          TX packets:840 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:1000 
          RX bytes:45543 (45.5 KB)  TX bytes:103389 (103.3 KB)

lo        Link encap:Local Loopback  
          inet addr:127.0.0.1  Mask:255.0.0.0
          inet6 addr: ::1/128 Scope:Host
          UP LOOPBACK RUNNING  MTU:65536  Metric:1
          RX packets:965869 errors:0 dropped:0 overruns:0 frame:0
          TX packets:965869 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:0 
          RX bytes:497637820 (497.6 MB)  TX bytes:497637820 (497.6 MB)

在上述的 IP 地址中

  • 第一个是我用来在虚拟机上面以 NAT 的方式登录互联网的 IP
  • 第二个 IP 地址是多结点分布式部署 spark 集群设定的 IP
  • 第三个 IP 地址是用来进行主机到虚拟机二者之间进行 ssh 远程连接的 IP 地址
  • 第四个 主机自循环 IP 地址

  • 因为我们在进行远程调试的时候,是想把虚拟机中的调试信息数据通过端口号传到主机(windows)的上面,

  • 所以选用的输出调试信息的 IP 地址与 ssh 所使用的相同,只不过端口号不同一个是 8888 另一个是 22 罢了

  • 接下来,不要着急运行 jar ,在这里由于 word-counter 这个程序是从 hdfs 上面来读取文本文件的,

  • 所以还需要将输入文本文件上传到 hdfs 的上

  • 首先将本地的文本文件 README.md(我用的是 spark 的 README 文件,随便什么 ASCII 编码的文本文件都可以) 上传到 hdfs 的上面


$ hdfs dfs -put REAEME.md /
  • 查看文件是否被正确的上传,以及对应的结果路径是否正确的被创建 (在此期间,发现 datanode 没有启动,所以 README.md 这个本地文件并没有正确上传,所以先停掉了 spark ,hadoop , 然后重启 hadoop ,在 hadoop 启动之后又将 spark 进行启动)

$ hdfs dfs -ls /
root@aimer-v:/home/aimer/spark_remote# hdfs dfs -ls /
Found 5 items
-rw-r--r--   1 root supergroup       3593 2016-02-26 16:34 /README.md
drwx-wx-wx   - root supergroup          0 2016-01-01 23:31 /tmp
drwxr-xr-x   - root supergroup          0 2016-01-01 23:53 /user
  • 在输入文件上传,输出数据文件路径分别在 hdfs 上创建好之后便可以输入如下命令

$ ./spark-submit --master spark://aimer-v:7077 --name SparkWordCount --class SparkWordCount --driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8888" --executor-memory 1G /home/aimer/spark_remote/spark_learning.jar hdfs://aimer-v:9000/README.md
  • 上述的 –driver-java-options 后面所跟随的命令参数是是这样的:

    • -Xdebug
    • 这个参数是通知 JVM 工作在 DEBUG 的模式下面

    • -Xrunjdwp 这个参数是用来通知 JVM 使用 (java debug wire protocol) 来运行调试环境

    • -Xrunjdwp:transport=dt_socket 这个参数用来指定的是调试期间生成的数据传输的方式,

    • 如果后跟 dt_shmem 这个参数的话表示的是以共享内存的方式来传递调试产生的数据,不过 dt_shmem 仅在 Windows 平台下适用。

    • server 该参数指的是是否支持在 server 模式的 VM 中

    • suspend 参数是用来设定是否等到用于调试的客户端成功创建连接之后,再来执行 JVM

    • address 参数用来指定的是调试信息发送的端口号

  • 如果正确运行的话,命令行中会显示如下的信息

Listening for transport dt_socket at address: 8888

然后在本地的 Intellij 中通过如下的配置来接收远程发来的调试信息,实现远程调试功能

  • step 1 Run -> Edit Configurations 配置如下图所示的远程调试配置信息,为这个创建的远程调试设定一个名称 “remote-spark”

  • step 2 Run -> Debug ‘remote-spark’
    如果成功连接的话,将会在下面显示如下的信息(不要忘了在本地的代码上打上端点)

  • 同时如果将窗口切换到远程访问界面的话,也会看到对应输出的日志信息(直接截图)

  • step 3 继续调试,直到程序结束,最终结果既不会显示在 IDE 的控制台输出信息中,而是会显示在 linux 的命令提示行中


关于结束收尾工作

首先结束 spark

  • 将路径切换到 ${SPARK_HOME}/sbin
  • 然后运行如下命令停止 spark 相关进程

root@aimer-v:/spark/sbin# ./stop-all.sh 
localhost: stopping org.apache.spark.deploy.worker.Worker
stopping org.apache.spark.deploy.master.Master

然后结束 hadoop

  • 将路径切换到 ${HADOOP_HOME}/sbin 下面
  • 然后运行如下的命令来停止 hadoop 相关的进程

root@aimer-v:/hadoop/sbin# ./stop-all.sh 
This script is Deprecated. Instead use stop-dfs.sh and stop-yarn.sh
Stopping namenodes on [localhost]
localhost: stopping namenode
localhost: stopping datanode
Stopping secondary namenodes [0.0.0.0]
0.0.0.0: stopping secondarynamenode
stopping yarn daemons
stopping resourcemanager
localhost: stopping nodemanager
no proxyserver to stop

最后输入 jps 命令来检查是否相关进程均停止


root@aimer-v:/spark/bin# jps
7305 SparkSubmit
10003 Jps

是的,本篇博客最大的败笔除了截图截得参差不齐之外,就是最后这个运行的 SparkSubmit 进程我不知道如何停止它;
直接 kill 没有生效,就当做是挖个坑好了,解决之后再来写上 (= . =)||

end

Spark 文档翻译1

Description: 最近参加了一个翻译 spark 文档的小组 打算利用业余时间来补习一下英文,以及通过阅读 spark 文档来系统学习一下 spark 的相关知识,翻译难免有不恰当之处,敬请指正。


Spark SQL,DataFrames and Datasets Guide

概述

Spark SQL 是 Spark 中用来处理结构化数据的模块。基本抽象数据类型 RDD 所提供的接口不同的是, Spark SQL 的接口则会向 Spark 提供更多关于数据结构和正在进行的计算结构方面额外的信息。事实上, Spark SQL 就是利用这些附带的信息来执行额外的优化操作的。与 Spark SQL相交互的方式有这几种: 通过 SQL 语句,DataFrames 的接口函数和 Dataset 提供的应用程序接口函数。
计算数据的时所调用的是同一个引擎,该引擎不会因为你使用编程语言或是调用函数接口的不同而有所变动。这种(引擎调用的)统一化意味着开发者可以轻易地在 Spark 为不同语言提供的函数接口之间进行频繁地切换目的是可以用不同语言中最为地道的的方式来执行数据的转换操作。

在当前页面中所呈现的所有示例和简单的测试数据在 Spark 发布的软件包中均能找到,且可以使用 spark-shell,pyspark shell 或是 sparkR shell 来运行。

SQL-结构化查询语言

Spark SQL 的其中一种使用方式便是用来执行执行最基本 SQL 语法格式或是 HiveQL 语法格式的 SQL 语句的查询。 Spark SQL 同样也具有从已经安装部署好的 Hive (数据仓库)中读取数据这样一种功能。如果想要了解更多关于如何配置 Spark SQL 的这种特性,可以参照 Hive Tables 这一段的文档。当使用另外一种编程语言来运行 SQL 的时候最终的执行结果将会以 DataFrame 的数据结构被返回使用者也可以通过命令行或是 JDBC/ODBC 的方式来与 SQL 接口来进行交互。

DataFrames-数据框

DataFrame 所指的是由以’列’组织的这样的数据所构成的分布式集合。从理论角度分析,可以把存放于关系型数据库中的数据表比作是 R/Python 语言中的数据框,但是后者在底层表述方面有着更加丰富的优化策略。DataFrames 有着丰富的生成数据来源,像是: 结构化数据文件, Hive 中的数据表,外部数据库或是已经存在于内存中的 RDDs 结构。

(Spark 也为)DataFrame 提供了由 Scala,Java,Python,和R语言编写好的 API 接口。

Datasets-数据集

Dataset 是在 Spark 1.6 版本之后实验性新增的一个接口,目的是为了让基于 RDDs(支持强写入,并能够使用强大的 lambda 表达式) 可以在执行计算时具备和 Spark SQL 一样的引擎优化的能力。一个 Dataset(对象) 可以从 JVM 的对象生成,(生成之后)便可以调用相关的操作方法(像是,map, flatMap,filter,诸如此类的方法)。

Dataset 有一套统一的可被应用于 Scala 和 Java 开发语言中的API接口函数。Python 语言目前还不支持 Dataset 的 API 接口函数,但是由于 Python 本身具备动态编程语言的特性这一优势使使用 Dataset 的 API变为了可能(比如说,你可以通过 Python 的原声语言特sing row.columnName 来直接访问行对象中的属性字段)。Dataset 将会在未来的发行版本中来实现对 Python 语言的完全支持。

Google 测试框架 g-Test 使用教程

Description: 这篇文章简明介绍了 google 开源的一款 C++ 方向的测试框架 Gtest 的使用方法做简单介绍,为后续整理发布的数据库引擎开发项目(目前还没发布到博客中)做单元,模块测试方面的技术支撑。


G-Test 的安装

  • 通过 github 页面下载 gtest 的源码压缩包,命令如下

   $wget https://github.com/google/googletest/archive/master.zip 
  • 解压之后,在目录下面看到 CMakeList.txt 文件,可知使用的是 cmake 编译工具,保证虚拟机处于联网(NAT模式),输入命令下载 cmake 编译工具

  $apt-get install cmake 
  • 下载 cmake 结束之后,测试其是否能够正常工作

$cmake
如果正常工作会显示如下内容
Usage
  cmake [options] 
  cmake [options] 
  Options -C  = Pre-load a script to populate the cache. 

编译安装 gTest


    $ mkdir cmake_dir
    $ cd cmake_dir
    $ cmake ../
    正确编译,显示信息如下
-- The C compiler identification is GNU 4.9.2
-- The CXX compiler identification is GNU 4.9.2
-- Check for working C compiler: /usr/bin/cc
-- Check for working C compiler: /usr/bin/cc -- works
...
-- Found Threads: TRUE  
-- Configuring done
-- Generating done
-- Build files have been written to: /gTester/googletest-master/cmake_dir
 
在这里我们创建 cmake_dir 文件夹,并在该文件夹的下面执行 cmake 操作的目的是为了让 cmake 编译生成的中间文件与下载的 gTest 的项目文件二者之间分离开,不然会混淆. 在 cmake_dir 文件夹中生成的中间文件如下:

 CMakeCache.txt  CMakeFiles  cmake_install.cmake  CTestTestfile.cmake  googlemock  Makefile

继续输入 make && make install 来安装 gTest,如正确编译安装显示信息如下:


 Scanning dependencies of target gmock
[ 14%] Building CXX object googlemock/CMakeFiles/gmock.dir/__/googletest/src/gtest-all.cc.o
[ 28%] Building CXX object googlemock/CMakeFiles/gmock.dir/src/gmock-all.cc.o
Linking CXX static library libgmock.a
...
Install the project...
-- Install configuration: ""
-- Installing: /usr/local/lib/libgmock.a
-- Installing: /usr/local/lib/libgmock_main.a
-- Installing: /usr/local/include/gmock
-- Installing: /usr/local/include/gmock/gmock-generated-actions.h
-- Installing: /usr/local/include/gmock/gmock-cardinalities.h
-- Installing: /usr/local/include/gmock/gmock-actions.h
... 

gTest 的入门级使用例子

  • 使用 gTest 来编写最简单的测试用例
    • 首先编写 C++ 文件

  // test.hpp
#ifndef TEST_HPP__
#define TEST_HPP__

int getValue(int _value);

#endif

 // test.cpp
#include "test.hpp"
#include 

int getValue( int _value ){
    return _value ;
}
 
  • 编写包含 gTest库函数的 C++ 测试文件

#include "test.hpp"
#include "gtest/gtest.h"

// first we test ASSERT_TRUE

void test_ASSERT_TRUE(){
  ASSERT_TRUE(false) ;
}

void test_EXPECT_TRUE(){
   EXPECT_TRUE(false) ;
}

int main( void ){
  test_EXPECT_TRUE();

  return 0;
}

  • 编写 Makefile 文件

GTEST_DIR=/gTester/googletest-master/googletest

USER_DIR= ./

CPPFLAGS += -I$(GTEST_DIR)/include

CXXFLAGS += -g -Wall -Wextra

TESTS = test_tester

GTEST_HEADERS = $(GTEST_DIR)/include/gtest/*.h \
                $(GTEST_DIR)/include/gtest/internal/*.h

all : $(TESTS)

clean :
        rm -f $(TESTS) *.a *.o

GTEST_SRCS_= $(GTEST_DIR)/src/*.cc $(GTEST_DIR)/src/*.h $(GTEST_HEADERS)

gtest-all.o : $(GTEST_SRCS_)
        $(CXX) $(CPPFLAGS) -I$(GTEST_DIR) $(CXXFLAGS) -c \
                $(GTEST_DIR)/src/gtest-all.cc


gtest_main.o : $(GTEST_SRCS_)
        $(CXX) $(CPPFLAGS) -I$(GTEST_DIR) $(CXXFLAGS) -c $(GTEST_DIR)/src/gtest_main.cc  

gtest.a : gtest-all.o
        $(AR) $(ARFLAGS) $@ $^  

gtest_main.a : gtest-all.o gtest_main.o
        $(AR) $(ARFLAGS) $@ $^  


test.o : $(USER_DIR)/test.cpp $(USER_DIR)/test.hpp $(GTEST_HEADERS)
        $(CXX) $(CPPFLAGS) $(CXXFLAGS) -c $(USER_DIR)/test.cpp

test_tester.o : $(USER_DIR)/test_tester.cpp $(USER_DIR)/test.hpp $(GTEST_HEADERS)
        $(CXX) $(CPPFLAGS) $(CXXFLAGS) -c $(USER_DIR)/test_tester.cpp

test_tester : test.o test_tester.o gtest_main.a
        $(CXX) $(CPPFLAGS) $(CXXFLAG) -pthread  $^ -o $@
  • 输入 make 命令生成可执行文件 test_tester
  • 运行 gTest 的测试文件,查看输出结果

 .//test_tester.cpp:12: Failure
  Value of: false
  Actual: false
  Expected: true 
 

gTest 中断言介绍

  • ASSERT_* 的断言函数如果判定最终结果不满足判定输出值,将会发出 断言失败 + 终止程序的结果

  • EXPECT_* 的断言函数如果判断最终结果不满足判定输出值,将仅会发出 断言失败 的提示信息

基本断言说明


ASSERT_TRUE (condition); 
ASSERT_FALSE (condition) ;
上述断言函数是这样的: 括号中的 condition 可以是一个返回结果为布尔值的函数,也可以是一个不二变量,同样也可以是一个逻辑表达式,只要最终返回的结果是布尔值就可以。而两个函数 ASSERT_TRUE 要求这个布尔值必须是 TRUE/真值,如果不是真便会输出'致命错误'并退出当前正在执行的函数。 ASSERT_FALSE 方法刚好相反,它期待的是一个 FALSE/假值,如果不满足同样输出'致命错误'信息,并退出当前正在执行的方法(程序)。

   EXPECT_TRUE(condition) ;
   EXPECT_FALSE(condition) ;
上述函数的断言是这样的: 如果括号中的 condition 变量返回的布尔值与断言期待的布尔值不同的话,不会退出当前正在执行的方法/程序。 它会继续允许程序的继续运行,但是会输出执行错误的提示信息(如果错误信息流定向是控制台显示器的话) ### 基于两值比较的断言函数说明 + 在基于而知比较的断言函数中,传入的参数必须要满足下面两种条件中的一种 + 是基本类型,可以直接进行逻辑比较 + 如果是符合类似(class ,struct)是需要重载比较运算符的 + 需要注意的是,如果传入的是指针类型的话,判定的并不是指针指向的数值内容是否相同,而是会判定指针是否指向同一块内存空间 + 如果需要判定指针指向字符串的逻辑关系,不要使用这一系列的断言函数

    ASSERT_EQ(expected, actual) ;  expected==actual
    ASSERT_NE(expected, actual) ;  expected!=actual
    ASSERT_LT(expected, actual) ;  expected 小于 actual, LT(less than)
    ASSERT_LE(expected, actual) ;  expected 小于等于 actual, LE(less equal)
    ASSERT_GT(expected, actual) ;  expected 大于 actual, GT(greater than)
    ASSERT_GE(expected, actual) ;  expected 大于等于 actual, GE(greater equal)

  EXPECT_EQ(expected, actual) ;
  EXPECT_NE(expected, actual) ;
  EXPECT_LT(expected, actual) ;
  EXPECT_LE(expected, actual) ;
  EXPECT_GT(expected, actual) ;
  EXPECT_GE(expected, actual) ;

基于字符串的比较断言说明


  ASSERT_STREQ(str1,str2); str1's content = str2's content
  ASSERT_STRNE(str1,str2); str1's content != str2's content
 ASSERT_STRCASEEQ(str1,str2); str1's content = str2's content ,ignoring characters case
 ASSERT_STRCASENE(str1,str2); ignoring string characters' case, str1's content != str2's content 

DHT Chord 理论学习笔记 1

Description:

本篇博客记录的是P2P 对等计算中基于 DHT 拓扑结构中的 Chord 算法理论信息,目的是为基于Chord算法的 P2P 自感应系统代码做理论支撑


名词概念

DHT 是什么 ?

DHT 是对等计算(Peer-to-Peer,P2P) 技术中拓扑结构中的一种,叫做全分布式结构化结构(decentralized structured topology),简称为 DHT,又称作分布式哈希表(distributed hash table)。

另外几种拓扑结构分别是中心化拓扑结构(centralized topology),全分布式非结构化结构(decentralized unstructured topology),半分布式结构(partially decentralized topology)。

DHT 较比其他拓扑结构有哪些特点?

DHT 结构可以自适应的支持网络中节点动态地加入/退出,并且扩展性、鲁棒性、结点ID 分配的均匀性和自组织能力都很好。同时DHT由于自身结构的特点可以提供精确的发现和定位网络中结点资源的功能。

DHT 都可以用来做什么 ?

可以基于 DHT 来建立复杂的服务,例如分散式档案系统、点对点技术档案分享系统、网页的快速抓取、数据的缓存系统、网络中任意结点数据传输、网域名城系统和即时通讯系统等等。

DHT 由哪些元素构成

操作/方法: Put(key,data), Get(key)

既然你已经知道了 DHT 就是我们常说的分布式哈希表,那么结合哈希表的’存入’,’读出’的操作特性,便可以推知 DHT 的操作方法也不外乎这两个(put,get)。

更加详细的说明在介绍 Chord 算法之后再来补充。

Chord 是什么 ?

Chord 是基于 DHT 数据结构的分布式算法,如果说 DHT 为分布式系统查询提供 Put , Get 接口封装的话,那么 Chord 便为 DHT 中为 Get,Put 两种两种方法提供底层实现的算法。

二者的关系如下图所示

  • 在 Chord 算法将每台主机抽象成结点(Node),同时为了保证结点在网络中的唯一性, 使用 Node-ID 数字序列来作为结点的唯一标识。

  • Chord 将网络中处于’在线’状态的计算机(结点),按照Node-ID数值的大小排列,从逻辑上构成一个首位相连的环形结构。

  • 为了方便在网络中每台’在线’结点上资源的搜索,每个结点上面还存放用来存放其他结点信息的路由表

  • 其中逻辑环中的主机总数(结点数目)和结点 ID 号码的位数都有关系:

1
2
3
4
5
6
7
8
9
Node-ID 二进制位数 = m

逻辑环中最大容纳主机(结点)个数 = 2^m 个

Node-ID 取值范围 = [0,2^m -1]

Node 上面的 finger-table(路由表) 维护的表项个数为 m 个

Finger-table 的表项中记录其他 Node-ID 的ID间隔为 2^i (0 <=i <= m-1)

P2P 网络中 Chord 环状结构图示:下图便是一个 m 为 6 的Chord 拓扑环和环中Node-ID = 8 结点上所维护的 finger table 图示

Chord 构成数据结构介绍

后继结点列表 - sucessor node list

  • 何谓后继结点?

    • 后继结点(successor)指的是 Chord 逻辑环中处于在线状态的 Node-ID 数值 >= 当前结点 Node-ID 的结点所构成的结点集合中 Node-ID 数值最小的结点变叫做当前结点的后继结点-它只有一个。列表中的其他的节点都是 Node-ID > 当前结点的 Node-ID。
  • 这么说有点混乱,举个例子: 就拿上图而言,我们设 N14 为当前节点,那么对于 N14 来说整个 Chord 环中 Node-ID 大于等于它所构成的结点集合是 {N21,N32,N38,N42,N48,N51,N56}。
  • {N21,N32,N38,N42,N48,N51,N56} 这个列表便是当前 N14的后继结点列表 ,而其中 Node-ID 号码最小的是 N21 , 那么 N21 便可以称作是 N14 的后继结点。
  • Chord 逻辑环中所提及的结点必须是处于’上线’/live 状态的节点。 Chord 中将处于在线状态的结点作为环中结点进行添加,如果不处于在线状态是不会将它加入逻辑环结构中的。 如果一个结点存活着进入到环中,然后由于某种原因崩溃/宕机了,那么它会被从环中踢出去的。
  • 后继结点列表是构成 Chord 网络的主要数据结构,当前结点可以借助于后继结点列表中记录的信息来直接’跳转’到它的后继结点上面,就像是 C++ 中通过指针进行地址跳转一样。
  • 后继结点列表越大越好吗?

    • 后接结点列表中存放的表项越多整个环形网络搜索到目标资源的可靠性越高,设想如果通过当前结点可以’跳到’更多的结点的上面,想必在环形网络中搜索到目标资源的命中率越高。

    • 不过,这些是相对于网络数据流量而言的,维护的列表越大,网络负担也就越重。从一个结点发出的查询越多(可跳转到的结点越多),该结点的流量相比也会越大。

前驱结点 - predecessor node

  • 何谓前驱结点?

    • 在了解何为后继结点之后,前驱结点的概念也明朗了很多。它指的是 Node-ID < 当前结点 Node-ID 的所有结点集合中, Node-ID 最小的那个便是当前结点的前驱结点
  • 如果某个结点没有 Node-ID < 它的 Node-ID 的结点的话,那么便选取整个 Chord 网络中 Node-ID 号码最大的结点作为它的前驱结点。N8 便是例子,它的前驱结点是 N56。

  • 这种特例用在后继结点上也是一样:如果在 Chord 网络中找不到 Node-ID > 它的 Node-ID 结点的话,就选取 Chord 网络中 Node-ID 最小的 Node-ID 所标识的结点作为它的后继结点。例如 N56 结点的后继结点便是 N8.

路由表 - finger table

  • 何谓路由表?

    路由表主要用来提高结点和资源信息的查询路由的速度,类似于 linux 中用来记录 IP 和 IP 所映射的域名的路由表,因此得名。 就是为了根据当前结点可以快速的跳转到其他结点上而记录的其他结点的{结点名称:网络地址} 这样映射关系的表项的二维表格。

路由表表项

路由表中表项主要存放了一下几种信息:

  • start : (n+2^(k-1)) mode 2^m ; (1<= k <= m )起始查询结点

  • interval : [finger[i].start,finger[i+1].start) 区间范围

  • node : Chord 网络中第一个 Node-ID >= start 结点的后继结点

  • successor : Chord 网络中结点的直接后继结点

  • predecessor : Chord 环形网络中结点的直接前驱结点

end

如何使用七牛存储来为你的 gihub 博客中添加图片

Description:
作为一个截图狂魔的我,在写博客的时候为了让内容描述更加的简洁,都会上传大量的图片实例。
但是自从换成了 github 上面的博客之后,为了节省空间(主要是技术问题…),一直都没有上传图片(背景图片除外!)
近期写的DHT 这篇博客十分需要配图说明,所以在这篇博客中将如何使用七牛存储来为自己的博客中添加图片的步骤记录一下。


首先注册申请七牛的账号

  • 在注册的时候,推荐将自己的 github 账号和七牛二者进行绑定。
  • 注册成功并登录之后,需要配置空间,也就是为你的空间起域名,空间选择’公开’这样可以确保通过 github 上的链接信息可以直接访问
  • 点击内容管理,然后从本地上传一张图片,会在右边窗口栏中看到这张图片的外链地址信息,我的是这个 点击它便可以看到刚刚上传的截图信息了

然后将外链地址添加到博客中

加载图片的格式为 ![](刚才七牛网站空间上传图片所生成的外链地址)

这样便可将图片展现在博客中(你没有穿越,前几天手贱把七牛里面的图片都删了,然后图片链接就坏掉了,这是重新截的图)

end