golang源码分析:sarama kafka client(part I:生产者) - 墨天轮
https://github.com/Shopify/sarama 是一个纯go实现的kafka客户端,是gopher学习kafka一个很好的资料。说实话sarama的代码组织很烂,密密麻麻一堆源码文件都在一个目录,让人无从下手,下面列出了一部分:
examples mocks tools //基于客户端,实现的kafka客户端工具 tools/kafka-producer-performance tools/kafka-console-producer tools/kafka-console-partitionconsumer tools/kafka-console-consumer vagrant //启动虚拟机的配置文件 acl_xx.go //权限相关的逻辑 add_partitions_to_txn_response.go add_partitions_to_txn_request.go add_offsets_to_txn_response.go add_offsets_to_txn_request.go admin.go alter_xx.go //修改相关的逻辑 async_producer.go balance_strategy.go broker.go client.go config.go consumer_group.go consumer_group_members.go consumer.go create_xx.go fetch_request.go delete_xx.go describe_xx.go list_xx.go offset_xx.go partitioner.go sarama.go sync_producer.go produce_request.go produce_response.go utils.go
其实我们重点关注下面几个文件就好了
admin.go async_producer.go broker.go client.go consumer_group.go consumer.go sync_producer.go
还是从例子开始:
生产者
package main import ( "fmt" "log" "github.com/Shopify/sarama" ) func main() { // 构建 生产者 // 生成 生产者配置文件 config := sarama.NewConfig() // 设置生产者 消息 回复等级 0 1 all config.Producer.RequiredAcks = sarama.NoResponse //sarama.WaitForAll //kafka server: Replication-factor is invalid. // 设置生产者 成功 发送消息 将在什么 通道返回 config.Producer.Return.Successes = true // 设置生产者 发送的分区 config.Producer.Partitioner = sarama.NewRandomPartitioner // 构建 消息 msg := &sarama.ProducerMessage{} msg.Topic = "test" msg.Value = sarama.StringEncoder("123") // 连接 kafka producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { log.Print(err) return } defer producer.Close() // 发送消息 message, offset, err := producer.SendMessage(msg) if err != nil { log.Println(err) return } fmt.Println(message, " ", offset) }
1,创建一个生产者:sarama.NewSyncProducer
代码在sync_producer.go中
func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) { if err := verifyProducerConfig(config); err != nil { } p, err := NewAsyncProducer(addrs, config) return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil }
先校验参数,然后调用NewAsyncProducer生成一个producer,将它转化成syncProducer
type syncProducer struct { producer *asyncProducer wg sync.WaitGroup }
可以看到syncProducer 本质上还是一个asyncProducer,通过waitGroup的方式来实现的同步。
NewAsyncProducer的代码实现在async_producer.go中:
func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) { client, err := NewClient(addrs, conf) if err != nil { return nil, err } return newAsyncProducer(client) }
首先创建了一个client,Client是对kafka broker连接的一个包装,生产者消费者都通过client和kafka broker进行通信的。代码位于client.go中
// Client is a generic Kafka client. It manages connections to one or more Kafka brokers. // You MUST call Close() on a client to avoid leaks, it will not be garbage-collected // automatically when it passes out of scope. It is safe to share a client amongst many // users, however Kafka will process requests from a single client strictly in serial, // so it is generally more efficient to use the default one client per producer/consumer. type Client interface { // Config returns the Config struct of the client. This struct should not be // altered after it has been created. Config() *Config // Controller returns the cluster controller broker. It will return a // locally cached value if it's available. You can call RefreshController // to update the cached value. Requires Kafka 0.10 or higher. Controller() (*Broker, error) // RefreshController retrieves the cluster controller from fresh metadata // and stores it in the local cache. Requires Kafka 0.10 or higher. RefreshController() (*Broker, error) // Brokers returns the current set of active brokers as retrieved from cluster metadata. Brokers() []*Broker // Broker returns the active Broker if available for the broker ID. Broker(brokerID int32) (*Broker, error) // Topics returns the set of available topics as retrieved from cluster metadata. Topics() ([]string, error) // Partitions returns the sorted list of all partition IDs for the given topic. Partitions(topic string) ([]int32, error) // WritablePartitions returns the sorted list of all writable partition IDs for // the given topic, where "writable" means "having a valid leader accepting // writes". WritablePartitions(topic string) ([]int32, error) // Leader returns the broker object that is the leader of the current // topic/partition, as determined by querying the cluster metadata. Leader(topic string, partitionID int32) (*Broker, error) // Replicas returns the set of all replica IDs for the given partition. Replicas(topic string, partitionID int32) ([]int32, error) // InSyncReplicas returns the set of all in-sync replica IDs for the given // partition. In-sync replicas are replicas which are fully caught up with // the partition leader. InSyncReplicas(topic string, partitionID int32) ([]int32, error) // OfflineReplicas returns the set of all offline replica IDs for the given // partition. Offline replicas are replicas which are offline OfflineReplicas(topic string, partitionID int32) ([]int32, error) // RefreshBrokers takes a list of addresses to be used as seed brokers. // Existing broker connections are closed and the updated list of seed brokers // will be used for the next metadata fetch. RefreshBrokers(addrs []string) error // Refreshmetadata takes a list of topics and queries the cluster to refresh the // available metadata for those topics. If no topics are provided, it will refresh // metadata for all topics. Refreshmetadata(topics ...string) error // GetOffset queries the cluster to get the most recent available offset at the // given time (in milliseconds) on the topic/partition combination. // Time should be OffsetOldest for the earliest available offset, // OffsetNewest for the offset of the message that will be produced next, or a time. GetOffset(topic string, partitionID int32, time int64) (int64, error) // Coordinator returns the coordinating broker for a consumer group. It will // return a locally cached value if it's available. You can call // RefreshCoordinator to update the cached value. This function only works on // Kafka 0.8.2 and higher. Coordinator(consumerGroup string) (*Broker, error) // RefreshCoordinator retrieves the coordinator for a consumer group and stores it // in local cache. This function only works on Kafka 0.8.2 and higher. RefreshCoordinator(consumerGroup string) error // InitProducerID retrieves information required for Idempotent Producer InitProducerID() (*InitProducerIDResponse, error) // Close shuts down all broker connections managed by this client. It is required // to call this function before a client object passes out of scope, as it will // otherwise leak memory. You must close any Producers or Consumers using a client // before you close the client. Close() error // Closed returns true if the client has already had Close called on it Closed() bool }
然后创建了一个asyncProducer对象
type asyncProducer struct { client Client conf *Config errors chan *ProducerError input, successes, retries chan *ProducerMessage inFlight sync.WaitGroup brokers map[*Broker]*brokerProducer brokerRefs map[*brokerProducer]int brokerLock sync.Mutex txnmgr *transactionManager }
transactionManager是它的成员
type transactionManager struct { producerID int64 producerEpoch int16 sequenceNumbers map[string]int32 mutex sync.Mutex }
创建完producer对象后起了两个协程
func newAsyncProducer(client Client) (AsyncProducer, error) { .... go withRecover(p.dispatcher) go withRecover(p.retryHandler) }
重点关注下peoducer的input成员
input, successes, retries chan *ProducerMessage
dispatcher这个协程,不断消费input里面的消息,然后发送给topicProducer的input chanel,这样我们发送消息的时候,值需要不断往peoducer的input里面发送就可以了。
func (p *asyncProducer) dispatcher() { for msg := range p.input { handler = p.newTopicProducer(msg.Topic) handler <- msg } }
这里面分两步:
1,获取topicProducer,返回topicProducer的input chanel
2,向这个chanel里发送消息。
// one per topic // partitions messages, then dispatches them by partition type topicProducer struct { parent *asyncProducer topic string input <-chan *ProducerMessage breaker *breaker.Breaker handlers map[int32]chan<- *ProducerMessage partitioner Partitioner }
每一个topicProducer同样会起一个协程
func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage input := make(chan *ProducerMessage, p.conf.ChannelBufferSize) go withRecover(tp.dispatch) }
dispatch 方法的内容很相似,把收到的消息转发给partitionProducer
func (tp *topicProducer) dispatch(){ for msg := range tp.input { handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition) handler <- msg } }
接着看下partitionProducer做了什么:
// one per partition per topic // dispatches messages to the appropriate broker // also responsible for maintaining message order during retries type partitionProducer struct { parent *asyncProducer topic string partition int32 input <-chan *ProducerMessage leader *Broker breaker *breaker.Breaker brokerProducer *brokerProducer // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through, // all other messages get buffered in retryState[msg.retries].buf to preserve ordering // retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and // therefore whether our buffer is complete and safe to flush) highWatermark int retryState []partitionRetryState }
func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage { input := make(chan *ProducerMessage, p.conf.ChannelBufferSize) go withRecover(pp.dispatch) return input }
没错,它同样起了个协程,返回了一个input channel用来接受消息,我们看看dispatch 的具体实现:
func (pp *partitionProducer) dispatch() { pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition) pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader) pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn} for msg := range pp.input { pp.brokerProducer.input <- msg } }
对,你没有看错,它同样创建了一个brokerProducer,然后把msg 发送到了brokerProducer的input channel。getBrokerProducer依赖于这个partitation的leader,发送消息都是发送到partation的leader,获取leader的方式是通过存储在kafka中的元数据得到的,后面会详细介绍。
func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer { bp = p.newBrokerProducer(broker) }
// groups messages together into appropriately-sized batches for sending to the broker // handles state related to retries etc type brokerProducer struct { parent *asyncProducer broker *Broker input chan *ProducerMessage output chan<- *produceSet responses <-chan *brokerProducerResponse abandoned chan struct{} stopchan chan struct{} buffer *produceSet timer <-chan time.Time timerFired bool closing error currentRetries map[string]map[int32]error }
brokerProducer同样起了两个协程
func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { go withRecover(bp.run) // minimal bridge to make the network response `select`able go withRecover(func() { for set := range bridge { request := set.buildRequest() response, err := broker.Produce(request) responses <- &brokerProducerResponse{ set: set, err: err, res: response, } } close(responses) }) }
run是一个循环,不断从input消费message,为请求kafka做准备。
func (bp *brokerProducer) run() { for { select { case msg, ok := <-bp.input: bp.buffer.add(msg) case output <- bp.buffer: bp.rollOver() case response, ok := <-bp.responses: if ok { bp.handleResponse(response) } }
第二个协程,就是做了构建请求,发起请求,传递返回结果三件事情。
func (ps *produceSet) buildRequest() *ProduceRequest { for topic, partitionSets := range ps.msgs { for partition, set := range partitionSets { req.AddBatch(topic, partition, rb) } } req.AddMessage(topic, partition, compMsg) }
message 定义在async_producer.go中
type Message struct { Codec CompressionCodec // codec used to compress the message contents CompressionLevel int // compression level LogAppendTime bool // the used timestamp is LogAppendTime Key []byte // the message key, may be nil Value []byte // the message contents Set *MessageSet // the message set a message might wrap Version int8 // v1 requires Kafka 0.10 Timestamp time.Time // the timestamp of the message (version 1+ only) compressedCache []byte compressedSize int // used for computing the compression ratio metrics }
接着就是发送消息
func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) { response = new(ProduceResponse) err = b.sendAndReceive(request, response) }
func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error { promise, err := b.send(req, res != nil, responseHeaderVersion) }
func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) { bytes, err := b.write(buf) }
调用tcp连接发送数据
func (b *Broker) write(buf []byte) (n int, err error) { return b.conn.Write(buf) }
上面就是整个数据的传递路径。
2, producer.SendMessage 发送消息
func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) { sp.producer.Input() <- msg }
SendMessage 就很简单了,往producer.Input的chanel里扔数据就好了。
上面就是sarama的生产消息流程,总结下,核心流程如下:
syncProducer->topicProducer->partitionProducer->brokerProducer
消息就是沿着这几个对象的input chanel 向下流动,最后通过tcp连接发送给kafka。