package org.redisson.reactive;

import java.util.concurrent.Callable;
import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncService;
import org.redisson.connection.ConnectionManager;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:BOOT-INF/lib/redisson-3.10.7.jar:org/redisson/reactive/CommandReactiveService.class */
public class CommandReactiveService extends CommandAsyncService implements CommandReactiveExecutor {
    public CommandReactiveService(ConnectionManager connectionManager) {
        super(connectionManager);
    }

    public <R> Mono<R> reactive(Callable<RFuture<R>> callable) {
        return Flux.create(fluxSink -> {
            fluxSink.onRequest(j -> {
                try {
                    RFuture rFuture = (RFuture) callable.call();
                    fluxSink.onDispose(() -> {
                        rFuture.cancel(true);
                    });
                    rFuture.onComplete((obj, th) -> {
                        if (th != null) {
                            fluxSink.error(th);
                            return;
                        }
                        if (obj != null) {
                            fluxSink.next(obj);
                        }
                        fluxSink.complete();
                    });
                } catch (Exception e) {
                    fluxSink.error(e);
                }
            });
        }).next();
    }
}
