'데이터 엔지니어'로 성장하기

정리하는 걸 좋아하고, 남이 읽으면 더 좋아함

커뮤니티/slipp

Slipp)사이드프로젝트_센서 실시간 저장_1(strimzi,questdb,fastapi)

MightyTedKim 2023. 5. 21. 16:55
728x90
반응형

요약

'토이프로젝트 만들기' 스터디를 신청해서 진행 중입니다.
1차 목표인, 휴대폰 데이터 스트리밍은 성공했고 그 후기를 정리했습니다


예상 독자

결과물 

k8s 에 배포한 스트리밍 UI

 

[주제 구상] - 무엇을 주제로 선택할까

휴대폰의 센서 데이터 수집

업무가 주로 실시간 데이터 파이프라인을 구축임데, 보내는 쪽의 데이터를 선택할 수 없는게 좀 아쉽더라고요.
(공장에서 보내는 것을 그냥 받아야하니까ㅜ)

그래서 제가 선택할 수 있는 데이터는 무엇이 있을까 고민해봤어요

비트코인, 주식 그래프 등이 있지만, 문득 휴대폰의 센서 데이터를 수집해보면 어떨까? 라는 생각을 했죠
 

[자료 조사] - 무엇을 참고할까

Sensor Logger (APP)

'센서 로거'라는 어플리케이션을 이용해서 수집하기로 했어요.
마침 똑같은 내용을 포스팅한 블로그를 찾았거든요

https://towardsdatascience.com/a-real-time-streaming-project-with-smartphone-data-7e838a1b009d

[실행] - K8S + strimzi/ fastapi / questdb/ UI

docker-compose로 되어있는 예시를 k8s로 옮기기로 했어요

계획은 아래와 같습니다. 일단 모두 k8s에 올리기로 했어요.

토이프로젝트 아키텍처

 
 
데이터전송: Sensor Logger 앱에서, Push URL 값을 제가 띄운 Producer API로 입력해줬습니다.

Producer: 휴대폰을 흔들면 센서가 1초마다 api를 호출합니다. 그러면 msg를 kafka로 보내게되요

producer 코드 예시와 로그

Consumer: kafka에서 msg를 consume해서 questdb에 실시간으로 저장합니다

Quesdb: 저장 확인

Ui Server: while True로 계속 fastapi로 데이터 요청해서 최근 50개 데이터를 가져와서 시각화

 

[회고] - 무엇을 배웠나

저는 aws의 k8s로 올려봤어요. 거의 다 처음 적용하는 기술들이라 재미있었습니닿

1. kafka (strimzi)

on-prem에서만 kafka를 실행했었는데, k8s 에서 올리니까 엄청 좋더라고요 ㅎ
대시보드도 엄청 많고, yaml로 topic을 관리한다는 것 자체가 재미있었어요.
Operator를 이용해서 리소스를 관리하는 패턴에 대해서도 알게되었어요

 


 

  • strimzi operator
[ec2-user@k8smaster questdb]$ k get all -n kafka
NAME                                                     READY   STATUS    RESTARTS       AGE
pod/cluster-hgkim1-entity-operator-fd48dd64b-p775s   3/3     Running   13 (53m ago)   40h
pod/cluster-hgkim1-kafka-0                           1/1     Running   6 (55m ago)    40h
pod/cluster-hgkim1-kafka-1                           1/1     Running   4 (56m ago)    40h
pod/cluster-hgkim1-kafka-2                           1/1     Running   5 (55m ago)    40h
pod/cluster-hgkim1-kafka-exporter-c9b88464d-8fgnn    1/1     Running   5 (54m ago)    40h
pod/cluster-hgkim1-zookeeper-0                       1/1     Running   4 (55m ago)    40h
pod/cluster-hgkim1-zookeeper-1                       1/1     Running   2 (56m ago)    40h
pod/cluster-hgkim1-zookeeper-2                       1/1     Running   3 (56m ago)    40h
pod/strimzi-cluster-operator-687b8c77db-r2gbf            1/1     Running   5 (56m ago)    3d20h

2. FastApi

