百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

Kafka消息可靠传输之幂等、事务机制

liuian 2025-06-15 17:36 44 浏览

一般而言,消息中间件的消息传输保障有3个层级,分别如下。

at most once:至多一次。消息可能会丢失,但绝对不会重复传输。

at least once:最少一次。消息绝不会丢失,但可能会重复传输。

exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次。

Kafka 的消息传输保障机制非常直观。当生产者向 Kafka 发送消息时,一旦消息被成功提交到日志文件,由于多副本机制的存在,这条消息就不会丢失。如果生产者发送消息到 Kafka 之后,遇到了网络问题而造成通信中断,那么生产者就无法判断该消息是否已经提交。虽然 Kafka 无法确定网络故障期间发生了什么,但生产者可以进行多次重试来确保消息已经写入 Kafka,这个重试的过程中有可能会造成消息的重复写入,所以这里 Kafka 提供的消息传输保障为 at least once。

对消费者而言,消费者处理消息和提交消费位移的顺序在很大程度上决定了消费者提供哪一种消息传输保障。如果消费者在拉取完消息之后,应用逻辑先处理消息后提交消费位移,那么在消息处理之后且在位移提交之前消费者宕机了,待它重新上线之后,会从上一次位移提交的位置拉取,这样就出现了重复消费,因为有部分消息已经处理过了只是还没来得及提交消费位移,此时就对应 at least once。

如果消费者在拉完消息之后,应用逻辑先提交消费位移后进行消息处理,那么在位移提交之后且在消息处理完成之前消费者宕机了,待它重新上线之后,会从已经提交的位移处开始重新消费,但之前尚有部分消息未进行消费,如此就会发生消息丢失,此时就对应 at most once。

Kafka 从 0.11.0.0 版本开始引入了幂等和事务这两个特性,以此来实现 EOS(exactly once semantics,精确一次处理语义)。

幂等

所谓的幂等,简单地说就是对接口的多次调用所产生的结果和调用一次是一致的。生产者在进行重试的时候有可能会重复写入消息,而使用 Kafka 的幂等性功能之后就可以避免这种情况。

开启幂等性功能的方式很简单,只需要显式地将生产者客户端参数 enable.idempotence 设置为 true 即可(这个参数的默认值为 false),参考如下:

properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

# 或者

properties.put(“enable.idempotence”, true);

不过如果要确保幂等性功能正常,还需要确保生产者客户端的 retries、acks、
max.in.flight.requests.per.connection 这几个参数不被配置错。实际上在使用幂等性功能的时候,用户完全可以不用配置(也不建议配置)这几个参数。

如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0,否则会报出 ConfigException:

org.apache.kafka.common.config.ConfigException: Must set retries to non-zero when using the idempotent producer.

如果用户没有显式地指定 retries 参数,那么 KafkaProducer 会将它置为 Integer.MAX_VALUE。同时还需要保证
max.in.flight.requests.per.connection 参数的值不能大于5(这个参数的值默认为5,在 2.2.1 节中有相关的介绍),否则也会报出 ConfigException:

org.apache.kafka.common.config.ConfigException: Must set max.in.flight. requests.per.connection to at most 5 to use the idempotent producer.

如果用户还显式地指定了 acks 参数,那么还需要保证这个参数的值为 -1(all),如果不为 -1(这个参数的值默认为1),那么也会报出 ConfigException:

org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.

如果用户没有显式地指定这个参数,那么 KafkaProducer 会将它置为-1。开启幂等性功能之后,生产者就可以如同未开启幂等时一样发送消息了。

为了实现生产者的幂等性,Kafka 为此引入了 producer id(以下简称 PID)和序列号(sequence number)这两个概念,这两个概念其实在第2节中就讲过,分别对应 v2 版的日志格式中 RecordBatch 的 producer id 和 first seqence 这两个字段(参考第2节)。

