๊ด€๋ฆฌ ๋ฉ”๋‰ด

๐‘†๐‘ข๐‘›๐‘ โ„Ž๐‘–๐‘›๐‘’ ๐‘Ž๐‘“๐‘ก๐‘’๐‘Ÿ ๐‘Ÿ๐‘Ž๐‘–๐‘›โœง

[Spring] SSE & Kafka ๋ฅผ ํ™œ์šฉํ•ด์„œ ์•Œ๋ฆผ ๊ธฐ๋Šฅ ๊ตฌํ˜„ํ•˜๊ธฐ ๋ณธ๋ฌธ

๐—ฃ๐—ฟ๐—ผ๐—ด๐—ฟ๐—ฎ๐—บ๐—บ๐—ถ๐—ป๐—ด๐Ÿ’ป/๐’๐ฉ๐ซ๐ข๐ง๐ 

[Spring] SSE & Kafka ๋ฅผ ํ™œ์šฉํ•ด์„œ ์•Œ๋ฆผ ๊ธฐ๋Šฅ ๊ตฌํ˜„ํ•˜๊ธฐ

๐ŸคRyusun๐Ÿค 2024. 3. 22. 18:12

ํ•„์ž๋Š” ์ฒ˜์Œ์—๋Š” ์•Œ๋ฆผ ํ…Œ์ด๋ธ”์„ ๊ตฌํ˜„ํ•œํ›„ ๋Œ“๊ธ€์ด ๋‹ฌ๋ฆฌ๋ฉด ์•Œ๋ฆผํ…Œ์ด๋ธ”์— ๋ฐ์ดํ„ฐ๋กœ ์ €์žฅํ•˜์˜€๊ณ , API๊ฐ€ ํ˜ธ์ถœ๋˜๋ฉด ํ•ด๋‹น ์‚ฌ์šฉ์ž์˜ ์•Œ๋ฆผ ๋ฐ์ดํ„ฐ๋ฅผ DB์—์„œ ๊ฐ€์ ธ์˜ค๋Š” ๋ฐฉ์‹์œผ๋กœ ๊ตฌํ˜„ํ•˜์˜€๋‹ค. 

ํ•˜์ง€๋งŒ ์ด๋Ÿฐ polling ๋ฐฉ์‹์€ ์ •ํ•ด์ง„ ์ฃผ๊ธฐ๋งˆ๋‹ค ๋ฌด์กฐ๊ฑด ์„œ๋ฒ„์— ์š”์ฒญ์„ ๋ณด๋‚ด๊ธฐ ๋•Œ๋ฌธ์— ๋ฐ์ดํ„ฐ๊ฐ€ ์—†์„ ๋•Œ๋„ ํŠธ๋ž˜ํ”ฝ์„ ๋ฐœ์ƒ์‹œํ‚ค๋ฉฐ, ์‹ค์‹œ๊ฐ„์œผ๋กœ ๋ฐ์ดํ„ฐ๊ฐ€ ์—…๋ฐ์ดํŠธ ๋˜์ง€์•Š๋Š”๋‹ค๋Š” ๋‹จ์ ์ด ์žˆ๋‹ค. 

 

๊ทธ๋ž˜์„œ ํ•„์ž๋Š” Kafka๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์•Œ๋ฆผ ์ด๋ฒคํŠธ๋ฅผ ๋น„๋™๊ธฐ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๊ณ , Server-Sent Events(SSE)๋ฅผ ํ†ตํ•ด ํด๋ผ์ด์–ธํŠธ์— ์‹ค์‹œ๊ฐ„์œผ๋กœ ์•Œ๋ฆผ์„ ์ „๋‹ฌํ•˜๋Š” ์‹œ์Šคํ…œ์„ ๊ตฌํ˜„ํ•ด๋ณผ๊ฒƒ์ด๋‹ค. ๊ฒŒ์‹œํŒ์˜ ๊ธ€์— ๋Œ“๊ธ€์ด ๋‹ฌ๋ฆฌ๋ฉด ์•Œ๋žŒ์ด ์ƒ๊ธฐ๊ณ , ์„œ๋ฒ„๋Š” Kafka๋ฅผ ํ†ตํ•ด ๋น„๋™๊ธฐ์ ์œผ๋กœ ์•Œ๋žŒ ์ด๋ฒคํŠธ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋ฉฐ, ์ฒ˜๋ฆฌ๋œ ์•Œ๋žŒ์€ SSE๋ฅผ ํ†ตํ•ด ์‹ค์‹œ๊ฐ„์œผ๋กœ ํด๋ผ์ด์–ธํŠธ์— ์ „์†กํ•˜๋Š” ๋กœ์ง์œผ๋กœ ๊ตฌํ˜„ํ• ๊ฒƒ์ด๋‹ค.

 


 

 