FasaApi는 Hello World만해봤는데, core/schemas/model/db 등과 같은 구조로 구성하는걸 배울 수 있어서 좋았어요.
다만 demo용 코드라서 실제 운영에서 사용하기에는 문제가 있다고 생각됩니다.

  • producer, consumer, ui
[ec2-user@k8smaster questdb]$ k get all -n stream-demo
NAME                    READY   STATUS    RESTARTS      AGE
pod/stream-consumer-0   1/1     Running   4 (61m ago)   19h
pod/stream-producer-0   1/1     Running   4 (61m ago)   22h
pod/stream-ui-0         1/1     Running   2 (62m ago)   16h

NAME                      TYPE       CLUSTER-IP       EXTERNAL-IP   PORT(S)          AGE
service/stream-producer   NodePort   10.106.50.186    <none>        8000:30016/TCP   44h
service/stream-ui         NodePort   10.101.145.111   <none>        5000:30017/TCP   17h

NAME                               READY   AGE
statefulset.apps/stream-consumer   1/1     19h
statefulset.apps/stream-producer   1/1     23h
statefulset.apps/stream-ui         1/1     16h

 

3. Questdb

이 부분에서 애를 좀 먹었는데요. 
QuestDb는 postgre를 지원하지만, curl과 http를 사용 할수도 있더라고요.
helm으로 실행하니 port가 4개나 나와서, 각각의 역할을 배울 수 있었어요

k get all -n questdb
NAME               READY   STATUS    RESTARTS      AGE
pod/my-questdb-0   1/1     Running   2 (61m ago)   41h

NAME                          TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                                        AGE
service/my-questdb            NodePort    10.103.137.17   <none>        9000:32726/TCP,8812:32682/TCP,9003:32145/TCP   41h
service/my-questdb-headless   ClusterIP   None            <none>        8812/TCP                                       41h

NAME                          READY   AGE
statefulset.apps/my-questdb   1/1     41h

 

[고도화] 2차 목표는?

일단 1차 목표인 스트리밍 서비스, k8s에 올리기는 완료했어요

하지만 아직은 데모라 부족한 부분을 좀 더 보완하기로 했습니다.

1. Questdb column timestamp로 변경

- timestamp가 string으로 저장되더라고요. grafana애서 cast하니까 너무 느려서 Data type을 명시하고 싶어요

2. DB Connection close 문제 해결

demo 코드여서 그런지, connection close가 없는 부분이 없어서 로직을 추가하려고요

  File "/usr/local/lib/python3.10/site-packages/pandas/io/sql.py", line 2027, in execute
    raise ex from inner_exc
pandas.errors.DatabaseError: Execution failed on sql: with tmp as 
(select device_id,recorded_timestamp,x,y,z,
   row_number() over(partition by device_id order by
                    recorded_timestamp desc) as rn
    from device_offload
    where sensor_name = 'acc'
    and recorded_timestamp::timestamp >= dateadd('s', -60, now())
)
select * from tmp where rn <= 50

server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.

unable to rollback

3. UI 고도화

스마트폰에서 데이터를 보내지 않을때 ui에서 아무것도 안나오더라고요. 이것도 수정하고 싶어요

4. 가속도 센서 외의 데이터 수집하기

9dof imu sensor를 이용하고 싶어서, gyroscope와 magnetic 데이터도 수집해서 db에 저장하고 싶어요


다음 글은 2차 목표 완료로 설정하고 싶네요 ㅎ

+ https://mightytedkim.tistory.com/199
 

 

Slipp)사이드프로젝트_스마트폰 센서 실시간 저장_2(기능 추가,버그 수정)

요약 '스마트폰 센서 실시간 저장/시각화' 토이 프로젝트를 하고 있어요. 지난 포스팅에서는 외국 블로그를 참고해서, k8s에 서비스를 올리는 것까지 진행했어요' 이번에는 몇가지 기능을 추가하

mightytedkim.tistory.com

참고

https://towardsdatascience.com/a-real-time-streaming-project-with-smartphone-data-7e838a1b009d

 

A Real-Time Streaming Project with Smartphone Data

Consume and Process Smartphone Sensor Data with FastAPI, Kafka, QuestDb, and Docker

towardsdatascience.com

 

728x90
반응형