每个新的生产者实例在初始化的时候都会被分配一个 PID,这个 PID 对用户而言是完全透明的。对于每个 PID,消息发送到的每一个分区都有对应的序列号,这些序列号从0开始单调递增。生产者每发送一条消息就会将 <PID,分区> 对应的序列号的值加1。

broker 端会在内存中为每一对 <PID,分区> 维护一个序列号。对于收到的每一条消息,只有当它的序列号的值(SN_new)比 broker 端中维护的对应的序列号的值(SN_old)大1(即 SN_new = SN_old + 1)时,broker 才会接收它。如果 SN_new< SN_old + 1,那么说明消息被重复写入,broker 可以直接将其丢弃。如果 SN_new> SN_old + 1,那么说明中间有数据尚未写入,出现了乱序,暗示可能有消息丢失,对应的生产者会抛出
OutOfOrderSequenceException,这个异常是一个严重的异常,后续的诸如 send()、beginTransaction()、commitTransaction() 等方法的调用都会抛出 IllegalStateException 的异常。

引入序列号来实现幂等也只是针对每一对 <PID,分区> 而言之,也就是说,Kafka 的幂等只能保证单个生产者会话(session)中单分区的幂等。

ProducerRecord<String, String> record

= new ProducerRecord<>(topic, "key", "msg");

producer.send(record);

producer.send(record);

注意,上面示例中发送了两条相同的消息,不过这仅仅是指消息内容相同,但对 Kafka 而言是两条不同的消息,因为会为这两条消息分配不同的序列号。Kafka 并不会保证消息内容的幂等。

事务

幂等性并不能跨多个分区运作,而事务可以弥补这个缺陷。事务可以保证对多个分区写入操作的原子性。操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能。

对流式应用(Stream Processing Applications)而言,一个典型的应用模式为“consume-transform-produce”。在这种模式下消费和生产并存:应用程序从某个主题中消费消息,然后经过一系列转换后写入另一个主题,消费者可能在提交消费位移的过程中出现问题而导致重复消费,也有可能生产者重复生产消息。Kafka中的事务可以使应用程序将消费消息、生产消息、提交消费位移当作原子操作来处理,同时成功或失败,即使该生产或消费会跨多个分区。

为了实现事务,应用程序必须提供唯一的 transactionalId,这个 transactionalId 通过客户端参数 transactional.id 来显式设置,参考如下:

properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionId");

# 或者

properties.put("transactional.id", "transactionId");

事务要求生产者开启幂等特性,因此通过将 transactional.id 参数设置为非空从而开启事务特性的同时需要将 enable.idempotence 设置为 true(如果未显式设置,则 KafkaProducer 默认会将它的值设置为 true),如果用户显式地将 enable.idempotence 设置为 false,则会报出 ConfigException:

org.apache.kafka.common.config.ConfigException: Cannot set a transactional.id without also enabling idempotence.

transactionalId 与 PID 一一对应,两者之间所不同的是 transactionalId 由用户显式设置,而 PID 是由 Kafka 内部分配的。另外,为了保证新的生产者启动后具有相同 transactionalId 的旧生产者能够立即失效,每个生产者通过 transactionalId 获取 PID 的同时,还会获取一个单调递增的 producer epoch(对应下面要讲述的
KafkaProducer.initTransactions() 方法)。如果使用同一个 transactionalId 开启两个生产者,那么前一个开启的生产者会报出如下的错误:

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

producer epoch 同 PID 和序列号一样在第2节中就讲过了,对应v2版的日志格式中 RecordBatch 的 producer epoch 字段。

从生产者的角度分析,通过事务,Kafka 可以保证跨生产者会话的消息幂等发送,以及跨生产者会话的事务恢复。前者表示具有相同 transactionalId 的新生产者实例被创建且工作的时候,旧的且拥有相同 transactionalId 的生产者实例将不再工作。后者指当某个生产者实例宕机后,新的生产者实例可以保证任何未完成的旧事务要么被提交(Commit),要么被中止(Abort),如此可以使新的生产者实例从一个正常的状态开始工作。

