language/Python

Python으로 Kafka Producer 구현

브라우니란 2022. 10. 27. 16:24

테스트 환경

  • Kafka broker 3대(Linux CentOS 7)
  • Python == 3.6.5
  • confluent-kakfa == 1.9.2

1. confluent-kafka 설치

  • Kafka에 Python이 레코드를 전송을 위해 필요한 라이브러리
  • cmd나 Pycharm 터미널에서 설치 진행
pip install confluent-kafka

 

2. Topic 생성

  • Topic 이름: test
  • Partition: 3개
  • Replication Factor: 3개
  • Kafka에서 Topic 생성
  • "Created topic test." 출력되면 Topic 생성 완료
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test

 

 

3. Producer 구현

 1. Python 스크립트

  • bootstrap.servers: Kafka broker들의 ip 주소:포트번호, 필수값
  • client.id: Producer host명 
  • poll():
  • flush(): Database의 commit과 유사한 기능
from confluent_kafka import Producer
import socket

conf = {'bootstrap.servers': '{broker1 ip 주소}:9092,{broker2 ip 주소}:9092,{broker3 ip 주소}:9092',
        'client.id':socket.gethostname()}

p = Producer(conf)

def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

for i in range(100):
    data = 'message : {}'.format(i)
    print(data)
    p.poll(3)
    p.produce('test', value=data, callback=delivery_report)
    p.flush()

p.flush()

 

 2. 실행 결과

레코드가 저장된 partition 확인

3. 참고사항

  • Kafka의 레코드가 저장되는 디렉터리(실제 Topic)가 root 권한으로 생성된 경우, 저장 디렉터리 권한 777로 수정
  • Producer(Client)가 Kafka에 접근하기 위해서는 server.properties에서 advertised.listeners 옵션 수정 필요

 

 

https://deep-dive-dev.tistory.com/42?category=1069496

 

Kafka 외부 접속 허용하기

$ vi conf/server.properties conf/server.properties 가 Kafka의 설정을 담당하는 파일이다. # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for..

deep-dive-dev.tistory.com

https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html

 

confluent_kafka API — confluent-kafka 1.9.0 documentation

Docs » confluent_kafka API View page source Kafka Clients AdminClient Kafka admin client: create, view, alter, and delete topics and resources. class confluent_kafka.admin.AdminClient(conf)[source] AdminClient provides admin operations for Kafka brokers,

docs.confluent.io