https://gist.github.com/rambolee/71286ad4c31aeb62253f4eb5cb6cbd93 安装
https://github.com/arnaud-lb/php-rdkafka 使用
https://kafka.apache.org/documentation/#configuration kafka 连接配置
背景
由于相关项目有 Kafka 的操作需求,因此,需要当前平台连接 Kafka 获取相关数据。
安装与配置
系统环境
- CentOS 6.7
- Kafka 0.10
- PHP 5.6.21
安装 php Kafka 扩展
根据官网指引地址 我们选择对应的 PHP 客户端
Dev 环境准备
$ sudo yum install zlib zlib-devel openssl openssl-devel cyrus-sasl2 cyrus-sasl-devel
安装 php-rdkafka 的依赖 C 扩展库
$ git clone https://github.com/edenhill/librdkafka/
$ cd librdkafka
$ ./configure
$ make
$ sudo make install
PECL 安装
/usr/local/php56-5.6.24/bin/pecl install rdkafka
或者 编译安装 php-rdkafka的 PHP 扩展
参考官网安装说明
$ git clone https://github.com/arnaud-lb/php-rdkafka.git $ cd php-rdkafka $ # For PHP 7, checkout the php7 branch: $ # git checkout php7 $ phpize $ ./configure $ make all -j 5 $ sudo make install $ sh -c 'echo "extension=rdkafka.so" >> /usr/local/php/etc/php.ini'
查看 php-rdkafka 配置
$ php -m | grep kafka Warning: PHP Startup: Unable to load dynamic library '/usr/local/php/lib/php/extensions/no-debug-non-zts-20131226/kafka.so' - librdkafka.so.1: cannot open shared object file: No such file or directory in Unknown on line 0
修复加载.so 问题
$ sudo touch /etc/ld.so.conf.d/librd.conf
$ sudo sh -c 'echo "/usr/local/lib" >> /etc/ld.so.conf.d/librd.conf'
$ ldconfig
$ ldconfig -p
查看 php 安装好的 kafka 配置
$ php -m | grep kafka
rdkafka
初步使用 Demo
目前使用高级消费者模式,参考官网的 demo 如下
$conf = new RdKafka\Conf(); // Set a rebalance callback to log partition assignemts (optional) $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new \Exception($err); } }); // Configure the group.id. All consumer with the same group.id will consume // different partitions. $conf->set('group.id', 'myConsumerGroup_1'); // Initial list of Kafka brokers $conf->set('metadata.broker.list', '10.110.92.95,10.110.92.101'); $topicConf = new RdKafka\TopicConf(); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning $topicConf->set('auto.offset.reset', 'smallest'); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafka\KafkaConsumer($conf); // Subscribe to topic 'test' $consumer->subscribe(['NetBaseInfo']); echo "Waiting for partition assignment... (make take some time when\n"; echo "quickly re-joinging the group after leaving it.)\n"; while (true) { $message = $consumer->consume(120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } }
kafka topic 使用多个 partition
topic 的删除,和使用标记
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic someTopic
删除 消费者组
https://stackoverflow.com/questions/29243896/removing-a-kafka-consumer-group-in-zookeeper
./bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --delete --group <group-name>
There is no need to delete with the new consumer. Here is what the script outputs when attempting to delete:
Note that there's no need to delete group metadata for the new consumer as it is automatically deleted when the last member leaves
That's the short answer. More details: By "metadata", two things are meant. First, just the information about the consumers and consumer groups that is stored as part of the group membership coordinator. That is automatically removed if all consumers in the group are gone.
Second, the consumer group has stored the committed offsets in a Kafka topic (when the new consumer is used. Previously these used to be stored with Zookeeper). That topic is not immediately deleted when the consumer group disappears. If the consumer group reappears again, it will find the previous offsets automatically in this topic. It can choose to use them or ignore them. If the consumer group never reappears, these stored offsets are eventually garbage collected automatically.