문제
서비스를 개발하다보면, 선착순 이벤트 시스템을 개발할 경우가 생길 수 있습니다. 예를들어, 커머스 도메인에서 블랙프라이데이와 같은 이벤트와 같습니다. 이러한 시스템을 개발할 때 어떤 문제들을 마주하고, 어떻게 해결해야될지에 대해 고민하여 해결해보겠습니다.
발생할 수 있는 문제점
- 정해진 수량보다 많이 발급되는 경우
- 이벤트 페이지 접속이 안되는 경우 (서버 다운?)
- 이벤트랑 전혀 상관없는 페이지도 느려짐 (서버 혹은 DB가 모두 연결되어 있는 경우)
요구사항 정의
선착순 100명에게 할인쿠폰을 제공하는 이벤트를 진행하고자 한다.
이 이벤트는 아래와 같은 조건을 만족하여야 한다.
- 선착순 100명에게만 지급되어야한다.
- 101개 이상이 지급되면 안된다.
- 순간적으로 몰리는 트래픽을 버틸 수 있어야한다.
문제해결 과정
Entity
package com.example.couponsystem.domain;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
@Entity
public class Coupon {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long userId;
public Coupon() {}
public Coupon(Long userId) {
this.userId = userId;
}
public Long getId() {
return id;
}
}
repository
package com.example.couponsystem.repository;
import com.example.couponsystem.domain.Coupon;
import org.springframework.data.jpa.repository.JpaRepository;
public interface CouponRepository extends JpaRepository<Coupon, Long> {
}
service
package com.example.couponsystem.service;
import com.example.couponsystem.domain.Coupon;
import com.example.couponsystem.repository.CouponRepository;
import org.springframework.stereotype.Service;
@Service
public class ApplyService {
private final CouponRepository couponRepository;
public ApplyService(CouponRepository couponRepository) {
this.couponRepository = couponRepository;
}
public void apply(Long userId) {
long count = couponRepository.count();
if (count > 100) {
return;
}
couponRepository.save(new Coupon(userId));
}
}
test
package com.example.couponsystem.service;
import com.example.couponsystem.repository.CouponRepository;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest
class ApplyServiceTest {
@Autowired
private ApplyService applyService;
@Autowired
private CouponRepository couponRepository;
@Test
public void 한번만응모() {
applyService.apply(1L);
long count = couponRepository.count();
assertThat(count).isEqualTo(1L);
}
@Test
public void 여러명응모_정합성_문제_발생() throws InterruptedException {
int threadCount = 1000;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for(int i = 0 ; i < threadCount ; i++) {
long userId = i;
executorService.submit(() -> {
try {
applyService.apply(userId);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
long count = couponRepository.count();
assertThat(count).isEqualTo(100);
}
}
문제
Service 단에서 count가 100개 초과되지 않도록 로직을 넣어놨지만, 실제 test를 해보면 121개가 들어가는 것을 볼 수 있다.
원인 (race condition)
service단 로직을 보면 count를 가져가고, count가 100개가 초과되지 않았다면 save작업을 하도록 구현이 되어있다. 하지만, 테스트는 단일 스레드가 아닌 멀티스레드로 구현이 되어있기 때문에 count가 89개 일 때, 32개 스레드가 로직을 실행한다면(save 되기 전) 위와같이 100개 보다 더 많은 쿠폰이 발급되는 경우가 생긴다.
해결 (Redis INCR)
문제는 쿠폰 개수를 가져올 때, 발생했다. 문제 해결 방법은 여러개가 존재한다.
- 단일 스레드
- 단일 스레드로 돌린다면, 이전 작업이 완료된 후에 count 개수를 가져온다음 작업을 진행하기 때문에 정합성 문제가 발생하지 않는다.
- 하지만, 단일 스레드로 돌린다면 시간이 너무 오래걸린다는 단점이 있다.
- Synchronized
- 서버가 1대라면 문제를 해결할 수 있지만, 서버가 여러대라면 race condition 문제가 다시 발생한다.
- MySQL + Redis Lock
- Lock 활용하여 구현하면 발급된 쿠폰 개수를 가져오는 것부터 쿠폰을 생성할 때까지 Lock을 걸어 문제를 해결할 수 있다.
- 하지만, Lock을 거는 구간이 길어진다면 시간이 너무 오래 걸린다는 단점이 있다. 결국, 단일 스레드와 차이점이 없다.
- Redis INCR
- Redis의 INCR은 key에 대한 value를 1씩 증가시키는 명령어이다.
- Redis는 싱글 스레드 기반으로 동작하여 race condition 문제를 해결할 수 있을 뿐만 아니라, incr 명령어는 성능도 굉장히 빠른 명령어이다.
- incr 명령어로 발급된 쿠폰 개수를 제어한다면, 성능과 정합성 모두 지킬 수 있다.
docker 환경에서 redis를 돌리는 명령어는 아래와 같다. (docker pull redis가 되어있다는 가정하에)
docker exec -it 컨테이너ID redis-cli
incr 명령어로 coupon_count에 대한 키를 증가시키는 것을 위에서 확인할 수 있다.
repository 추가
package com.example.couponsystem.repository;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Repository;
@Repository
public class CouponCountRepository {
private final RedisTemplate<String, String> redisTemplate;
public CouponCountRepository(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
public Long increment() {
return redisTemplate.opsForValue().increment("coupon_count");
}
}
service 로직 변경
package com.example.couponsystem.service;
import com.example.couponsystem.domain.Coupon;
import com.example.couponsystem.repository.CouponCountRepository;
import com.example.couponsystem.repository.CouponRepository;
import org.springframework.stereotype.Service;
@Service
public class ApplyService {
private final CouponRepository couponRepository;
private final CouponCountRepository couponCountRepository;
public ApplyService(CouponRepository couponRepository, CouponCountRepository couponCountRepository) {
this.couponRepository = couponRepository;
this.couponCountRepository = couponCountRepository;
}
public void apply(Long userId) {
long count = couponCountRepository.increment();
if (count > 100) {
return;
}
couponRepository.save(new Coupon(userId));
}
}
save하기 전, redis incr를 해주고 반환된 값이 100 이하인 경우만 save를 해주어 race condition을 해결할 수 있었다.
테스트를 돌리기 전 flushall을 해주고
test
@Test
public void 여러명응모_정합성_문제_해결_with_Redis() throws InterruptedException {
int threadCount = 1000;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
long userId = i; // 각 스레드에 고유한 userId 할당
executorService.submit(() -> {
try {
applyService.apply(userId);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
long count = couponRepository.count();
assertThat(count).isEqualTo(100);
executorService.shutdown();
}
테스트가 성공적으로 되는 것을 확인할 수 있다.
실제로 확인해보면, 멀티스레드를 통해 1000번까지 count가 증가하는 것을 확인할 수 있고
실제 db에 들어간 요청이 완료된 count를 확인해보면 정확히 100개가 들어가는 것을 확인할 수 있다.
문제해결!!? (서버 다운..?)
동시성 문제는 해결이 되었지만, nGrinder로 부하 테스트를 해보면 서비스가 요청을 처리하지 못하는 것을 확인할 수 있다.
부하를 분산시키기 위해 kafka를 적용했다. kafka는 특정한 토픽에 대하여 컨슈머가 병렬적으로 처리할 수 있기 때문에 효율적이기 때문이다. 이를통해, 효율성을 높여주었다.
Producer 추가 코드
package com.example.couponsystem.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.*;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Long> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Long> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
package com.example.couponsystem.producer;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class CouponCreateProducer {
private final KafkaTemplate<String, Long> kafkaTemplate;
public CouponCreateProducer(KafkaTemplate<String, Long> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void create(Long userId) {
kafkaTemplate.send("coupon_create", userId);
}
}
consumer 코드
package com.example.consumer.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.*;
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Long> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Long> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Long> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
package com.example.consumer.consumer;
import com.example.consumer.domain.Coupon;
import com.example.consumer.repository.CouponRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class CouponCreatedConsumer {
private final CouponRepository couponRepository;
private final Logger logger = LoggerFactory.getLogger(CouponCreatedConsumer.class);
public CouponCreatedConsumer(CouponRepository couponRepository) {
this.couponRepository = couponRepository;
}
@KafkaListener(topics = "coupon_create", groupId = "group_1")
public void listener(Long userId) {
couponRepository.save(new Coupon(userId));
}
}
producer는 기존의 spring boot 파일에 추가하였고, consumer의 경우 모듈을 추가하여 분리해주었다.
발급 가능한 쿠폰 개수를 1인 1개 제한
대부분의 선착순 쿠폰발급 이벤트는 1인당 1개로 쿠폰개수를 제한한다. 쿠폰 개수를 ID당 1개만 발급할 수 있도록 하자.
- DB 유니크키
- 쿠폰 테이블에 user id와 coupon type이라는 컬럼을 추가하고, 유니크키를 설정하여 1개만 생성되도록 구현
- 하지만, 보통 서비스는 한 유저가 같은 쿠폰 타입을 여러개 가질 수 있기때문에 좋은 방법은 아닌 것 같다.
- Lock
- 범위로 lock을 잡고 쿠폰 발급 여부를 확인한뒤 발급하는 방식
- 하지만, 현재 kafka를 통해 consumer에서 쿠폰을 발급하는 방식이라 lock을 걸어도 쿠폰이 2개이상 발급되는 상황이 발생할 수 있다.
- 이를 막고자 consumer를 제외하고 바로 쿠폰을 발급한다면, lock 구간이 너무 길어지기 때문에 성능 저하가 발생할 수 있다.
- 자료구조 Set
- Set은 중복을 허용하지 않으며, O(1)의 시간복잡도로 존재여부와 추가를 할 수 있는 자료구조이다.
- Redis는 Set자료구조를 지원하기 때문에, Redis를 사용할 것이다.
redis는 `sadd key value` 명령어로 set에 추가할 수 있다.
실제 명령어를 실행해보면, test라는 key에 대한 데이터가 없는 경우 value가 return되는 것을 볼 수 있고 value가 존재하는 경우 0을 return하는 것을 확인할 수 있다.
명령어를 수행할 repository 추가
package com.example.couponsystem.repository;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Repository;
@Repository
public class AppliedUserRepository {
private final RedisTemplate<String, String> redisTemplate;
public AppliedUserRepository(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
public Long add(Long userId) {
return redisTemplate
.opsForSet()
.add("applied_user", userId.toString());
}
}
service 로직 추가
public void apply(Long userId) {
Long apply = appliedUserRepository.add(userId);
if(apply != 1) {
return;
}
long count = couponCountRepository.increment();
if (count > 100) {
return;
}
couponCreateProducer.create(userId);
}
테스트 코드 추가
@Test
public void 한명당_한개의_쿠폰발급() throws InterruptedException {
int threadCount = 1000;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
applyService.apply(1L);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
long count = couponRepository.count();
assertThat(count).isEqualTo(1);
executorService.shutdown();
}
1L이라는 user_id를 가진 유저가 1000번의 시도를 한 결과 1번만 쿠폰이 발급되는 것을 확인할 수 있다.
끝?? (topic에서 consumer가 데이터를 가져간 후에 consumer에서 에러가 발생하여 발급되지 않은 경우??)
이를 막고자 FailedEvent domain, repository를 생성해주어 실패하는 경우 FailedEvent에 추가하고 이후에 배치 시스템을 돌려 쿠폰을 발급해주는 방식으로 설계했다.
추가된 도메인과 로직은 아래와 같다.
package com.example.consumer.domain;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
@Entity
public class FailedEvent {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long userId;
public FailedEvent() {
}
public FailedEvent(Long userId) {
this.userId = userId;
}
}
@KafkaListener(topics = "coupon_create", groupId = "group_1")
public void listener(Long userId) {
try {
couponRepository.save(new Coupon(userId));
} catch (Exception e) {
logger.error("failed to save coupon: "+ userId);
failedEventRepository.save(new FailedEvent(userId));
}
}
consumer에서 이벤트를 가져왔지만, 발급이 되지 않은 경우 log를 출력하고 failed event repository를 통해 userId를 추가한다.
'PROGRAM_SOLVING' 카테고리의 다른 글
[PROBLEM_SOLVING] 싱글톤 클래스정의와 구현 (0) | 2022.01.13 |
---|