์ „์ฒด์ ์ธ ํ๋ฆ„

์‚ฌ์šฉ์ž์˜ ํ–‰๋™์ด๋‚˜ ํŠน์ • ์ด๋ฒคํŠธ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด ์•Œ๋ฆผ ์ด๋ฒคํŠธ๊ฐ€ ์ƒ์„ฑ๋˜๊ณ  Kafka๋ฅผ ํ†ตํ•ด ์ฒ˜๋ฆฌ๋œ๋‹ค. ์ด๋ฒคํŠธ๋Š” AlarmConsumer์— ์˜ํ•ด ์ˆ˜์‹ ๋˜๊ณ , AlarmService์—์„œ ์ฒ˜๋ฆฌ๋˜์–ด ์‹ค์ œ ์•Œ๋ฆผ์œผ๋กœ ๋ณ€ํ™˜๋œ ํ›„, ํ•ด๋‹น ์‚ฌ์šฉ์ž์—๊ฒŒ SSE๋ฅผ ํ†ตํ•ด ์‹ค์‹œ๊ฐ„์œผ๋กœ ์ „์†กํ•œ๋‹ค.

 

์ฃผ์š” ํด๋ž˜์Šค์˜ ์—ญํ• 

  • AlarmProducer: ์‚ฌ์šฉ์ž๊ฐ€ ๋Œ“๊ธ€์„ ๋‹ฌ๋ฉด ํ˜ธ์ถœ๋˜์–ด, Kafka์˜ "alarm" ํ† ํ”ฝ์œผ๋กœ ์•Œ๋ฆผ ์ด๋ฒคํŠธ๋ฅผ ์ „์†กํ•œ๋‹ค.
  • AlarmConsumer: Kafka์—์„œ "alarm" ํ† ํ”ฝ์˜ ์•Œ๋ฆผ ์ด๋ฒคํŠธ๋ฅผ ์ˆ˜์‹ ํ•˜๊ณ , ํ•ด๋‹น ์ด๋ฒคํŠธ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด AlarmService์˜ send ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•œ๋‹ค.
  • AlarmService: ์•Œ๋ฆผ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์ €์žฅํ•˜๊ณ , ํ•ด๋‹น ์‚ฌ์šฉ์ž์˜ SseEmitter๊ฐ€ ์žˆ์„ ๊ฒฝ์šฐ ์‹ค์‹œ๊ฐ„์œผ๋กœ ์•Œ๋ฆผ์„ ์ „์†กํ•œ๋‹ค. ๋˜ํ•œ, ์‚ฌ์šฉ์ž๊ฐ€ ์•Œ๋ฆผ ๊ตฌ๋…์„ ์š”์ฒญํ•  ๋•Œ (connectNotification ๋ฉ”์„œ๋“œ) ์ƒˆ SseEmitter๋ฅผ ์ƒ์„ฑํ•˜๊ณ  ์ €์žฅํ•œ๋‹ค.
  • EmitterRepository: ์‚ฌ์šฉ์ž๋ณ„ SseEmitter ๊ฐ์ฒด๋ฅผ ๋ฉ”๋ชจ๋ฆฌ ๋‚ด์—์„œ ๊ด€๋ฆฌํ•˜๋ฉฐ, ์•Œ๋ฆผ ์ „์†ก ์‹œ ์‚ฌ์šฉ์ž์˜ SseEmitter๋ฅผ ์กฐํšŒํ•˜์—ฌ ์•Œ๋ฆผ์„ ์ „์†กํ•œ๋‹ค.
  • KafkaProducerConfig: Kafka ํ”„๋กœ๋“€์„œ์™€ ์ปจ์Šˆ๋จธ์˜ ์„ค์ •์„ ์ •์˜ํ•œ๋‹ค.

 


 

