Skip to content

Support reactive repositories in ResourceReaderRepositoryPopulator #2616

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-commons</artifactId>
<version>2.7.0-SNAPSHOT</version>
<version>2.7.0-GH-2558-SNAPSHOT</version>

<name>Spring Data Core</name>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,48 @@
*/
package org.springframework.data.repository.init;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.data.repository.support.DefaultRepositoryInvokerFactory;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.core.CrudMethods;
import org.springframework.data.repository.core.RepositoryInformation;
import org.springframework.data.repository.core.RepositoryMetadata;
import org.springframework.data.repository.core.support.DefaultCrudMethods;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.data.repository.support.Repositories;
import org.springframework.data.repository.support.RepositoryInvoker;
import org.springframework.data.repository.support.RepositoryInvokerFactory;
import org.springframework.data.repository.util.ReactiveWrapperConverters;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;

/**
* A {@link RepositoryPopulator} using a {@link ResourceReader} to read objects from the configured {@link Resource}s.
*
* @author Oliver Gierke
* @author Christoph Strobl
* @author Mark Paluch
* @since 1.4
*/
public class ResourceReaderRepositoryPopulator implements RepositoryPopulator, ApplicationEventPublisherAware {

private static final Log logger = LogFactory.getLog(ResourceReaderRepositoryPopulator.class);
private static final Log logger = LogFactory.getLog(ResourceReaderRepositoryPopulator.class);

private final ResourceReader reader;
private final @Nullable ClassLoader classLoader;
Expand Down Expand Up @@ -114,7 +127,7 @@ public void populate(Repositories repositories) {

Assert.notNull(repositories, "Repositories must not be null!");

RepositoryInvokerFactory invokerFactory = new DefaultRepositoryInvokerFactory(repositories);
RepositoryPersisterFactory persisterFactory = new RepositoryPersisterFactory(repositories);

for (Resource resource : resources) {

Expand All @@ -125,13 +138,13 @@ public void populate(Repositories repositories) {
if (result instanceof Collection) {
for (Object element : (Collection<?>) result) {
if (element != null) {
persist(element, invokerFactory);
persist(element, persisterFactory);
} else {
logger.info("Skipping null element found in unmarshal result!");
}
}
} else {
persist(result, invokerFactory);
persist(result, persisterFactory);
}
}

Expand All @@ -158,12 +171,172 @@ private Object readObjectFrom(Resource resource) {
* Persists the given {@link Object} using a suitable repository.
*
* @param object must not be {@literal null}.
* @param invokerFactory must not be {@literal null}.
* @param persisterFactory must not be {@literal null}.
*/
private void persist(Object object, RepositoryPersisterFactory persisterFactory) {

RepositoryPersister persister = persisterFactory.getPersisterFor(object.getClass());
logger.debug(String.format("Persisting %s using repository %s", object, persister));
persister.save(object);
}

/**
* Factory to create {@link RepositoryPersister} instances.
*/
static class RepositoryPersisterFactory {

private final Map<Class<?>, RepositoryPersister> persisters = new HashMap<>();
private final Repositories repositories;

public RepositoryPersisterFactory(Repositories repositories) {
this.repositories = repositories;
}

/**
* Obtain a {@link RepositoryPersister}.
*
* @param domainType
* @return
*/
public RepositoryPersister getPersisterFor(Class<?> domainType) {
return persisters.computeIfAbsent(domainType, this::createPersisterFor);
}

private RepositoryPersister createPersisterFor(Class<?> domainType) {

RepositoryInformation repositoryInformation = repositories.getRequiredRepositoryInformation(domainType);
Object repository = repositories.getRepositoryFor(domainType).orElseThrow(
() -> new IllegalStateException(String.format("No repository found for domain type: %s", domainType)));

if (repositoryInformation.isReactiveRepository()) {
return repository instanceof ReactiveCrudRepository ? new ReactiveCrudRepositoryPersister(repository)
: new ReflectiveReactivePersister(repositoryInformation, repository);
}

if (repository instanceof CrudRepository) {
return new CrudRepositoryPersister(repository);
}

return new ReflectivePersister(repositoryInformation, repository);
}
}

/**
* Interface defining a save method to persist an object within a repository.
*/
interface RepositoryPersister {

/**
* Saves the {@code object} in an appropriate repository.
*
* @param object
*/
void save(Object object);
}

/**
* Reflection variant of a {@link RepositoryPersister}.
*/
private void persist(Object object, RepositoryInvokerFactory invokerFactory) {
private static class ReflectivePersister implements RepositoryPersister {

private final CrudMethods methods;
private final Object repository;

RepositoryInvoker invoker = invokerFactory.getInvokerFor(object.getClass());
logger.debug(String.format("Persisting %s using repository %s", object, invoker));
invoker.invokeSave(object);
public ReflectivePersister(RepositoryMetadata metadata, Object repository) {

this.methods = metadata.getCrudMethods();
this.repository = repository;
}

@Override
public void save(Object object) {
doPersist(object);
}

Object doPersist(Object object) {
Method method = methods.getSaveMethod()//
.orElseThrow(() -> new IllegalStateException("Repository doesn't have a save-method declared!"));

return ReflectionUtils.invokeMethod(method, repository, object);
}

@Override
public String toString() {
return repository.toString();
}
}

/**
* Reactive extension to save objects in a reactive repository.
*/
private static class ReflectiveReactivePersister extends ReflectivePersister {

public ReflectiveReactivePersister(RepositoryMetadata metadata, Object repository) {
super(metadata, repository);
}

@Override
public void save(Object object) {

Object wrapper = doPersist(object);

Publisher<?> publisher = ReactiveWrapperConverters.toWrapper(wrapper, Publisher.class);

if (!(publisher instanceof Mono)) {
publisher = Flux.from(publisher).collectList();
}

Mono.from(publisher).block();
}
}

/**
* {@link RepositoryPersister} to operate with {@link CrudRepository}.
*/
private static class CrudRepositoryPersister implements RepositoryPersister {

private final CrudRepository<Object, Object> repository;

@SuppressWarnings("unchecked")
public CrudRepositoryPersister(Object repository) {

Assert.isInstanceOf(CrudRepository.class, repository);
this.repository = (CrudRepository<Object, Object>) repository;
}

@Override
public void save(Object object) {
repository.save(object);
}

@Override
public String toString() {
return repository.toString();
}
}

/**
* {@link RepositoryPersister} to operate with {@link ReactiveCrudRepository}.
*/
private static class ReactiveCrudRepositoryPersister implements RepositoryPersister {

private final ReactiveCrudRepository<Object, Object> repository;

@SuppressWarnings("unchecked")
public ReactiveCrudRepositoryPersister(Object repository) {

Assert.isInstanceOf(ReactiveCrudRepository.class, repository);
this.repository = (ReactiveCrudRepository<Object, Object>) repository;
}

@Override
public void save(Object object) {
repository.save(object).block();
}

@Override
public String toString() {
return repository.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

import static org.mockito.Mockito.*;

import java.io.Serializable;

import org.springframework.data.mapping.context.SampleMappingContext;
import org.springframework.data.repository.Repository;

/**
* @author Oliver Gierke
*/
public class DummyRepositoryFactoryBean<T extends Repository<S, ID>, S, ID extends Serializable>
public class DummyRepositoryFactoryBean<T extends Repository<S, ID>, S, ID>
extends RepositoryFactoryBeanSupport<T, S, ID> {

private final T repository;
Expand Down
Loading