首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用星火查看日志

用星火查看日志
EN

Stack Overflow用户
提问于 2016-12-29 19:57:04
回答 1查看 860关注 0票数 2

我是Spark的新手,我正在开发一个python脚本,它可以读取带有一些日志的csv文件:

代码语言:javascript
复制
userId,timestamp,ip,event
13,2016-12-29 16:53:44,86.20.90.121,login
43,2016-12-29 16:53:44,106.9.38.79,login
66,2016-12-29 16:53:44,204.102.78.108,logoff
101,2016-12-29 16:53:44,14.139.102.226,login
91,2016-12-29 16:53:44,23.195.2.174,logoff

并检查用户是否有一些奇怪的行为,例如,他是否连续进行了两次“登录”而没有进行“注销”。我已经将csv作为dataFrame加载,我希望比较单个用户的日志行,按时间戳排序,并检查两个连续事件是否属于同一类型(登录-登录、注销-注销)。我正在以“map- reduce”的方式搜索它,但目前我还不知道如何使用一个用于比较连续行的reduce函数。我写的代码很好,但是性能很差。

代码语言:javascript
复制
sc = SparkContext("local","Data Check")
sqlContext = SQLContext(sc)

LOG_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/flume/events/*"
RESULTS_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/spark/script_results/prova/bad_users.csv"
N_USERS = 10*1000

dataFrame = sqlContext.read.format("com.databricks.spark.csv").load(LOG_FILE_PATH)
dataFrame = dataFrame.selectExpr("C0 as userID","C1 as timestamp","C2 as ip","C3 as event")

wrongUsers = []

for i in range(0,N_USERS):

    userDataFrame = dataFrame.where(dataFrame['userId'] == i)
    userDataFrame = userDataFrame.sort('timestamp')

    prevEvent = ''

    for row in userDataFrame.rdd.collect():

        currEvent = row[3]
        if(prevEvent == currEvent):
            wrongUsers.append(row[0])

        prevEvent = currEvent

badUsers = sqlContext.createDataFrame(wrongUsers)
badUsers.write.format("com.databricks.spark.csv").save(RESULTS_FILE_PATH)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-12-29 22:35:43

首先(不相关但仍然相关),确保每个用户的条目数量没有那么大,因为collectfor row in userDataFrame.rdd.collect():中是危险的。

第二,您不需要离开DataFrame区域来使用经典的Python,只需坚持使用Spark即可。

现在你的问题。它基本上是“我想从前一行了解的每一行”:这属于Window函数的概念,准确地说是lag函数。这里有两篇关于Spark中的窗口函数的有趣文章:一篇来自数据库中的代码,另一篇来自新赫中的Scala示例(我认为更容易理解)。

我在Scala中有一个解决方案,但我认为您可以用Python实现它的翻译:

代码语言:javascript
复制
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lag

import sqlContext.implicits._

val LOG_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/flume/events/*"
val RESULTS_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/spark/script_results/prova/bad_users.csv"

val data = sqlContext
  .read
  .format("com.databricks.spark.csv")
  .option("inferSchema", "true")
  .option("header", "true") // use the header from your csv
  .load(LOG_FILE_PATH)

val wSpec = Window.partitionBy("userId").orderBy("timestamp")

val badUsers = data
  .withColumn("previousEvent", lag($"event", 1).over(wSpec))
  .filter($"previousEvent" === $"event")
  .select("userId")
  .distinct

badUsers.write.format("com.databricks.spark.csv").save(RESULTS_FILE_PATH)

基本上,您只需从前一行中检索值,并将其与当前行上的值进行比较,如果匹配是错误的行为,并保留userId。对于每个userId“块”中的第一行,前面的值将是null:与当前值相比,布尔表达式将是false,因此这里没有问题。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41386387

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档