Controller.java

    @GetMapping( "alarm/subscribe")
    public SseEmitter subscribe(@RequestParam Long userId) {
        log.info("subscribe");
        return alarmService.connectNotification(userId);
    }

    @GetMapping("alarm")
    public ResponseEntity<Page<AlarmReponse>> alarm(@RequestParam Long userId, Pageable pageable) {
        return ResponseEntity.ok().body(alarmService.getAlarmList(userId, pageable));
    }

 

@GetMapping( "alarm/subscribe")

  • ์‚ฌ์šฉ์ž ID์— ๋Œ€ํ•œ SSE(Server-Sent Events) ์—ฐ๊ฒฐ์„ ์ƒ์„ฑํ•˜๊ณ , ์ด ์—ฐ๊ฒฐ์„ ํ†ตํ•ด ์‹ค์‹œ๊ฐ„์œผ๋กœ ์„œ๋ฒ„์™€ ํด๋ผ์ด์–ธํŠธ ๊ฐ„์— ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์ด ์„ค์ •๋œ๋‹ค.

@GetMapping("alarm")

  • ํ•ด๋‹น ์‚ฌ์šฉ์ž์˜ ์•Œ๋ฆผ ๋ชฉ๋ก์„ ํŽ˜์ด์ง€๋„ค์ด์…˜๋œ ํ˜•ํƒœ๋กœ ๊ฐ€์ ธ์˜จ๋‹ค.

 

KafkaProducerConfig.java

  • Kafka ํ”„๋กœ๋“€์„œ ์„ค์ •์„ ์œ„ํ•œ ํด๋ž˜์Šค
  • Kafka์™€ ํ†ต์‹ ํ•˜๊ธฐ ์œ„ํ•œ ์—ฌ๋Ÿฌ Bean๋“ค์„ ์ •์˜
@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, AlarmEvent> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, AlarmEvent> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}

 

producerFactory() ๋ฉ”์†Œ๋“œ

  • ProducerFactory<String, AlarmEvent> : Kafka๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ผ ๋•Œ ์‚ฌ์šฉ๋œ๋‹ค.
  • ProducerConfig๋ฅผ ์„ค์ •ํ•˜๋Š”๋ฐ, ์—ฌ๊ธฐ์—๋Š” Kafka ์„œ๋ฒ„ ์ฃผ์†Œ(BOOTSTRAP_SERVERS_CONFIG), ๋ฉ”์‹œ์ง€ ํ‚ค์˜ ์ง๋ ฌํ™” ๋ฐฉ์‹(KEY_SERIALIZER_CLASS_CONFIG), ๋ฉ”์‹œ์ง€ ๊ฐ’์˜ ์ง๋ ฌํ™” ๋ฐฉ์‹(VALUE_SERIALIZER_CLASS_CONFIG) ๋“ฑ์ด ํฌํ•จ๋œ๋‹ค.
  • ์—ฌ๊ธฐ์„œ AlarmEvent๋Š” ๋ฉ”์‹œ์ง€์˜ ๋ณธ๋ฌธ์„ ๋‚˜ํƒ€๋‚ด๋Š” ํด๋ž˜์Šค์ด๋ฉฐ, JsonSerializer๋Š” ์ด AlarmEvent ์ธ์Šคํ„ด์Šค๋ฅผ JSON ํ˜•์‹์œผ๋กœ ์ง๋ ฌํ™”ํ•œ๋‹ค.

kafkaTemplate() ๋ฉ”์†Œ๋“œ

  • KafkaTemplate<String, AlarmEvent> ํƒ€์ž…์˜ Bean์„ ๋ฐ˜ํ™˜ํ•˜๋ฉฐ, ์ด๋Š” Kafka๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๊ธฐ ์œ„ํ•œ ํ…œํ”Œ๋ฆฟ์œผ๋กœ ์‚ฌ์šฉํ•œ๋‹ค.
  • ์ด ํ…œํ”Œ๋ฆฟ์€ producerFactory()์—์„œ ์ƒ์„ฑํ•œ ProducerFactory๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

