关于ZAKER 融媒体解决方案 合作 加入

python – 将数据框列和外部列表传递给 with.

CocoaChina 09-19

我有一个具有以下结构的 Spark 数据帧 . bodyText_token 具有标记 ( 处理 / 单词集 ) . 我有一个已定义关键字的嵌套列表

root |-- id: string ( nullable = true ) |-- body: string ( nullable = true ) |-- bodyText_token: array ( nullable = true ) keyword_list= [ 'union','workers','strike','pay','rally','free','immigration', ] , [ 'farmer','plants','fruits','workers' ] , [ 'outside','field','party','clothes','fashions' ] ]

我需要检查每个关键字列表下有多少令牌 , 并将结果添加为现有数据帧的新列 .

例如:如果代币 = [ " 成为 "," 农民 "," 集会 "," 工人 "," 学生 " ]

结果将是 – > [ 1,2,0 ]

以下功能按预期工作 .

def label_maker_topic ( tokens,topic_words ) : twt_list = [ ] for i in range ( 0, len ( topic_words ) ) : count = 0 #print ( topic_words [ i ] ) for tkn in tokens: if tkn in topic_words [ i ] : count += 1 twt_list.append ( count ) return twt_list

我在 withColumn 下使用了 udf 来访问该函数 , 但是我收到了一个错误 . 我认为这是关于将外部列表传递给 udf. 有没有办法可以将外部列表和 datafram 列传递给 udf 并向我的数据帧添加新列?

topicWord = udf ( label_maker_topic,StringType ( ) ) myDF=myDF.withColumn ( "topic_word_count",topicWord ( myDF.bodyText_token,keyword_list ) )

最佳答案

最干净的解决方案是使用闭包传递其他参数:

def make_topic_word ( topic_words ) : return udf ( lambda c: label_maker_topic ( c, topic_words ) ) df = sc.parallelize ( [ ( [ "union" ] , ) ] ) .toDF ( [ "tokens" ] ) ( df.withColumn ( "topics", make_topic_word ( keyword_list ) ( col ( "tokens" ) ) ) .show ( ) )

这不需要对 keyword_list 或使用 UDF 包装的函数进行任何更改 . 您还可以使用此方法传递任意对象 . 这可以用于传递例如用于有效查找的集合列表 .

如果您想使用当前的 UDF 并直接传递 topic_words, 则必须先将其转换为列文字:

from pyspark.sql.functions import array, litks_lit = array ( * [ array ( * [ lit ( k ) for k in ks ] ) for ks in keyword_list ] ) df.withColumn ( "ad", topicWord ( col ( "tokens" ) , ks_lit ) ) .show ( )

根据您的数据和要求 , 可以使用替代的 , 更有效的解决方案 , 这些解决方案不需要 UDF ( 爆炸聚合崩溃 ) 或查找 ( 散列矢量操作 ) .

以上内容由"CocoaChina"上传发布 查看原文