而从消费者的角度分析,事务能保证的语义相对偏弱。出于以下原因,Kafka 并不能保证已提交的事务中的所有消息都能够被消费:

对采用日志压缩策略的主题而言,事务中的某些消息有可能被清理(相同key的消息,后写入的消息会覆盖前面写入的消息)。

事务中消息可能分布在同一个分区的多个日志分段(LogSegment)中,当老的日志分段被删除时,对应的消息可能会丢失。

消费者可以通过 seek() 方法访问任意 offset 的消息,从而可能遗漏事务中的部分消息。

消费者在消费时可能没有分配到事务内的所有分区,如此它也就不能读取事务中的所有消息。

KafkaProducer 提供了5个与事务相关的方法,详细如下:

void initTransactions();

void beginTransaction() throws ProducerFencedException;

void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,

String consumerGroupId)

throws ProducerFencedException;

void commitTransaction() throws ProducerFencedException;

void abortTransaction() throws ProducerFencedException;

initTransactions() 方法用来初始化事务,这个方法能够执行的前提是配置了 transactionalId,如果没有则会报出 IllegalStateException:

java.lang.IllegalStateException: Cannot use transactional methods without enabling transactions by setting the transactional.id configuration property.

beginTransaction() 方法用来开启事务;sendOffsetsToTransaction() 方法为消费者提供在事务内的位移提交的操作;commitTransaction() 方法用来提交事务;abortTransaction() 方法用来中止事务,类似于事务回滚。

一个典型的事务消息发送的操作如代码清单14-1所示。

代码清单14-1 事务消息发送示例

Properties properties = new Properties();

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,

StringSerializer.class.getName());

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,

StringSerializer.class.getName());

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);


KafkaProducer<String, String> producer = new KafkaProducer<>(properties);


producer.initTransactions();

producer.beginTransaction();


try {

//处理业务逻辑并创建ProducerRecord

ProducerRecord<String, String> record1 = new ProducerRecord<>(topic, "msg1");

producer.send(record1);

ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "msg2");

producer.send(record2);

ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, "msg3");

producer.send(record3);

//处理一些其他逻辑

producer.commitTransaction();

} catch (ProducerFencedException e) {

producer.abortTransaction();

}

producer.close();

在消费端有一个参数 isolation.level,与事务有着莫大的关联,这个参数的默认值为“read_uncommitted”,意思是说消费端应用可以看到(消费到)未提交的事务,当然对于已提交的事务也是可见的。这个参数还可以设置为“read_committed”,表示消费端应用不可以看到尚未提交的事务内的消息。

举个例子,如果生产者开启事务并向某个分区值发送3条消息 msg1、msg2 和 msg3,在执行 commitTransaction() 或 abortTransaction() 方法前,设置为“read_committed”的消费端应用是消费不到这些消息的,不过在 KafkaConsumer 内部会缓存这些消息,直到生产者执行 commitTransaction() 方法之后它才能将这些消息推送给消费端应用。反之,如果生产者执行了 abortTransaction() 方法,那么 KafkaConsumer 会将这些缓存的消息丢弃而不推送给消费端应用。

日志文件中除了普通的消息,还有一种消息专门用来标志一个事务的结束,它就是控制消息(ControlBatch)。控制消息一共有两种类型:COMMIT 和 ABORT,分别用来表征事务已经成功提交或已经被成功中止。KafkaConsumer 可以通过这个控制消息来判断对应的事务是被提交了还是被中止了,然后结合参数 isolation.level 配置的隔离级别来决定是否将相应的消息返回给消费端应用,如下图所示。注意 ControlBatch 对消费端应用不可见,后面还会对它有更加详细的介绍。




本节开头就提及了 consume-transform-produce 这种应用模式,这里还涉及在代码清单14-1中尚未使用的 sendOffsetsToTransaction() 方法。该模式的具体结构如下图所示。与此对应的应用示例如代码清单14-2所示。




代码清单14-2 消费—转换—生产模式示例

public class TransactionConsumeTransformProduce {

public static final String brokerList = "localhost:9092";


public static Properties getConsumerProperties(){

Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

StringDeserializer.class.getName());

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

StringDeserializer.class.getName());

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");

return props;

}


public static Properties getProducerProperties(){

Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,

StringSerializer.class.getName());

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,

StringSerializer.class.getName());

props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");

return props;

}


public static void main(String[] args) {

//初始化生产者和消费者

KafkaConsumer<String, String> consumer =

new KafkaConsumer<>(getConsumerProperties());

consumer.subscribe(Collections.singletonList("topic-source"));

KafkaProducer<String, String> producer =

new KafkaProducer<>(getProducerProperties());

//初始化事务

producer.initTransactions();

while (true) {

ConsumerRecords<String, String> records =

consumer.poll(Duration.ofMillis(1000));

if (!records.isEmpty()) {

Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

//开启事务

producer.beginTransaction();

try {

for (TopicPartition partition : records.partitions()) {

List<ConsumerRecord<String, String>> partitionRecords

= records.records(partition);

for (ConsumerRecord<String, String> record :

partitionRecords) {

//do some logical processing.

ProducerRecord<String, String> producerRecord =

new ProducerRecord<>("topic-sink", record.key(),

record.value());

//消费—生产模型

producer.send(producerRecord);

}

long lastConsumedOffset = partitionRecords.

get(partitionRecords.size() - 1).offset();

offsets.put(partition,

new OffsetAndMetadata(lastConsumedOffset + 1));

}

//提交消费位移

producer.sendOffsetsToTransaction(offsets,"groupId");

//提交事务

producer.commitTransaction();

} catch (ProducerFencedException e) {

//log the exception

//中止事务

producer.abortTransaction();

}

}

}

}

}

注意:在使用 KafkaConsumer 的时候要将 enable.auto.commit 参数设置为 false,代码里也不能手动提交消费位移。

为了实现事务的功能,Kafka 还引入了事务协调器(TransactionCoordinator)来负责处理事务,这一点可以类比一下组协调器(GroupCoordinator)。每一个生产者都会被指派一个特定的 TransactionCoordinator,所有的事务逻辑包括分派 PID 等都是由 TransactionCoordinator 来负责实施的。TransactionCoordinator 会将事务状态持久化到内部主题 __transaction_state 中。下面就以最复杂的 consume-transform-produce 的流程(参考下图,后面就以“事务流程图”称呼)为例来分析 Kafka 事务的实现原理。




1. 查找TransactionCoordinator

TransactionCoordinator 负责分配 PID 和管理事务,因此生产者要做的第一件事情就是找出对应的 TransactionCoordinator 所在的 broker 节点。与查找 GroupCoordinator 节点一样,也是通过 FindCoordinatorRequest 请求来实现的,只不过 FindCoordinatorRequest 中的 coordinator_type 就由原来的0变成了1,由此来表示与事务相关联。

Kafka 在收到 FindCoorinatorRequest 请求之后,会根据 coordinator_key(也就是 transactionalId)查找对应的 TransactionCoordinator 节点。如果找到,则会返回其相对应的 node_id、host 和 port 信息。具体查找 TransactionCoordinator 的方式是根据 transactionalId 的哈希值计算主题 __transaction_state 中的分区编号,具体算法如代码清单14-3所示。

代码清单14-3 计算分区编号

Utils.abs(transactionalId.hashCode) % transactionTopicPartitionCount

其中
transactionTopicPartitionCount 为主题 __transaction_state 中的分区个数,这个可以通过 broker 端参数
transaction.state.log.num.partitions 来配置,默认值为50。

找到对应的分区之后,再寻找此分区 leader 副本所在的 broker 节点,该 broker 节点即为这个 transactionalId 对应的 TransactionCoordinator 节点。细心的读者可以发现,这一整套的逻辑和查找 GroupCoordinator 的逻辑如出一辙。

2. 获取PID

在找到 TransactionCoordinator 节点之后,就需要为当前生产者分配一个 PID 了。凡是开启了幂等性功能的生产者都必须执行这个操作,不需要考虑该生产者是否还开启了事务。生产者获取 PID 的操作是通过 InitProducerIdRequest 请求来实现的,InitProducerIdRequest 请求体结构如下图所示,其中 transactional_id 表示事务的 transactionalId,transaction_timeout_ms 表示 TransactionCoordinaor 等待事务状态更新的超时时间,通过生产者客户端参数 transaction.timeout.ms 配置,默认值为60000。




保存PID

生产者的 InitProducerIdRequest 请求会被发送给 TransactionCoordinator。注意,如果未开启事务特性而只开启幂等特性,那么 InitProducerIdRequest 请求可以发送给任意的 broker。当 TransactionCoordinator 第一次收到包含该 transactionalId 的 InitProducerIdRequest 请求时,它会把 transactionalId 和对应的 PID 以消息(我们习惯性地把这类消息称为“事务日志消息”)的形式保存到主题 __transaction_state 中,如事务流程图步骤2.1所示。这样可以保证 <transaction_Id, PID> 的对应关系被持久化,从而保证即使 TransactionCoordinator 宕机该对应关系也不会丢失。存储到主题 __transaction_state 中的具体内容格式如下图所示。




其中 transaction_status 包含 Empty(0)、Ongoing(1)、PrepareCommit(2)、PrepareAbort(3)、CompleteCommit(4)、CompleteAbort(5)、Dead(6) 这几种状态。在存入主题 __transaction_state 之前,事务日志消息同样会根据单独的 transactionalId 来计算要发送的分区,算法同代码清单14-3一样。




与 InitProducerIdRequest 对应的 InitProducerIdResponse 响应体结构如上图所示,除了返回 PID,InitProducerIdRequest 还会触发执行以下任务:

增加该 PID 对应的 producer_epoch。具有相同 PID 但 producer_epoch 小于该 producer_epoch 的其他生产者新开启的事务将被拒绝。

恢复(Commit)或中止(Abort)之前的生产者未完成的事务。

3. 开启事务

通过 KafkaProducer 的 beginTransaction() 方法可以开启一个事务,调用该方法后,生产者本地会标记已经开启了一个新的事务,只有在生产者发送第一条消息之后 TransactionCoordinator 才会认为该事务已经开启。

4. Consume-Transform-Produce

这个阶段囊括了整个事务的数据处理过程,其中还涉及多种请求。注:如果没有给出具体的请求体或响应体结构,则说明其并不影响读者对内容的理解,笔者为了缩减篇幅而将其省略。

1)AddPartitionsToTxnRequest

当生产者给一个新的分区(TopicPartition)发送数据前,它需要先向 TransactionCoordinator 发送 AddPartitionsToTxnRequest 请求(AddPartitionsToTxnRequest 请求体结构如下图所示),这个请求会让 TransactionCoordinator 将 <transactionId, TopicPartition> 的对应关系存储在主题 __transaction_state 中,如图事务流程图步骤4.1所示。有了这个对照关系之后,我们就可以在后续的步骤中为每个分区设置 COMMIT 或 ABORT 标记,如事务流程图步骤5.2所示。




如果该分区是对应事务中的第一个分区,那么此时TransactionCoordinator还会启动对该事务的计时。

2)ProduceRequest

这一步骤很容易理解,生产者通过 ProduceRequest 请求发送消息(ProducerBatch)到用户自定义主题中,这一点和发送普通消息时相同,如事务流程图步骤4.2所示。和普通的消息不同的是,ProducerBatch 中会包含实质的 PID、producer_epoch 和 sequence number。

3)AddOffsetsToTxnRequest

通过 KafkaProducer 的 sendOffsetsToTransaction() 方法可以在一个事务批次里处理消息的消费和发送,方法中包含2个参数:Map<TopicPartition, OffsetAndMetadata> offsets 和 groupId。这个方法会向 TransactionCoordinator 节点发送 AddOffsetsToTxnRequest 请求(AddOffsetsToTxnRequest 请求体结构如下图所示),TransactionCoordinator 收到这个请求之后会通过 groupId 来推导出在 __consumer_offsets 中的分区,之后 TransactionCoordinator 会将这个分区保存在 __transaction_state 中,如事务流程图步骤4.3所示。




