首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >以有状态方式处理网络数据包

以有状态方式处理网络数据包
EN

Stack Overflow用户
提问于 2019-02-12 16:27:33
回答 2查看 334关注 0票数 1

我想使用Spark解析网络消息,并以有状态的方式将它们分组为逻辑实体。

问题描述

让我们假设每条消息都在输入数据的一行中,如下所示。

代码语言:javascript
复制
| row   | time | raw payload   |
+-------+------+---------------+
|  1    | 10   | TEXT1;        |
|  2    | 20   | TEXT2;TEXT3;  |
|  3    | 30   | LONG-         |
|  4    | 40   | TEXT1;        |
|  5    | 50   | TEXT4;TEXT5;L |
|  6    | 60   | ONG           |
|  7    | 70   | -TEX          |
|  8    | 80   | T2;           | 

任务是解析原始有效负载中的逻辑消息,并在新的输出数据中提供它们。在本例中,有效负载中的每个逻辑消息都以分号(分隔符)结尾。

然后,所需的输出数据可以如下所示:

代码语言:javascript
复制
| row   | time | message       |
+-------+------+---------------+
|  1    | 10   | TEXT1;        |
|  2    | 20   | TEXT2;        |
|  3    | 20   | TEXT3;        |
|  4    | 30   | LONG-TEXT1;   |
|  5    | 50   | TEXT4;        |
|  6    | 50   | TEXT5;        |
|  7    | 50   | LONG-TEXT2;   |

请注意,某些消息行不会在结果中生成新行(例如,第4行、第6行、第7行、第8行),而有些消息甚至会产生多行(例如第2行、第5行)。

我的问题:

  • 这是联非新议程的用例吗?如果是这样的话,我应该如何实现merge函数?我不知道它的目的是什么。
  • 由于消息排序问题(我不能在不尊重消息顺序的情况下正确地处理LONGTEXT-1、LONGTEXT-2 ),我是否可以告诉spark在更高的级别(例如每个消息日历日)并行化,但不能在一天内并行化(例如,时间上的事件50、60、70、80需要按顺序处理)。
  • 后续问题:这个解决方案不仅适用于传统的星火,而且也适用于火花结构化流,这是否可以想象?还是后者需要自己的有状态处理方法?
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-02-22 11:05:25

在此期间,我想出了如何用“联非新议程”来完成这个任务。

代码语言:javascript
复制
class TagParser extends UserDefinedAggregateFunction {

  override def inputSchema: StructType = StructType(StructField("value", StringType) :: Nil)

  override def bufferSchema: StructType = StructType(
    StructField("parsed", ArrayType(StringType)) ::
      StructField("rest", StringType)
      :: Nil)

  override def dataType: DataType = ArrayType(StringType)

  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = IndexedSeq[String]()
    buffer(1) = null
  }

  def doParse(str: String, buffer: MutableAggregationBuffer): Unit = {

    buffer(0) = IndexedSeq[String]()

    val prevRest = buffer(1)
    var idx = -1
    val strToParse = if (prevRest != null) prevRest + str else str

    do {
      val oldIdx = idx;
      idx = strToParse.indexOf(';', oldIdx + 1)

      if (idx == -1) {
        buffer(1) = strToParse.substring(oldIdx + 1)
      } else {
        val newlyParsed = strToParse.substring(oldIdx + 1, idx)
        buffer(0) = buffer(0).asInstanceOf[IndexedSeq[String]] :+ newlyParsed
        buffer(1) = null
      }

    } while (idx != -1)
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {

    if (buffer == null) {
      return
    }

    doParse(input.getAs[String](0), buffer)
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = throw new UnsupportedOperationException

  override def evaluate(buffer: Row): Any = buffer(0)
}

这里有一个演示应用程序,它使用上面的联非新议程来解决上面的问题:

代码语言:javascript
复制
case class Packet(time: Int, payload: String)

object TagParserApp extends App {

  val spark, sc = ... // kept out for brevity

  val df = sc.parallelize(List(
    Packet(10, "TEXT1;"),
    Packet(20, "TEXT2;TEXT3;"),
    Packet(30, "LONG-"),
    Packet(40, "TEXT1;"),
    Packet(50, "TEXT4;TEXT5;L"),
    Packet(60, "ONG"),
    Packet(70, "-TEX"),
    Packet(80, "T2;")
  )).toDF()

  val tp = new TagParser
  val window = Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
  val df2 = df.withColumn("msg", tp.apply(df.col("payload")).over(window))
  df2.show()
}

这产生了:

代码语言:javascript
复制
+----+-------------+--------------+
|time|      payload|           msg|
+----+-------------+--------------+
|  10|       TEXT1;|       [TEXT1]|
|  20| TEXT2;TEXT3;|[TEXT2, TEXT3]|
|  30|        LONG-|            []|
|  40|       TEXT1;|  [LONG-TEXT1]|
|  50|TEXT4;TEXT5;L|[TEXT4, TEXT5]|
|  60|          ONG|            []|
|  70|         -TEX|            []|
|  80|          T2;|  [LONG-TEXT2]|
+----+-------------+--------------+

我的主要问题是找出如何实际应用这一联非新议程,即使用以下方法:

代码语言:javascript
复制
df.withColumn("msg", tp.apply(df.col("payload")).over(window))

我现在唯一需要弄清楚的是并行化的各个方面(我只想在不依赖排序的地方实现并行化),但这对我来说是另一个问题。

票数 0
EN

Stack Overflow用户

发布于 2019-02-12 17:34:57

通常,您可以使用mapGroupsWithState of flatMapGroupsWithState在火花流上运行任意有状态聚合。您可以找到一些示例这里。但是,所有这些都不能保证按事件时间对流的处理进行排序。

如果需要强制数据排序,则应尝试使用事件时间上的窗口操作。在这种情况下,您需要运行无状态操作,但是如果每个窗口组中的元素数量足够小,则可以使用collectList,然后在每个列表中应用一个UDF (可以管理每个窗口组的状态)。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54654578

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档