基于SEDA的异步框架设计与实现
四、异步框架总体设计与实现
1、框架中的stage理想结构
前文提到,基于SEDA的异步框架,一个stage的理想结构描述如下:
在这个框架的设想中,一个stage一般需要有如下几个组件:
1、D-MQ:分布式消息中间件。用做事件队列,以进行消息的传递。
2、Local-Queue:本地队列。一般是blockingQueue,用以辅助实现stage内的动态线程池。采用Local-Queue的目的在于避免数据在mq中的堆积导致mq性能下降。
3、Thread Pool:动态线程池。进行事件的并发处理。
4、Worker:事件的具体处理器
5、Stage Controller:stage的性能控制器。用以对stage的队列、资源、调度策略进行控制。
引此为框架的设计理念,于是有了如下基于SEDA的异步框架的架构设计。
2、SEDA异步框架的使用场景
该异步框架可以用来处理如下几个场景的问题:
1、系统资源监控(CPU、内存、线程池、队列)
2、外围服务交互情况(API被调用、上游服务交互、请求方等)监控
3、系统报警(服务异常、接口压力过大等)
4、基于日志和事件的数据挖掘(规则挖掘等)
5、重要业务数据切片转储(里程碑消息、核心服务交互数据等)
6、异步触发的操作(表A写完后异步写表B等)
其使用场景大致可如下图所示:
3、SEDA异步框架系统总体架构
因而,基于以上所述适用范围的框架实现之后的系统架构,一般可如下所示:
当然,以上结构并非绝对的,如有需要,你完全可以通过自己定制bundle和bundle之间的拓扑关系,来实现各种复杂的事件处理过程。你只需要简单通 过声明bundle相关配置,即可实现任何按照你所希望的有向图去关联的bundle。框架提供给了你一个经过轻量级封装后的平台,后面的业务逻辑,就靠 开发者自己了。
4、异步框架原生态架构(Virtual Bundle)
基于上述的设计理念,最终实现的异步框架的原生态架构如下所示:
异步框架在无任何扩展的时候,其主要组件如下:
1、bundle:消息中心的核心组件。由读、处理、写三部分功能组成。同时整合开关、定时器、动态线程池等元素来支持多样化的输入和需求。bundle可以从多种数据源获取数据,并进行数据的处理。
1)开关:用以决定该bundle是否被激活。如未被激活,则该bundle将停止读取数据,同时不会在其他服务上产生该bundle对应的数据(比如在mq上生成该bundle的队列、连接、交换机等。)
2)定时器:用以指定该bundle是否定时运行。如未指定,则实时运行。
3)动态线程池:用以支持bundle以同步/异步方式调用。如未指定,则同步运行。
2、bundle decider:用以对bundle的关键指标进行决策(是否激活、时效性、同步类型等)。并同时提供健康检查。
3、Work carrier:处理数据的最小单元。
5、异步框架的AMQP实现(AMQP Bundle)
异步框架扩展的AMQP实现,其架构图如下所示:
其主要组件说明如下:
1、amqp bundle:消息中心的核心组件。由读、处理、写三部分功能组成。在整合开关、定时器、动态线程池之余,提供了配置化的订阅订阅管理以及关键行为的声明。
要声明一个bundle仅需声明对应的bean,示例如下:
<!-- 报警消息收集器 -->
<bean id="alarmCollector" class="com.cc.deimos.satellite.core.collector.CommonAmqpCollectBundle">
</bean>
1)开关:用以决定该bundle是否被激活。如未被激活,则该bundle将停止读取数据,同时不会在其他服务上产生该bundle对应的数据(比如在 mq上生成该bundle的队列、连接、交换机等)。加上开关之后,最大的优势在于其可以更方便的支持分布式部署。对不同的部署实例设置不一样的 active设置可以完成不同stage在不同机器的启停。默认激活。我们为bundle加上开关,示例如下:
<!-- 报警消息收集器 -->
<bean id="alarmCollector" class="com.cc.deimos.satellite.core.collector.CommonAmqpCollectBundle">
<property name="active" value="true" />
</bean>
2)定时器:用以指定该bundle是否定时运行。默认实时运行。我们为bundle加上定时器,示例如下:
<!-- 报警消息收集器 -->
<bean id="alarmCollector" class="com.cc.deimos.satellite.core.collector.CommonAmqpCollectBundle">
<property name="timer" value="0/30 * * * * ?"/>
</bean>
3)动态线程池:用以支持bundle内部的实际数据流处理过程以同步/异步方式调用。默认同步运行。我们为bundle加上动态线程池,示例如下:
<!-- 报警消息收集器 -->
<bean id="alarmCollector" class="com.cc.deimos.satellite.core.collector.CommonAmqpCollectBundle">
<property name="taskExecutor" ref="alarmCollectBundleExecutor"/>
</bean>
<!-- 报警信息收集器对应的动态线程池 -->
<bean id="alarmCollectBundleExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="2" />
<property name="maxPoolSize" value="30" />
<property name="queueCapacity" value="200" />
</bean>
4)订阅发布:用以声明收集和推送信息时所需的交换机和密钥。通过支持逗号分隔的多key组合来支持多对多的上下游bundle关系。每个key的配置语 法符合rabbitmq中topic类型的exchange使用规范即可。默认采用“deimos-common”交换机。以下给出几种声明的配置:
其一:最简易配置。配置要订阅和发布的消息key即可。交换机采用默认配置。
<!-- 报警消息收集器 -->
<bean id="alarmCollector" class="com.cc.deimos.satellite.core.collector.CommonAmqpCollectBundle">
<property name="pubKeys" value="process.alarm.*" />
<property name="subKeys" value="collect.alarm.*, collect.log.error" />
</bean>
其二:声明特殊的来源和目的地的交换机:
<!-- 报警消息收集器 -->
<bean id="alarmCollector" class="com.cc.deimos.satellite.core.collector.CommonAmqpCollectBundle">
<property name="pubKeys" value="process.alarm.*" />
<property name="subKeys" value="collect.alarm.*, collect.log.error" />
<property name="pubDest" value="spec1" />
<property name="subDest" value="spec2" />
</bean>
5)事件队列:每个bundle默认实现一个固定格式的独立队列。可通过配置另外指定。可支持bundle监听多队列的需求。如需要特别指定一个或多个事件队列,则示例如下:
<!-- 报警消息收集器 -->
<bean id="alarmCollector" class="com.cc.deimos.satellite.core.collector.CommonAmqpCollectBundle">
<property name="pubKeys" value="process.alarm.*" />
<property name="subKeys" value="collect.alarm.*, collect.log.error" />
<property name="subQueues">
<list>
<ref bean="queueForLogError" />
</list>
</property>
</bean>
<!-- error日志消息的订阅 -->
<!--
如队列的声明不采用默认配置,完整声明如下:
<property name="exchangeName" value="deimos-common" />
<property name="queue" value="logQueue" />
<property name="bindingKey" value="collect.log.*" />
-->
<bean id="queueForLogError" class="com.cc.deimos.satellite.bo.AmqpQueueConfig">
<property name="bindingKey" value="collect.log.error" />
</bean>
6)监听容器:按照默认配置实现,并发数可通过配置指定。bundle如需额外设定channel数量,则示例如下:
<!-- 报警消息收集器 -->
<bean id="alarmCollector" class="com.cc.deimos.satellite.core.collector.CommonAmqpCollectBundle">
<property name="pubKeys" value="process.alarm.*" />
<property name="subKeys" value="collect.alarm.*, collect.log.error" />
<property name="concurrency" value="20" />
</bean>
7)关键行为。用以给发布的消息打上bundle的标签。以辅助其他bundle进行数据筛选和处理。默认以发布的key为关键行为。如需额外声明,则示例如下:
<!-- 报警消息收集器 -->
<bean id="alarmCollector" class="com.cc.deimos.satellite.core.collector.CommonAmqpCollectBundle">
<property name="pubKeys" value="process.alarm.*" />
<property name="subKeys" value="collect.alarm.*, collect.log.error" />
<property name="keyAction" value="ALARM_KEY_INFO" />
</bean>
2、bundle decider:用以对bundle的关键指标进行决策(是否激活、时效性、同步类型等)。并同时提供健康检查。默认采用fix strategy decider(定参策略决策器)。可进行配置来指定所需决策器类型,示例如下:
<!-- 报警消息收集器 -->
<bean id="alarmCollector" class="com.cc.deimos.satellite.core.collector.CommonAmqpCollectBundle">
<property name="pubKeys" value="process.alarm.*" />
<property name="subKeys" value="collect.alarm.*, collect.log.error" />
<property name="strategyDecider" value="FIXED_STRATEGY"/>
</bean>
3、work carrier:处理数据的最小单元。Bundle依据决策器指示的状态同步/异步、实时/定时调用work carrier进行处理。完全对开发者透明。用户者无需关心该组件。bundle将结合decider进行调度。同时work carrier处理后的数据推送过程也对开发者透明。开发者所需要做的就是实现bundle的doWork方法,并将处理之后的数据直接return即 可。doWork方法如下所示:
@Override
public Object doWork(List<DeimosSatelliteRequest> message) throws Exception{
// do somethoing with message then retrun the result;
}
4、exchange:rabbitmq交换机。默认所有bundle都请求“deimos-common”。可集群化。配置见上。
5、amq:采用支持amqp协议的rabbitmq。默认单机内存节点。可采用镜像队列或其他方案来进行broker、queue的集群化。
6、channel:amq信道。可启动多信道并发监听amq队列消息。支持配置化设定。配置见上。
在web/servlet容器启动之后,框架中的各个组件将被依次加载,以下给出了bundle的大致启动流程,也正是因为这个启动流程,将上述的各个组件进行串联,并开始执行各自负责的工作:
以上详细介绍了SEDA框架的AMQP实现中主要组件的作用、声明方式以及实现原理。总结一下,异步框架的AMQP实现中,bundle与bundle之间通过分布式 队列rabbitmq进行数据传递,bundle内部提供包含阻塞队列的动态线程池taskExecutor来进行数据处理,同时提供了定时器timer 来控制bundle的定时/实时调用。workcarrier作为消息处理的最小单元,其调用机制完全对用户透明。消息在bundle中的接收、处理和推 送由bundle decider组件进行管理。用户只需要简单实现doWork方法和声明bundle配置即可实现消息的处理和传递。
6、一个简单的bundle安装示例
你完全可以只按照如下几步,就可以轻松实现你每个stage:
1、继承AmqpBundle类,实现doWork方法,完成你的业务逻辑。示例如下(通用收集器demo):
public class CommonAmqpCollectBundle extends SatAmqpBundle {
/**
* 采用并发队列。性能比阻塞队列高
*/
public final ConcurrentLinkedQueue<DeimosSatelliteRequest> cacheQueue = new ConcurrentLinkedQueue<DeimosSatelliteRequest>();
@Override
public Object doWork(List<DeimosSatelliteRequest> message) throws Exception {
logger.info("i am now in LogWorker:" + Thread.currentThread().getName() + "message is : " + message
+ ". now begin to collect!");
// 此处不采用锁,因为其带来的影响很有限
cacheQueue.addAll(message);
if (cacheQueue.size() < SatConstant.LOG_BATCH_SIZE) {
return null;
}
// 进行遍历导数据
List<DeimosSatelliteRequest> list = new ArrayList<DeimosSatelliteRequest>();
for (int i = 0; i < SatConstant.LOG_BATCH_SIZE && !cacheQueue.isEmpty(); i++) {
DeimosSatelliteRequest meta = cacheQueue.poll();
// 此时队列也已经为空了
if(meta == null){
break;
}
// 校验
if(meta.getTimestamp() == null || meta.getRealData() == null || meta.getData() == null){
logger.error("[deimos-satellite]meta param is error! meta request: " + meta);
continue;
}
// 记录关键行为
if(meta.getData().get(SatApiConstant.KEY_ACTION) == null){
meta.getData().put(SatApiConstant.KEY_ACTION, keyAction);
}
list.add(meta);
}
// 排序 ,以时间戳为key。考虑到可能出现时间戳一致的情况,所以不能用map。考虑到如果log要push到其他平台或者服务上,
// 该切片应该先保证自身有序而不能完全依赖于下一个bundle来处理
Collections.sort(list, new Comparator<DeimosSatelliteRequest>() {
@Override
public int compare(DeimosSatelliteRequest o1, DeimosSatelliteRequest o2) {
if (o1.getTimestamp().longValue() >= o2.getTimestamp()) {
return -1;
} else {
return 1;
}
}
});
logger.info("[deimos-satellite]common amqp collctor prepare to push to next bundle............ key action: " + keyAction);
return list;
}
}
2、对新写的bundle类加上配置声明。以下为最轻便的写法。如果需要额外定制其他bundle参数,参照上面的相关说明,进行定制即可。
<!-- 报警消息收集器,最简易参数声明。在切分成不同stage之后没有什么需要特别关注的潜在性能瓶颈时使用 -->
<bean id="alarmCollector" class="com.cc.deimos.satellite.core.collector.CommonAmqpCollectBundle">
<property name="pubKeys" value="process.alarm.*" />
<property name="subKeys" value="collect.alarm.*, collect.log.error" />
</bean>
3、启动服务。至此,你的bundle也就随着服务的启动就自动启动并开始工作了。
以上描述的一个bundle启动的过程,当所需要处理的业务被合理拆解成数个bundle(也就是所谓的stage)之后,就可以形成一个完整的基于工作流的系统实现。以下为基于对来源A进行事件收集的简易报警系统,在被拆解为三个bundle之后的数据流:
6、框架改进空间
目前框架仅仅提供了一个非常轻量的解决方案,并仅对AMQP进行了实现。后续可有如下几个改进升级的空间:
1)构成bundle strategy center。各个bundle的decider(决策器)在决策过程中,可以依赖strategy center进行bundle的相关决策。
2)支持bundle集群化、broker集群化,并引入相关策略(比如一致性哈希)来保证基于该框架的系统的高可用。可纳入bundle strategy center。
3)框架进一步基于IOC思想,进行面向接口编程的改造和升级。
(暂时整理到这,具体的uml图以及代码后续提供。)
- 大小: 37.9 KB
- 大小: 34.5 KB
分享到:
相关推荐
Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。
Google Cloud CLI安装包GoogleCloudSDKInstaller.exe
Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。
罗兰贝格_xx业务计划与控制体系最终报告gltp.pptx
这个是一个JSP医院在线挂号管理系统,管理员角色包含以下功能:管理员登录,医生用户管理,修改密码,科室类别信息管理,医生信息管理,添加医生信息,查询医生信息,注册用户管理,黑名单管理,预约信息管理,查询预约信息,医生预约查询,预约信息统计,科室汇总统计等功能。患者角色包含以下功能:查看首页,患者登录,修改密码,修改个人资料,查看预约信息,查看医生信息,查看科室分类,医生查询,预约医生等功能。… 本项目实现的最终作用是基于JSP医院在线挂号管理系统 分为4个角色 第1个角色为管理员角色,实现了如下功能: - 修改密码 - 医生信息管理 - 医生用户管理 - 医生预约查询 - 查询医生信息 - 查询预约信息 - 注册用户管理 - 添加医生信息 - 科室汇总统计 - 科室类别信息管理 - 管理员登录 - 预约信息管理 - 预约信息统计 - 黑名单管理
麦肯锡-年月―中国xx集团战略咨询项目建议书gltp.pptx
详细介绍及样例数据:https://blog.csdn.net/li514006030/article/details/138872006
Screenshot_2024-05-14-22-47-39-925_com.alibaba.android.rimet.hznu.jpg
【资源说明】 基于Python+Flask弹幕情感分析的直播高光时刻判断模型设计与系统实现+系统功能说明+全部资料齐全+部署文档.zip基于Python+Flask弹幕情感分析的直播高光时刻判断模型设计与系统实现+系统功能说明+全部资料齐全+部署文档.zip 【备注】 1、该项目是个人高分项目源码,已获导师指导认可通过,答辩评审分达到95分 2、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! 3、本项目适合计算机相关专业(人工智能、通信工程、自动化、电子信息、物联网等)的在校学生、老师或者企业员工下载使用,也可作为毕业设计、课程设计、作业、项目初期立项演示等,当然也适合小白学习进阶。 4、如果基础还行,可以在此代码基础上进行修改,以实现其他功能,也可直接用于毕设、课设、作业等。 欢迎下载,沟通交流,互相学习,共同进步!
内容概要:这个项目是基于Vue.js和Node.js的外卖系统的设计与实现,旨在帮助开发者通过实践掌握Vue.js和Node.js的应用,并了解外卖系统的开发流程和技术选型。该系统旨在提供一个完整的在线订餐平台,包括用户注册登录、浏览商家和菜单、下单支付、配送跟踪等功能,以提升用户的订餐体验。源码包含了前端Vue.js和后端Node.js的代码,部署文档详细介绍了系统的部署步骤和环境配置要求,讲解内容涵盖了系统的功能模块、技术选型理由、设计思路以及使用方法。 适合人群:对Vue.js和Node.js有一定了解,并希望通过实践项目提升技能的开发者。 能学到什么:①掌握Vue.js和Node.js的开发流程;②了解外卖系统的业务逻辑和实现方法;③学习前后端分离的开发模式;④了解部署和维护一个完整项目的流程。 阅读建议:本资源旨在帮助开发者通过实践项目掌握Vue.js和Node.js的应用,并了解外卖系统的开发流程。建议在学习过程中结合部署文档进行实践,并深入理解讲解内容中的技术原理和设计思路。同时,鼓励开发者根据实际需求对项目进行拓展和优化,以提升自己的技术水平。
大学生毕业设计、大学生课程设计作业
具体的操作步骤如下: (1)生成6位数字防伪编码。当用户在主程序界面中输入数字“1”菜单项时,将进入“生成6位数字防伪编码 (213563型)”的功能执行任务。此时要求输入生成防伪码的数量,可以根据需要输入生成防伪码的数量,如图2所示。按下<Enter>键,开始批量生成防伪码,生成后系统将提示用户生成了多少个注册码和生成文件的位置信息等,如图3所示。单击“确定”按钮,关闭提示信息。在屏幕上可以看到生成的防伪码信息,如图4所示。生成的文件在程序所在目录下的“codepath”文件夹下,名称为“scode1.txt”,如图5所示。
Centos7-离线安装redis
这个是一个JSP鲜花商城网站系统,用户角色包含以下功能:用户登录与注册,修改个人信息,查看首页,查看分类,查看热销,查看订单,查看鲜花详情,加入购物车,提交订单等功能。管理员角色包含以下功能:订单管理,客户信息管理,鲜花管理,鲜花类目管理,管理员登录等功能。
Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。
备自投动作不成功事故处理tp.pptx
这个是一个JSP户籍管理系统,管理员角色包含以下功能:管理员登陆,户籍管理,迁入管理,迁出管理,反馈投诉管理等功能。用户角色包含以下功能:用户登陆,户籍信息查看,迁入查询,反馈建议等功能。 本项目实现的最终作用是基于JSP户籍管理系统 分为2个角色 第1个角色为管理员角色,实现了如下功能: - 反馈投诉管理 - 户籍管理 - 管理员登陆 - 迁入管理 - 迁出管理 第2个角色为用户角色,实现了如下功能: - 反馈建议 - 户籍信息查看 - 用户登陆 - 迁入查询
Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。
如何使用容器服务进行开发和部署
java班级管理系统