4)TxnOffsetCommitRequest

这个请求也是 sendOffsetsToTransaction() 方法中的一部分,在处理完 AddOffsetsToTxnRequest 之后,生产者还会发送 TxnOffsetCommitRequest 请求给 GroupCoordinator,从而将本次事务中包含的消费位移信息 offsets 存储到主题 __consumer_offsets 中,如事务流程图步骤4.4所示。

5. 提交或者中止事务

一旦数据被写入成功,我们就可以调用 KafkaProducer 的 commitTransaction() 方法或 abortTransaction() 方法来结束当前的事务。

1)EndTxnRequest 无论调用 commitTransaction() 方法还是 abortTransaction() 方法,生产者都会向 TransactionCoordinator 发送 EndTxnRequest 请求(对应的 EndTxnRequest 请求体结构如下图所示),以此来通知它提交(Commit)事务还是中止(Abort)事务。




TransactionCoordinator 在收到 EndTxnRequest 请求后会执行如下操作:

将 PREPARE_COMMIT 或 PREPARE_ABORT 消息写入主题 __transaction_state,如事务流程图步骤5.1所示。

通过 WriteTxnMarkersRequest 请求将 COMMIT 或 ABORT 信息写入用户所使用的普通主题和 __consumer_offsets,如事务流程图步骤5.2所示。

将 COMPLETE_COMMIT 或 COMPLETE_ABORT 信息写入内部主题 __transaction_state,如事务流程图步骤5.3所示。

2)WriteTxnMarkersRequest

WriteTxnMarkersRequest 请求是由 TransactionCoordinator 发向事务中各个分区的 leader 节点的,当节点收到这个请求之后,会在相应的分区中写入控制消息(ControlBatch)。控制消息用来标识事务的终结,它和普通的消息一样存储在日志文件中,前面章节中提及了控制消息,RecordBatch 中 attributes 字段的第6位用来标识当前消息是否是控制消息。如果是控制消息,那么这一位会置为1,否则会置为0,如下图所示。




attributes 字段中的第5位用来标识当前消息是否处于事务中,如果是事务中的消息,那么这一位置为1,否则置为0。由于控制消息也处于事务中,所以attributes字段的第5位和第6位都被置为1。ControlBatch 中只有一个 Record,Record 中的 timestamp delta 字段和 offset delta 字段的值都为0,而控制消息的 key 和 value 的内容如下图所示。




就目前的 Kafka 版本而言,key 和 value 内部的 version 值都为0,key 中的 type 表示控制类型:0表示 ABORT,1表示 COMMIT;value 中的 coordinator_epoch 表示 TransactionCoordinator 的纪元(版本),TransactionCoordinator 切换的时候会更新其值。

3)写入最终的COMPLETE_COMMIT或COMPLETE_ABORT

TransactionCoordinator 将最终的 COMPLETE_COMMIT 或 COMPLETE_ABORT 信息写入主题 __transaction_state 以表明当前事务已经结束,此时可以删除主题 __transaction_state 中所有关于该事务的消息。由于主题 __transaction_state 采用的日志清理策略为日志压缩,所以这里的删除只需将相应的消息设置为墓碑消息即可

————————————————

版权声明:本文为CSDN博主「車輪の唄」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:
https://blog.csdn.net/asdfsadfasdfsa/article/details/104806981

相关推荐

总结下SpringData JPA 的常用语法

SpringDataJPA常用有两种写法,一个是用Jpa自带方法进行CRUD,适合简单查询场景、例如查询全部数据、根据某个字段查询,根据某字段排序等等。另一种是使用注解方式,@Query、@Modi...

解决JPA在多线程中事务无法生效的问题

