crossorigin="anonymous"> $(function(){ $('.article_view').find('table').each(function (idx, el) { $(el).wrap('
') }); $('img[alt="N"]').each(function(){ $(this).replaceWith('

N

') }); });

새소식

고수만/서버

Redis를 이용한 Pub/Sub 메시징 시스템 구현 - 쉬움

  • -

 

웹소켓에 대한 설정은 여기에 자세히 나와 있으니 이 글을 참고하자.

 

 

Java에서 WebSocket 사용하기!

이번에 이야기 할 내용은 Java에서 WebSocket을 사용하는 법에 대하여 이야기 해보려고 한다.우선 WebSocket이 무엇인지 부터 간단히 이야기 해보겠다. WebSocket은 웹 애플리케이션에서 서버와 클라이

dog-foot-sleep.tistory.com

 

1. 구현하고자 하는 페이지

 

오늘 이야기 해볼 것은 Redis Pub/Sub을 사용한 간단한 예제를 만들어 볼까 한다. 어려운 내용은 아니지만 

기본적으로 Web Socket의 개념과 Pub/Sub의 개념을 모르신다면 미리 알아보고 이 글을 읽으시길 바란다.

 

우선 상상해보자. 어떠한 뉴스 리스트를 보여주는 게시판을 웹으로 만든다고 생각하자.

해당 뉴스 리스트는 다른 크롤링 서버에서 크롤링 마무리 지어야 데이터가 업데이트 된다고 가정하자. 

 

API 서버와 크롤링 서버는 같은 DB의 테이블을 보고 있다고 가정하자.

 

 

2. 기존 구조의 문제점 

 

 

도식화 한다면 위와 같을 것이다. 그러나 여기에 한 가지 문제가 있다. 

 

크롤링 서버가 크롤링을 시작해서 DB에 뉴스 데이터를 업데이트 해주려면 최소 5분 이상이 걸린다고 가정하자.

그럼 API 서버는 그걸 내내 기다릴 것인가?

 

내내 API로 기다리는 것 보다 크롤링이 끝나는 시점에 API 서버 측에서 화면을 갱신해주는 것이 더 효율적일 것이다.

 

그러나 API 서버는 크롤링 요청을 보내는건 쉬워도 크롤링 서버가 언제 끝나는지 알 수가 없다.

 

또한 API가 알았다고 하더라도 일반적인 방법으로는 웹 클라이언트에서 이벤트를 발생시키는건 어렵지 않지만 역으로 요청하지 않았는데 데이터를 수신한다는 것은 쉽지 않다.

 

그렇다고 언제 업데이트 되는지 모를 DB를 위해 매번 새로고침을 누르고 있거나 , 크롤링 서버의 응답을 5분동안 기다리는 것은 리소스 낭비이다!

 

5분간 매번 새로고침을 하고 있다면 업데이트를 하루에 4번만 해도 20분 낭비이고

한 달의 근무시간인 20일간이면 1일의 리소스가 낭비다!!

 

따라서 우리는 아래와 같이 Redis Pub/Sub 메시지를 사용해 Crawling 서버와 통신하고

Web Socket을 사용해 웹 클라이언트를 업데이트 시켜주도록 만들 것이다.

 

 

3-1. Publish 업데이트 메시지 설정 및 구현

 

업데이트 시스템 구축을 위하여 Pub/Sub 메시지 구조를 만들어야 했다.

 

뉴스 업데이트 버튼 클릭 시 Crawling server에 업데이트 메시지를 보내고 Crawling  Server의 업데이트 종료가 되면

API Server에서 메시지를 받아 웹 클라이언트에 전달을 해주는 시스템을 구축해야 했다.

 

때문에 웹과 서버의 Web Socket 통신과 API 서버와 크롤링  서버의 통신 2가지가 필요했다.

 

API 서버와 Crawling 서버는 Redis Chennel을 통한 메시징 통신을 사용하기로 했고

Web과 서버는 Stomp Webscoket 라이브러리를 사용하여 웹 소켓 통신을 구축하기로 했다.

 

제일 먼저 API Server는 업데이트 이벤트가 발생하면 Redis Chennel에 메시지를 발행하여 Crawling Server에 전달한다.

이에 대한 구성은 다음과 같다.

 

더보기
@Service
public class RedisService  {

    @Value("${crawling}")
    private String crawlingChannelName;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    public RedisService(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    public void pubMessage(String message) {
        log.info(message);
        stringRedisTemplate.convertAndSend(channelName, message);
    }

    //메시지 송신
    public void sendMessage(String message) {
        stringRedisTemplate.convertAndSend(crawlingChannelName, message);
    }
}

 

StringRedisTemplate 라이브러리를 사용하여 Redis Chennel에 메시지를 보낸다. 사용법은 보낼 채널명과 메시지를 넣으면 되는 간단한 작업이다.

 

이 후 Crawling  Server에서 뉴스 업데이트가 종료되면 성공적으로 종료되었다는 메시지를 Redis Chennel을 통해 역으로 퍼블리싱 하게 된다.

 

기존 Crawling 채널은 Crawling 업데이트 이미 사용하고 있기 때문에 API 서버가 리스닝 할 채널이 필요했고 또 다른 채널을 만들어 종료 메시지를 리스닝 할 채널을 구성했다.

 

 

3-2. Subscribe Crawling Server메시지 설정 및 구현

 

 

더보기
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory,
                                        MessageListenerAdapter listenerAdapter) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(redisConnectionFactory);
    // 구독할 채널을 프로퍼티에서 가져온 값으로 설정
    container.addMessageListener(listenerAdapter, "리스닝 채널명");
    return container;
}

  @Bean
    public MessageListenerAdapter  listenerAdapter(RedisListener.ProbeMessageListener subscriberService){
        return new MessageListenerAdapter(subscriberService, "onMessage");
    }

 

위 코드와 같이 Redis Message Broker를 구성하여 Crawling Server에서 오는 메시지를 구독할 수 있도록 설정한다.

 

먼저 Redis 리스너 컨테이너에 리스너로 Adapter를 채널과 함께 추가해줘야 한다.

그리고 리스너 구현체를 Adapter에 등록시켜 준다.

 

Redis 메시지 컨테이너 + Listener Adapter + Listener 구현체

 

이렇게 하면 해당 채널에 메시지가 들어오면

  1. 리스너 컨테이너가 메시지를 인지하고 Adapter에 전달한다.
  2. Adapter가 메시지를 받아서 연결 된 리스너 구현체에 전달해주면
  3. 리스너 구현체의 onMessage 메서드에 구현된 로직이 실행된다.

 

더보기
@Component
public static class UpdateMessageListener implements MessageListener {

    private final SimpMessagingTemplate simpMessagingTemplate;

    public UpdateMessageListener(SimpMessagingTemplate simpMessagingTemplate) {
        this.simpMessagingTemplate = simpMessagingTemplate;
    }


    @Override
    public void onMessage(Message message, byte[] pattern) {
        String body = new String(message.getBody());
        if(body.startsWith("crawling_update_end")){
            simpMessagingTemplate.convertAndSend("/topic/updated", body);
        }
    }
}

 

뉴스 업데이트가 완료 되었다는 메시지가 오면 API 서버는 WebSocket을 통하여 클라이언트에 업데이트가 완료되었다는 메시지를 전달해야 한다.

 

위 코드는 adapter에 연결 된 리스너 구현체이다.

Adapter에서 메시지를 캐치하면 onMessage 안의 로직 플로우가 시작된다.

 

onMessage 내부 코드는 업데이트 완료 채널로부터 메시지가 오면 "crawling_update_end” 의 메시지를 필터링 하고

웹소켓에 종료 메시지를 브로드캐스트 하는 동작을 취한다.

 

여기서 WebSocket을 통해 메시지를 브로드캐스트 하는 탬플릿은 simpMessagingTemplate 이다.

해당 메서드를 사용하하여 /topic/updated경로로 완료 메시지를 클라이언트에게 보낸다.

 

3-3. Publish Web Socket 메시지 설정 및 구현

 

더보기
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    //WebSocket 메시지 브로커를 구성
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config){
        //특정 목적지 경로(prefix)로 들어오는 메시지를 구독 중인 클라이언트에게 브로드캐스트합니다.
        config.enableSimpleBroker("/topic");
        //특정 목적지로 메시지를 보낼 수 있도록 합니다.
        config.setApplicationDestinationPrefixes("/app");
    }

    //클라이언트의 엔드포인트를 등록하고 SockJS를 사용하도록 설정합니다.
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry){
        //엔드포인트를 등록하고 SockJS를 사용하도록 설정합니다.
        registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS();
    }
}

 

 

위 코드는 웹 소켓 구성을 담고 있는 코드이다.

 

앞서서 API와 Crawling Server 메시징을 위한 브로커를 구성해줬으니 이제는 WebSocket용 브로커를 구성해야 한다.

 

메시지 브로커는 API의 리스닝 포인트가 되는 /app을 설정해주고

브로드캐스트 경로가 되는 /topic을 설정해준다.

 

간단하게 말하면 API 입장에서 귀는 /app이고 입은 /topic 인 것이다.

 

enableSimpleBroker에서 경로를 설정해주면 설정된 경로로부터 시작 되는 메시지는 WebSocket으로 브로드캐스트 된다.

Redis 리스너 구현체를 보면 Crawling Server로 부터 메시지가 오고 crawling_update_end가 메시지 내용이라면

/topic/updated” 로 브로드캐스트 메시지를 쏘게 되어 있다.

 

여기서 우리는 /topic으로 시작하면 WebSocket으로 브로드캐스팅 하게 만들었기 때문에

 

뉴스 업데이트 메시지가 Crawling Server에서 오게 되면 메시지를 WebSocket으로 종료 메시지를 보내게 되는 것이다.

 

또한 클라이언트 의 엔드포인트 역시 API서버 쪽에서 구성을 해줘야 한다.

registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS(); 로 /websocket 라는 엔드포인트를 구성해준다.

 

 

3-4. Subscribe Web Socket 메시지 설정 및 구현

 

 

더보기
<script>
    $(document).ready(function () {
        //websocket
        var socket = new SockJS('/websocket');
        var stompClient = Stomp.over(socket);

        stompClient.connect({}, function (frame) {
            stompClient.subscribe('/topic/updated', function (message) {
               
                alert("업데이트 되었습니다.");
                
                // 데이터 재조회 로직 추가
          
        });
     });

 

위 코드는 클라이언트의 js코드이다.

 

우선 SockJS으로 클라이언트 엔드포인트를 설정해주고 해당 엔드포인트를 Stomp 라이브러리에 등록해 메시지를 수신하는 브로커를 만든다.

 

설정 된 브로커는 수신할 채널 경로를 설정하고 메시지 수신 시 실행 될 콜백 매서드를 완성해준다.

 

따라서 stompClient는 /websocket 엔드포인트로 '/topic/updated' 경로에 오는 메시지를 리스닝 하고 있다가 메시지가 오면

콜백 메서드 안의 로직을 수행하는 것이다.

 

뉴스 업데이트가 마무리 되었다는 의미의 메시지가 클라이언트에 오면 업데이트 되었다는 메시지를 화면에 보여주고

뉴스  데이터를 재로드하게 된다.

Contents