找回密码
 会员注册
查看: 14|回复: 0

如何快速实现BitSailConnector?_UTF_8

[复制链接]

4

主题

0

回帖

13

积分

新手上路

积分
13
发表于 2024-9-30 23:58:14 | 显示全部楼层 |阅读模式
本文面向BitSail的Connector开发人员,通过开发者的角度全面的阐述开发一个完整Connector的全流程,快速上手Connector开发。文| 浩宇来自字节跳动数据平台BitSail团队目录结构首先开发者需要通过git下载最新代码到本地,并导入到IDE中。同时创建自己的工作分支,使用该分支开发自己的Connector。项目地址:https://github.com/bytedance/bitsail.git。项目结构如下:开发流程BitSail 是一款基于分布式架构的数据集成引擎,Connector会并发执行。并由BitSail 框架来负责任务的调度、并发执行、脏数据处理等,开发者只需要实现对应接口即可,具体开发流程如下:工程配置,开发者需要在bitsail/bitsail-connectors/pom.xml模块中注册自己的Connector,同时在bitsail/bitsail-dist/pom.xml增加自己的Connector模块,同时为你的连接器注册配置文件,来使得框架可以在运行时动态发现它。Connector开发,实现Source、Sink提供的抽象方法,具体细节参考后续介绍。数据输出类型,目前支持的数据类型为BitSail Row类型,无论是Source在Reader中传递给下游的数据类型,还是Sink从上游消费的数据类型,都应该是BitSail Row类型。Architecture当前Source API的设计同时兼容了流批一批的场景,换言之就是同时支持pull & push 的场景。在此之前,我们需要首先再过一遍传统流批场景中各组件的交互模型。Batch Model传统批式场景中,数据的读取一般分为如下几步:createSplits:一般在client端或者中心节点执行,目的是将完整的数据按照指定的规则尽可能拆分为较多的rangeSplits,createSplits在作业生命周期内有且执行一次。runWithSplit: 一般在执行节点节点执行,执行节点启动后会向中心节点请求存在的rangeSplit,然后再本地进行执行;执行完成后会再次向中心节点请求直到所有splits执行完成。commit:全部的split的执行完成后,一般会在中心节点执行commit的操作,用于将数据对外可见。StreamModel传统流式场景中,数据的读取一般分为如下几步:createSplits:一般在client端或者中心节点执行,目的是根据滑动窗口或者滚动窗口的策略将数据流划分为rangeSplits,createSplits在流式作业的生命周期中按照划分窗口的会一直执行。runWithSplit: 一般在执行节点节点执行,中心节点会向可执行节点发送rangeSplit,然后在可执行节点本地进行执行;执行完成后会将处理完的splits数据向下游发送。commit:全部的split的执行完成后,一般会向目标数据源发送retract message,实时动态展现结果。BitSailModelcreateSplits:BitSail通过SplitCoordinator模块划分rangeSplits,在流式作业中的生命周期中createSplits会周期性执行,而在批式作业中仅仅会执行一次。runWithSplit: 在执行节点节点执行,BitSail中执行节点包括Reader和Writer模块,中心节点会向可执行节点发送rangeSplit,然后在可执行节点本地进行执行;执行完成后会将处理完的splits数据向下游发送。commit:writer在完成数据写入后,committer来完成提交。在不开启checkpoint时,commit会在所有writer都结束后执行一次;在开启checkpoint时,commit会在每次checkpoint的时候都会执行一次。Source ConnectorSource: 数据读取组件的生命周期管理,主要负责和框架的交互,构架作业,不参与作业真正的执行SourceSplit: 数据读取分片;大数据处理框架的核心目的就是将大规模的数据拆分成为多个合理的SplitState:作业状态快照,当开启checkpoint之后,会保存当前执行状态。SplitCoordinator: 既然提到了Split,就需要有相应的组件去创建、管理Split;SplitCoordinator承担了这样的角色SourceReader: 真正负责数据读取的组件,在接收到Split后会对其进行数据读取,然后将数据传输给下一个算子Source Connector开发流程如下首先需要创建Source类,需要实现Source和ParallelismComputable接口,主要负责和框架的交互,构架作业,它不参与作业真正的执行BitSail的Source采用流批一体的设计思想,通过getSourceBoundedness方法设置作业的处理方式,通过configure方法定义readerConfiguration的配置,通过createTypeInfoConverter方法来进行数据类型转换,可以通过FileMappingTypeInfoConverter得到用户在yaml文件中自定义的数据源类型和BitSail类型的转换,实现自定义化的类型转换。最后,定义数据源的数据分片格式SourceSplit类和闯将管理Split的角色SourceSplitCoordinator类最后完成SourceReader实现从Split中进行数据的读取。Job TypeBoundednessbatchBoundedness.BOUNDEDNESSstreamBoundedness.UNBOUNDEDNESS每个SourceReader都在独立的线程中执行,并保证SourceSplitCoordinator分配给不同SourceReader的切片没有交集在SourceReader的执行周期中,开发者只需要关注如何从构造好的切片中去读取数据,之后完成数据类型对转换,将外部数据类型转换成BitSail的Row类型传递给下游即可。Reader示例public class FakeSourceReader extends SimpleSourceReaderBase { private final BitSailConfiguration readerConfiguration; private final TypeInfo[] typeInfos; private final transient int totalCount; private final transient RateLimiter fakeGenerateRate; private final transient AtomicLong counter; private final FakeRowGenerator fakeRowGenerator; public FakeSourceReader(BitSailConfiguration readerConfiguration, Context context) { this.readerConfiguration = readerConfiguration; this.typeInfos = context.getTypeInfos(); this.totalCount = readerConfiguration.get(FakeReaderOptions.TOTAL_COUNT); this.fakeGenerateRate = RateLimiter.create(readerConfiguration.get(FakeReaderOptions.RATE)); this.counter = new AtomicLong(); this.fakeRowGenerator = new FakeRowGenerator(readerConfiguration, context.getIndexOfSubtask()); } @Override public void pollNext(SourcePipeline pipeline) throws Exception { fakeGenerateRate.acquire(); pipeline.output(fakeRowGenerator.fakeOneRecord(typeInfos)); } @Override public boolean hasMoreElements() { return counter.incrementAndGet() { private static final Logger LOG = LoggerFactory.getLogger(PrintWriter.class); private final int batchSize; private final List fieldNames; private final List writeBuffer; private final List commitBuffer; private final AtomicInteger printCount; public PrintWriter(int batchSize, List fieldNames) { this(batchSize, fieldNames, 0); } public PrintWriter(int batchSize, List fieldNames, int alreadyPrintCount) { Preconditions.checkState(batchSize > 0, "batch size must be larger than 0"); this.batchSize = batchSize; this.fieldNames = fieldNames; this.writeBuffer = new ArrayList(batchSize); this.commitBuffer = new ArrayList(batchSize); printCount = new AtomicInteger(alreadyPrintCount); } @Override public void write(Row element) { String[] fields = new String[element.getFields().length]; for (int i = 0; i prepareCommit() { return commitBuffer; } @Override public List snapshotState(long checkpointId) { return Collections.singletonList(printCount.get()); }}将连接器注册到配置文件中为你的连接器注册配置文件,来使得框架可以在运行时动态发现它,配置文件的定义如下:以hive为例,开发者需要在resource目录下新增一个json文件,名字示例为bitsail-connector-hive.json,只要不和其他连接器重复即可。{ "name": "bitsail-connector-hive", "classes": [ "com.bytedance.bitsail.connector.hive.source.HiveSource", "com.bytedance.bitsail.connector.hive.sink.HiveSink" ], "libs": [ "bitsail-connector-hive-${version}.jar" ]}测试模块在Source或者Sink连接器所在的模块中,新增ITCase测试用例,然后按照如下流程支持通过testcontainer来启动相应的组件编写相应的配置文件{ "job": { "common": { "job_id": 313, "instance_id": 3123, "job_name": "bitsail_clickhouse_to_print_test", "user_name": "test" }, "reader": { "class": "com.bytedance.bitsail.connector.clickhouse.source.ClickhouseSource", "jdbc_url": "jdbc:clickhouse://localhost:8123", "db_name": "default", "table_name": "test_ch_table", "split_field": "id", "split_config": "{\"name\": \"id\", \"lower_bound\": 0, \"upper_bound\": \"10000\", \"split_num\": 3}", "sql_filter": "( id % 2 == 0 )", "columns": [ { "name": "id", "type": "int64" }, { "name": "int_type", "type": "int32" }, { "name": "double_type", "type": "float64" }, { "name": "string_type", "type": "string" }, { "name": "p_date", "type": "date" } ] }, "writer": { "class": "com.bytedance.bitsail.connector.legacy.print.sink.PrintSink" } }}通过代码EmbeddedFlinkCluster.submit来进行作业提交@Testpublic void testClickhouseToPrint() throws Exception { BitSailConfiguration jobConf = JobConfUtils.fromClasspath("clickhouse_to_print.json"); EmbeddedFlinkCluster.submitJob(jobConf);}欢迎参与BitSail开源社区提交PR当开发者实现自己的Connector后,就可以关联自己的issue,提交PR到github上了,提交之前,开发者记得Connector添加文档,通过review之后,大家贡献的Connector就成为BitSail的一部分了,我们按照贡献程度会选取活跃的Contributor成为我们的Committer,参与BitSail社区的重大决策,希望大家积极参与!快来加入BitSail激励计划,成为Contributor!不仅可以Get到新技术,提升生产效率,还能结识到一群志同道合的小伙伴一起探讨和成长~更有蓝牙耳机、键盘、音箱等激励好礼,送给为BitSail做出积极贡献的你!issue认领链接:https://github.com/bytedance/bitsail/issuesBitSail种子用户招募ing,福利多多!活动流程:(1)填写开源调研问卷,参与抽奖。(2)我们将抽取部分填写问卷用户,参与一对一用户访谈。问卷链接:https://www.wjx.cn/vm/w8po1FA.aspx#填写开源调研问卷,即可参与抽取字节跳动数据平台帆布包、充电宝等精美礼品。产品介绍字节跳动开源数据集成引擎BitSailBitSail是字节跳动自研的数据集成引擎,于2022年10月26日正式开源。BitSail支持20多种异构数据源间的数据同步,并提供离线、实时、全量、增量场景下的全域数据集成解决方案,目前服务于字节内部几乎所有业务线,包括抖音、今日头条等大家耳熟能详的应用,同时也支撑了火山引擎多个客户的数据集成需求。后台回复数字“12”了解更多信息。点击阅读原文进入BitSail 开源代码库活动推荐12月20日19:00,本期分享将聚焦字节跳动数据中台建设经验,在存算分离、湖仓一体、ServerLess等技术发展趋势下,从企业数仓架构选择、数据湖解决方案与应用实践,以及一站式数据治理等角度,为企业构建自身数据中台提供思路和启发。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2025-1-13 22:32 , Processed in 2.803200 second(s), 25 queries .

Powered by Discuz! X3.5

© 2001-2025 Discuz! Team.

快速回复 返回顶部 返回列表