Commit ae17d5f3 authored by zeroleak's avatar zeroleak
Browse files

use Observable for HttpClient

parent c10ea081
......@@ -10,6 +10,8 @@ import io.reactivex.Observable;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.Callable;
import java8.util.Optional;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
......@@ -63,33 +65,26 @@ public class JavaHttpClient implements IHttpClient {
}
@Override
public synchronized <T> Observable<T> postJsonOverTor(
String urlStr, Class<T> responseType, Map<String, String> headers, Object bodyObj)
throws HttpException {
final boolean isRegOut = true;
try {
Request request = computeHttpRequest(isRegOut, urlStr, HttpMethod.POST, headers);
String jsonBody = objectMapper.writeValueAsString(bodyObj);
request.content(
new StringContentProvider(
MediaType.APPLICATION_JSON_VALUE, jsonBody, StandardCharsets.UTF_8));
return Observable.fromCallable(
() -> {
public synchronized <T> Observable<Optional<T>> postJsonOverTor(
String urlStr, Class<T> responseType, Map<String, String> headers, Object bodyObj) {
return httpObservable(
() -> {
final boolean isRegOut = true;
try {
Request request = computeHttpRequest(isRegOut, urlStr, HttpMethod.POST, headers);
String jsonBody = objectMapper.writeValueAsString(bodyObj);
request.content(
new StringContentProvider(
MediaType.APPLICATION_JSON_VALUE, jsonBody, StandardCharsets.UTF_8));
ContentResponse response = request.send();
T result = parseResponse(response, responseType);
return result;
});
} catch (Exception e) {
clearHttpClient(isRegOut);
if (log.isDebugEnabled()) {
log.error("postJsonOverTor failed: " + urlStr, e);
}
if (!(e instanceof HttpException)) {
e = new HttpException(e, null);
}
throw (HttpException) e;
}
} catch (Exception e) {
clearHttpClient(isRegOut);
throw e;
}
});
}
@Override
......@@ -215,4 +210,18 @@ public class JavaHttpClient implements IHttpClient {
}
}
}
private <T> Observable<Optional<T>> httpObservable(Callable<T> supplier) {
return Observable.fromCallable(
() -> {
try {
return Optional.ofNullable(supplier.call());
} catch (Exception e) {
if (!(e instanceof HttpException)) {
e = new HttpException(e, null);
}
throw (HttpException) e;
}
});
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment