炫意html5
最早CSS3和HTML5移动技术网站之一

Python PySpark RDD

现在我们已经在我们的系统上安装并配置了PySpark,我们可以在Apache Spark上用Python编程。但在此之前,让我们了解Spark – RDD中的一个基本概念。

RDD代表 Resilient Distributed Dataset,它们是在多个节点上运行和操作以在集群上进行并行处理的元素。RDD是不可变元素,这意味着一旦创建了RDD,就无法对其进行更改。RDD也具有容错能力,因此在发生任何故障时,它们会自动恢复。您可以在这些RDD上应用多个操作来完成某项任务。

要对这些RDD进行操作,有两种方法

  • Transformation
  • Action

让我们详细了解这两种方式。

转换 – 这些操作应用于RDD以创建新的RDD。 Filter,groupBy和map是转换的例子。

操作 – 这些是应用于RDD的操作,它指示Spark执行计算并将结果发送回驱动程序。

要在PySpark中应用任何操作,我们首先需要创建一个 PySpark RDD 。以下代码块具有PySpark RDD类的详细信息

class pyspark.RDD (
jrdd,
ctx,
jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

让我们看看如何使用PySpark运行一些基本操作。Python文件中的以下代码创建RDD单词,其中存储了一组提到的单词。

words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)

我们现在将对单词进行一些操作。

count()

返回RDD中的元素数。

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

命令 – count()的命令是

$SPARK_HOME/bin/spark-submit count.py

输出 – 上述命令的输出是

Number of elements in RDD  8

搜集()

返回RDD中的所有元素。

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

命令 – collect()的命令是

$SPARK_HOME/bin/spark-submit collect.py

输出 – 上述命令的输出是

Elements in RDD -> [
'scala',
'java',
'hadoop',
'spark',
'akka',
'spark vs hadoop',
'pyspark',
'pyspark and spark'
]

foreach(F)

仅返回满足foreach内函数条件的元素。在下面的示例中,我们在foreach中调用print函数,它打印RDD中的所有元素。

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f)
----------------------------------------foreach.py---------------------------------------

命令 – foreach(f)的命令是

$SPARK_HOME/bin/spark-submit foreach.py

输出 – 上述命令的输出是

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

filter(f)

返回一个包含元素的新RDD,它满足过滤器内部的功能。在下面的示例中,我们过滤掉包含”spark’的字符串。

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

命令 – 过滤器(f)的命令是

$SPARK_HOME/bin/spark-submit filter.py

输出 – 上述命令的输出是

Fitered RDD -> [
'spark',
'spark vs hadoop',
'pyspark',
'pyspark and spark'
]

map(f,preservesPartitioning = False)

通过将函数应用于RDD中的每个元素来返回新的RDD。在下面的示例中,我们形成一个键值对,并将每个字符串映射为值1。

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

命令 – map命令(f,preservesPartitioning = False)是

$SPARK_HOME/bin/spark-submit map.py

输出 – 上述命令的输出是

Key value pair -> [
('scala', 1),
('java', 1),
('hadoop', 1),
('spark', 1),
('akka', 1),
('spark vs hadoop', 1),
('pyspark', 1),
('pyspark and spark', 1)
]

reduce(F)

执行指定的可交换和关联二进制操作后,将返回RDD中的元素。在下面的示例中,我们从运算符导入add包并将其应用于’num’以执行简单的加法运算。

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

命令 – reduce(f)的命令是

$SPARK_HOME/bin/spark-submit reduce.py

输出 – 上述命令的输出是

Adding all the elements -> 15

join(other,numPartitions = None)

它返回RDD,其中包含一对带有匹配键的元素以及该特定键的所有值。在以下示例中,两个不同的RDD中有两对元素。在连接这两个RDD之后,我们得到一个RDD,其元素具有匹配的键及其值。

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

命令 – 连接命令(其他,numPartitions =无)是

$SPARK_HOME/bin/spark-submit join.py

输出 – 上述命令的输出是

Join RDD -> [
('spark', (1, 2)),
('hadoop', (4, 5))
]

cache()

使用默认存储级别(MEMORY_ONLY)保留此RDD。您还可以检查RDD是否被缓存。

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Cache app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words.cache()
caching = words.persist().is_cached
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

命令 – cache()的命令是

$SPARK_HOME/bin/spark-submit cache.py

输出 – 上述程序的输出是

Words got cached -> True

这些是在PySpark RDD上完成的一些最重要的操作。

炫意HTML5 » Python PySpark RDD

Java基础教程Android基础教程