在使用SpringBoot2.x和JPA的过程中,如果在多线程环境下发现查询方法(如@Query或findAll)以及事务(如@Transactional)无法生效,通常是由于S...

PostgreSQL系列(一):数据类型和基本类型转换

自从厂子里出来后,数据库的主力就从Oracle变成MySQL了。有一说一哈,贵确实是有贵的道理,不是开源能比的。后面的工作里面基本上就是主MySQL,辅MongoDB、ES等NoSQL。最近想写一点跟...

基于MCP实现text2sql

目的:基于MCP实现text2sql能力参考:https://blog.csdn.net/hacker_Lees/article/details/146426392服务端#选用开源的MySQLMCP...

ORACLE 错误代码及解决办法

ORA-00001:违反唯一约束条件(.)错误说明:当在唯一索引所对应的列上键入重复值时,会触发此异常。ORA-00017:请求会话以设置跟踪事件ORA-00018:超出最大会话数ORA-00...

从 SQLite 到 DuckDB:查询快 5 倍,存储减少 80%

作者丨Trace译者丨明知山策划丨李冬梅Trace从一开始就使用SQLite将所有数据存储在用户设备上。这是一个非常不错的选择——SQLite高度可靠,并且多种编程语言都提供了广泛支持...

010:通过 MCP PostgreSQL 安全访问数据

项目简介提供对PostgreSQL数据库的只读访问功能。该服务器允许大型语言模型(LLMs)检查数据库的模式结构,并执行只读查询操作。核心功能提供对PostgreSQL数据库的只读访问允许L...

发现了一个好用且免费的SQL数据库工具(DBeaver)

缘起最近Ai不是大火么,想着自己也弄一些开源的框架来捣腾一下。手上用着Mac,但Mac都没有显卡的,对于学习Ai训练模型不方便,所以最近新购入了一台4090的拯救者,打算用来好好学习一下Ai(呸,以上...

微软发布.NET 10首个预览版:JIT编译器再进化、跨平台开发更流畅

IT之家2月26日消息,微软.NET团队昨日(2月25日)发布博文,宣布推出.NET10首个预览版更新,重点改进.NETRuntime、SDK、libraries、C#、AS...

数据库管理工具Navicat Premium最新版发布啦

管理多个数据库要么需要使用多个客户端应用程序,要么找到一个可以容纳你使用的所有数据库的应用程序。其中一个工具是NavicatPremium。它不仅支持大多数主要的数据库管理系统(DBMS),而且它...

50+AI新品齐发,微软Build放大招:拥抱Agent胜算几何?

北京时间5月20日凌晨,如果你打开微软Build2025开发者大会的直播,最先吸引你的可能不是一场原本属于AI和开发者的技术盛会,而是开场不久后的尴尬一幕:一边是几位微软员工在台下大...

揭秘:一条SQL语句的执行过程是怎么样的?

数据库系统能够接受SQL语句,并返回数据查询的结果,或者对数据库中的数据进行修改,可以说几乎每个程序员都使用过它。而MySQL又是目前使用最广泛的数据库。所以,解析一下MySQL编译并执行...

各家sql工具,都闹过哪些乐子?

相信这些sql工具,大家都不陌生吧,它们在业内绝对算得上第一梯队的产品了,但是你知道,他们都闹过什么乐子吗?首先登场的是Navicat,这款强大的数据库管理工具,曾经让一位程序员朋友“火”了一把。Na...

详解PG数据库管理工具--pgadmin工具、安装部署及相关功能

概述今天主要介绍一下PG数据库管理工具--pgadmin,一起来看看吧~一、介绍pgAdmin4是一款为PostgreSQL设计的可靠和全面的数据库设计和管理软件,它允许连接到特定的数据库,创建表和...

Enpass for Mac(跨平台密码管理软件)

还在寻找密码管理软件吗?密码管理软件有很多,但是综合素质相当优秀且完全免费的密码管理软件却并不常见,EnpassMac版是一款免费跨平台密码管理软件,可以通过这款软件高效安全的保护密码文件,而且可以...