์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
- ํ์ดํผ๋ฐ์ด์
- ํ๋ก๊ทธ๋๋จธ์ค
- JPA
- nGrinder
- private subnet ec2 ๋ก์ปฌ ์ ์
- AWS Certified Solutions Architect - Associate
- redis ์กฐํ
- @RestControllerAdvice
- ํ๋ก๊ทธ๋๋จธ์ค ํฉ์นํ์์๊ธ
- ๋ค์ค ์ปจํ ์ด๋
- Codedeploy ์ค๋ฅ
- ์ ํจ์ค ๋น๋ ์ค๋ฅ
- s3 log ์ ์ฅ
- Entity
- ์๋ฐ
- prod docker-compose
- docker
- ์๋ฒ ํฐ์ง๋ ๋์ปค ์ฌ์คํ
- aws saa ํฉ๊ฒฉ
- redis ํ ์คํธ์ฝ๋
- docker compose
- docker-compose kafka
- ์ ํจ์ค ์ค์ผ์ค๋ฌ
- ์คํํ๋ ๋ฏธ์ค
- Kafka
- ํ๋ก๊ทธ๋๋จธ์ค ์ปฌ๋ฌ๋ง๋ถ
- s3 ์ด๋ฏธ์ง ์ ์ฅ
- s3 ์ด๋ฏธ์ง ๋ค์ด๋ก๋
- aws ์ฟ ํฐ
- docker ps -a
- Today
- Total
๐๐ข๐๐ โ๐๐๐ ๐๐๐ก๐๐ ๐๐๐๐โง
[Spring] SSE & Kafka ๋ฅผ ํ์ฉํด์ ์๋ฆผ ๊ธฐ๋ฅ ๊ตฌํํ๊ธฐ ๋ณธ๋ฌธ
[Spring] SSE & Kafka ๋ฅผ ํ์ฉํด์ ์๋ฆผ ๊ธฐ๋ฅ ๊ตฌํํ๊ธฐ
๐คRyusun๐ค 2024. 3. 22. 18:12ํ์๋ ์ฒ์์๋ ์๋ฆผ ํ ์ด๋ธ์ ๊ตฌํํํ ๋๊ธ์ด ๋ฌ๋ฆฌ๋ฉด ์๋ฆผํ ์ด๋ธ์ ๋ฐ์ดํฐ๋ก ์ ์ฅํ์๊ณ , API๊ฐ ํธ์ถ๋๋ฉด ํด๋น ์ฌ์ฉ์์ ์๋ฆผ ๋ฐ์ดํฐ๋ฅผ DB์์ ๊ฐ์ ธ์ค๋ ๋ฐฉ์์ผ๋ก ๊ตฌํํ์๋ค.
ํ์ง๋ง ์ด๋ฐ polling ๋ฐฉ์์ ์ ํด์ง ์ฃผ๊ธฐ๋ง๋ค ๋ฌด์กฐ๊ฑด ์๋ฒ์ ์์ฒญ์ ๋ณด๋ด๊ธฐ ๋๋ฌธ์ ๋ฐ์ดํฐ๊ฐ ์์ ๋๋ ํธ๋ํฝ์ ๋ฐ์์ํค๋ฉฐ, ์ค์๊ฐ์ผ๋ก ๋ฐ์ดํฐ๊ฐ ์ ๋ฐ์ดํธ ๋์ง์๋๋ค๋ ๋จ์ ์ด ์๋ค.
๊ทธ๋์ ํ์๋ Kafka๋ฅผ ์ฌ์ฉํ์ฌ ์๋ฆผ ์ด๋ฒคํธ๋ฅผ ๋น๋๊ธฐ์ ์ผ๋ก ์ฒ๋ฆฌํ๊ณ , Server-Sent Events(SSE)๋ฅผ ํตํด ํด๋ผ์ด์ธํธ์ ์ค์๊ฐ์ผ๋ก ์๋ฆผ์ ์ ๋ฌํ๋ ์์คํ ์ ๊ตฌํํด๋ณผ๊ฒ์ด๋ค. ๊ฒ์ํ์ ๊ธ์ ๋๊ธ์ด ๋ฌ๋ฆฌ๋ฉด ์๋์ด ์๊ธฐ๊ณ , ์๋ฒ๋ Kafka๋ฅผ ํตํด ๋น๋๊ธฐ์ ์ผ๋ก ์๋ ์ด๋ฒคํธ๋ฅผ ์ฒ๋ฆฌํ๋ฉฐ, ์ฒ๋ฆฌ๋ ์๋์ SSE๋ฅผ ํตํด ์ค์๊ฐ์ผ๋ก ํด๋ผ์ด์ธํธ์ ์ ์กํ๋ ๋ก์ง์ผ๋ก ๊ตฌํํ ๊ฒ์ด๋ค.
์ ์ฒด์ ์ธ ํ๋ฆ
์ฌ์ฉ์์ ํ๋์ด๋ ํน์ ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ๋ฉด ์๋ฆผ ์ด๋ฒคํธ๊ฐ ์์ฑ๋๊ณ Kafka๋ฅผ ํตํด ์ฒ๋ฆฌ๋๋ค. ์ด๋ฒคํธ๋ AlarmConsumer์ ์ํด ์์ ๋๊ณ , AlarmService์์ ์ฒ๋ฆฌ๋์ด ์ค์ ์๋ฆผ์ผ๋ก ๋ณํ๋ ํ, ํด๋น ์ฌ์ฉ์์๊ฒ SSE๋ฅผ ํตํด ์ค์๊ฐ์ผ๋ก ์ ์กํ๋ค.
์ฃผ์ ํด๋์ค์ ์ญํ
- AlarmProducer: ์ฌ์ฉ์๊ฐ ๋๊ธ์ ๋ฌ๋ฉด ํธ์ถ๋์ด, Kafka์ "alarm" ํ ํฝ์ผ๋ก ์๋ฆผ ์ด๋ฒคํธ๋ฅผ ์ ์กํ๋ค.
- AlarmConsumer: Kafka์์ "alarm" ํ ํฝ์ ์๋ฆผ ์ด๋ฒคํธ๋ฅผ ์์ ํ๊ณ , ํด๋น ์ด๋ฒคํธ๋ฅผ ์ฒ๋ฆฌํ๊ธฐ ์ํด AlarmService์ send ๋ฉ์๋๋ฅผ ํธ์ถํ๋ค.
- AlarmService: ์๋ฆผ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ์ฌ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ ์ฅํ๊ณ , ํด๋น ์ฌ์ฉ์์ SseEmitter๊ฐ ์์ ๊ฒฝ์ฐ ์ค์๊ฐ์ผ๋ก ์๋ฆผ์ ์ ์กํ๋ค. ๋ํ, ์ฌ์ฉ์๊ฐ ์๋ฆผ ๊ตฌ๋ ์ ์์ฒญํ ๋ (connectNotification ๋ฉ์๋) ์ SseEmitter๋ฅผ ์์ฑํ๊ณ ์ ์ฅํ๋ค.
- EmitterRepository: ์ฌ์ฉ์๋ณ SseEmitter ๊ฐ์ฒด๋ฅผ ๋ฉ๋ชจ๋ฆฌ ๋ด์์ ๊ด๋ฆฌํ๋ฉฐ, ์๋ฆผ ์ ์ก ์ ์ฌ์ฉ์์ SseEmitter๋ฅผ ์กฐํํ์ฌ ์๋ฆผ์ ์ ์กํ๋ค.
- KafkaProducerConfig: Kafka ํ๋ก๋์์ ์ปจ์๋จธ์ ์ค์ ์ ์ ์ํ๋ค.
Controller.java
@GetMapping( "alarm/subscribe")
public SseEmitter subscribe(@RequestParam Long userId) {
log.info("subscribe");
return alarmService.connectNotification(userId);
}
@GetMapping("alarm")
public ResponseEntity<Page<AlarmReponse>> alarm(@RequestParam Long userId, Pageable pageable) {
return ResponseEntity.ok().body(alarmService.getAlarmList(userId, pageable));
}
@GetMapping( "alarm/subscribe")
- ์ฌ์ฉ์ ID์ ๋ํ SSE(Server-Sent Events) ์ฐ๊ฒฐ์ ์์ฑํ๊ณ , ์ด ์ฐ๊ฒฐ์ ํตํด ์ค์๊ฐ์ผ๋ก ์๋ฒ์ ํด๋ผ์ด์ธํธ ๊ฐ์ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ด ์ค์ ๋๋ค.
@GetMapping("alarm")
- ํด๋น ์ฌ์ฉ์์ ์๋ฆผ ๋ชฉ๋ก์ ํ์ด์ง๋ค์ด์ ๋ ํํ๋ก ๊ฐ์ ธ์จ๋ค.
KafkaProducerConfig.java
- Kafka ํ๋ก๋์ ์ค์ ์ ์ํ ํด๋์ค
- Kafka์ ํต์ ํ๊ธฐ ์ํ ์ฌ๋ฌ Bean๋ค์ ์ ์
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, AlarmEvent> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, AlarmEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
}
producerFactory() ๋ฉ์๋
- ProducerFactory<String, AlarmEvent> : Kafka๋ก ๋ฉ์์ง๋ฅผ ๋ณด๋ผ ๋ ์ฌ์ฉ๋๋ค.
- ProducerConfig๋ฅผ ์ค์ ํ๋๋ฐ, ์ฌ๊ธฐ์๋ Kafka ์๋ฒ ์ฃผ์(BOOTSTRAP_SERVERS_CONFIG), ๋ฉ์์ง ํค์ ์ง๋ ฌํ ๋ฐฉ์(KEY_SERIALIZER_CLASS_CONFIG), ๋ฉ์์ง ๊ฐ์ ์ง๋ ฌํ ๋ฐฉ์(VALUE_SERIALIZER_CLASS_CONFIG) ๋ฑ์ด ํฌํจ๋๋ค.
- ์ฌ๊ธฐ์ AlarmEvent๋ ๋ฉ์์ง์ ๋ณธ๋ฌธ์ ๋ํ๋ด๋ ํด๋์ค์ด๋ฉฐ, JsonSerializer๋ ์ด AlarmEvent ์ธ์คํด์ค๋ฅผ JSON ํ์์ผ๋ก ์ง๋ ฌํํ๋ค.
kafkaTemplate() ๋ฉ์๋
- KafkaTemplate<String, AlarmEvent> ํ์ ์ Bean์ ๋ฐํํ๋ฉฐ, ์ด๋ Kafka๋ก ๋ฉ์์ง๋ฅผ ๋ณด๋ด๊ธฐ ์ํ ํ ํ๋ฆฟ์ผ๋ก ์ฌ์ฉํ๋ค.
- ์ด ํ ํ๋ฆฟ์ producerFactory()์์ ์์ฑํ ProducerFactory๋ฅผ ์ฌ์ฉํ๋ค.
kafkaListenerContainerFactory() ๋ฉ์๋
- Kafka ๋ฉ์์ง ๋ฆฌ์ค๋ ์ปจํ ์ด๋๋ฅผ ์ค์ ํ๊ธฐ ์ํ ๋ฉ์๋
- ConcurrentKafkaListenerContainerFactory<?, ?> ๋ Kafka ๋ฉ์์ง ๋ฆฌ์ค๋๋ค์ ์ํ ์ปจํ ์ด๋๋ฅผ ์์ฑํ๋๋ฐ ์ฌ์ฉ๋๋ค.
- ConcurrentKafkaListenerContainerFactoryConfigurer์ ConsumerFactory<Object, Object>๋ฅผ ์ด์ฉํด ๋ฆฌ์ค๋ ์ปจํ ์ด๋ ํฉํ ๋ฆฌ๋ฅผ ๊ตฌ์ฑํ๋ค.
- setAckMode(ContainerProperties.AckMode.MANUAL)๋ ๋ฉ์์ง๊ฐ ์ฑ๊ณต์ ์ผ๋ก ์ฒ๋ฆฌ๋์์์ ์ ํ๋ฆฌ์ผ์ด์ ์์ Kafka์ ์๋ฆฌ๋ ์ญํ ์ ํ๋ค.
AlarmService.java
- ์ฌ์ฉ์์๊ฒ ์๋์ ์ ์กํ๊ณ , ์๋ ์ฐ๊ฒฐ์ ๊ด๋ฆฌํ๋ฉฐ, ์๋ ๋ชฉ๋ก์ ์กฐํํ๋ ํด๋์ค
@Slf4j
@RequiredArgsConstructor
@Service
public class AlarmService {
private final AlarmRepository alarmRepository;
private final EmitterRepository emitterRepository;
private final UserRepository userRepository;
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
private final static String ALARM_NAME = "alarm";
@Transactional
public void send(AlarmType type, AlarmArgs args, Long receiverId) {
EntityUser user = userRepository.findByUserId(receiverId).orElseThrow(
() -> new AppException(ErrorCode.USER_NOT_FOUND, "ํ์์ด ์กด์ฌํ์ง ์์ต๋๋ค."));
HashMap<Long, Long> newArgs = new HashMap<>();
newArgs.put(args.getFromUserId(), args.getTargetId());
EntityAlarm alarm = EntityAlarm.of(type, newArgs, user);
try {
alarmRepository.save(alarm);
} catch (Exception e){
throw new AppException(ErrorCode.INTERNAL_SERVER_ERROR, "์ผ์์ ์ค๋ฅ์
๋๋ค");
}
emitterRepository.get(receiverId).ifPresentOrElse(it -> { //emitterRepo์์ ์ธ์คํด์ค๋ฅผ ๊ฐ์ง๊ณ ์จ๋ค.
try {
it.send(SseEmitter.event() // ์๋ก์ด ์๋ ์ ์ก
.id(alarm.getAlarmId().toString())
.name(ALARM_NAME)
.data(new AlarmNoti()));
} catch (IOException exception) {
emitterRepository.delete(receiverId);
throw new AppException(ErrorCode.NOTIFICATION_CONNECT_ERROR, "์๋ฆผ ์ ์ก์ ์คํจํ์ต๋๋ค.");
}
},
() -> log.info("No emitter founded")
);
}
@Transactional
public SseEmitter connectNotification(Long userId) {
log.info("{userId = }", userId);
SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
emitterRepository.save(userId, emitter);
emitter.onCompletion(() -> emitterRepository.delete(userId));
emitter.onTimeout(() -> emitterRepository.delete(userId));
try {
log.info("send");
emitter.send(SseEmitter.event()
.id("id")
.name(ALARM_NAME)
.data("connect completed"));
} catch (IOException exception) {
throw new AppException(ErrorCode.NOTIFICATION_CONNECT_ERROR, "ํต์ ์ ์คํจํ์ต๋๋ค.");
}
return emitter;
}
@Transactional(readOnly = true)
public Page<AlarmReponse> getAlarmList(Long userId, Pageable pageable) {
return alarmRepository.findAllByUserUserId(userId, pageable).map(AlarmReponse::fromEntity);
}
send ๋ฉ์๋
- ์๋์ ์ ์ก
- alarmRepository ์๋ ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๋ค.
- emitterRepository์์ SSE (Server-Sent Events) ์ด๋ฒคํธ ์คํธ๋ฆผ์ ๊ด๋ฆฌํ๊ณ , ์๋ก์ด ์๋ ์ด๋ฒคํธ๋ฅผ ์คํธ๋ฆผ์ ๋ณด๋ธ๋ค.
connectNotification ๋ฉ์๋
- ํด๋ผ์ด์ธํธ๊ฐ ์๋ฆผ ์์ ์ ์ํด ์ฐ๊ฒฐ
- SseEmitter ์ธ์คํด์ค๋ฅผ ์์ฑํ๊ณ , ์ด๋ฅผ emitterRepository์ ์ ์ฅํ๋ค. ์ฐ๊ฒฐ ์๋ฃ๋๋ฉด, ์ฐ๊ฒฐ ์๋ฃ ์ด๋ฒคํธ๋ฅผ ํด๋ผ์ด์ธํธ์ ๋ณด๋ธ๋ค.
- ์ฐ๊ฒฐ ์๋ฃ๋ ํ์์์ ์ emitterRepository์์ ํด๋น SseEmitter ์ธ์คํด์ค๋ฅผ ์ ๊ฑฐํ๋ค.
getAlarmList ๋ฉ์๋
- ์ฌ์ฉ์ ID๋ก ์๋ ๋ชฉ๋ก์ ํ์ด์ง ๋จ์๋ก ์กฐํ
EmitterRepository.java
์๋ฒ์์ ํด๋ผ์ด์ธํธ๋ก ์๋ฒ ์ ์ก ์ด๋ฒคํธ(Server-Sent Events, SSE)๋ฅผ ์ ์กํ๋ ๋ฐ ์ฌ์ฉ๋๋ SseEmitter ์ธ์คํด์ค๋ฅผ ๋ก์ปฌ ์บ์๋ก ๊ด๋ฆฌํ๋ ํด๋์ค
@Slf4j
@Repository
@RequiredArgsConstructor
public class EmitterRepository { // ๋ก์ปฌ์บ์๋ก emitter ์ ์ฅ
private Map<String, SseEmitter> emitterMap = new HashMap<>();
public SseEmitter save(Long userId, SseEmitter emitter) {
final String key = getKey(userId);
log.info("Set Emitter to Redis {}({})", key, emitter);
emitterMap.put(key, emitter);
return emitter;
}
public void delete(Long userId) {
emitterMap.remove(getKey(userId));
}
public Optional<SseEmitter> get(Long userId) { //emitter๊ฐ null์ผ์๋ ์์ผ๋๊น optional ์ฒ๋ฆฌ
SseEmitter result = emitterMap.get(getKey(userId));
log.info("Get Emitter from Redis {}", result);
return Optional.ofNullable(result);
}
private String getKey(Long userId) {
return "emitter:UID:" + userId;
}
}
Map<String, SseEmitter> emitterMap = new HashMap<>()
- SSE ์ด๋ฒคํธ ์คํธ๋ฆผ์ ๊ด๋ฆฌํ๋ SseEmitter ์ธ์คํด์ค๋ฅผ ์ ์ฅ. ํค๋ ์ฌ์ฉ์ ID๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ์์ฑ๋ ๋ฌธ์์ด์ด๋ค.
save ๋ฉ์๋
- ํน์ ์ฌ์ฉ์ ID์ ๋ํ SseEmitter ์ธ์คํด์ค๋ฅผ ์ ์ฅ
- ์ฌ์ฉ์ ID๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ํค๋ฅผ ์์ฑํ๊ณ , ์ด ํค์ ํจ๊ป emitterMap์ SseEmitter ์ธ์คํด์ค๋ฅผ ์ ์ฅํ๋ค.
- ์ ์ฅ๋ SseEmitter ์ธ์คํด์ค๋ฅผ ๋ฐํํ๋ค.
delete ๋ฉ์๋
- ์ฃผ์ด์ง ์ฌ์ฉ์ ID์ ํด๋นํ๋ SseEmitter ์ธ์คํด์ค๋ฅผ emitterMap์์ ์ ๊ฑฐํ๋ค.
- ์ฌ์ฉ์ ID๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ํค๋ฅผ ์์ฑํ๊ณ , ํด๋น ํค์ ๋์ํ๋ ์ธ์คํด์ค๋ฅผ ๋งต์์ ์ ๊ฑฐํ๋ค.
get ๋ฉ์๋
- ์ฃผ์ด์ง ์ฌ์ฉ์ ID์ ํด๋นํ๋ SseEmitter ์ธ์คํด์ค๋ฅผ ์กฐํํ๋ค.
- ์ฌ์ฉ์ ID๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ํค๋ฅผ ์์ฑํ๊ณ , ํด๋น ํค๋ฅผ ์ฌ์ฉํ์ฌ emitterMap์์ SseEmitter ์ธ์คํด์ค๋ฅผ ์กฐํํ๋ค.
getKey ๋ฉ์๋
- "emitter:UID:" ์ ๋์ฌ์ ์ฌ์ฉ์ ID๋ฅผ ๋ถ์ฌ ๋ฌธ์์ด์ ์์ฑํ๋ค. ์ด๋ emitterMap์์ ์ฌ์ฉ์๋ณ SseEmitter ์ธ์คํด์ค๋ฅผ ๊ตฌ๋ถํ๋ ๋ฐ ์ฌ์ฉ๋๋ค.
AlarmRepository.java
์๋์ ์ ์ฅํ๋ ํด๋์ค
@Repository
public interface AlarmRepository extends JpaRepository<EntityAlarm, Long>, JpaSpecificationExecutor<EntityAlarm> {
Page<EntityAlarm> findAllByUserUserId(Long userId, Pageable pageable);
}
AlarmArgs.java, AlarmEvent.java, AlarmNoti.java, AlarmType.java
์๋ ๋ด์ฉ, ์๋ ์ ๋ณด ํด๋์ค
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AlarmArgs {
private Long fromUserId; //์๋ ๋ฐ์์๋ฒํธ
private Long targetId; //์๋์ด ๋ฐ์ํ ๊ฐ์ฒด๋ฒํธ
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AlarmEvent {
private AlarmType type;//์๋ ํ์
private AlarmArgs args; // ์๋ ์ ๋ณด
private Long receiverUserId; //์๋๋ฐ๋ ์ฌ๋
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AlarmNoti {
private String text;
}
@Getter
@AllArgsConstructor
public enum AlarmType {
NEW_PRODUCT("์๋ก์ด ๊ธ์ต์ํ!"),
NEW_RECOMMEND("์๋ก์ด ์ถ์ฒ์ํ!"),
NEW_COMMENT("์๋ก์ด ๋๊ธ!");
private final String alarmText;
}
AlarmProducer.java
Kafka๋ก ์๋ ์ด๋ฒคํธ๋ฅผ ์ ์กํ๋ ํด๋์ค
@Slf4j
@Component
@RequiredArgsConstructor
public class AlarmProducer {
private final KafkaTemplate<String, AlarmEvent> alarmEventKafkaTemplate;
public void send(AlarmEvent event) {
alarmEventKafkaTemplate.send("alarm", String.valueOf(event.getReceiverUserId()), event);
log.info("send fin");
}
}
alarmEventKafkaTemplate: KafkaTemplate<String, AlarmEvent>
- Kafka๋ก ๋ฉ์์ง๋ฅผ ์ ์กํ๋ ๋ฐ ์ฌ์ฉ
- String์ ๋ฉ์์ง์ ํค ํ์ , AlarmEvent๋ ๋ฉ์์ง์ ๊ฐ ํ์ ์ด๋ค.
send ๋ฉ์๋
- ์๋ ์ด๋ฒคํธ๋ฅผ Kafka๋ก ์ ์กํ๋ ๋ฉ์๋
- alarmEventKafkaTemplate.send() ๋ฉ์๋๋ฅผ ํธ์ถํ์ฌ ์ค์ ๋ก ๋ฉ์์ง๋ฅผ Kafka์ "alarm" ํ ํฝ์ผ๋ก ์ ์กํ๋ค. ์ด๋ ๋ฉ์์ง์ ํค๋ก๋ ์๋ ์์ ์๋ฅผ ๋ช ์ํ๋ค. ํค๋ ๋ฉ์์ง๋ฅผ ๊ตฌ๋ถํ๊ณ ํํฐ์ ๋ํ ๋ ์ฌ์ฉ๋๋ค. ๋ฉ์์ง์ ๊ฐ์ผ๋ก๋ event ๊ฐ์ฒด๊ฐ ์ ๋ฌ๋๋ค.
AlarmConsumer.java
Kafka๋ก๋ถํฐ ์๋ ๊ด๋ จ ๋ฉ์์ง๋ฅผ ์์ ํ๊ณ ์ฒ๋ฆฌํ๋ ํด๋์ค
@Slf4j
@Component
@RequiredArgsConstructor
public class AlarmConsumer {
private final AlarmService alarmService;
private final ObjectMapper objectMapper; // JSON ๋ณํ์ ์ํ ObjectMapper
@KafkaListener(topics = "alarm")
public void consumeNotification(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
AlarmEvent event = objectMapper.readValue(record.value(), AlarmEvent.class); // ๋ฉ์์ง๋ฅผ AlarmEvent ๊ฐ์ฒด๋ก ๋ณํ
log.info("Consume the event {}", event);
alarmService.send(event.getType(), event.getArgs(), event.getReceiverUserId());
ack.acknowledge(); // ๋ฉ์์ง ์ฒ๋ฆฌ ํ ์๋์ผ๋ก ack
} catch (IOException e) {
log.error("Error deserializing message", e);
}
}
}
@KafkaListener(topics = "alarm")
- Kafka์ "alarm" ํ ํฝ์ผ๋ก๋ถํฐ ๋ฉ์์ง๋ฅผ ์์ ํ๋๋ก ์ง์ .
ConsumerRecord<String, String>
- ๋ฉ์๋ ๋ด์์ objectMapper๋ฅผ ์ฌ์ฉํ์ฌ ๋ฉ์์ง์ ๊ฐ์ AlarmEvent ๊ฐ์ฒด๋ก ๋ณํ.
alarmService.send()
- ๋ณํ๋ AlarmEvent ๊ฐ์ฒด๋ฅผ ์ฌ์ฉํ์ฌ alarmService์ send ๋ฉ์๋๋ฅผ ํตํด ์๋์ ๋ฐ์ก.
ack.acknowledge()
- Kafka์์ ๋ฉ์์ง ์๋น์๊ฐ ํน์ ๋ฉ์์ง๋ฅผ ์ฑ๊ณต์ ์ผ๋ก ์ฒ๋ฆฌํ์์ ์๋ฆผ.
์ด์ ์ค์ตํด๋ณด์!
ํ์๋ kafka๋ฅผ ๋์ปค์ ๋์์ ๋์ปค๋ฅผ ์คํ์์ผ์ฃผ๊ฒ ๋ค.
"alarm/subscribe" API ํธ์ถํ์ฌ ์ฌ์ฉ์ 1๋ฒ์ ๋ํ SSE(Server-Sent Events) ์ฐ๊ฒฐ์ ์์ฑํ๋ค.
๊ทธ๋ฆฌ๊ณ kafka์ ์๋์ด ์ค๋์ง ํ์ธํ๊ธฐ์ํด kafka ์ปจํ ์ด๋์ ์ ์ํ์
// ๋์ปค ์ปจํ
์ด๋ ์ ์
docker exec -it [ContainerID] /bin/bash
// ์ปจ์๋จธ ๊ฒฝ๋ก๋ก ์ด๋
cd /opt/kafka_2.13-2.8.1/bin/
./kafka-console-consumer.sh --topic [ํ ํฝ์ด๋ฆ] --bootstrap-server kafka:9092
์ฌ์ฉ์ 1๋ฒ์ ๊ฒ์๊ธ์ ๋ง๋ค์ด์ค๋ค.
๊ทธ๋ฆฌ๊ณ ์ฌ์ฉ์ 2๋ฒ์ ์ฌ์ฉ์ 1๋ฒ์ ๊ฒ์๊ธ์ ๋๊ธ์ ๋จ๋ค.
๊ทธ๋ฆฌ๊ณ ์ฌ์ฉ์ 1๋ฒ์ ์๋ฆผ์ ํ์ธํด๋ณด์.
์๋ฆผ๋ฆฌ์คํธ์๋ ์๊ธฐ๊ณ kafka ์ปจํ ์ด๋์๋ ์๋์ด ์ ์์ ์ผ๋ก ์๊ธด๊ฒ์ ํ์ธํ ์์๋ค!!
์ฑ๊ณต!!๐ฅณ
'๐ฃ๐ฟ๐ผ๐ด๐ฟ๐ฎ๐บ๐บ๐ถ๐ป๐ด๐ป > ๐๐ฉ๐ซ๐ข๐ง๐ ' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Spring] ์ฑ๋ฅ ์ต์ ํ 3ํธ - ์ธ๋ฑ์ค ํ์ฉํ๊ธฐ (2) | 2024.03.15 |
---|---|
[Spring] ์ฑ๋ฅ ์ต์ ํ 2ํธ - ์ค์นด์ฐํธ ๋ชจ๋ํฐ๋ง TOOL ์ค์น (1) | 2024.03.14 |
[Spring] ์ฑ๋ฅ ์ต์ ํ 1ํธ - ๋ถํํ ์คํธ ngrinder ์ค์นํ๊ธฐ (0) | 2024.03.09 |
[Spring] DTO ๊ตฌ์กฐ๋ฅผ ์ด๋ป๊ฒ ๊ตฌ์ฑํด์ผํ ๊น (0) | 2024.03.08 |
[JPA] @Enumerated, @Converter (0) | 2023.08.26 |