mydeer
就像我们从来没有来过这里,但这种感觉似曾相识。
PHP 与 Kafka 连接与搭建

https://gist.github.com/rambolee/71286ad4c31aeb62253f4eb5cb6cbd93 安装

https://github.com/arnaud-lb/php-rdkafka 使用

https://kafka.apache.org/documentation/#configuration kafka 连接配置

背景

由于相关项目有 Kafka 的操作需求,因此,需要当前平台连接 Kafka 获取相关数据。

安装与配置

系统环境

  • CentOS 6.7
  • Kafka 0.10
  • PHP 5.6.21

安装 php Kafka 扩展

根据官网指引地址 我们选择对应的 PHP 客户端

Dev 环境准备

$ sudo yum install zlib zlib-devel openssl openssl-devel cyrus-sasl2 cyrus-sasl-devel

安装 php-rdkafka 的依赖 C 扩展库

$ git clone https://github.com/edenhill/librdkafka/
$ cd librdkafka
$ ./configure
$ make 
$ sudo make install

PECL  安装

/usr/local/php56-5.6.24/bin/pecl install rdkafka

或者 编译安装   php-rdkafka的 PHP 扩展

参考官网安装说明

$ git clone https://github.com/arnaud-lb/php-rdkafka.git
$ cd php-rdkafka
$ # For PHP 7, checkout the php7 branch:
$ # git checkout php7
$ phpize
$ ./configure
$ make all -j 5
$ sudo make install
$ sh -c 'echo "extension=rdkafka.so" >> /usr/local/php/etc/php.ini'

查看 php-rdkafka 配置

$ php -m | grep kafka
Warning: PHP Startup: Unable to load dynamic library '/usr/local/php/lib/php/extensions/no-debug-non-zts-20131226/kafka.so' - librdkafka.so.1: cannot open shared object file: No such file or directory in Unknown on line 0

修复加载.so 问题

$ sudo touch /etc/ld.so.conf.d/librd.conf
$ sudo sh -c 'echo "/usr/local/lib" >> /etc/ld.so.conf.d/librd.conf'
$ ldconfig
$ ldconfig -p

查看 php 安装好的 kafka 配置

$ php -m | grep kafka
rdkafka

初步使用 Demo

目前使用高级消费者模式,参考官网的 demo 如下

$conf = new RdKafka\Conf();

// Set a rebalance callback to log partition assignemts (optional)
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;

         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
             echo "Revoke: ";
             var_dump($partitions);
             $kafka->assign(NULL);
             break;

         default:
            throw new \Exception($err);
    }
});

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'myConsumerGroup_1');

// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '10.110.92.95,10.110.92.101');

$topicConf = new RdKafka\TopicConf();

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');

// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

// Subscribe to topic 'test'
$consumer->subscribe(['NetBaseInfo']);

echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joinging the group after leaving it.)\n";

while (true) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

kafka  topic 使用多个 partition

topic 的删除,和使用标记

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic someTopic

删除 消费者组

https://stackoverflow.com/questions/29243896/removing-a-kafka-consumer-group-in-zookeeper

./bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --delete --group <group-name>

There is no need to delete with the new consumer. Here is what the script outputs when attempting to delete:

Note that there's no need to delete group metadata for the new consumer as it is automatically deleted when the last member leaves

That's the short answer. More details: By "metadata", two things are meant. First, just the information about the consumers and consumer groups that is stored as part of the group membership coordinator. That is automatically removed if all consumers in the group are gone.

Second, the consumer group has stored the committed offsets in a Kafka topic (when the new consumer is used. Previously these used to be stored with Zookeeper). That topic is not immediately deleted when the consumer group disappears. If the consumer group reappears again, it will find the previous offsets automatically in this topic. It can choose to use them or ignore them. If the consumer group never reappears, these stored offsets are eventually garbage collected automatically.

https://stackoverflow.com/questions/37741936/how-to-delete-kafka-consumer-created-via-new-consumer-api

<< 上一篇 Kafka 搭建 kafka可视化管理工具 Kafka Manager 下一篇 >>
文章标签
随意 | Created At 2014 By William Clinton | 蜀ICP备14002619号-4 |