buildscript {
ext {
springBootVersion = '2.1.0.RC1'
}
repositories {
mavenCentral ()
maven { url "https://repo.spring.io/snapshot"}
maven { url "https://repo.spring.io/milestone"}
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-managerment'
- 스프링 부트 버전은 spring init 커맨드를 실행할 때 지정한 것으로 설정
- 스프링 프로젝트 저장소를 추가한 이유는 정식버전이 아닌 버전 (2.1.0.RC1)의 라이브러리는 중앙 메이븐 저장소에 없기 때문이다.
group = 'arthur.kim.microservices.core'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-actuator'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
test {
useJUnitPlatform()
}
미리 구성해 놓은 task들의 그룹이며, 특정 빌드과정에서 필요한 기본정보를 포함하고, 필요에 따라 정보를 수정하여 목적에 맞게 사용할 수 있다.
- org.springframework.boot : 실행 가능한 jar또는 war아카이브를 패키징하고, Spring Boot 애플리케이션을 실행하고, spring-boot-dependencies에서 제공하는 종속성 관리를 사용할 수 있다.
- io.spring.dependency-management : 스프링버전에 따른 의존성을 자동으로 관리
- java : 실행가능한 jar를 빌딩하는 테스크가 자동적으로 설정
cd microservices/product-service; ./gradlew build; cd -; \
cd microservices/product-composite-service; ./gradlew build; cd -; \
cd microservices/recommendation-service; ./gradlew build; cd-; \
cd microservices/review-service; ./gradlew build; cd-;
include ':microservices:product-service'
include ':microservices:review-service'
include ':microservices:recommendation-service'
include ':microservices:product-composite-service'
# cp: copy
cp -r microservices/product-service/gradle .; \
cp -r microservices/product-service/gradlew .; \
cp -r microservices/product-service/gradlew.bat .; \
cp -r microservices/product-service/.gitignore .; \
# "{}" > find 로 찾은 파일명
# -r : recursive
# -f : force
# -v : verbose
find microservices -depth -name "gradle" -exec rm -rfv "{}" \;
find microservices -depth -name "gradlew" -exec rm -fv "{}" \;
./gradlew build
java -jar microservices/product-service/build/libs/product-service-0.0.1-SNAPSHOT.jar &
java -jar microservices/product-composite-service/build/libs/product-composite-service-0.0.1-SNAPSHOT.jar &
java -jar microservices/review-service/build/libs/review-service-0.0.1-SNAPSHOT.jar &
java -jar microservices/recommendation-service/build/libs/recommendation-service-0.0.1-SNAPSHOT.jar &
kill $(jobs -p )
@ExceptionHandler, @ModelAttribute, @InitBinder 가 적용된 메서드들을 AOP를 적용해 컨트롤러 단에 적용하기 위해 고안된 애너테이션
@RestControllerAdvice
public class GlobalControllerExceptionHandler {
private static final Logger LOG = LoggerFactory.getLogger(GlobalControllerExceptionHandler.class);
@ResponseStatus(NOT_FOUND)
@ExceptionHandler(NotFoundException.class)
public @ResponseBody HttpErrorInfo handleNotFoundExceptions(ServerHttpRequest request, Exception ex) {
return createHttpErrorInfo(NOT_FOUND, request, ex);
}
@ResponseStatus(UNPROCESSABLE_ENTITY)
@ExceptionHandler(InvalidInputException.class)
public @ResponseBody HttpErrorInfo handelInvalidInputException(ServerHttpRequest request, Exception ex) {
return createHttpErrorInfo(UNPROCESSABLE_ENTITY, request, ex);
}
private HttpErrorInfo createHttpErrorInfo(HttpStatus httpStatus, ServerHttpRequest request, Exception ex) {
final String path = request.getPath().pathWithinApplication().value();
final String message = ex.getMessage();
LOG.debug("Returning HTTP status: {} for path: {}, message: {}", httpStatus, path, message);
return new HttpErrorInfo(httpStatus, path, message);
}
}
@Controller, @RestController가 적용된 Bean내에서 발생하는 예외를 잡아서 하나의 메서드에서 처리
- Controller, RestController에만 적용가능하다. (@Service같은 빈에서는 안됨.)
- 리턴 타입은 자유롭게 해도 된다. (Controller내부에 있는 메서드들은 여러 타입의 response를 할 것이다. 해당 타입과 전혀다른 리턴 타입이어도 상관없다.)
- @ExceptionHandler를 등록한 Controller에만 적용된다. 다른 Controller에서 NullPointerException이 발생하더라도 예외를 처리할 수 없다.
- 메서드의 파라미터로 Exception을 받아왔는데 이것 또한 자유롭게 받아와도 된다.
# 존재하지 않는 productId(13)를 조회하고 404(Not Found)가 반환되는지 검증
curl http://localhost:7000/product-composite/13 -i
> Task :microservices:product-composite-service:test
ProductCompositeServiceApplicationTests > getProductId() FAILED
java.lang.AssertionError: JSON path "$.recommendations.length()" expected:<0> but was:<1>
at org.springframework.test.util.AssertionErrors.fail(AssertionErrors.java:59)
at org.springframework.test.util.AssertionErrors.assertEquals(AssertionErrors.java:122)
at org.springframework.test.util.JsonPathExpectationsHelper.assertValue(JsonPathExpectationsHelper.java:125)
at org.springframework.test.web.reactive.server.JsonPathAssertions.isEqualTo(JsonPathAssertions.java:54)
at arthur.kim.microservices.core.product.composite.ProductCompositeServiceApplicationTests.getProductId(ProductCompositeServiceApplicationTests.java:57)
4 tests completed, 1 failed
Finished generating test XML results (0.025 secs) into: /Users/yskim/Desktop/msa-learn/microservices/product-composite-service/build/test-results/test
Generating HTML test report...
Finished generating test html results (0.021 secs) into: /Users/yskim/Desktop/msa-learn/microservices/product-composite-service/build/reports/tests/test
> Task :microservices:product-composite-service:test FAILED
:microservices:product-composite-service:test (Thread[Execution worker for ':' Thread 7,5,main]) completed. Took 12.034 secs.
FAILURE: Build failed with an exception.
- Test Result Dashboard
file:///Users/yskim/Desktop/msa-learn/microservices/product-composite-service/build/reports/tests/test/index.html
- GlobalControllerExceptionHandler.java
@RestControllerAdvice
public class GlobalControllerExceptionHandler {
private static final Logger LOG = LoggerFactory.getLogger(GlobalControllerExceptionHandler.class);
@ExceptionHandler(NotFoundException.class)
@ResponseBody
public ResponseEntity<?> handleNotFoundException(Exception ex) {
return new ResponseEntity<>(ex, HttpStatus.NOT_FOUND);
}
@ExceptionHandler(InvalidInputException.class)
@ResponseBody
public ResponseEntity<?> handleInvalidInputException(Exception ex) {
return new ResponseEntity<>(ex, HttpStatus.UNPROCESSABLE_ENTITY);
}
}
- ProductCompositeServiceApplicationTests.java
@Test
public void getProductNotFound() {
client.get()
.uri("/product-composite/" + PRODUCT_ID_NOT_FOUND)
.accept(MediaType.APPLICATION_JSON)
.exchange()
.expectStatus().isNotFound()
.expectHeader().contentType(MediaType.APPLICATION_JSON)
.expectBody()
.jsonPath("$.message").isEqualTo("NOT FOUND: " + PRODUCT_ID_NOT_FOUND);
}
@Test
public void getProductInvalidInput() {
client.get()
.uri("/product-composite/" + PRODUCT_ID_INVALID)
.accept(MediaType.APPLICATION_JSON)
.exchange()
.expectStatus().isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY)
.expectHeader().contentType(MediaType.APPLICATION_JSON)
.expectBody()
.jsonPath("$.message").isEqualTo("INVALID: " + PRODUCT_ID_INVALID);
}
function assertCurl() {
local expectedHttpCode=$1
local curlCmd="$2 -w\"%{http_code}\""
local result=$(eval $curlCmd)
local httpCode="${result:(-3)}"
RESPONSE='' && (( ${#result} > 3 )) && RESPONSE="${result%???}"
if [ "$httpCode" = "$expectedHttpCode" ]
then
if [ "$httpCode" = "200" ]
then
echo "Test OK (HTTP Code: $httpCode)"
else
echo "Test OK (HTTP Code: $httpCode, $RESPONSE)"
fi
else
echo "Test FAILED, EXPECTED HTTP Code: $expectedHttpCode, GOT: $httpCode, WILL ABORT!"
echo "- Failing command: $curlCmd"
echo "- Response Body: $RESPONSE"
exit 1
fi
}
function assertEqual() {
local expected=$1
local actual=$2
if [ "$actual" = "$expected" ]
then
echo "Test OK (actual value: $actual)"
else
echo "Test FAILED, EXPECTED VALUE: $expected, ACTUAL VALUE: $actual, WILL ABORT"
exit 1
fi
}
-w, --write-out FORMAT
- 응답에서 포맷에 맞는 데이터를 출력할 수 있다.
- %{variable_name} 과 같은 식으로 출력할 수 있다.
- content_type, http_code, time_total 등의 정보를 출력할 수 있다.
-s, --silent
- 프로그레스나 에러 정보를 보여주지 않는다.
선언된 변수는 기본적으로 전역 변수(global variable)다. 단 함수 안에서만 지역 변수(local variable)를 사용할 수 있는데 사용할려면 변수 명 앞에 local을 붙여주면 된다.
문자열 길이(예: echo ${#string})
변수의 뒷부분부터 짧게 일치한 단어 삭제(예: echo ${string%b*c})
string="abc-efg-123-abc"
echo ${string%b*c} # abc-efg-123-a
echo ${string%???} # abc-efg-123-
chmod u+x ./bash-test.bash
# u stands for user.
# g stands for group.
# o stands for others.
# a stands for all. ( default )
# chmod +x {filename} | you'll make it executable.
curl http://localhost:7000/product-composite/1 -s -w "%{http_code}"
{
"productId": 1,
"name": "name-1",
"weight": 123,
"recommendations": [
{
"recommendationId": 1,
"author": "Author 1",
"rate": 1
},
{
"recommendationId": 2,
"author": "Author 2",
"rate": 2
},
{
"recommendationId": 3,
"author": "Author 3",
"rate": 3
}
],
"reviews": [
{
"reviewId": 1,
"author": "Author 1",
"subject": "Subject 1"
},
{
"reviewId": 2,
"author": "Author 2",
"subject": "Subject 2"
},
{
"reviewId": 3,
"author": "Author 3",
"subject": "Subject 3"
}
],
"serviceAddresses": {
"cmp": "yskimui-MacBook-Pro.local/218.38.137.27:7000",
"pro": "yskimui-MacBook-Pro.local/218.38.137.27:7001",
"rev": "yskimui-MacBook-Pro.local/218.38.137.27:7003",
"rec": "yskimui-MacBook-Pro.local/218.38.137.27:7002"
}
}
200
docker run -it --rm ubuntu
- 터미널을 통해 컨테이너와 상호작용
- 테미널 세션을 마치면 종료하도록 설정
docker rm -f $(docker ps -aq)
- 실행 중 이거나, 중지된 컨테이너의 ID를 출력
- 커맨드의 출력을 정리하고 컨테이너 ID만 남긴다.
- 9 버전 이하의 자바에서는 리눅스 cgroup으로 지정한 자원 할당량을 무시했었음
./gradlew :microservices/product-service/build
FROM openjdk:12.0.2
EXPOSE 8080
ADD ./build/libs/*.jar app.jar
ENTRYPOINT ["java","-jar","/app.jar"]
docker build -t product-service .
# 확인
docker images | grep product-service
docker run --rm -p 8080:8080 -e "SPRING_PROFILE_ACTIVE=docker" product-service
curl localhost:8080/product/3 | jq
docker run -d -p 8080:8080 -e "SPRING_PROFILES_ACTIVE=docker" --name my-prd-svc product-service
- 로그 보기
docker logs my-prd-srv -f
version: "2.1"
services:
product:
build: microservices/product-service
mem_limit: 350m
environment:
- SPRING_PROFILES_ACTIVE=docker
recommendation:
build: microservices/recommendation-service
mem_limit: 350m
environment:
- SPRING_PROFILES_ACTIVE=docker
reivew:
build: microservices/reivew-service
mem_limit: 350m
environment:
- SPRING_PROFILES_ACTIVE=docker
product-composite:
build: microservices/product-composite-service
mem_limit: 350m
ports:
- "8080:8080"
environment:
- SPRING_PROFILES_ACTIVE=docker
./gradlew build
docker-compose build
docker images
docker-compose up -d
docker-compose logs -f
curl localhost:8080/product-composite/123 -s | jq .
docker-compose down
function testUrl() {
url=$@
if curl $url -ks -f -o /dev/null
then
echo "Ok"
return 0
else
echo -n "not yet"
return 1
fi;
}
function waitForService() {
url=$@
echo -n "Wait for: $url..."
n=0
until testUrl $url
do
n=$((n + 1))
if [[ $n == 100 ]]
then
echo " Give up "
exit 1
else
sleep 6
echo -n ", retry #$n "
fi
done
}
- springfox-swagger2 : 스웨거 2 기반의 문서 생성
- springfox-spring-webflux : 스프링 웹플럭스 기반의 RESTful 오퍼레이션 지원
- springfox-swagger-ui : 마이크로서비스에 스웨거 뷰어를 내장
<!-- product-composite-service -->
implementation "io.springfox:springfox-boot-starter:3.0.0"
<!-- api -->
implementation "io.springfox:springfox-boot-starter:3.0.0"
@Value("${api.common.version}")
String apiVersion;
@Value("${api.common.title}")
String apiTitle;
@Value("${api.common.description}")
String apiDescription;
@Value("${api.common.termsOfServiceUrl}")
String apiTermsOfServiceUrl;
@Value("${api.common.license}")
String apiLicense;
@Value("${api.common.licenseUrl}")
String apiLicenseUrl;
@Value("${api.common.contact.name}")
String apiContactName;
@Value("${api.common.contact.url}")
String apiContactUrl;
@Value("${api.common.contact.email}")
String apiContactEmail;
@Bean
public Docket apiDocumentation() {
return new Docket(DocumentationType.SWAGGER_2)
.select()
.apis(RequestHandlerSelectors.any())
.paths(PathSelectors.any()).build()
.apiInfo(new ApiInfo(apiTitle, apiDescription, apiVersion, apiTermsOfServiceUrl,
new Contact(apiContactName, apiContactUrl, apiContactEmail), apiLicense, apiLicenseUrl,
emptyList()));
}
- 스웨거 V2 문서를 생성하고자 Docket 빈을 초기화한다.
- apis(), paths() 메서드로 스프링 폭스가 API 정보를 찾을 위치를 지정
- API 문서는 버전관리가 되어야 하므로, Docket Bean 단위로 버전관리를 작성
- ApiSelectorBuilder를 생성
- api 스펙이 작성되어 있는 패키지 지정
- apis()로 선택되어진 API중에서 특정 path 조건에 맞는 API들을 다시 필터링 하여 문서화
- 제목, 설명 등 문서에 대한 정보들을 보여주기 위해 호출
public ApiInfo( title, description, version, termsOfServiceUrl, contact, license, licenseUrl, vendorExtensions )
@Api(description = "REST API for composite product information")
public interface ProductCompositeService {
/**
* Sample usage: curl $HOST:$PORT/product-composite/1
*
* @param productId
* @return the composite product info, if found, else null
*/
@ApiOperation(value = "${api.product-composite.get-composite-product.description}", notes = "${api.product-composite.get-composite-product.notes}")
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Bad Request, invalid format of the request. See response message for more information."),
@ApiResponse(code = 404, message = "Not found, the specified id does not exist."),
@ApiResponse(code = 422, message = "Unprocessable entity, input parameters caused the processing to fails. See response message for more information.") })
@GetMapping(value = "/product-composite/{productId}", produces = "application/json")
ProductAggregate getProduct(@PathVariable int productId);
}
api:
common:
version: 1.0.0
title: Sample API
description: Description of the API...
termsOfServiceUrl: MINE TERMS OF SERVICE URL
license: License
licenseUrl: MY LICENSE URL
contact:
name: Contact
url: My
email: me@mail.com
product-composite:
get-composite-product:
description: Returns a composite view of the specified product id
notes: |
# Normal response
If the requested product id is found the method will return information regarding:
1. Base product information
2. Reviews
3. Recommendations
4. Service Addresses\n(technical information regarding the addresses of the microservices that created the response)
# Expected partial and error responses
In the following cases, only a partial response be created (used to simplify testing of error conditions)
## Product id 113
200 - Ok, but no recommendations will be returned
## Product id 213
200 - Ok, but no reviews will be returned
## Non numerical product id
400 - A <b>Bad Request</b> error will be returned
## Product id 13
404 - A <b>Not Found</b> error will be returned
## Negative product ids
422 - An <b>Unprocessable Entity</b> error will be returned
스프링 - 2.3.x, swagger - 3.0.0 버전에서는 페이지가 안뜨는 이슈가 있다.
https://stackoverflow.com/questions/48311447/springboot-swagger-url-shows-whitelabel-error-page
a code generator that greatly simplifies the implementation of mappings between Java bean types based on a convention over configuration approach.
ext {
mapstructVersion = "1.3.1.Final"
}
dependencies {
...
// mapstruct
implementation "org.mapstruct:mapstruct:${mapstructVersion}"
annotationProcessor "org.mapstruct:mapstruct-processor:${mapstructVersion}"
testAnnotationProcessor "org.mapstruct:mapstruct-processor:${mapstructVersion}" // if you are using mapstruct in test code
}
역시 책에 있는대로 따라하면 동작하지 않는다. ( gradle 7버전 이후 compile, compileOnly 옵션 deprecated )
dependencies {
...
// mongoDB
implementation 'org.springframework.boot:spring-boot-starter-data-mongodb'
testImplementation 'de.flapdoodle.embed:de.flapdoodle.embed.mongo'
}
brew services start mongodb-community
이 클래스가 MongoDB 엔티티 클래스이며, products라는 이름의 MongoDB 컬렉션에 매핑된다는 것을 표시한다.
스프링 데이터의 id 및 version 필드라는 것을 표시한다.
비즈니스 키, productId에 생성된 고유색인을 가져온다.
@CompoundIndex(name = "prod-rec-id", unique = true, def = "{'productId': 1, 'recommendationId': 1}")
productId와 recommendationId 필드로 구성된 복합 비즈니스 키를 위한 고유 복합 인덱스를 생성
이 클래스가 JPA 엔티티 클래스이며, SQL 데이터베이스의 products 테이블에 매핑된다는 것을 표시한다.
@Table(name = "reviews", indexes = { @Index(name = "reviews_unique_idx", unique = true, columnList = "productId,reviewId") })
productId와 reviewId 필드로 구성된 복합 비즈니스 키를 위한 고유 복합 인덱스를 생성하는 역할도 한다.
스프링 데이터의 id 및 version 필드라는 것을 표시한다.
스프링 데이터 JPA가 id 필드에 고유한 id 값을 자동으로 생성하도록 지시한다.
public interface ReviewRepository extends CrudRepository<ReviewEntity, Integer>{
@Transactional(readOnly = true) // 읽기전용이므로 영속성 컨택스트는 스냅샷을 보관하지 않는다. => 메모리 사용량을 최적화 시킬 수 있다.
List<ReviewEntity> findByProductId(int productId);
}
public interface ProductRepository extends PagingAndSortingRepository<ProductEntity, String>{
Optional<ProductEntity> findByProductId(int productId);
}
테스트를 시작할 때 내장형 MongoDB 데이터베이스를 시작한다.
테스트를 시작할 때 내장형 SQL 데이터베이스를 시작한다.
스프링 부트는 다른 테스트에 의한 부작용을 최소화하고자 SQL 데이터베이스에 업데이트한 내용을 롤백하도록 테스트를 구성한다.
이런 이유로 일부 테스트가 실패하기 때문에 클래스 수준 어노테이션 @Transactional(propagation = NOT_SUPPORT)을 사용해 자동 롤백을 비활성화 한다.
./gradlew microservices:product-service:test --tests PersistenceTests
@DataMongoTest
//@TestInstance(Lifecycle.PER_CLASS) // BeforeAll 을 static이 아닌 함수로 사용시
public class PersistenceTests {
@Autowired
private ProductRepository repository;
private ProductEntity savedEntity;
@BeforeEach
public void setupDb() {
repository.deleteAll();
ProductEntity entity = new ProductEntity(1, "n", 1);
savedEntity = repository.save(entity);
assertEqualsProduct(entity, savedEntity);
}
}
@Test
public void create() {
ProductEntity newEntity = new ProductEntity(2, "n", 2);
savedEntity = repository.save(newEntity);
ProductEntity foundEntity = repository.findById(newEntity.getId()).get();
assertEqualsProduct(newEntity, foundEntity);
assertEquals(2, repository.count());
}
@Test
public void update() {
savedEntity.setName("n2");
repository.save(savedEntity);
ProductEntity foundEntity = repository.findById(savedEntity.getId()).get();
assertEquals(1, (long)foundEntity.getVersion());
assertEquals("n2", foundEntity.getName());
}
@Test
public void delete() {
repository.delete(savedEntity);
assertFalse(repository.existsById(savedEntity.getId()));
}
@Test
public void getByProductId() {
Optional<ProductEntity> entity = repository.findByProductId(savedEntity.getProductId());
assertTrue(entity.isPresent());
assertEqualsProduct(savedEntity, entity.get());
}
// import org.springframework.dao.DuplicateKeyException;
@Test
public void duplicateError() {
assertThrows(DuplicateKeyException.class, () -> {
ProductEntity entity2 = new ProductEntity(savedEntity.getProductId(), "n", 1);
repository.save(entity2);
});
}
# application.yml
spring.data.mongodb:
host: localhost
port: 27017
database: product-db
auto-index-creation: true
@Test
public void optimisticLockError() {
// 데이터베이스에서 가져온 엔티티를 변수 2개에 저장한다.
ProductEntity entity1 = repository.findById(savedEntity.getId()).get();
ProductEntity entity2 = repository.findById(savedEntity.getId()).get();
// 첫 번째 엔티티 객체를 업데이트 한다.
entity1.setName("n1");
repository.save(entity1);
// 두 번째 엔티티 객체를 업데이트한다.
// 두 번째 엔티티 객체의 버전이 낮으므로 실패한다.
// 즉 낙관적 잠금 오류가 발생해 실패한다.
try {
entity2.setName("n2");
repository.save(entity2);
fail("Expected an OptimisticLockingFailureException");
} catch (OptimisticLockingFailureException e) {}
// 데이터베이스에서 업데이트된 엔티티를 가져와서 새로운 값을 확인한다.
ProductEntity updatedEntity = repository.findById(savedEntity.getId()).get();
assertEquals(1, (int)updatedEntity.getVersion());
assertEquals("n1", updatedEntity.getName());
}
@Test
public void paging() {
repository.deleteAll();
List<ProductEntity> newProducts = rangeClosed(1001, 1010)
.mapToObj(i -> new ProductEntity(i, "name " + i, i)).collect(Collectors.toList());
repository.saveAll(newProducts);
PageRequest nextPage = PageRequest.of(0, 4, Direction.ASC, "productId");
Page<ProductEntity> productPage = repository.findAll(nextPage);
assertEquals("[1001, 1002, 1003, 1004]", productPage.getContent().stream().map(p -> p.getProductId()).collect(Collectors.toList()).toString());
assertEquals(true, productPage.hasNext());
// nextPage = testNextPage(nextPage, "[1001, 1002, 1003, 1004]", true);
// nextPage = testNextPage(nextPage, "[1005, 1006, 1007, 1008]", true);
// nextPage = testNextPage(nextPage, "[1009, 1010]", false);
}
@PostMapping(
value = "/product",
consumes = "application/json",
produces = "application/json"
)
Product createProduct(@RequestBody Product body);
@DeleteMapping(
value = "/product/{productId}"
)
void deleteProduct(@PathVariable int productId);
@Mapper(componentModel = "spring")
public interface RecommendationMapper {
@Mappings({
@Mapping(target = "rate", source="entity.rating"),
@Mapping(target = "serviceAddress", ignore = true)
})
Recommendation entityToApi(RecommendationEntity entity);
@Mappings({
@Mapping(target = "rating", source="api.rate"),
@Mapping(target = "id", ignore = true),
@Mapping(target = "version", ignore = true)
})
RecommendationEntity apiToEntity(Recommendation api);
List<Recommendation> entityListToApiList(List<RecommendationEntity> entity);
List<RecommendationEntity> apiListToEntityList(List<Recommendation> api);
}
엔티티 객체를 API 모델 객체에 매핑
엔티티 클래스에는 serviceAddress가 없으므로 무시가능하도록 ignore = true
엔티티의 rating 필드와 API 모델의 rate 필드 매핑
API 모델 객체를 엔티티 객체에 매칭
API 모델 클래스에는 id, version이 없으므로 ignore = true
엔티티의 rating 필드와 API 모델의 rate 필드 매핑
- mapstruct 1.3.1.Final 버전에서 알수없는 버그에 헤매다 1.4.0.Beta3 로 바꾸니 해결되었다.
> Task :microservices:review-service:compileJava FAILED
/Users/yskim/Desktop/msa-learn/microservices/review-service/src/main/java/arthur/kim/microservices/core/review/services/ReviewMapper.java:24: error: arthur.kim.microservices.core.review.persistence.ReviewEntity does not have an accessible parameterless constructor.
ReviewEntity apiToEntity(Review api);
^
1 error
cmd + shift + P
Java: Clean Java Language Server Workspace
No qualifying bean of type 'org.springframework.boot.web.reactive.context.ReactiveWebApplicationContext' available
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
# application.yml
spring:
data:
mongodb:
auto-index-creation: true
import com.mongodb.DuplicateKeyException;
대신에
import org.springframework.dao.DuplicateKeyException;
를 사용한다.
private RuntimeException handleHttpClientException(HttpClientErrorException ex) {
switch (ex.getStatusCode()) {
case NOT_FOUND:
return new NotFoundException(getErrorMessage(ex));
case UNPROCESSABLE_ENTITY:
return new InvalidInputException(getErrorMessage(ex));
default:
LOG.warn("Got a unexpected HTTP error: {}, will rethrow it", ex.getStatusCode());
LOG.warn("Error body: {}", ex.getResponseBodyAsString());
return ex;
}
}
// ProductCompositeIntegration.java
@Override
public Product createProduct(Product body) {
try {
return restTemplate.postForObject(productServiceUrl, body, Product.class);
} catch (HttpClientErrorException ex) {
throw handleHttpClientException(ex);
}
}
@Override
public void deleteProduct(int productId) {
try {
restTemplate.delete(productServiceUrl + "/" + productId);
} catch (HttpClientErrorException ex) {
throw handleHttpClientException(ex);
}
}
// api/src/../ProductCompositeService.java
@Api(description = "REST API for composite product information")
public interface ProductCompositeService {
/**
* Sample usage: curl $HOST:$PORT/product-composite/1
*
* @param productId
* @return the composite product info, if found, else null
*/
@ApiOperation(value = "${api.product-composite.get-composite-product.description}", notes = "${api.product-composite.get-composite-product.notes}")
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Bad Request, invalid format of the request. See response message for more information."),
@ApiResponse(code = 404, message = "Not found, the specified id does not exist."),
@ApiResponse(code = 422, message = "Unprocessable entity, input parameters caused the processing to fails. See response message for more information.") })
@GetMapping(value = "/product-composite/{productId}", produces = "application/json")
ProductAggregate getCompositeProduct(@PathVariable int productId);
/**
* Sample usage:
*
* curl -X POST $HOST:$PORT/product-composite \
* -H "Content-Type: application/json" --data \
* '{"productId":123,"name":"product 123","weight":123}'
*
* @param body
*/
@ApiOperation(value = "${api.product-composite.create-composite-product.description", notes = "${api.product-composite.create-composite-product.notes}")
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Bad Request, invalid format of the request. See response message for more information."),
@ApiResponse(code = 422, message = "Unprocessable entity, input parameters caused the processing to fail. See response message for more information.")
})
@PostMapping(value = "/product-composite", consumes = "application/json")
void createCompositeProduct(@RequestBody ProductAggregate body);
/**
* Sample usage:
*
* curl -X DELETE $HOST:$PORT/product-composite/1
*
* @param productId
*/
@ApiOperation(value = "${api.product-composite.delete-composite-product.description", notes = "${api.product-composite.delete-composite-product.notes}")
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Bad Request, invalid format of the request. See response message for more information."),
@ApiResponse(code = 422, message = "Unprocessable entity, input parameters caused the processing to fail. See response message for more information.")
})
@DeleteMapping(value = "/product-composite/{productId}")
void deleteCompositeProduct(@RequestBody ProductAggregate body);
}
@Override
public void createCompositeProduct(ProductAggregate body) {
try {
LOG.debug("createCompositeProduct: creates a new composite entityfor productId: {}", body.getProductId());
Product product = new Product(body.getProductId(), body.getName(),body.getWeight(), null);
integration.createProduct(product);
if (body.getRecommendations() != null) {
body.getRecommendations().forEach(r -> {
Recommendation recommendation = new Recommendation(bodygetProductId(), r.getRecommendationId(), r.getAuthor(), r.getRat(), r.getContent(), null);
integration.createRecommendation(recommendation);
});
}
if (body.getReviews() != null) {
body.getReviews().forEach(r -> {
Review review = new Review(body.getProductId(), r.getReviewId(),r.getAuthor(), r.getSubject(), r.getContent(), null);
});
}
LOG.debug("createCompositeProduct: composite entites created forproductId: {}", body.getProductId());
} catch (RuntimeException re) {
LOG.warn("createCompositeProduct failed", re);
throw re;
}
}
@Override
public void deleteCompositeProduct(int productId) {
LOG.debug("deleteCompositeProduct: Deletes a product aggregate forproductId: {}", productId);
integration.deleteProduct(productId);
integration.deleteRecommendations(productId);
integration.deleteReviews(productId);
LOG.debug("getCompositeProduct: aggregate entities deleted forproductId: {}", productId);
}
$ mongo
mongodb:
image: mongo:3.6.9
mem_limit: 350m
ports:
- "27017:27017"
command: mongod --smallfiles
$ mysql -uroot -h127.0.0.1 -p
mysql:
image: mysql:5.7
mem_limit: 350m
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=rootpwd
- MYSQL_DATABASE=review-db
- MYSQL_USER=user
- MYSQL_PASSWORD=pwd
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-uuser", "-ppwd", "-h", "localhost"]
interval: 10s
timeout: 5s
retries: 10
spring:
config.activate.on-profile: docker
data:
mongodb:
host: mongodb
port: 27017
database: product-db
auto-index-creation: true
server.port: 8080
spring.jpa.hibernate.ddl-auto: update
spring.datasource:
url: jdbc:mysql://localhost/review-db
username: user
password: pwd
spring.datasource.hikari.initializationFailTimeout: 60000
---
spring.config.activate.on-profile: docker
spring.datasource:
url: jdbc:mysql://mysql/review-db
server.port: 8080
docker-compose exec mongodb mongo recommendation-db --quiet --eval "db.recommendations.find()"
result
{ "_id" : ObjectId("61c1f0e8a5ea8e57eebfe495"), "version" : 0, "productId" : 123, "recommendationId" : 0, "author" : "string", "rating" : 0, "content" : "string", "_class" : "arthur.kim.microservices.core.recommendation.persistence.RecommendationEntity" }
docker-compose exec mysql mysql -uuser -p review-db -e "select * from reviews"
result
Enter password:
+----+--------+---------+------------+-----------+---------+---------+
| id | author | content | product_id | review_id | subject | version |
+----+--------+---------+------------+-----------+---------+---------+
| 1 | string | string | 123 | 0 | string | 0 |
+----+--------+---------+------------+-----------+---------+---------+
- 논블로킹 동기 REST API
- 비동기 이벤트 기반 서비스
- 느슨한 결합을 위해서 대체로 동기 API방식 보다는 이벤트 기반 비동기 메시지 전달 방식을 선호한다.
- 엔드유저가 응답을 기다리는 읽기 작업일 때
- 모바일 앱이나 SPA 등 동기 API가 알맞는 클라이언트 플랫폼일 때
- 공통 메시징 시스템을 공유해 사용할 수 없을 때
- product-composite 마이크로 서비스는 다른 조직의 클라이언트를 주로 상대하기 때문에 동기 API가 적합
- core 마이크로 서비스들의 읽기 서비스는 응답을 기다리는 엔드유저가 있기 때문에 논블로킹 동기 API로 개발한다.
- core 마이크로 서비스들의 생성 및 삭제 서비스는 이벤트 기반 비동기 서비스로 개발한다.
- product-composite 마이크로 서비스에서 생성 및 삭제를 위해 제공하는 동기 API는 이벤트를 emit할 때, 바로 200 응답을 반환한다.
- Reactor By Example : https://www.infoq.com/articles/reactor-by-example/
@Test
public void TestFlux() {
List<Integer> list = new ArrayList<>();
Flux<Integer> subs = Flux.just(1, 2, 3, 4)
.filter(n -> n % 2 == 0)
.map(n -> n * 2)
.log();
subs.subscribe(n -> list.add(n));
assertThat(list, contains(4, 8));
}
@Test
public void TestFluxBlocking() {
List<Integer> list = Flux.just(1, 2, 3, 4)
.filter(n -> n % 2 == 0)
.map(n -> n * 2).log()
.collectList()
.block();
assertThat(list, contains(4, 8));
}
- 리액티브 데이터 유형을 반환하도록 API를 변경한다.
- 서비스 구현을 변경해 블로킹 코드를 제거한다.
- 리액티브 서비스를 테스트할 수 있도록 테스트를 변경한다.
- 논블로킹 코드와 계속 블로킹 방식을 사용해야 하는 코드를 분리한다.
implementation('org.springframework.boot:spring-boot-starter-data-mongodb-reactive') // 꼭 넣기
- 이제 영속성 메소드가 Mono나 Flux 객체를 반환하므로, 메서드는 반환된 리액티브 객체에서 결과를 받을 때까지 기다려야 한다.
- block() 메소드를 직접 호출해서 결과를 받을 때 까지 기다리거나,
- StepVerifier 헬퍼 클래스를 사용하여 검증
public interface RecommendationRepository extends ReactiveCrudRepository<RecommendationEntity, String>{
Flux<RecommendationEntity> findByProductId(int productId);
}
@Override
public Flux<Recommendation> getRecommendations(int productId) {
if (productId < 1)
throw new InvalidInputException("Invalid productId: " + productId);
return repository.findByProductId(productId)
.log()
.map(e -> mapper.entityToApi(e))
.map(e -> {
e.setServiceAddress(serviceUtil.getServiceAddress());
return e;
});
}
@BeforeEach
public void setupDb() {
StepVerifier.create(repository.deleteAll()).verifyComplete();
ProductEntity entity = new ProductEntity(1, "n", 1);
StepVerifier.create(repository.save(entity))
.expectNextMatches(createdEntity -> {
savedEntity = createdEntity;
return areProductEqual(entity, savedEntity);
}).verifyComplete();
}
- JPA를 사용해 관계형 데이터베이스의 데이터에 접근하는 review 서비스는 논블로킹 프로그래밍 모델을 사용하지 않고 Scheduler를 사용해 블로킹 코드를 실행한다.
- 스케쥴러는 일정 수의 스레드를 보유한 전용 스레드 풀의 스레드에서 블로킹 코드를 실행한다.
- 스레드 풀을 사용해 블로킹 코드를 처리하면 마이크로서비스에서 사용할 스레드의 고갈을 방지하므로 마이크로서비스의 논블로킹 처리에 영향을 주지 않는다.
@Autowired
public ReviewServiceApplication(
@Value("${spring.datasource.maximum-pool-size:10}") Integer connectionPoolSize) {
this.connectionPoolSize = connectionPoolSize;
}
@Bean
public Scheduler jdbcScheduler() {
LOG.info("Creates a jdbcScheduler with connectionPoolSize = " + connectionPoolSize);
return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
}
private <T> Flux<T> asyncFlux(Iterable<T> iterable) {
return Flux.fromIterable(iterable).publishOn(scheduler);
}
protected List<Review> getByProductId(int productId) {
List<ReviewEntity> entityList = repository.findByProductId(productId);
List<Review> list = mapper.entityListToApiList(entityList);
list.forEach(e -> e.setServiceAddress(serviceUtil.getServiceAddress()));
return list;
}
@Override
public Flux<Review> getReviews(int productId) {
if ( productId < 1 ) throw new InvalidInputException("Invalid productId: " + productId);
return asyncFlux(getByProductId(productId)).log();
}
@Override
public Mono<Product> getProduct(int productId) {
String url = productServiceUrl + "/product/" + productId;
return webClient.get().uri(url).retrieve().bodyToMono(Product.class).log().onErrorMap(WebClientResponseException.class, ex -> handleException(ex));
}
@Override
public Flux<Recommendation> getRecommendations(int productId) {
String url = recommendationServiceUrl + "/recommendation?productId=" + productId;
return webClient.get().uri(url).retrieve().bodyToFlux(Recommendation.class).log().onErrorResume(error -> empty());
}
@Override
public Flux<Recommendation> getRecommendations(int productId) {
String url = recommendationServiceUrl + "/recommendation?productId=" + productId;
return webClient.get().uri(url).retrieve().bodyToFlux(Recommendation.class).log().onErrorResume(error -> empty());
}
@Override
public Product createProduct(Product body) {
if (body.getProductId() < 1) throw new InvalidInputException("Invalid productId: " + body.getProductId());
ProductEntity entity = mapper.apiToEntity(body);
Mono<Product> newEntity = repository.save(entity)
.log()
.onErrorMap(DuplicateKeyException.class,
ex -> new InvalidInputException("Duplicated key, Product Id: " + body.getProductId()))
.map(e -> mapper.entityToApi(e));
return newEntity.block();
}
@Override
public void deleteProduct(int productId) {
if (productId < 1) throw new InvalidInputException("Invalid productId: " + productId);
repository.findByProductId(productId).log().map(e -> repository.delete(e)).flatMap(e -> e).block();
}
public Mono<ProductAggregate> getCompositeProduct(int productId) {
return Mono.zip(values -> createProductAggregate((Product) values[0], (List<Recommendation>) values[1], (List<Review>) values[2], serviceUtil.getServiceAddress()),
integration.getProduct(productId),
integration.getRecommendations(productId).collectList(),
integration.getReviews(productId).collectList())
.doOnError(ex -> LOG.warn("getCompositeProduct failed: {}", ex.toString()))
.log();
}
- 복합 서비스는 생성 및 삭제 이벤트를 각 핵심 서비스의 토픽에 게시한 수 핵심 서비스의 처리를 기다리지 않고 호출자에게 OK 응답을 반환한다.
- 스프링 클라우드 스트림 구성
- 토픽 및 이벤트 정의
- 그래들 빌드 파일 변경
- 복합 서비스에서 이벤트 게시
- 핵심 서비스에서 이벤트 소비
- Example
// 게시
mysource.output().send(MessageBuilder.withPlayload(message).build);
// 소비
@StreamListener(target = Sink.INPUT)
public void receive(MyMessage message) {
LOG.info("Received: {}", message);
}
- 소비자 그룹
- 재시도 및 데드 레터 대기열
- 순서 보장 및 파티션
- 마이크로 서비스의 모든 인스턴스가 같은 메시지를 소비하는 문제를 해결
spring.cloud.stream:
binding.input:
destination: products
group: productsGroup
- 스트림은 group 필드를 사용해 product 마이크로 서비스의 모든 인스턴스를 productsGroup이라는 소비자 그룹으로 묶는다
- 소비자가 메시지 처리에 실패하면 메시지는 실패한 소비자가 성공적으로 처리할 때까지 대기열로 다시 보내지거나 사라진다.
- 내용이 잘못된 메시지 (poison message)인 경우엔 수동으로 메시지를 제거할 때까지 다른 메시지를 처리하지 못하도록 소비자를 차단한다.
- 일시적인 문제로 실패한 경우에는 여러번의 재시도로 처리가 성공할 수 있다.
- 소비자( e.g. product ) 측에 스프링 클라우드 스트림을 구성한다.
spring.cloud.stream.bindings.input.consumer:
maxAttempts: 3 # 3번 시도
backOffInitialInterval: 500 # 첫번째 시도
backOffMaxInterval: 1000 # 두번째 시도
backOffMultiplier: 2.0
# RabbitMQ
spring.cloud.stream.rabbit.bindings.input.consumer:
autoBindDlq: true
republishToDlq: true
# kafka
spring.cloud.stream.kafka.bindings.input.consumer:
enableDlq: true
- 파티션을 사용하면 성능과 확장성을 잃지 않으면서도 전송됐을 때의 순서 그대로 메시지를 전달할 수 있다.
- 메시징 시스템이 같은 키를 가진 메시지 사이의 순서를 보장하고자 사용할 키를 각 메시지에 지정한다.
- 같은 키를 가진 메시지는 언제나 같은 파티션에 배치되며, 하나의 동일 파티션에 속한 메시지만 전달 순서가 보장된다.
- 메시지 순서 보장을 위해 소비자 그룹 안의 각 파티션마다 하나의 소비자 인스턴스가 배정된다.
- 게시자 ( e.g. product-composite ) 에서 키 및 파티션 수를 지정해야 한다.
spring.cloud.stream.binding.output:
destination: products
producer:
partition-key-expression: payload.key # 메시지 페이로드의 key 필드
partition-count: 2
- 소비자( e.g. product ) 에서 소비할 파티션을 지정할 수 있다.
spring.cloud.stream.binding.input:
destination: products
group: productsGroup
consumer:
partitioned: true
instance-index: 0
소비자가 첫 번째 파티션 ( 0 )의 메시지만 소비한다는 것을 스프링 클라우드 스트림에 알린다.
- 스프링 클라우드 스트림은 게시 및 구독 패턴을 기반으로 한다.
- 즉, 게시자는 토픽에 메시지를 게시하고 구독자는 관심 있는 토픽을 구독해 메시지를 수신한다.
- 엔티티 유현별로 "products, recommendations, reviews"등의 토픽 사용
- 이벤트란 "어떤 상황이 발생했다는 것을 설명하는 메시지"
이벤트 정의
- type ( 생성, 삭제 )
- key ( productId )
- data ( 실제 이벤트 데이터 )
- timestamp ( 이벤트 발생 시간 )
// api.event.Event
public class Event<K, T> {
public enum Type { CREATE, DELETE }
private Event.Type eventType;
private K key;
private T data;
private LocalDateTime eventCreatedAt;
public Event() {
this.eventType = null;
this.key = null;
this.data = null;
this.eventCreatedAt = null;
}
public Event(Type eventType, K key, T data) {
this.eventType = eventType;
this.key = key;
this.data = data;
this.eventCreatedAt = now();
}
public Type getEventType() {
return eventType;
}
public K getKey() {
return key;
}
public T getData() {
return data;
}
public LocalDateTime getEventCreatedAt() {
return eventCreatedAt;
}
}
// product-composite
ext {
springCloudVersion = 'Greenwich.RELEASE'
}
dependencies {
implementation('org.springframework.cloud:spring-cloud-starter-stream-rabbit')
implementation('org.springframework.cloud:spring-cloud-starter-stream-kafka')
testImplementation('org.springframework.cloud:spring-cloud-stream-test-support')
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
- 메시지 소스를 선언하고 통합 계층에서 이벤트를 게시한다.
- 이벤트 게시를 위한 구성을 추가한다.
- 이벤트 게시를 테스트할 수 있도록 테스트를 변경한다.
- 메시지 소스 선언
private MessageSources messageSources;
public interface MessageSources {
String OUTPUT_PRODUCTS = "output-products";
String OUTPUT_RECOMMENDATIONS = "output-recommendations";
String OUTPUT_REVIEWS = "output-reviews";
@Output(OUTPUT_PRODUCTS)
MessageChannel outputProducts();
@Output(OUTPUT_RECOMMENDATIONS)
MessageChannel outputRecommendations();
@Output(OUTPUT_REVIEWS)
MessageChannel outputReviews();
}
- 통합계층에서 이벤트 게시
@Override
public Review createReview(Review body) {
Event<Integer, Review> event = new Event<Integer, Review>(CREATE, body.getProductId(), body);
messageSources.outputProducts().send(MessageBuilder.withPayload(event).build());
return body;
}
@Override
public void deleteReviews(int productId) {
Event<Integer, Review> event = new Event<Integer, Review>(DELETE, productId, null);
messageSources.outputProducts().send(MessageBuilder.withPayload(event).build());
}
spring.cloud.stream:
defaultBinder: rabbit
default.contentType: application/json
bindings:
output-products:
destination: products
producer:
required-groups: auditGroup
output-recommendations:
destination: recommendations
producer:
required-groups: auditGroup
output-reviews:
destination: reviews
producer:
required-groups: auditGroup
public class IsSameEvent extends TypeSafeMatcher<String> {
@Override
protected boolean matchesSafely(String item) {
// TODO Auto-generated method stub
return false;
}
@Override
public void describeTo(Description description) {
// TODO Auto-generated method stub
}
}
- 토픽의 이벤트 수신을 위해 메시지 프로세스를 선언한다.
- 리액티브 영속성 계층을 올바르게 사용하도록 서비스 구현을 변경한다.
- 이벤트 소비를 위한 구성을 추가한다.
- 이벤트의 비동기 처리를 테스트할 수 있도록 테스트를 변경한다.
@EnableBinding(Sink.class)
public class MessageProcessor {
private static final Logger LOG = LoggerFactory.getLogger(MessageProcessor.class);
@Autowired
private final ReviewService reviewService;
@StreamListener(target = Sink.INPUT)
public void process(Event<Integer, Review> event) {
switch (event.getEventType()) {
case CREATE:
Review review = event.getData();
reviewService.createReview(review);
break;
case DELETE:
int productId = event.getKey();
reviewService.deleteReviews(productId);
break;
default:
String errorMessage = "Incorrect event type: " + event.getEventType() + ", expected a CREATE or DELETE event";
throw new EventProcessingException(errorMessage);
}
}
}