找回密码
 会员注册
查看: 12|回复: 0

如何做好分布式任务调度——Scheduler的一些探索_UTF_8

[复制链接]

2万

主题

0

回帖

7万

积分

超级版主

积分
73956
发表于 2024-9-30 23:27:07 | 显示全部楼层 |阅读模式
动手点关注干货不迷路作者:张宇轩,章逸,曾丹初识 Scheduler找准定位:分布式任务调度平台无论是互联网应用或者企业级应用,都充斥着大量的任务。我们常常需要一些任务调度系统帮助我们解决问题。随着微服务化架构的逐步演进,单体架构逐渐演变为分布式、微服务架构。在此的背景下,很多原先的单点式任务调度平台已经不能满足业务系统的需求。于是出现了一些基于分布式的任务调度平台。Scheduler 是飞书内的分布式任务调度平台。分布式任务调度能力主要包括:分布式:平台是分布式部署的,各个节点之间可以无状态和无限的水平扩展(保证可扩展);任务调度:涉及到任务状态管理、任务调度请求的发送与接收、具体任务的分配、任务的具体执行;(集群中哪些机器什么时候执行什么任务,所以又需要一个可以感知整个集群运行状态的配置中心);配置中心:可以感知整个集群的状态、任务信息的注册。摸清脉络:Scheduler的结构和核心模块名词解释:Processor: 编程处理器, 拥有一定的编程规范, 用户自定义实现。Executor: 一个 SDK,运行 Processor 的进行容器,与 Scheduler 通信的载体。Job:用户创建的任务,其中包含任务的调度规则、调度模型、执行器名称等信息。Instance:运行态的Job,每当Job触发后会生成一个Instance,记录本次执行的调度信息。Task:最小执行单元,不同调度模型的任务产生的Task数量不同。通过架构图可以发现,Scheduler主要有以下三个部分:调度器 (Scheduler):任务调度中心,负责管理任务的生命周期。接受任务注册,准时准确找出待触发的任务,进行任务拆分下发。找出与之关联的执行器并下发对应任务;执行器 (Executor):接收调度任务,并将自身状态上报给调度器;控制台 (Web 前端):负责配置执行器的信息以及调度任务的配置、任务状态、信息展示。因此,我们可以用一句话解释清楚 Scheduler 所做的事情,即:在「指定时间」「通知执行器」以「指定方式」执行任务这句话中包含了三个关键点,也分别代表着 Scheduler 的三个核心模块:指定时间:任务的触发规则,如:每天早上8点、每周二、每月15号等。触发器模块(Launcher Cron)负责任务触发;指定方式:任务的执行形式,如:单播任务-指定一个机器执行;广播任务-指定所有机器执行;分片任务-任务分阶段分批的执行。分派器模块(Assignment Cron)负责任务的执行方式;通知执行器:将任务发送到指定执行器,执行任务。派遣器模块(Dispatcher Cron)负责任务的发送,采用流式通信,调度器以推送的方式将任务发送给执行器。在一个Job的调度周期中,各个模块各司其职,整个流程如下:拥有这三个核心模块后,Scheduler 已具备了成熟的任务调度功能。另外,为了增加 Scheduler 的稳定性,有额外两个模块为其保驾护航:健康管理模块 (Service Health Cron): 负责管理 Job 的生命周期,检测未正常派发执行的 Job、Instance 和 Task,并将结果上报给运维人员。任务进度刷新模块 (Task Cron): 异步更新 Task 状态,流量较高时进行削峰,保证依赖的 mysql 及 redis 不因为流量过高而出现问题。本篇文章不对 Scheduler 所支持的定时任务能力作赘述,而是从三个方面(易用性、多功能性、稳定性)介绍 Scheduler 对于分布式任务调度的思考和探索:「易用性」: 决定了用户是否选择使用该框架的意愿,一个好的框架必须是易用且快速接入的;「多功能性」: 接入方需求多种多样,要站在用户角度想问题,不能闭门造车;「稳定性」: 对于分布式任务调度平台来说,不仅仅局限于自身的稳定性,接入方的稳定性也十分重要。换位思考-快速接入背景:效率至上,时间是金以字节跳动内部为例,当前团队想要实现一个定时任务有多种方式:接入字节云的 cronjob 平台、自己实现一套定时任务框架或者接入第三方定时任务框架。对于第一种接入 cronjob 平台,每一种定时任务都需要注册各自的 psm 和运行时环境(镜像),当任务需要访问依赖资源如 redis/db 等时,需要各自添加授权。任务代码逻辑有变化时也需要各自升级,导致开发、管理起来较为复杂。对于第二种自己实现一套定时任务框架,不仅整体开发时间较长,且需要大量时间进行测试回归来保证框架的稳定性。如果项目内使用到的定时任务较多,那么自身研发一套框架用途也较广泛;若项目中使用到的定时任务较少,则 ROI 较低,很多时候也只是为了造轮子而造轮子。因此,大多数项目面对增加定时任务的需求时,都会寻求直接接入第三方成熟的定时任务框架。对于他们来说,是否易于接入、与现有代码联系是否紧密、调试是否方便是很重要的选取指标。基于这种背景,Scheduler 在设计时就站在了接入方的角度,思考了如何让接入方能够在最短时间内以最低成本接入 Scheduler,实现自己的定时任务。分析:站在用户角度想问题站在接入方角度,对定时任务框架进行选型时最关注的几个点无非是定时任务执行准确性、最高支持 qps、定时设置多样性、接入成本这几个。对于前两个指标,Scheduler 目前接入业务方 50+,日均调度任务 20w+ 次,与公司内其他第三方定时任务框架相比也较有竞争性,同时对于后两个关注点,Scheduler 也有自己的风格。丰富的调度设置一般的定时任务框架只支持 crontab 表达式,例如0 1 * * * ,代表每天凌晨一点执行一次。cronTab 功能强大,但是若配置复杂的定时策略,有一定学习成本,且可读性不高。因此,鉴于这种情况,Scheduler 在 crontab 之上设计了更易读更强大的定时策略,做到所见即所得。{ "startTime":1648029600000, "timeZone":"Asia/Shanghai", "repeatLevel":"month", "repeatInterval":2, "repeatDays":[3,5,23]}start_time: 开始时间戳,在此之前定时任务不会执行,到达该时间后会执行第一次timeZone: 时区设置,根据当前设置的时区准确派发repeatLevel: 重复级别,目前可以设置按「小时」、「天」、「月」、「周」、「年」以及「工作日」进行重复repeatInterval: 重复间隔,代表每隔 $repertInterval $repeatLevel 执行一次repeatDays: 重复天数,重复级别是周或月时生效因此,该设置所代表的定时间隔为:每两月的3、5、23号触发一次触发时间(北京时间)2022-03-23 18:00:002022-05-03 18:00:002022-05-05 18:00:002022-05-23 18:00:00为什么要做工作日调度可能有同学注意到,Scheduler 对于重复级别的支持十分丰富,不仅可以按照普通的年、月、日等级别进行设置,还可以按照工作日进行重复调度(例如每两个工作日执行一次),这归因在 Scheduler 孵化于字节跳动内部企业服务系统,为诸如人事系统、权限系统等 ToB 服务提供定时任务能力。往往 ToB 客户的需求复杂多变,因此,需要提前具备更多能力,才能更好地服务好 ToB 客户。Scheduler 在调研接入方需求时,得到了有些客户对于定时提醒这类任务的需求是尽量不要在「非工作日」打扰。于是,Scheduler 决定增加工作日调度选项来适配客户潜在需求,也侧面说明了 Scheduler 为了让接入方更快更小成本接入做出的努力。轻松的使用方式相信「开箱即用」对于人们在采买诸如家电、数码产品时,是十分重要的一个考核指标。而对于对外提供的服务 or 框架,亦是如此。Scheduler 的目标就是让接入方能够在短时间熟悉 Scheduler、编写测试代码以及上线定时任务。专注业务如果想实现一个定时任务,接入方只需要三步:引入 Scheduler sdk,绑定相应 processor,在 process 接口中实现具体业务逻辑。同时,由于定时任务的实现位于原代码中,启动配置无需更改,本地测试也较为便捷。同时,在字节跳动环境下,无需新增 psm、授权配置等,尽可能做到了「开箱即用」。import ( "context" "code.byted.org/apaas/scheduler_sdk/executor" // 引入 sdk)func main() { executorSvc, err := executor.NewExecutor(executor.NewDefaultExecutorConfig(), &HelloWorld{}) // 绑定 processor if err != nil { panic(err) } if err = executorSvc.Run(); err != nil { panic(err) }}type HelloWorld struct { ProcessorApiName string}func (h *HelloWorld) GetApiName() string { return h.ProcessorApiName}// process 中实现具体业务逻辑func (h *HelloWorld) SimpleProcess(ctx context.Context, tc *executor.TaskContext) (err error) { tc.LogInfo(ctx, "hello world") return}快速运维没有程序员想主动写出 bug,但问题总是会突然出现。如何在出现问题时快速运维、快速止损,是所有工程师都追求的目标。Scheduler 在这方面做了几种尝试:报警更直观用户可以在创建 job 时,可以选择配置报警机器人,并把 Scheduler 机器人拉入对应报警群组。当检测到对应 job 出现问题时,Scheduler 机器人会把相应报警推送到对应群组,做到实时响应。状态更清晰目前 task 的相关状态如下,当一个 task 长期没有到终态时,根据状态码即可知 task 目前处于什么状态,从而推断是哪一步骤出了问题。状态码状态100等待触发101Ready 就绪态,等待推送201推送到 Executor,还未实际执行(任务太多排队)202执行中203执行超时,逻辑复杂导致301执行成功302执行失败401Ready 超时,没有 Executor 拉取402推送到 Executor 后长期未执行403执行超时,Executor 宕机导致并且一些 Scheduler 常见的报错也做了封装,帮助快速定位问题,例如错误码错误原因k_sc_ec_000004找不到任务{{.jobApiName}}k_sc_ec_100004找不到任务实例{{.instanceID}}k_sc_ec_300001Processor Name 未注册{{.content}}k_sc_ec_300006processor({{.content}}) 找不到对应 executork_sc_ec_400002找不到Executor {{.content}}k_sc_ec_400005无权限操作并肩作战-分片任务背景:任务越多,挑战越大一个成熟的项目中避免不了大型批量任务,比如通过 Excel、csv 或其他数据源批量创建或更新数据,批量任务一般数据量很大,如果按照单实例串行执行,那么不能充分利用计算机资源且一次运行会消费大量时间,用户体验不友好。以 Kunlun 举例, 旧阶段的批量任务依赖于消息队列、Redis 实现,总体分为三大部分:解析并校验Excel,将数据解析成一条条数据,将每条消息封装成一条消息发送至消息队列;消费消息队列,进行创建、更新等操作,并在Redis记录总体进度并推送给用户,如果任务失败,会将行数和错误原因同时记录进redis;待所有数据处理结束后,如果redis中没有错误数据,则提示用户成功,否则根据错误信息生成Excel返回给用户。使用消息队列、redis的定时任务可以提速和优化用户体验,但有以下不足:维护起来不方便,例如当有新服务需要此类功能时,需要自己再实现一套差不多的框架,所以需要将分片功能托管到第三方服务,而业务方只用专注于具体业务;依赖于消息队列和Redis两个外部组件,对两个组件的稳定性要求极高,当其中一个出现问题,都会带来不小的麻烦。基于这种背景,Scheduler 丰富了原本的任务调度能力,补充了分片能力,以满足复杂繁琐的任务分片处理的需求。分析:旧问题,新解法若打算做出一套贴合业务需求的分片任务框架,需要先了解现阶段的分片任务的实现步骤。现阶段的分片任务大致可以抽象成3个步骤:总。获取数据,可以从上传的文件解析数据、从 DB 查询出大批数据或其他数据源。分。处理数据,聚焦于具体业务,如:创建、查询、更新。总。处理结果,将此次任务运行结果处理成结果报告返回给用户,报告可以为 Excel、一条消息、一封邮件等。Scheduler 要做的事情则是替换其中分片、消息队列、Redis 的功能,做出以下抽象:type ShardingProcessor interface { PreProcess(ctx context.Context, tc *TaskContext) error ShardingProcess(ctx context.Context, tc *TaskContext) error Notify(ctx context.Context, tc *TaskContext)error PostProcess(ctx context.Context, tc *TaskContext) error}PreProcess(总):数据准备阶段。可在此方法中对数据进行额外处理,如计算拓扑关系,定义数据优先级。单机运行。ShardingProcess(分):分片处理阶段。实际处理函数,多机运行。分片处理函数,执行批量导入、更新等处理。(入参:Scheduler 对 PreProcess 返回结果的分片子参数)Notify(阶段式-总):进度更新处理,每当进度变更的大小大于设定阈值,则生成一次 Notify 的Task。Executor 向 Scheduler 汇报子任务进度,Scheduler 计算出总体进度,当总进度发生变更后生成 NotifyTask 通知 Executor 进行处理。PostProcess(总):结果处理阶段。任务执行完毕后可在此函数进行后续处理,单机运行。当所有分片子任务都执行完后,Scheduler 会将子任务的执行结果发送到此函数处理。(入参:每一个子任务 ShardingProcess 后的结果数组)执行器需要实现ShardingProcessor接口以供调度器进行调度。调度过程如下:Scheduler 支持分片任务重点在于丰富调度模型,提升调度器调度能力,完善执行器执行能力来达到支持分片任务的目的调度侧能力分批调度的能力调度侧需要根据任务进度依次生成 PreTask、ProcessTask、NotifyTask、PostTask 来调度执行器 ShardingProcessor 中的 PreProcess - Process - Notify - PostProcess 四个方法。单机调度 PreProcess,PostProcess,Notify,并行调度 PostProcess,总体调度呈现总-分-总的形式。调度过程如下:数据拆分的能力数据拆分即任务分片,指的是将单一任务按照特定的逻辑切分为多个独立的子任务,将其分派到不同的节点执行,以提高任务的执行效率。而 Scheduler 要处理的任务内部可能存在依赖关系(比如 kunlun 业务中 metadata 批量创建的需求,由于存在 lookup 和 reference 字段等,记录创建之间存在拓扑关系),所以在执行时需要优先级的概念,而不能被简单拆分为独立的子任务。为了支持带优先级的任务分片,Scheduler 接收的分片任务的数据特点如下:是业务方自定义任务的二维数组;第一个维度是任务执行的优先级,位于同一优先级下的任务并发执行,位于不同优先级的任务按优先级串行执行;第二个维度是同一优先级下的自定义任务列表。关于自定义任务的结构,Scheduler 不感知。业务方可以选择存储任务详情或是主键信息,并自定义处理逻辑,而 Scheduler 只做分片和调度工作。二维数组可以是下面这样:[ [ // 第一优先级的任务1,可以是主键 101, // 第一优先级的任务2,可以是SQL语句 "Insert into tablename xxx", // 第一优先级的任务3,可以是结构体等等... { "ID": 999, "Name": "zhangsan" } ], [ // 第二优先级的任务1、2、... 102, 103, ... ]]了解了待分片任务的结构,我们来讨论如何对任务进行分片。比如,分片的数量由什么决定,单个分片上的信息是如何分配的,不同分片又是不同分派到不同的处理器上的...分片数的确定分片数的确定基于以下参数的值:数据量、任务创建时用户指定的单片最大数量、单片最小数量,以及实际可用的执行器数量。分片算法分片特征值(sharding key)的选择要遵循的原则应该是基于最常用的访问方式。由于 Scheduler 分片时并不关心业务数据的结构,所以选用数据数组的下标来作为分片特征值。由于分片数量确定后,不涉及到由于分片的增加或减少对数据进行 Rehash 的情况,所以无需考虑虚拟节点、一致性哈希等方式进行分片。这里选用哈希分片的分片算法,原因是既可以均匀分布数据,实现起来也很简单。分片的存储和派发分片完成后,需要给每个分片创建一个 Task,并把分片的数据存储下来。关于 Task 的派发,根据上面关于分片数的讨论,可以得到分片数和 Executor 数的关系:在数据量合适的情况下(单片最大数量和单片最小数量设置合理时,是最普遍的情况),分片数和Executor 的数目是一致的;当数据量很小时,会出现分片数小于 Executor 数;当数据量很大时,会出现分片数大于 Executor 数,甚至可能是后者的几倍。为了让各个 Task / 各个分片 能够均匀派发给各个 Executor,也为了避免某个executor挂掉时,其他Executor 不能均匀分摊挂掉的节点原先承担的分片,需要采用合理的分片策略。在分片时,我们保证了各分片的数据是尽量均匀分布的,所以从分片到 Executor 的分派方式可以尽可能地简单,采用平均分配的策略即可。对于挂掉的节点所承担的分片,也采用同样的策略派发到存活的 Executors 上即可。例如:有3个 Executor,分成9片,则每个 Executor 分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8]如果Executor 1挂掉,则将1的分片平均分配* 到Executor 2、3: 2=[0,1,3,4,5], 3=[2,6,7,8]平均分配*:对于不能均分的情况,为了避免靠前的 Executor 总是承担更多的压力,可以根据待分配分片数量的奇偶来决定是升序分派还是降序分派。进度通知的能力Scheduler 支持通知 Executor 任务执行的整体进度Executor 上报子任务进度至 Scheduler;Scheduler 计算总任务进度;总任务进度发生变化,则生成 NotifyTask 发送至 Executor。执行侧分批执行的能力执行侧需要实现并注册 SDK 提供的ShardingProcessor接口,来处理由调度侧发来的多种类型的Task。PreProcess预处理方法,可以进行但不限于以下的操作:启动参数不符合 Scheduler 规定格式,可以通过 PreProcess 方法进行一次转换;数据存在导入优先级,可以在 PreProcess 中编写计算拓扑关系的方法。如果不需要预处理,可直接在方法内 return,分片时数据使用启动时的 Datafunc (s *ShardingProcessor) PreProcess(ctx context.Context, tc taskContext) error{ oldData := tc.GetData() // 用户业务, 数据处理 newData := Transform(oldData) // 返回带拓扑排序的数据 tc.SetResult(newData) return nil}ShardingProcess分片处理函数,主要是进行数据更新、创建操作。ShardingProcess 的入参是切分后的数组([]interface{})。Executor 需要对参数进行两部分额外处理:将[]interface{}中的interface{}断言成具体struct{};处理后。计算当前子任务执行进度,进行上报;如果省略上报,服务端以分片粒度生成 NotifyTask通知执行器。func (s *ShardingProcessor) ShardingProcess(ctx context.Context, tc taskContext) error{ taskData := tc.GetData() for _, data := range taskData{ // 用户业务, 数据处理 } tc.SetResult(...) return}PostProcess接受所有分片处理结果,进行后续处理,如生成错误文件。func (s *ShardingProcessor) PostProcess(ctx context.Context, tc taskContext) error{ taskData := tc.GetData() for _, result := range taskData{ // do something } // 用户业务 tc.SetResult(...)}Notify提供给子任务上报的能力,Scheduler 会根据所有子任务上报结果计算进度,通知 Executor,通知粒度为数据条数。如果接入方不主动上报子任务进度,Scheduler 会根据子任务完成度进行通知,通知粒度为分片粒度。分片任务流程削峰填谷-流量控制背景:提供能力,而非施加压力在 Scheduler 设计初期时,更多的是把注意力放在了如何能够快速、准确、低延迟的触发任务,为此还多次优化了触发器、分派器、派遣器三大模块的轮询逻辑,但是忽略了任务量过大时下游能否抗住流量的问题。如果 Scheduler 在调度时无法准确感知下游压力,那么很容易将下游打挂,如:在定时任务首次上线时,因为 kunlun 的装包机制导致数千个应用下配置了同样的定时任务,虽然一个包内的数十个定时任务触发时间分散,但是应用包之间的同一个任务触发时间相同,导致下游需要在同一时刻处理数千个任务,再加上任务的处理流程还会通过消息中间件进行扩散,导致数据库在任务执行阶段一直处理低IDLE 阶段。分析:流量追踪,剥茧抽丝目前大部分后端服务,通过分析任务的流量走向,可以大致确认每一条任务在执行过程中不论扩散还是非扩散流量都会走向DB,流量图大致如图。任务的流量最终打到了 DB,所以流量控制的目标就更加清晰:对 DB 的流量控制。需要对 DB 进行流量控制,那么就要设定合理的指标,理论上,只要指标采纳的足够合理,就能严格、准确的控制流量,指标则需要具备以下条件:实时性。能够准时反应数据库健康状况;权威性。能够准确反应数据库健康状况。优先级:实时性 > 权威性。当一个指标的实时性不够高,那么它的权威性就不再有价值。只需要实时监听着 DB 的指标,来判断任务是立刻执行,还是延迟执行就能有效的保护 DB。指标选择消费 metrics 的监控点,关于数据库的打点信息非常全面,能够非常轻易的获取到数据库宿主机的CPU、内存或数据库本身的连接数、查询数等指标,这些指标的权威性毋庸置疑,但是 metrics 通过将指标收集到本地代理,代理每 30s 做一次聚合发送至服务端,其时效性太差。数据库不可用因素为:大量任务触发 -> DB 访问流量增高 -> CPU idle 降低 -> 数据库不可用。造成 CPU idle 降低的因素为 DB 流量增高,可以将 DB 的流量作为指标进行流量控制,缺点是需要自己采集指标。指标收集指标范围反映 DB 压力较为直接的指标是 cpu idle,但考虑到服务部署往往多实例以及 cpu idle 采集难度大的情况,以近似指标来代替。另一方面,通过历史数据分析,DB 流量与 cpu idle 有一定的关联,因此以 DB 流量作为 DB 压力指标。数据存储参考限流的实现方案,采用单独的 Redis 存储流量数据,以 1s 为时间窗口作为 Redis key,每个时间窗口的流量作为 Redis value,每次发生 DB 操作时更新流量数据。系统中存在多个 DB,每个 DB 单独统计,在 Redis key 中加入db信息。Redis key 设置10s过期时间,查询时根据过去3个窗口的加权平均(80%/15%/5%)作为当前流量,以处理窗口交界处的突发流量。收集方式目前 DB 流量已有 metrics 监控数据,但由于 metrics 会在本地聚合 30s 数据后上报,至少会有 30s的延迟。而造成 DB 压力大的定时任务多为短期集中触发,使用 metrics 数据会有感知不及时的问题,因此需要额外收集数据。参考 DB metrics 数据采集的方式,通过 Gorm 的 callback 机制插入具体的采集逻辑,减少对业务代码的侵入。func SetMonitorCallBack(db *gorm.DB) { db.Callback().Create().Before("gorm:before_create").Register("metric:before_create", beforeCreateCallback) db.Callback().Delete().Before("gorm:before_delete").Register("metric:before_delete", beforeDeleteCallback) ...}func beforeCreateCallback(scope *gorm.Scope) { beforeCallback(scope, "create")}func beforeCallback(scope *gorm.Scope, method string) { ...}在采集逻辑上,需要考虑以下几个问题:性能:callback 中需要尽量减少延迟,优先使用异步的方式上报数据。使用 channel 充当队列,callback 中将数据写入 channel,当 channel 容量满时丢弃数据,防止阻塞。另有异步协程从channel 中取数据上报。兼顾时效性和网络开销,在上报前预先在本地以100ms窗口做聚合。type MetricType int8const ( QueryCount MetricType = 1)type DBMetric struct { DBName string DBMethod string Type MetricType Timestamp int64 Value interface{}}// callback中将metric数据写入channelfunc beforeCallback(scope *gorm.Scope, method string) { dbName := getStringValueFromCtx(scope.DB().Ctx, CtxVariableDBName) curMs := time.Now().UnixNano()/int64(time.Millisecond) metric := DBMetric{dbName, method, QueryCount, curMs, 1} select { case ch (metric key(dbName|dbMethod|type) -> metric) aggrMetrics := map[int64]map[string]DBMetric timer := time.NewTicker(windowSize) defer func() { timer.Stop() }() for { select { case msg :=
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2025-1-13 22:31 , Processed in 1.814281 second(s), 25 queries .

Powered by Discuz! X3.5

© 2001-2025 Discuz! Team.

快速回复 返回顶部 返回列表