kafkaListenerContainerFactory() ๋ฉ”์†Œ๋“œ

  • Kafka ๋ฉ”์‹œ์ง€ ๋ฆฌ์Šค๋„ˆ ์ปจํ…Œ์ด๋„ˆ๋ฅผ ์„ค์ •ํ•˜๊ธฐ ์œ„ํ•œ ๋ฉ”์†Œ๋“œ
  • ConcurrentKafkaListenerContainerFactory<?, ?> ๋Š” Kafka ๋ฉ”์‹œ์ง€ ๋ฆฌ์Šค๋„ˆ๋“ค์„ ์œ„ํ•œ ์ปจํ…Œ์ด๋„ˆ๋ฅผ ์ƒ์„ฑํ•˜๋Š”๋ฐ ์‚ฌ์šฉ๋œ๋‹ค.
  • ConcurrentKafkaListenerContainerFactoryConfigurer์™€ ConsumerFactory<Object, Object>๋ฅผ ์ด์šฉํ•ด ๋ฆฌ์Šค๋„ˆ ์ปจํ…Œ์ด๋„ˆ ํŒฉํ† ๋ฆฌ๋ฅผ ๊ตฌ์„ฑํ•œ๋‹ค.
  • setAckMode(ContainerProperties.AckMode.MANUAL)๋Š” ๋ฉ”์‹œ์ง€๊ฐ€ ์„ฑ๊ณต์ ์œผ๋กœ ์ฒ˜๋ฆฌ๋˜์—ˆ์Œ์„ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์—์„œ Kafka์— ์•Œ๋ฆฌ๋Š” ์—ญํ• ์„ ํ•œ๋‹ค.

 

AlarmService.java

  • ์‚ฌ์šฉ์ž์—๊ฒŒ ์•Œ๋žŒ์„ ์ „์†กํ•˜๊ณ , ์•Œ๋žŒ ์—ฐ๊ฒฐ์„ ๊ด€๋ฆฌํ•˜๋ฉฐ, ์•Œ๋žŒ ๋ชฉ๋ก์„ ์กฐํšŒํ•˜๋Š” ํด๋ž˜์Šค
@Slf4j
@RequiredArgsConstructor
@Service
public class AlarmService {
    private final AlarmRepository alarmRepository;
    private final EmitterRepository emitterRepository;
    private final UserRepository userRepository;
    private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
    
    private final static String ALARM_NAME = "alarm";

    @Transactional
    public void send(AlarmType type, AlarmArgs args, Long receiverId) {
        EntityUser user = userRepository.findByUserId(receiverId).orElseThrow(
                () -> new AppException(ErrorCode.USER_NOT_FOUND, "ํšŒ์›์ด ์กด์žฌํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค."));
        HashMap<Long, Long> newArgs = new HashMap<>();
        newArgs.put(args.getFromUserId(), args.getTargetId());
        EntityAlarm alarm = EntityAlarm.of(type, newArgs, user);

        try {
            alarmRepository.save(alarm);
        } catch (Exception e){
            throw new AppException(ErrorCode.INTERNAL_SERVER_ERROR, "์ผ์‹œ์  ์˜ค๋ฅ˜์ž…๋‹ˆ๋‹ค");
        }

        emitterRepository.get(receiverId).ifPresentOrElse(it -> { //emitterRepo์—์„œ ์ธ์Šคํ„ด์Šค๋ฅผ ๊ฐ€์ง€๊ณ  ์˜จ๋‹ค.
                    try {
                        it.send(SseEmitter.event() // ์ƒˆ๋กœ์šด ์•Œ๋žŒ ์ „์†ก
                                .id(alarm.getAlarmId().toString())
                                .name(ALARM_NAME)
                                .data(new AlarmNoti()));

                    } catch (IOException exception) {
                        emitterRepository.delete(receiverId);
                        throw new AppException(ErrorCode.NOTIFICATION_CONNECT_ERROR, "์•Œ๋ฆผ ์ „์†ก์— ์‹คํŒจํ–ˆ์Šต๋‹ˆ๋‹ค.");
                    }
                },
                () -> log.info("No emitter founded")
        );
    }

    @Transactional
    public SseEmitter connectNotification(Long userId) {
        log.info("{userId = }", userId);
        SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
        emitterRepository.save(userId, emitter);
        emitter.onCompletion(() -> emitterRepository.delete(userId));
        emitter.onTimeout(() -> emitterRepository.delete(userId));

        try {
            log.info("send");
            emitter.send(SseEmitter.event()
                    .id("id")
                    .name(ALARM_NAME)
                    .data("connect completed"));
        } catch (IOException exception) {
            throw new AppException(ErrorCode.NOTIFICATION_CONNECT_ERROR, "ํ†ต์‹ ์— ์‹คํŒจํ–ˆ์Šต๋‹ˆ๋‹ค.");
        }
        return emitter;
    }

