Commit bd93c945 authored by zeroleak's avatar zeroleak
Browse files

move IHttpClient + AbstractOrchestrator + rxjava to extlibj

parent 7dee1ebf
......@@ -25,7 +25,7 @@
<dependency>
<groupId>io.samourai.code.wallet</groupId>
<artifactId>extlibj</artifactId>
<version>0.0.13</version>
<version>develop-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.samourai.code.wallet</groupId>
......@@ -53,11 +53,6 @@
<artifactId>throwing-supplier</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.15</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
......
package com.samourai.http.client;
import com.samourai.wallet.api.backend.IBackendClient;
import io.reactivex.Observable;
import java.util.Map;
import java8.util.Optional;
public interface IHttpClient extends IBackendClient {
void connect() throws Exception;
<T> Observable<Optional<T>> postJson(
String url, Class<T> responseType, Map<String, String> headers, Object body);
}
package com.samourai.http.client;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.samourai.wallet.api.backend.beans.HttpException;
import io.reactivex.Observable;
import java.util.Map;
import java.util.concurrent.Callable;
import java8.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class JacksonHttpClient implements IHttpClient {
private static final Logger log = LoggerFactory.getLogger(JacksonHttpClient.class);
private ObjectMapper objectMapper;
public JacksonHttpClient() {
this.objectMapper = new ObjectMapper();
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
}
protected abstract String requestJsonGet(String urlStr, Map<String, String> headers)
throws Exception;
protected abstract String requestJsonPost(
String urlStr, Map<String, String> headers, String jsonBody) throws Exception;
protected abstract String requestJsonPostUrlEncoded(
String urlStr, Map<String, String> headers, Map<String, String> body) throws Exception;
protected void onRequestError(Exception e) {}
@Override
public <T> T getJson(String urlStr, Class<T> responseType, Map<String, String> headers)
throws HttpException {
try {
String responseContent = requestJsonGet(urlStr, headers);
T result = parseJson(responseContent, responseType);
return result;
} catch (Exception e) {
onRequestError(e);
if (log.isDebugEnabled()) {
log.error("getJson failed: " + urlStr + ":" + e.getMessage());
}
if (!(e instanceof HttpException)) {
e = new HttpException(e, null);
}
throw (HttpException) e;
}
}
@Override
public <T> Observable<Optional<T>> postJson(
final String urlStr,
final Class<T> responseType,
final Map<String, String> headers,
final Object bodyObj) {
return httpObservable(
new Callable<T>() {
@Override
public T call() throws Exception {
try {
String jsonBody = objectMapper.writeValueAsString(bodyObj);
String responseContent = requestJsonPost(urlStr, headers, jsonBody);
T result = parseJson(responseContent, responseType);
return result;
} catch (Exception e) {
onRequestError(e);
if (log.isDebugEnabled()) {
log.error("postJson failed: " + urlStr, e);
}
throw e;
}
}
});
}
@Override
public <T> T postUrlEncoded(
String urlStr, Class<T> responseType, Map<String, String> headers, Map<String, String> body)
throws HttpException {
try {
String responseContent = requestJsonPostUrlEncoded(urlStr, headers, body);
T result = parseJson(responseContent, responseType);
return result;
} catch (Exception e) {
onRequestError(e);
if (log.isDebugEnabled()) {
log.error("postUrlEncoded failed: " + urlStr, e);
}
if (!(e instanceof HttpException)) {
e = new HttpException(e, null);
}
throw (HttpException) e;
}
}
private <T> T parseJson(String responseContent, Class<T> responseType) throws Exception {
T result;
if (log.isTraceEnabled()) {
String responseStr =
(responseContent != null
? responseContent.substring(0, Math.min(responseContent.length(), 50))
: "null");
log.trace(
"response["
+ (responseType != null ? responseType.getCanonicalName() : "null")
+ "]: "
+ responseStr);
}
if (String.class.equals(responseType)) {
result = (T) responseContent;
} else {
result = objectMapper.readValue(responseContent, responseType);
}
return result;
}
protected <T> Observable<Optional<T>> httpObservable(final Callable<T> supplier) {
return Observable.fromCallable(
new Callable<Optional<T>>() {
@Override
public Optional<T> call() throws Exception {
try {
return Optional.ofNullable(supplier.call());
} catch (Exception e) {
if (!(e instanceof HttpException)) {
e = new HttpException(e, null);
}
throw e;
}
}
});
}
protected ObjectMapper getObjectMapper() {
return objectMapper;
}
}
package com.samourai.whirlpool.client.wallet.orchestrator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractOrchestrator {
private Logger log;
private final int LOOP_DELAY;
private final int START_DELAY;
private final Integer LAST_RUN_DELAY;
private boolean started;
protected Thread myThread;
private boolean dontDisturb;
private long lastRun;
private boolean lastRunSetInLoop;
public AbstractOrchestrator(int loopDelay) {
this(loopDelay, 0, null);
}
public AbstractOrchestrator(int loopDelayMs, int startDelayMs, Integer lastRunDelaySeconds) {
this.log = LoggerFactory.getLogger(getClass().getName());
this.LOOP_DELAY = loopDelayMs;
this.START_DELAY = startDelayMs;
this.LAST_RUN_DELAY = lastRunDelaySeconds;
resetOrchestrator();
}
protected void resetOrchestrator() {
this.dontDisturb = false;
this.lastRun = 0;
}
public synchronized void start(boolean daemon) {
if (isStarted()) {
log.error("Cannot start: already started");
return;
}
if (log.isDebugEnabled()) {
log.debug(
"Starting... loopDelay="
+ LOOP_DELAY
+ ", lastRunDelay="
+ (LAST_RUN_DELAY != null ? LAST_RUN_DELAY : "null"));
}
this.started = true;
this.myThread =
new Thread(
new Runnable() {
@Override
public void run() {
if (START_DELAY > 0) {
doSleep(START_DELAY);
}
while (started) {
lastRunSetInLoop = false;
runOrchestrator();
// orchestrator may have been stopped in the meantime, as function is not
// synchronized
if (lastRunSetInLoop && LAST_RUN_DELAY != null) {
// wait for lastRunDelay if we did run in this loop
waitForLastRunDelay(LAST_RUN_DELAY);
} else {
doSleep(LOOP_DELAY);
}
lastRunSetInLoop = false;
}
// thread exiting
myThread = null;
if (log.isDebugEnabled()) {
log.debug("Ended. started=" + started);
}
resetOrchestrator();
}
},
getClass().getSimpleName());
this.myThread.setDaemon(daemon);
this.myThread.start();
}
protected abstract void runOrchestrator();
public void quickStop() {
this.started = false;
}
public synchronized void stop() {
if (!isStarted()) {
log.error("Cannot stop: not started");
return;
}
if (log.isDebugEnabled()) {
log.debug("Ending...");
}
this.started = false;
synchronized (myThread) {
myThread.notify();
}
}
protected synchronized void notifyOrchestrator() {
if (isStarted() && !isDontDisturb()) {
if (log.isTraceEnabled()) {
log.trace("notifying");
}
synchronized (myThread) {
myThread.notify();
}
} else {
if (log.isTraceEnabled()) {
log.trace("NOT notifying (dontDisturb)");
}
}
}
protected void sleepOrchestrator(long timeToWait, boolean withDontDisturb) {
if (withDontDisturb) {
dontDisturb = true;
}
doSleep(timeToWait);
if (withDontDisturb) {
dontDisturb = false;
}
}
private void doSleep(long timeToWait) {
try {
synchronized (myThread) {
myThread.wait(timeToWait);
}
} catch (InterruptedException e) {
}
}
private long computeWaitForLastRunDelay(int delay) {
long elapsedTimeSinceLastRun = System.currentTimeMillis() - lastRun;
long timeToWait = (delay * 1000) - elapsedTimeSinceLastRun;
return timeToWait;
}
private boolean waitForLastRunDelay(int delay) {
long timeToWait = computeWaitForLastRunDelay(delay);
if (timeToWait > 0) {
if (log.isDebugEnabled()) {
log.debug("Sleeping for lastRunDelay (" + (timeToWait / 1000) + "s to wait)");
}
sleepOrchestrator(timeToWait, true);
return true;
}
return false;
}
protected void setLastRun() {
this.lastRun = System.currentTimeMillis();
this.lastRunSetInLoop = true;
}
public boolean isStarted() {
return started;
}
public boolean isDontDisturb() {
return dontDisturb;
}
}
package com.samourai.whirlpool.client.wallet.orchestrator;
import com.samourai.wallet.api.backend.beans.UnspentResponse.UnspentOutput;
import com.samourai.wallet.util.AbstractOrchestrator;
import com.samourai.whirlpool.client.exception.EmptyWalletException;
import com.samourai.whirlpool.client.exception.UnconfirmedUtxoException;
import com.samourai.whirlpool.client.wallet.WhirlpoolWallet;
......
package com.samourai.whirlpool.client.wallet.orchestrator;
import com.samourai.wallet.util.AbstractOrchestrator;
import com.samourai.whirlpool.client.wallet.data.AbstractSupplier;
import java.util.Collection;
import org.slf4j.Logger;
......
package com.samourai.whirlpool.client.wallet.orchestrator;
import com.samourai.wallet.util.AbstractOrchestrator;
import com.samourai.whirlpool.client.WhirlpoolClient;
import com.samourai.whirlpool.client.exception.NotifiableException;
import com.samourai.whirlpool.client.mix.listener.MixFailReason;
......
package com.samourai.whirlpool.client.wallet.orchestrator;
import com.samourai.wallet.util.AbstractOrchestrator;
import com.samourai.whirlpool.client.wallet.data.AbstractPersistableSupplier;
import com.samourai.whirlpool.client.wallet.data.AbstractSupplier;
import java.util.Collection;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment