资讯 小学 初中 高中 语言 会计职称 学历提升 法考 计算机考试 医护考试 建工考试 教育百科
栏目分类:
子分类:
返回
空麓网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
空麓网 > 计算机考试 > 软件开发 > 后端开发 > Python

基于docker的confluent-kafka搭建及python接口使用

Python 更新时间: 发布时间: 计算机考试归档 最新发布

基于docker的confluent-kafka搭建及python接口使用

基于docker的confluent-kafka搭建及python接口使用

  • 1. 安装docker以及docker-compose
    • 1.1 安装docker
    • 1.2 安装docker-compose
  • 2. 安装confluent-kafka
  • 3. python接口使用
    • 3.1 安装依赖包
    • 3.2 创建、查看topic
    • 3.3 python接口-broker
    • 3.4 python接口-consumer
  • 参考链接

本文介绍基于docker搭建的confluent-kafka及其python接口的使用。

本文只搭建了一个单Broker的confluent-kafka测试环境,考虑到高可用、高吞吐等因素,正式生产环境一般至少要3个节点。

本文采用的系统配置如下:

  • LinuxMint 20.3 (兼容 Ununtu 20.04)
  • docker 20.10.21
  • docker-compose 2.14.2
  • python 3.9.16
  • confluent-kafka(python包) 2.1.1

1. 安装docker以及docker-compose

1.1 安装docker

docker-compose依赖于docker,因此需要先安装docker。

curl -fsSL https://test.docker.com -o test-docker.shsudo sh test-docker.sh

1.2 安装docker-compose

Compose 是用于定义和运行多容器 Docker 应用程序的工具。通过 Compose,您可以使用 YML 文件来配置应用程序需要的所有服务。然后,使用一个命令,就可以从 YML 文件配置中创建并启动所有服务。

curl -L https://get.daocloud.io/docker/compose/releases/download/v2.14.2/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose

要安装其他版本的 Compose,请替换 v2.14.2。

2. 安装confluent-kafka

新建文件并创建docker-compose.yml文件:

version: '3'services:  zookeeper:    image: confluentinc/cp-zookeeper:7.0.1    container_name: zookeeper    environment:      ZOOKEEPER_CLIENT_PORT: 2181      ZOOKEEPER_TICK_TIME: 2000  broker:    image: confluentinc/cp-kafka:7.0.1    container_name: broker    ports:    # To learn about configuring Kafka for access across networks see    # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/      - "9092:9092"    depends_on:      - zookeeper    environment:      KAFKA_BROKER_ID: 1      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

注意 这里搭建的是本地环境,如果需要从网络中的另一位置访问kafka,需要将KAFKA_ADVERTISED_LISTENERS中的localhost换成kafka所在主机的真实地址/域名。

进入该文件夹并运行:

docker-compose -f docker-compose.yml up -d

运行后在docker中看到类似结果说明启动成功:

aa@bb:~/docker_scripts$ docker psCONTAINER ID   IMAGE                             COMMAND                  CREATED         STATUS         PORTS                                       NAMESe6fbc05d61b1   confluentinc/cp-kafka:7.0.1       "/etc/confluent/dock…"   7 minutes ago   Up 7 minutes   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp   broker58b04385f2bf   confluentinc/cp-zookeeper:7.0.1   "/etc/confluent/dock…"   7 minutes ago   Up 7 minutes   2181/tcp, 2888/tcp, 3888/tcp                zookeeper

这里kafka端口为9092。

关闭容器服务:

docker-compose -f docker-compose.yml down

3. python接口使用

3.1 安装依赖包

安装依赖包:

pip3 install confluent-kafka

3.2 创建、查看topic

进入kafka镜像:

docker exec -ti broker bash

查看topic:

[aa@bb ~]$ /bin/kafka-topics --list --bootstrap-server 127.0.0.1:9092

新建名为test001的topic:

[aa@bb ~]$ /bin/kafka-topics --create --bootstrap-server 127.0.0.1:9092 --topic test001 --partitions 2Created topic test001.

查看topic:

[aa@bb ~]$ /bin/kafka-topics --list --bootstrap-server 127.0.0.1:9092test001

通过Ctrl + P + Q回到终端。

3.3 python接口-broker

创建producer代码producer1.py:

import socketfrom confluent_kafka import Producerconf = {    'bootstrap.servers': "localhost:9092",    'client.id': socket.gethostname()}producer = Producer(conf)def __publish_delivery_report(err, msg) -> None:    if err is not None:        print(f"send msg:{msg} fail, err is not None")    else:        print(f"send msg{msg} success")def send_msg(topic: str, data):    producer.produce(topic, data, callback=__publish_delivery_report)    producer.flush()if __name__ == '__main__':    msg = "hello kafka"    topic = "test001"    send_msg(topic, msg)

运行结果:

aa@bb:~/codes/kafka_test$ python3 producer1.pysend msg success

3.4 python接口-consumer

创建consumer代码consumer1.py:

from confluent_kafka import Consumer class KafkaConsumer:    def __init__(self, brokers, group):        config = dict()        config['bootstrap.servers'] = brokers        config['group.id'] = group        config['auto.offset.reset'] = 'earliest'        self.consumer = Consumer(config)     def subscribe(self, topics):        self.consumer.subscribe(topics=topics)     def pull(self):        while True:            msg = self.consumer.poll(1.0)            if msg is None:                continue            if msg.error():                print("Consumer error: {}".format(msg.error()))                continue            print('Received message: {}'.format(msg.value().decode('utf-8')))     def close(self):        self.consumer.close()  if __name__ == "__main__":    consumer = KafkaConsumer("127.0.0.1:9092", "test_group1")    consumer.subscribe(['test001'])    consumer.pull()    consumer.close()

运行结果:

aa@bb:~/codes/kafka_test$ python3 consumer1.pyReceived message: hello kafka

参考链接

  1. Hello Kafka(八)——Confluent Kafka简介
  2. Docker Compose | 菜鸟教程
  3. confluent_kafka生产者 - luckygxf - 博客园
  4. Hello Kafka(十二)——Python客户端_kafka python客户端_天山老妖的博客-CSDN博客
转载请注明:文章转载自 http://www.konglu.com/
本文地址:http://www.konglu.com/it/1097515.html
免责声明:

我们致力于保护作者版权,注重分享,被刊用文章【基于docker的confluent-kafka搭建及python接口使用】因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理,本文部分文字与图片资源来自于网络,转载此文是出于传递更多信息之目的,若有来源标注错误或侵犯了您的合法权益,请立即通知我们,情况属实,我们会第一时间予以删除,并同时向您表示歉意,谢谢!

我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2023 成都空麓科技有限公司

ICP备案号:蜀ICP备2023000828号-2