    @Transactional(readOnly = true)
    public Page<AlarmReponse> getAlarmList(Long userId, Pageable pageable) {
        return alarmRepository.findAllByUserUserId(userId, pageable).map(AlarmReponse::fromEntity);
    }

 

send ๋ฉ”์†Œ๋“œ

  • ์•Œ๋žŒ์„ ์ „์†ก
  • alarmRepository ์•Œ๋žŒ ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•œ๋‹ค.
  • emitterRepository์—์„œ SSE (Server-Sent Events) ์ด๋ฒคํŠธ ์ŠคํŠธ๋ฆผ์„ ๊ด€๋ฆฌํ•˜๊ณ , ์ƒˆ๋กœ์šด ์•Œ๋žŒ ์ด๋ฒคํŠธ๋ฅผ ์ŠคํŠธ๋ฆผ์— ๋ณด๋‚ธ๋‹ค.

connectNotification ๋ฉ”์†Œ๋“œ

  • ํด๋ผ์ด์–ธํŠธ๊ฐ€ ์•Œ๋ฆผ ์ˆ˜์‹ ์„ ์œ„ํ•ด ์—ฐ๊ฒฐ
  • SseEmitter ์ธ์Šคํ„ด์Šค๋ฅผ ์ƒ์„ฑํ•˜๊ณ , ์ด๋ฅผ emitterRepository์— ์ €์žฅํ•œ๋‹ค.  ์—ฐ๊ฒฐ ์™„๋ฃŒ๋˜๋ฉด, ์—ฐ๊ฒฐ ์™„๋ฃŒ ์ด๋ฒคํŠธ๋ฅผ ํด๋ผ์ด์–ธํŠธ์— ๋ณด๋‚ธ๋‹ค.
  • ์—ฐ๊ฒฐ ์™„๋ฃŒ๋‚˜ ํƒ€์ž„์•„์›ƒ ์‹œ emitterRepository์—์„œ ํ•ด๋‹น SseEmitter ์ธ์Šคํ„ด์Šค๋ฅผ ์ œ๊ฑฐํ•œ๋‹ค.

getAlarmList ๋ฉ”์†Œ๋“œ

  • ์‚ฌ์šฉ์ž ID๋กœ ์•Œ๋žŒ ๋ชฉ๋ก์„ ํŽ˜์ด์ง€ ๋‹จ์œ„๋กœ ์กฐํšŒ

 

EmitterRepository.java

์„œ๋ฒ„์—์„œ ํด๋ผ์ด์–ธํŠธ๋กœ ์„œ๋ฒ„ ์ „์†ก ์ด๋ฒคํŠธ(Server-Sent Events, SSE)๋ฅผ ์ „์†กํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ๋˜๋Š” SseEmitter ์ธ์Šคํ„ด์Šค๋ฅผ ๋กœ์ปฌ ์บ์‹œ๋กœ ๊ด€๋ฆฌํ•˜๋Š” ํด๋ž˜์Šค

@Slf4j
@Repository
@RequiredArgsConstructor
public class EmitterRepository { // ๋กœ์ปฌ์บ์‹œ๋กœ emitter ์ €์žฅ

    private Map<String, SseEmitter> emitterMap = new HashMap<>();

    public SseEmitter save(Long userId, SseEmitter emitter) {
        final String key = getKey(userId);
        log.info("Set Emitter to Redis {}({})", key, emitter);
        emitterMap.put(key, emitter);
        return emitter;
    }

    public void delete(Long userId) {
        emitterMap.remove(getKey(userId));
    }

    public Optional<SseEmitter> get(Long userId) { //emitter๊ฐ€ null์ผ์ˆ˜๋„ ์ž‡์œผ๋‹ˆ๊น optional ์ฒ˜๋ฆฌ
        SseEmitter result = emitterMap.get(getKey(userId));
        log.info("Get Emitter from Redis {}", result);
        return Optional.ofNullable(result);
    }

    private String getKey(Long userId) {
        return "emitter:UID:" + userId;
    }
}

 

Map<String, SseEmitter> emitterMap = new HashMap<>()

