테스트 환경
- Kafka broker 3대(Linux CentOS 7)
- Python == 3.6.5
- confluent-kakfa == 1.9.2
1. confluent-kafka 설치
- Kafka에 Python이 레코드를 전송을 위해 필요한 라이브러리
- cmd나 Pycharm 터미널에서 설치 진행
pip install confluent-kafka
2. Topic 생성
- Topic 이름: test
- Partition: 3개
- Replication Factor: 3개
- Kafka에서 Topic 생성
- "Created topic test." 출력되면 Topic 생성 완료
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test
3. Producer 구현
1. Python 스크립트
- bootstrap.servers: Kafka broker들의 ip 주소:포트번호, 필수값
- client.id: Producer host명
- poll():
- flush(): Database의 commit과 유사한 기능
from confluent_kafka import Producer
import socket
conf = {'bootstrap.servers': '{broker1 ip 주소}:9092,{broker2 ip 주소}:9092,{broker3 ip 주소}:9092',
'client.id':socket.gethostname()}
p = Producer(conf)
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
for i in range(100):
data = 'message : {}'.format(i)
print(data)
p.poll(3)
p.produce('test', value=data, callback=delivery_report)
p.flush()
p.flush()
2. 실행 결과
3. 참고사항
- Kafka의 레코드가 저장되는 디렉터리(실제 Topic)가 root 권한으로 생성된 경우, 저장 디렉터리 권한 777로 수정
- Producer(Client)가 Kafka에 접근하기 위해서는 server.properties에서 advertised.listeners 옵션 수정 필요
https://deep-dive-dev.tistory.com/42?category=1069496
Kafka 외부 접속 허용하기
$ vi conf/server.properties conf/server.properties 가 Kafka의 설정을 담당하는 파일이다. # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for..
deep-dive-dev.tistory.com
https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html
confluent_kafka API — confluent-kafka 1.9.0 documentation
Docs » confluent_kafka API View page source Kafka Clients AdminClient Kafka admin client: create, view, alter, and delete topics and resources. class confluent_kafka.admin.AdminClient(conf)[source] AdminClient provides admin operations for Kafka brokers,
docs.confluent.io
'language > Python' 카테고리의 다른 글
[Module] 파이썬 파일(.py)을 import 하는 방법 (0) | 2023.11.16 |
---|---|
[Function] 가변 인자(*args)와 키워드 가변 인자(**kwargs) (0) | 2023.11.16 |
[자료형] List Comprehension (0) | 2023.11.15 |
[자료형] List - 자주 사용하는 메소드 함수 (0) | 2023.08.08 |
[Package/Library] Numpy 이해 및 사용 예시 (0) | 2022.11.20 |