找回密码
 会员注册
查看: 15|回复: 0

百度UidGenerator源码解析

[复制链接]

4

主题

0

回帖

13

积分

新手上路

积分
13
发表于 2024-10-8 20:06:59 | 显示全部楼层 |阅读模式
简介先来看一下官方介绍:雪花算法“雪花算法(Snowflake)是一种生成分布式全局唯一 ID 的算法,生成的 ID 称为 Snowflake IDs 或 snowflakes。这种算法由 Twitter 创建,并用于推文的 ID。Discord 和 Instagram 等其他公司采用了修改后的版本。一个 Snowflake ID 有 64 位。前 41 位是时间戳,表示了自选定的时期以来的毫秒数。接下来的 10 位代表计算机 ID,防止冲突。其余 12 位代表每台机器上生成 ID 的序列号,这允许在同一毫秒内创建多个 Snowflake ID。SnowflakeID 基于时间生成,故可以按时间排序。此外,一个 ID 的生成时间可以由其自身推断出来,反之亦然。该特性可以用于按时间筛选 ID,以及与之联系的对象。”第 1 位该位不用主要是为了保持 ID 的自增特性,若使用了最高位,int64 会表示为负数。在 Java 中由于 long 类型的最高位是符号位,正数是 0,负数是 1,一般生成的 ID 为正整数,所以最高位为 041 位时间戳毫秒级的时间,一般实现上不会存储当前的时间戳,而是时间戳的差值(当前时间减去固定的开始时间),这样可以使产生的 ID 从更小值开始;41 bit 可以表示的数字多达 2^41 - 1,也就是可以标识 2 ^ 41 - 1 个毫秒值,换算成年就是表示 69 年的时间。(1L << 41) / (1000L 60 60 24 365) = (2199023255552 / 31536000000) ≈ 69.73 年。10 位工作机器 IDTwitter 实现中使用前 5 位作为数据中心标识,后 5 位作为机器标识,可以部署 1024 (2^10)个节点。意思就是最多代表 2 ^ 5 个机房(32 个机房),每个机房里可以代表 2 ^ 5 个机器(32 台机器)。具体的分区可以根据自己的需要定义。比如拿出 4 位标识业务号,其他 6 位作为机器号。12 位序列号支持同一毫秒内同一个节点可以生成 4096 (2^12)个 ID,也就是同一毫秒内同一台机器所生成的最大 ID 数量为 4096。简单来说,你的某个服务假设要生成一个全局唯一 id,那么就可以发送一个请求给部署了 SnowFlake 算法的系统,由这个 SnowFlake 算法系统来生成唯一 id。这个 SnowFlake 算法系统首先肯定是知道自己所在的机器号,(这里姑且讲 10bit 全部作为工作机器 ID)接着 SnowFlake 算法系统接收到这个请求之后,首先就会用二进制位运算的方式生成一个 64 bit 的 long 型 id,64 个 bit 中的第一个 bit 是无意义的。接着用当前时间戳(单位到毫秒)占用 41 个 bit,然后接着 10 个 bit 设置机器 id。最后再判断一下,当前这台机房的这台机器上这一毫秒内,这是第几个请求,给这次生成 id 的请求累加一个序号,作为最后的 12 个 bit。优点:理论上 Snowflake 方案的 QPS 约为 409.6w/s(1000 * 2^12),这种分配方式可以保证在任何一个 IDC 的任何一台机器在任意毫秒内生成的 ID 都是不同的。缺点强依赖机器时钟,如果机器上时钟回拨,会导致发号重复或者服务会处于不可用状态。UidGenerator“UidGenerator 是 Java 实现的,基于 Snowflake 算法的唯一 ID 生成器。UidGenerator 以组件形式工作在应用项目中,支持自定义 workerId 位数和初始化策略,从而适用于 docker 等虚拟化环境下实例自动重启、漂移等场景。在实现上,UidGenerator 通过借用未来时间来解决 sequence 天然存在的并发限制;采用 RingBuffer 来缓存已生成的 UID, 并行化 UID 的生产和消费,同时对 CacheLine 补齐,避免了由 RingBuffer 带来的硬件级「伪共享」问题。最终单机 QPS 可达 600 万。”UidGenerator 的实现跟 SnowFlake 原始算法不太一样,不过以下参数均可通过 Spring 进行自定义:sign(1bit) 固定 1bit 符号标识,即生成的 UID 为正数。delta seconds (28 bits) 当前时间,相对于时间基点"2016-05-20"的增量值,单位:秒,最多可支持约 8.7 年worker id (22 bits) 机器 id,最多可支持约 420w 次机器启动。内置实现为在启动时由数据库分配,默认分配策略为用后即弃,后续可提供复用策略。sequence (13 bits) 每秒下的并发序列,13 bits 可支持每秒 8192 个并发。RingBuffer 环形数组,数组每个元素成为一个 slot。RingBuffer 容量,默认为 Snowflake 算法中 sequence 最大值,且为 2^N。可通过 boostPower 配置进行扩容,以提高 RingBuffer 读写吞吐量。Tail 指针、Cursor 指针用于环形数组上读写 slot:Tail 指针 表示 Producer 生产的最大序号(此序号从 0 开始,持续递增)。Tail 不能超过 Cursor,即生产者不能覆盖未消费的 slot。当 Tail 已赶上 curosr,此时可通过 rejectedPutBufferHandler 指定 PutRejectPolicyCursor 指针 表示 Consumer 消费到的最小序号(序号序列与 Producer 序列相同)。Cursor 不能超过 Tail,即不能消费未生产的 slot。当 Cursor 已赶上 tail,此时可通过 rejectedTakeBufferHandler 指定 TakeRejectPolicyCachedUidGenerator 采用了双 RingBuffer,Uid-RingBuffer 用于存储 Uid、Flag-RingBuffer 用于存储 Uid 状态 (是否可填充、是否可消费)由于数组元素在内存中是连续分配的,可最大程度利用 CPU cache 以提升性能。但同时会带来「伪共享」FalseSharing 问题,为此在 Tail、Cursor 指针、Flag-RingBuffer 中采用了 CacheLine 补齐方式。关于更多伪共享的知识,可以参考:https://www.cnblogs.com/cyfonly/p/5800758.html,总结来说,伪共享会导致性能问题,解决了能提升性能,就算不解决也不会出现数据不一致等严重的问题。RingBuffer 填充时机初始化预填充 RingBuffer 初始化时,预先填充满整个 RingBuffer.即时填充 Take 消费时,即时检查剩余可用 slot 量 (tail - cursor),如小于设定阈值,则补全空闲 slots。阈值可通过 paddingFactor 来进行配置周期填充 通过 Schedule 线程,定时补全空闲 slots。可通过 scheduleInterval 配置,以应用定时填充功能,并指定 Schedule 时间间隔源码分析概况整个项目共 2386 行 java 代码代码内部 class 的依赖结构是这样的:可见 RingBuffer 是个核心类。目录结构??com????└──?baidu????????└──?fsg????????????└──?uid????????????????├──?BitsAllocator.java??????????????????-?Bit?分配器?(C)????????????????├──?UidGenerator.java???????????????????-?UID?生成的接口?(I)????????????????├──?buffer????????????????│???├──?BufferPaddingExecutor.java??????-?填充?RingBuffer?的执行器?(C)????????????????│???├──?BufferedUidProvider.java????????-?RingBuffer?中?UID?的提供者?(C)????????????????│???├──?RejectedPutBufferHandler.java???-?拒绝?Put?到?RingBuffer?的处理器?(C)????????????????│???├──?RejectedTakeBufferHandler.java??-?拒绝从?RingBuffer?中?Take?的处理器?(C)????????????????│???└──?RingBuffer.java?????????????????-?内含两个环形数组?(C)????????????????├──?exception????????????????│???└──?UidGenerateException.java???????-?运行时异常????????????????├──?impl????????????????│???├──?CachedUidGenerator.java?????????-?RingBuffer?存储的?UID?生成器?(C)????????????????│???└──?DefaultUidGenerator.java????????-?无?RingBuffer?的默认?UID?生成器?(C)????????????????├──?utils????????????????│???├──?DateUtils.java????????????????│???├──?DockerUtils.java????????????????│???├──?EnumUtils.java????????????????│???├──?NamingThreadFactory.java????????????????│???├──?NetUtils.java????????????????│???├──?PaddedAtomicLong.java????????????????│???└──?ValuedEnum.java????????????????└──?worker????????????????????├──?DisposableWorkerIdAssigner.java??-?用完即弃的?WorkerId?分配器?(C)????????????????????├──?WorkerIdAssigner.java????????????-?WorkerId?分配器?(I)????????????????????├──?WorkerNodeType.java??????????????-?工作节点类型?(E)????????????????????├──?dao????????????????????│???└──?WorkerNodeDAO.java???????????-?MyBatis?Mapper????????????????????└──?entity????????????????????????└──?WorkerNodeEntity.java????????-?MyBatis?EntityDefaultUidGeneratorUidGenerator 在应用中是以 Spring 组件的形式提供服务,DefaultUidGenerator 提供了最简单的 Snowflake 式的生成模式,没有使用任何缓存来预存 UID,在需要生成 ID 的时候即时进行计算。所以我们结合源码来串一下最简单的默认生成模式的流程。首先引入DefaultUidGenerator配置???? ???????? ???????? ???????? ???????????? ?配置文件中的配置我都作了注释,这里重点说两个属性:epochStr是给一个过去时间的字符串,作为时间基点,比如"2016-09-20",用于计算时间戳的差值(当前时间减去固定的开始时间),这样可以使产生的 ID 从更小值开始。这点从以下的两段源码可以看出来:public?void?setEpochStr(String?epochStr)?{????if?(StringUtils.isNotBlank(epochStr))?{????????this.epochStr?=?epochStr;????????this.epochSeconds?=?TimeUnit.MILLISECONDS.toSeconds(DateUtils.parseByDayPattern(epochStr).getTime());????}}?/**??*?Get?current?second??*/private?long?getCurrentSecond()?{????long?currentSecond?=?TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());????if?(currentSecond?-?epochSeconds?>?bitsAllocator.getMaxDeltaSeconds())?{????????throw?new?UidGenerateException("Timestamp?bits?is?exhausted.?Refusing?UID?generate.?Now:?"?+?currentSecond);????}????return?currentSecond;}disposableWorkerIdAssignerWorker ID 分配器,用于为每个工作机器分配一个唯一的 ID,目前来说是用完即弃,在初始化 Bean 的时候会自动向 MySQL 中插入一条关于该服务的启动信息,待 MySQL 返回其自增 ID 之后,使用该 ID 作为工作机器 ID 并柔和到 UID 的生成当中。@Transactionalpublic?long?assignWorkerId()?{????//?build?worker?node?entity????WorkerNodeEntity?workerNodeEntity?=?buildWorkerNode();????//?add?worker?node?for?new?(ignore?the?same?IP?+?PORT)????workerNodeDAO.addWorkerNode(workerNodeEntity);????LOGGER.info("Add?worker?node:"?+?workerNodeEntity);????return?workerNodeEntity.getId();}buildWorkerNode() 为获取该启动服务的信息,兼容 Docker 服务。但要注意,无论是 docker 还是用 k8s,需要添加相关的环境变量 env 在配置文件中以便程序能够获取到。?/**?Environment?param?keys?主要是端口和?host?*/????private?static?final?String?ENV_KEY_HOST?=?"JPAAS_HOST";????private?static?final?String?ENV_KEY_PORT?=?"JPAAS_HTTP_PORT";核心方法介绍完上面这些,我们来看下 defaultUidGenerator 生成 ID 的核心方法(注意这个方法是同步方法)?protected?synchronized?long?nextId()?{????long?currentSecond?=?getCurrentSecond();????//?时钟向后移动,拒绝生成?id?(解决时钟回拨问题)????if?(currentSecond??threshold=1000?*?20?/100,?????*????????当?tail-cursor?0L,?"RingBuffer?size?must?be?positive");????????Assert.isTrue(Integer.bitCount(bufferSize)?==?1,?"RingBuffer?size?must?be?a?power?of?2");????????Assert.isTrue(paddingFactor?>?0?&?paddingFactor?? RingBuffer 的填充和获取RingBuffer 的填充和获取操作是线程安全的,但是填充和获取操作的性能会受到 RingBuffer 的大小的影响,先来看下 put 操作:?public?synchronized?boolean?put(long?uid)?{????????long?currentTail?=?tail.get();????????long?currentCursor?=?cursor.get();????????//?当?tail?追上了?cursor?时,表示?RingBuffer?满了,不能再放了????????// Tail 不能超过 Cursor,即生产者不能覆盖未消费的 slot。当 Tail 已赶上 curosr,此时可通过 rejectedPutBufferHandler 指定 PutRejectPolicy????????long?distance?=?currentTail?-?(currentCursor?==?START_POINT???0?:?currentCursor);????????if?(distance?==?bufferSize?-?1)?{????????????rejectedPutHandler.rejectPutBuffer(this,?uid);????????????return?false;????????}????????//?1.?预检查?flag?是否为?CAN_PUT_FLAG,首次?put?时,currentTail?为-1????????int?nextTailIndex?=?calSlotIndex(currentTail?+?1);????????if?(flags[nextTailIndex].get()?!=?CAN_PUT_FLAG)?{????????????rejectedPutHandler.rejectPutBuffer(this,?uid);????????????return?false;????????}????????//?2.?put?UID?in?the?next?slot????????slots[nextTailIndex]?=?uid;????????//?3.?update?next?slot'?flag?to?CAN_TAKE_FLAG????????flags[nextTailIndex].set(CAN_TAKE_FLAG);????????//?4.?publish?tail?with?sequence?increase?by?one?移动?tail????????tail.incrementAndGet();????????????????//?上述操作的原子性由“synchronized”保证。换句话说????????//?take?操作不能消费我们刚刚放的?UID,直到?tail?移动?(tail.incrementAndGet())?才可以????????return?true;????}注意 put 方法是 synchronized。再来看下 take 方法:UID 的读取是一个无锁的操作。在获取 UID 之前,还要检查是否达到了 padding 阈值,在另一个线程中会触发 padding buffer 操作,如果没有更多可用的 UID 可以获取,则应用指定的 RejectedTakeBufferHandler?public?long?take()?{????//?spin?get?next?available?cursor????long?currentCursor?=?cursor.get();????//?cursor?初始化为-1,现在?cursor?等于?tail,所以初始化时?nextCursor?为-1????long?nextCursor?=?cursor.updateAndGet(old?->?old?==?tail.get()???old?:?old?+?1);????//?check?for?safety?consideration,?it?never?occurs????//?初始化或者全部?UID?耗尽时?nextCursor?==?currentCursor????Assert.isTrue(nextCursor?>=?currentCursor,?"Curosr?can't?move?back");????//?如果达到阈值,则以异步模式触发填充????long?currentTail?=?tail.get();????if?(currentTail?-?nextCursor??uidList?=?uidProvider.provide(lastSecond.incrementAndGet());????????for?(Long?uid?:?uidList)?{????????????isFullRingBuffer?=?!ringBuffer.put(uid);????????????if?(isFullRingBuffer)?{????????????????break;????????????}????????}????}????//?not?running?now????running.compareAndSet(true,?false);????LOGGER.info("End?to?padding?buffer?lastSecond:{}.?{}",?lastSecond.get(),?ringBuffer);}当线程池分发多条线程来执行填充任务的时候,成功抢夺运行状态的线程会真正执行对 RingBuffer 填充,直至全部填满,其他抢夺失败的线程将会直接返回。该类还提供定时填充功能,如果有设置开关则会生效,默认不会启用周期性填充RIngBuffer 的填充时机有 3 个:CachedUidGenerator 时对 RIngBuffer 初始化、RIngBuffer#take() 时检测达到阈值和周期性填充(如果有打开)使用 RingBuffer 的 UID 生成器最后我们看一下利用 CachedUidGenerator 生成 UID 的代码,CachedUidGenerator 继承了 DefaultUidGenerator,实现了 UidGenerator 接口。该类在应用中作为 Spring Bean 注入到各个组件中,主要作用是初始化 RingBuffer 和 BufferPaddingExecutor。最重要的方法为 BufferedUidProvider 的提供者,即 lambda 表达式中的 nextIdsForOneSecond(long) 方法。?/**?????*?Get?the?UIDs?in?the?same?specified?second?under?the?max?sequence?????*??????*?@param?currentSecond?????*?@return?UID?list,?size?of?{@link?BitsAllocator#getMaxSequence()}?+?1?????*/????protected?List?nextIdsForOneSecond(long?currentSecond)?{????????//?Initialize?result?list?size?of?(max?sequence?+?1)????????int?listSize?=?(int)?bitsAllocator.getMaxSequence()?+?1;????????List?uidList?=?new?ArrayList(listSize);????????//?Allocate?the?first?sequence?of?the?second,?the?others?can?be?calculated?with?the?offset????????long?firstSeqUid?=?bitsAllocator.allocate(currentSecond?-?epochSeconds,?workerId,?0L);????????for?(int?offset?=?0;?offset?>>?(totalBits?-?sequenceBits);long?workerId?=?(uid?<>>?(totalBits?-?workerIdBits);long?deltaSeconds?=?uid?>>>?(workerIdBits?+?sequenceBits);参考https://www.cnblogs.com/cyfonly/p/5800758.htmlhttp://www.semlinker.com/uuid-snowflake/https://programmer.group/snowflake-algorithm-improved-version-snowflake.htmlhttps://github.com/baidu/uid-generator/blob/master/README.zh_cn.mdhttp://blog.chriscs.com/2017/08/02/baidu-uid-generator/
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2025-1-9 05:59 , Processed in 0.463345 second(s), 25 queries .

Powered by Discuz! X3.5

© 2001-2025 Discuz! Team.

快速回复 返回顶部 返回列表