  • SSE ์ด๋ฒคํŠธ ์ŠคํŠธ๋ฆผ์„ ๊ด€๋ฆฌํ•˜๋Š” SseEmitter ์ธ์Šคํ„ด์Šค๋ฅผ ์ €์žฅ. ํ‚ค๋Š” ์‚ฌ์šฉ์ž ID๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ์ƒ์„ฑ๋œ ๋ฌธ์ž์—ด์ด๋‹ค.

save ๋ฉ”์†Œ๋“œ

  • ํŠน์ • ์‚ฌ์šฉ์ž ID์— ๋Œ€ํ•œ SseEmitter ์ธ์Šคํ„ด์Šค๋ฅผ ์ €์žฅ
  • ์‚ฌ์šฉ์ž ID๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ํ‚ค๋ฅผ ์ƒ์„ฑํ•˜๊ณ , ์ด ํ‚ค์™€ ํ•จ๊ป˜ emitterMap์— SseEmitter ์ธ์Šคํ„ด์Šค๋ฅผ ์ €์žฅํ•œ๋‹ค.
  • ์ €์žฅ๋œ SseEmitter ์ธ์Šคํ„ด์Šค๋ฅผ ๋ฐ˜ํ™˜ํ™˜๋‹ค.

delete ๋ฉ”์†Œ๋“œ

  • ์ฃผ์–ด์ง„ ์‚ฌ์šฉ์ž ID์— ํ•ด๋‹นํ•˜๋Š” SseEmitter ์ธ์Šคํ„ด์Šค๋ฅผ emitterMap์—์„œ ์ œ๊ฑฐํ•œ๋‹ค.
  • ์‚ฌ์šฉ์ž ID๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ํ‚ค๋ฅผ ์ƒ์„ฑํ•˜๊ณ , ํ•ด๋‹น ํ‚ค์— ๋Œ€์‘ํ•˜๋Š” ์ธ์Šคํ„ด์Šค๋ฅผ ๋งต์—์„œ ์ œ๊ฑฐํ•œ๋‹ค.

get ๋ฉ”์†Œ๋“œ

  • ์ฃผ์–ด์ง„ ์‚ฌ์šฉ์ž ID์— ํ•ด๋‹นํ•˜๋Š” SseEmitter ์ธ์Šคํ„ด์Šค๋ฅผ ์กฐํšŒํ•œ๋‹ค.
  • ์‚ฌ์šฉ์ž ID๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ํ‚ค๋ฅผ ์ƒ์„ฑํ•˜๊ณ , ํ•ด๋‹น ํ‚ค๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ emitterMap์—์„œ SseEmitter ์ธ์Šคํ„ด์Šค๋ฅผ ์กฐํšŒํ•œ๋‹ค.

getKey ๋ฉ”์†Œ๋“œ

  • "emitter:UID:" ์ ‘๋‘์‚ฌ์— ์‚ฌ์šฉ์ž ID๋ฅผ ๋ถ™์—ฌ ๋ฌธ์ž์—ด์„ ์ƒ์„ฑํ•œ๋‹ค. ์ด๋Š” emitterMap์—์„œ ์‚ฌ์šฉ์ž๋ณ„ SseEmitter ์ธ์Šคํ„ด์Šค๋ฅผ ๊ตฌ๋ถ„ํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ๋œ๋‹ค.

 

AlarmRepository.java

์•Œ๋žŒ์„ ์ €์žฅํ•˜๋Š” ํด๋ž˜์Šค

@Repository
public interface AlarmRepository  extends JpaRepository<EntityAlarm, Long>, JpaSpecificationExecutor<EntityAlarm> {

