对于同样面临使用Flink(v1.16/Scala 2.12)连接多个(>;2)流(可能不同的数据类型)问题的开发人员来说,这个线程应该是一个小的“入门”或起点。对于这个任务,我选择了级联方法(总是一个接一个地连接两个流)或专门开发的“联合类型”。使用MultipleInputStreamOperator Flink提供了一个涵盖此用例的工具。不幸的是,我没有找到任何示例或大量文档。因此,我决定写这篇文章,并提供一小段我的实现。这个例子并不完美,但它应该能让人理解这个想法。
首先,您需要一个扩展MultipleInputStreamOperator和AbstractStreamOperatorV2的类以及一个工厂。
class MultiInputJoinOperator[OUT](parameters: StreamOperatorParameters[OUT], nInputs: Int)
(implicit outInfo: TypeInformation[OUT])
extends AbstractStreamOperatorV2[OUT](parameters, nInputs) with MultipleInputStreamOperator[OUT {
override def getInputs = util.Arrays.asList(new ExampleInput1(this, 1))
class ExampleInput1
(owner: AbstractStreamOperatorV2[OUT], inputId: Int)
extends AbstractInput[InputEvent, OUT](owner: AbstractStreamOperatorV2[OUT], inputId: Int) {
override def processElement(element: StreamRecord[InputEvent]): Unit = println("InputEvent1: " + element.getValue.toString)
}
}
class MultiInputJoinOperatorFactory[OUT]
(inputCount: Int)
(implicit outInfo: TypeInformation[OUT])
extends AbstractStreamOperatorFactory[OUT] {
def createStreamOperator[T <: StreamOperator[OUT]]
(parameters: StreamOperatorParameters[OUT]): T =
new MultiInputJoinOperator(parameters, inputCount).asInstanceOf[T]
def getStreamOperatorClass(classLoader: ClassLoader): Class[_ <: StreamOperator[OUT]] = classOf[MultiInputJoinOperator[OUT]]
}
然后您可以按以下方式组合使用它:
val transformOp = new KeyedMultipleInputTransformation[OutputEvent](
"MultipleInputOperator"
, new MultiInputJoinOperatorFactory[OutputEvent](1)
, Types.of[OutputEvent] //TypeOfOutputEvent
, env.getParallelism
, Types.of[String] //TypeOfKey
)
transformOp.addInput(inputStream.javaStream.getTransformation(),new KeySelector[InputEvent,String] {
override def getKey(value: InputEvent): String = value.keyAttribute
})
transformOp.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES)
env.getJavaEnv.addOperator(transformOp)
val connectedStreams = new MultipleConnectedStreams(env.getJavaEnv).transform(transformOp)
connectedStreams
.name("MultiInputStreamOperation")
.map(x => doSomethingWithIt(x))
.print()
我很高兴从您与操作员的经验中获得任何反馈或输入。
诚挚的问候Dominik