网站管理人员,个人怎么开网站,广州有名的广告公司,php网站开发范例如何使流程更快#xff1f; 如果您已经与Corda合作了一段时间#xff0c;那么您很有可能已经考虑过这一点。 您可以通过以下几方面进行合理的调整来提高性能#xff1a;事务大小#xff0c;优化查询并减少整个Flow执行过程中所需的网络跃点数。 在某种程度上#xff0c;还… 如何使流程更快 如果您已经与Corda合作了一段时间那么您很有可能已经考虑过这一点。 您可以通过以下几方面进行合理的调整来提高性能事务大小优化查询并减少整个Flow执行过程中所需的网络跃点数。 在某种程度上还有另一种可能也让您无所适从。 多线程。 更具体地说从已经执行的流程异步启动流程/子流程。 这样做有可能极大地改善您的CorDapps性能。 如果您尝试此操作则可能会遇到与我得到的类似的例外。 此外到目前为止Corda还不支持子流的线程化。 但是仍然可以做到。 我们只需要对此保持聪明。 那就是Corda Services中多线程进入的地方。它们可以在Flow中调用但不会妨碍Flow对其施加的严格规则因为正在执行的Flow不会在服务中挂起或检查点。 在本文中我将重点介绍从服务内部以多线程方式启动流程。 在Corda中还可以使用其他线程但这是我想更深入研究的有趣领域。 另一方面从服务启动流程也充满了一些陷阱。 这些需要考虑并遍历。 否则您将有一天醒来想知道为什么一切都没有明显的原因停止了。 幸运的是我在这里为您提供帮助。 对我来说我必须直面这个问题。 对我来说幸运的是R3能够提供帮助。 作为参考我将在本文中使用Corda Enterprise 3.1 。 要真正从本文的内容中受益您将需要使用Enterprise。 这是由于Enterprise支持多个异步执行的流。 开源目前不允许这样做。 我还建议您查看我以前的文章Corda Services 101因为我们将在此基础上建立基础。 情境 让我们从概述本帖子将要使用的场景开始。 随着时间的推移甲方向甲方发送一些消息。 每个消息来自一个流。 甲方回应所有发送给他们的消息。 每个消息都来自单个Flow但是它们希望在单个位置执行该过程。 可以快速组合一系列流程来满足此要求。 按顺序进行此操作应该证明绝对是零问题在解决了我们所有犯下的愚蠢错误之后。 尽管这种情况对于需要性能的情况很不利但是它很容易理解因此我们可以专注于异步运行。 慢速同步解决方案 在研究异步解决方案之前快速浏览一下将要使用的代码将是有益的。 下面是ReplyToMessagesFlow的代码。 我不想遍历所有底层代码而只想专注于与此帖子相关的代码 InitiatingFlow
StartableByRPC
class ReplyToMessagesFlow : FlowLogicList() {Suspendableoverride fun call(): List {return messages().map { reply(it) }}private fun messages() repository().findAll(PageSpecification(1, 100)).states.filter { it.state.data.recipient ourIdentity }private fun repository() serviceHub.cordaService(MessageRepository::class.java)Suspendableprivate fun reply(message: StateAndRef) subFlow(SendMessageFlow(response(message), message))private fun response(message: StateAndRef): MessageState {val state message.state.datareturn state.copy(contents Thanks for your message: ${state.contents},recipient state.sender,sender state.recipient)}
} 如果您确实阅读过Corda Services 101那么您可能已经认识到此类。 正如我之前提到的为提出的问题组合解决方案非常容易。 从Vault检索MessageState 然后启动子subFlow以subFlow进行回复。 这段代码将愉快地逐个传递消息。 那么我们可以采用此代码并使其更快吗 异步尝试失败 让我们尝试通过引入线程来使当前代码更快 我们将使用CompletableFutures来做到这一点 InitiatingFlow
StartableByRPC
class ReplyToMessagesBrokenAsyncFlow : FlowLogicList() {Suspendableoverride fun call(): List {return messages().map { CompletableFuture.supplyAsync { reply(it) }.join() }}// everything else is the same as before
} 大多数代码与以前相同因此已从示例中排除。 对代码的唯一更改是添加了CompletableFuture及其supplyAsync方法来自Java。 它尝试在单独的线程上开始为每个消息执行reply功能。 那么为什么将本节命名为“一次失败的尝试” 我指的是执行以上代码时获得的堆栈跟踪 java.util.concurrent.CompletionException: java.lang.IllegalArgumentException: Required value was null.at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_172]at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_172]at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) ~[?:1.8.0_172]at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[?:1.8.0_172]
Caused by: java.lang.IllegalArgumentException: Required value was null.at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201) ~[corda-node-3.1.jar:?]at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192) ~[corda-node-3.1.jar:?]at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271) ~[corda-node-3.1.jar:?]at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312) ~[corda-core-3.1.jar:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:57) ~[classes/:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19) ~[classes/:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$poop$$inlined$map$lambda$1.get(ReplyToMessagesBrokenAsyncFlow.kt:46) ~[classes/:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$poop$$inlined$map$lambda$1.get(ReplyToMessagesBrokenAsyncFlow.kt:19) ~[classes/:?] 您将获得它以及Corda正在打印的一长串检查点日志行。 此外只是为了掩盖我的屁股并向您证明这不是由于CompletableFuture的问题引起的这是使用Executor线程池时出现的另一个错误 Exception in thread pool-29-thread-1 Exception in thread pool-29-thread-2 java.lang.IllegalArgumentException: Required value was null.at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201)at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192)at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271)at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:48)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$call$$inlined$map$lambda$1.run(ReplyToMessagesBrokenAsyncFlow.kt:29)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
java.lang.IllegalArgumentException: Required value was null.at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201)at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192)at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271)at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:48)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$call$$inlined$map$lambda$1.run(ReplyToMessagesBrokenAsyncFlow.kt:29)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748) 希望您在这一点上相信我。 如果不是请参考我一开始所说的内容。 Corda当前不支持从正在执行的流程异步启动新流程。 我相信他们正在努力。 但是截至目前。 不要使用此解决方案。 可行的异步解决方案 我们已经看到在Flow内部执行线程是行不通的。 为了继续追求性能我们现在来看一下Corda服务中的线程。 这并不奇怪因为标题和开头的段落已经讨论了这一点…… 抛开讽刺的评论。 委派服务将需要对原始解决方案进行一些重做但是大部分代码将保持不变。 大部分内容将被复制并粘贴到另一个类中。 从流中获取代码并将其放入服务中。 以下是新的MessageService 其中包含原始ReplyToMessagesFlow的代码但进行了一些更改和添加了线程代码 CordaService
class MessageService(private val serviceHub: AppServiceHub) : SingletonSerializeAsToken() {private companion object {val executor: Executor Executors.newFixedThreadPool(8)!!}fun replyAll() {messages().map {executor.execute {reply(it)}}}private fun messages() repository().findAll(PageSpecification(1, 100)).states.filter { it.state.data.recipient serviceHub.myInfo.legalIdentities.first() }private fun repository() serviceHub.cordaService(MessageRepository::class.java)private fun reply(message: StateAndRef) serviceHub.startFlow(SendMessageFlow(response(message), message))private fun response(message: StateAndRef): MessageState {val state message.state.datareturn state.copy(contents Thanks for your message: ${state.contents},recipient state.sender,sender state.recipient)}
} 如您所见大多数代码与ReplyToMessagesFlow中的代码相同。 我要强调的第一点是使用Executor线程池。 我这里没有使用CompletableFutures 原因是我们稍后将要讨论的原因。 那么这一切如何运作 replyAll函数在新的系统线程上对从Vault检索到的每条消息执行reply 。 该新线程又调用startFlow 。 触发将新的流程放入“流程工作器”队列中。 这是所有乐趣发生的地方一切开始变得混乱。 Flow Worker队列负责执行Flow执行的顺序并将在Flow添加和完成时填充并为空。 该队列对于协调节点内流的执行至关重要。 当涉及到多线程Flows本身时它也是痛苦的根源。 下图显示了队列的简化视图 流进入队列并在处理后离开 我为什么要谈论这个队列 好吧我们需要格外小心不要将无法完成的流程填满队列。 怎么会这样 通过在正在执行的流程中启动流程然后流程等待其完成。 直到队列的线程池中的所有线程都遇到这种情况这才不会引起问题。 一旦发生它将使队列陷入僵局。 没有流程可以完成因为它们都依赖于许多排队的流程来完成。 流留在队列中等待它们调用的流完成 这很可能在多次触发相同流量的高吞吐量系统上发生。 现在队列中充满了等待其他流完成的流的机会增加了。 这不是很好使事情变得有些困难。 但是只要我们意识到这一点我们就可以适应它。 这也是Executor线程池而不是CompletableFuture的原因。 通过启动新流程而不等待其完成可以避免死锁。 这也是该解决方案的缺点。 没有新Flow的结果其功能将极为有限。 话虽如此如果您的用例适合上面显示的结构那么我绝对建议您使用此解决方案。 在下一节中我将讨论如何使用CompletableFuture 。 CompletableFutures的危险解决方案 这很危险的原因很简单。 僵局。 我建议不要使用此解决方案。 除非您的节点有权访问足够的线程否则要减少用无法完成的线程填充队列的机会。 另一方面这是一个更为理想的解决方案因为您可以等待启动的流程的结果并对其进行处理。 这使解决方案更加有用。 以下是带有CompletableFutures的MessageService外观 CordaService
class MessageService(private val serviceHub: AppServiceHub) : SingletonSerializeAsToken() {fun replyAll(): List messages().map { reply(it).returnValue.toCompletableFuture().join() }// everything else is the same as before
} 除了replyAll函数外代码replyAll 。 返回的CordaFuture提供的toCompletableFuture函数调用join以等待所有期货的结果并返回总体结果。 如前所述此解决方案可能导致死锁。 但是对于您的情况也许并非如此。 由您决定发生这种情况的可能性。 如果不利于您最好走开。 选择坚持使用类似于上一节中详述的同步或异步解决方案。 我真的需要这样做吗 现在是的我相信你会的。 展望未来我怀疑您是否需要依靠我在本文中提出的解决方案。 我相信Corda正在努力消除从Flow内部启动Flow时甚至不必考虑线程的需求。 取而代之的是您可以简单地调用带有选项的subFlow来异步运行它。 这本可以使我们保留原始的同步解决方案但可以选择使每个subFlow在单独的线程上运行。 将各部分结合在一起 总之在Corda Enterprise 3中可以在正在执行的流程中异步启动新流程。 根据您的用例这可以提供良好的性能优势。 有缺点。 您不能等待异步流的结果而不会用死锁的威胁来威胁您的节点。 节点的基础队列无法处理它所处的情况。因此在将线程引入到Flow调用中时需要格外小心。 值得庆幸的是随着Corda的发展您甚至不必担心自己这样做。 它甚至可能像添加布尔函数参数一样简单。 那是梦想 这篇文章中使用的代码可以在我的GitHub上找到 。 如果您认为这篇文章有帮助可以在Twitter上LankyDanDev关注我以跟上我的新文章。 翻译自: https://www.javacodegeeks.com/2018/09/asynchronous-flow-invocations-corda-services.html