    Page<EntityAlarm> findAllByUserUserId(Long userId, Pageable pageable);
}

 

 

AlarmArgs.java, AlarmEvent.java, AlarmNoti.java, AlarmType.java
์•Œ๋žŒ ๋‚ด์šฉ, ์•Œ๋žŒ ์ •๋ณด ํด๋ž˜์Šค

@Data
@AllArgsConstructor
@NoArgsConstructor
public class AlarmArgs {
    private Long fromUserId; //์•Œ๋žŒ ๋ฐœ์ƒ์ž๋ฒˆํ˜ธ
    private Long targetId; //์•Œ๋žŒ์ด ๋ฐœ์ƒํ•œ ๊ฐ์ฒด๋ฒˆํ˜ธ
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AlarmEvent {
    private AlarmType type;//์•Œ๋žŒ ํƒ€์ž…
    private AlarmArgs args; // ์•Œ๋žŒ ์ •๋ณด
    private Long receiverUserId; //์•Œ๋žŒ๋ฐ›๋Š” ์‚ฌ๋žŒ
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AlarmNoti {
    private String text;
}
@Getter
@AllArgsConstructor
public enum AlarmType {
    NEW_PRODUCT("์ƒˆ๋กœ์šด ๊ธˆ์œต์ƒํ’ˆ!"),
    NEW_RECOMMEND("์ƒˆ๋กœ์šด ์ถ”์ฒœ์ƒํ’ˆ!"),
    NEW_COMMENT("์ƒˆ๋กœ์šด ๋Œ“๊ธ€!");

    private final String alarmText;
}

 

 

AlarmProducer.java

Kafka๋กœ ์•Œ๋žŒ ์ด๋ฒคํŠธ๋ฅผ ์ „์†กํ•˜๋Š” ํด๋ž˜์Šค

@Slf4j
@Component
@RequiredArgsConstructor
public class AlarmProducer {

    private final KafkaTemplate<String, AlarmEvent> alarmEventKafkaTemplate;

    public void send(AlarmEvent event) {
        alarmEventKafkaTemplate.send("alarm", String.valueOf(event.getReceiverUserId()), event);
        log.info("send fin");
    }
}

 

alarmEventKafkaTemplate: KafkaTemplate<String, AlarmEvent> 

  • Kafka๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ „์†กํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ
  • String์€ ๋ฉ”์‹œ์ง€์˜ ํ‚ค ํƒ€์ž…, AlarmEvent๋Š” ๋ฉ”์‹œ์ง€์˜ ๊ฐ’ ํƒ€์ž…์ด๋‹ค.

send ๋ฉ”์†Œ๋“œ

  • ์•Œ๋žŒ ์ด๋ฒคํŠธ๋ฅผ Kafka๋กœ ์ „์†กํ•˜๋Š” ๋ฉ”์†Œ๋“œ
  • alarmEventKafkaTemplate.send() ๋ฉ”์†Œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ์‹ค์ œ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ Kafka์˜ "alarm" ํ† ํ”ฝ์œผ๋กœ ์ „์†กํ•œ๋‹ค. ์ด๋•Œ ๋ฉ”์‹œ์ง€์˜ ํ‚ค๋กœ๋Š” ์•Œ๋žŒ ์ˆ˜์‹ ์ž๋ฅผ ๋ช…์‹œํ•œ๋‹ค. ํ‚ค๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ๊ตฌ๋ถ„ํ•˜๊ณ  ํŒŒํ‹ฐ์…”๋‹ํ•  ๋•Œ ์‚ฌ์šฉ๋œ๋‹ค. ๋ฉ”์‹œ์ง€์˜ ๊ฐ’์œผ๋กœ๋Š” event ๊ฐ์ฒด๊ฐ€ ์ „๋‹ฌ๋œ๋‹ค.

 

AlarmConsumer.java

Kafka๋กœ๋ถ€ํ„ฐ ์•Œ๋žŒ ๊ด€๋ จ ๋ฉ”์‹œ์ง€๋ฅผ ์ˆ˜์‹ ํ•˜๊ณ  ์ฒ˜๋ฆฌํ•˜๋Š” ํด๋ž˜์Šค

@Slf4j
@Component
@RequiredArgsConstructor
public class AlarmConsumer {

    private final AlarmService alarmService;
    private final ObjectMapper objectMapper; // JSON ๋ณ€ํ™˜์„ ์œ„ํ•œ ObjectMapper

    @KafkaListener(topics = "alarm")
    public void consumeNotification(ConsumerRecord<String, String> record, Acknowledgment ack) {
        try {
            AlarmEvent event = objectMapper.readValue(record.value(), AlarmEvent.class); // ๋ฉ”์‹œ์ง€๋ฅผ AlarmEvent ๊ฐ์ฒด๋กœ ๋ณ€ํ™˜
            log.info("Consume the event {}", event);
            alarmService.send(event.getType(), event.getArgs(), event.getReceiverUserId());
            ack.acknowledge(); // ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ํ›„ ์ˆ˜๋™์œผ๋กœ ack
        } catch (IOException e) {
            log.error("Error deserializing message", e);
        }
    }
}

 

 

@KafkaListener(topics = "alarm")

  •  Kafka์˜ "alarm" ํ† ํ”ฝ์œผ๋กœ๋ถ€ํ„ฐ ๋ฉ”์‹œ์ง€๋ฅผ ์ˆ˜์‹ ํ•˜๋„๋ก ์ง€์ •.

ConsumerRecord<String, String>

  • ๋ฉ”์†Œ๋“œ ๋‚ด์—์„œ objectMapper๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ฉ”์‹œ์ง€์˜ ๊ฐ’์„ AlarmEvent ๊ฐ์ฒด๋กœ ๋ณ€ํ™˜.

alarmService.send()

  • ๋ณ€ํ™˜๋œ AlarmEvent ๊ฐ์ฒด๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ alarmService์˜ send ๋ฉ”์†Œ๋“œ๋ฅผ ํ†ตํ•ด ์•Œ๋žŒ์„ ๋ฐœ์†ก.

ack.acknowledge()

  • Kafka์—์„œ ๋ฉ”์‹œ์ง€ ์†Œ๋น„์ž๊ฐ€ ํŠน์ • ๋ฉ”์‹œ์ง€๋ฅผ ์„ฑ๊ณต์ ์œผ๋กœ ์ฒ˜๋ฆฌํ–ˆ์Œ์„ ์•Œ๋ฆผ.

 

 

์ด์ œ ์‹ค์Šตํ•ด๋ณด์ž!

ํ•„์ž๋Š” kafka๋ฅผ ๋„์ปค์— ๋„์›Œ์„œ ๋„์ปค๋ฅผ ์‹คํ–‰์‹œ์ผœ์ฃผ๊ฒ ๋‹ค.

 

 

"alarm/subscribe" API ํ˜ธ์ถœํ•˜์—ฌ ์‚ฌ์šฉ์ž 1๋ฒˆ์— ๋Œ€ํ•œ SSE(Server-Sent Events) ์—ฐ๊ฒฐ์„ ์ƒ์„ฑํ•œ๋‹ค.

 

๊ทธ๋ฆฌ๊ณ  kafka์— ์•Œ๋žŒ์ด ์˜ค๋Š”์ง€ ํ™•์ธํ•˜๊ธฐ์œ„ํ•ด kafka ์ปจํ…Œ์ด๋„ˆ์— ์ ‘์†ํ•˜์ž

// ๋„์ปค ์ปจํ…Œ์ด๋„ˆ ์ ‘์†
docker exec -it [ContainerID] /bin/bash

// ์ปจ์Šˆ๋จธ ๊ฒฝ๋กœ๋กœ ์ด๋™
cd /opt/kafka_2.13-2.8.1/bin/
./kafka-console-consumer.sh --topic [ํ† ํ”ฝ์ด๋ฆ„] --bootstrap-server kafka:9092

 

 

์‚ฌ์šฉ์ž 1๋ฒˆ์˜ ๊ฒŒ์‹œ๊ธ€์„ ๋งŒ๋“ค์–ด์ค€๋‹ค.

 

๊ทธ๋ฆฌ๊ณ  ์‚ฌ์šฉ์ž 2๋ฒˆ์€ ์‚ฌ์šฉ์ž 1๋ฒˆ์˜ ๊ฒŒ์‹œ๊ธ€์— ๋Œ“๊ธ€์„ ๋‹จ๋‹ค.

 

๊ทธ๋ฆฌ๊ณ  ์‚ฌ์šฉ์ž 1๋ฒˆ์˜ ์•Œ๋ฆผ์„ ํ™•์ธํ•ด๋ณด์ž.

 

 

์•Œ๋ฆผ๋ฆฌ์ŠคํŠธ์—๋„ ์ƒ๊ธฐ๊ณ  kafka ์ปจํ…Œ์ด๋„ˆ์—๋„ ์•Œ๋žŒ์ด ์ •์ƒ์ ์œผ๋กœ ์ƒ๊ธด๊ฒƒ์„ ํ™•์ธํ• ์ˆ˜์žˆ๋‹ค!!

์„ฑ๊ณต!!๐Ÿฅณ