diff --git a/pom.xml b/pom.xml index 0122aba..d8c9244 100644 --- a/pom.xml +++ b/pom.xml @@ -36,6 +36,7 @@ 1.3 v1.4.2 2.7.4 + 3.13.6 @@ -78,6 +79,11 @@ jackson-databind ${version.jaxon} + + com.konghq + unirest-java + ${version.unirest} + diff --git a/src/main/java/io/ipfs/api/IPFS.java b/src/main/java/io/ipfs/api/IPFS.java index d022409..6426070 100755 --- a/src/main/java/io/ipfs/api/IPFS.java +++ b/src/main/java/io/ipfs/api/IPFS.java @@ -1,21 +1,30 @@ package io.ipfs.api; -import io.ipfs.cid.*; -import io.ipfs.multihash.Multihash; +import io.ipfs.cid.Cid; import io.ipfs.multiaddr.MultiAddress; - -import java.io.*; -import java.net.*; -import java.nio.file.*; +import io.ipfs.multihash.Multihash; +import kong.unirest.RawResponse; +import kong.unirest.Unirest; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URL; +import java.net.URLEncoder; +import java.nio.file.Paths; import java.util.*; import java.util.concurrent.*; -import java.util.function.*; -import java.util.stream.*; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class IPFS { public static final Version MIN_VERSION = Version.parse("0.4.11"); + public enum PinType {all, direct, indirect, recursive} + public List ObjectTemplates = Arrays.asList("unixfs-dir"); public List ObjectPatchTypes = Arrays.asList("add-link", "rm-link", "set-data", "append-data"); private static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 10_000; @@ -85,9 +94,10 @@ public IPFS(String host, int port, String version, int connectTimeoutMillis, int throw new RuntimeException(e); } } - + /** * Configure a HTTP client timeout + * * @param timeout (default 0: infinite timeout) * @return current IPFS object with configured timeout */ @@ -108,14 +118,15 @@ public List add(NamedStreamable file, boolean wrap, boolean hashOnly } public List add(List files, boolean wrap, boolean hashOnly) throws IOException { - Multipart m = new Multipart(protocol + "://" + host + ":" + port + version + "add?stream-channels=true&w="+wrap + "&n="+hashOnly, "UTF-8"); - for (NamedStreamable file: files) { + Multipart multipart = new Multipart(protocol + "://" + host + ":" + port + version + "add?stream-channels=true&w=" + wrap + "&n=" + hashOnly, "UTF-8"); + for (NamedStreamable file : files) { if (file.isDirectory()) { - m.addSubtree(Paths.get(""), file); + multipart.addSubtree(Paths.get(""), file); } else - m.addFilePart("file", Paths.get(""), file); - }; - String res = m.finish(); + multipart.addFilePart("file", Paths.get(""), file); + } + + String res = multipart.finish(); return JSONParser.parseStream(res).stream() .map(x -> MerkleNode.fromJSON((Map) x)) .collect(Collectors.toList()); @@ -125,7 +136,7 @@ public List ls(Multihash hash) throws IOException { Map reply = retrieveMap("ls?arg=" + hash); return ((List) reply.get("Objects")) .stream() - .flatMap(x -> ((List)((Map) x).get("Links")) + .flatMap(x -> ((List) ((Map) x).get("Links")) .stream() .map(MerkleNode::fromJSON)) .collect(Collectors.toList()); @@ -156,13 +167,13 @@ public List refs(Multihash hash, boolean recursive) throws IOExceptio } public Map resolve(String scheme, Multihash hash, boolean recursive) throws IOException { - return retrieveMap("resolve?arg=/" + scheme+"/"+hash +"&r="+recursive); + return retrieveMap("resolve?arg=/" + scheme + "/" + hash + "&r=" + recursive); } public String dns(String domain, boolean recursive) throws IOException { Map res = retrieveMap("dns?arg=" + domain + "&r=" + recursive); - return (String)res.get("Path"); + return (String) res.get("Path"); } public Map mount(java.io.File ipfsRoot, java.io.File ipnsRoot) throws IOException { @@ -170,8 +181,8 @@ public Map mount(java.io.File ipfsRoot, java.io.File ipnsRoot) throws IOExceptio ipfsRoot.mkdirs(); if (ipnsRoot != null && !ipnsRoot.exists()) ipnsRoot.mkdirs(); - return (Map)retrieveAndParse("mount?arg=" + (ipfsRoot != null ? ipfsRoot.getPath() : "/ipfs" ) + "&arg=" + - (ipnsRoot != null ? ipnsRoot.getPath() : "/ipns" )); + return (Map) retrieveAndParse("mount?arg=" + (ipfsRoot != null ? ipfsRoot.getPath() : "/ipfs") + "&arg=" + + (ipnsRoot != null ? ipnsRoot.getPath() : "/ipns")); } // level 2 commands @@ -189,7 +200,7 @@ public List local() throws IOException { */ public class Pin { public List add(Multihash hash) throws IOException { - return ((List)((Map)retrieveAndParse("pin/add?stream-channels=true&arg=" + hash)).get("Pins")) + return ((List) ((Map) retrieveAndParse("pin/add?stream-channels=true&arg=" + hash)).get("Pins")) .stream() .map(x -> Cid.decode((String) x)) .collect(Collectors.toList()); @@ -200,9 +211,9 @@ public Map ls() throws IOException { } public Map ls(PinType type) throws IOException { - return ((Map)(((Map)retrieveAndParse("pin/ls?stream-channels=true&t="+type.name())).get("Keys"))).entrySet() + return ((Map) (((Map) retrieveAndParse("pin/ls?stream-channels=true&t=" + type.name())).get("Keys"))).entrySet() .stream() - .collect(Collectors.toMap(x -> Cid.decode(x.getKey()), x-> x.getValue())); + .collect(Collectors.toMap(x -> Cid.decode(x.getKey()), x -> x.getValue())); } public List rm(Multihash hash) throws IOException { @@ -215,7 +226,9 @@ public List rm(Multihash hash, boolean recursive) throws IOException } public List update(Multihash existing, Multihash modified, boolean unpin) throws IOException { - return ((List)((Map)retrieveAndParse("pin/update?stream-channels=true&arg=" + existing + "&arg=" + modified + "&unpin=" + unpin)).get("Pins")) + return ((List) ((Map) retrieveAndParse( + "pin/update?stream-channels=true&arg=" + existing + "&arg=" + modified + "&unpin=" + unpin)) + .getOrDefault("Pins", new ArrayList<>())) .stream() .map(x -> Cid.decode((String) x)) .collect(Collectors.toList()); @@ -230,18 +243,18 @@ public KeyInfo gen(String name, Optional type, Optional size) th } public List list() throws IOException { - return ((List)((Map)retrieveAndParse("key/list")).get("Keys")) + return ((List) ((Map) retrieveAndParse("key/list")).get("Keys")) .stream() .map(KeyInfo::fromJson) .collect(Collectors.toList()); } public Object rename(String name, String newName) throws IOException { - return retrieveAndParse("key/rename?arg="+name + "&arg=" + newName); + return retrieveAndParse("key/rename?arg=" + name + "&arg=" + newName); } public List rm(String name) throws IOException { - return ((List)((Map)retrieveAndParse("key/rm?arg=" + name)).get("Keys")) + return ((List) ((Map) retrieveAndParse("key/rm?arg=" + name)).get("Keys")) .stream() .map(KeyInfo::fromJson) .collect(Collectors.toList()); @@ -266,18 +279,17 @@ public Object peers() throws IOException { } public Object peers(String topic) throws IOException { - return retrieveAndParse("pubsub/peers?arg="+topic); + return retrieveAndParse("pubsub/peers?arg=" + topic); } /** - * * @param topic - * @param data url encoded data to be published + * @param data url encoded data to be published * @return * @throws IOException */ public Object pub(String topic, String data) throws Exception { - return retrieveAndParse("pubsub/pub?arg="+topic + "&arg=" + data); + return retrieveAndParse("pubsub/pub?arg=" + topic + "&arg=" + data); } public Stream> sub(String topic) throws Exception { @@ -285,17 +297,18 @@ public Stream> sub(String topic) throws Exception { } public Stream> sub(String topic, ForkJoinPool threadSupplier) throws Exception { - return retrieveAndParseStream("pubsub/sub?arg=" + topic, threadSupplier).map(obj -> (Map)obj); + return retrieveAndParseStream("pubsub/sub?arg=" + topic, threadSupplier).map(obj -> (Map) obj); } /** * A synchronous method to subscribe which consumes the calling thread + * * @param topic * @param results * @throws IOException */ public void sub(String topic, Consumer> results, Consumer error) throws IOException { - retrieveAndParseStream("pubsub/sub?arg="+topic, res -> results.accept((Map)res), error); + retrieveAndParseStream("pubsub/sub?arg=" + topic, res -> results.accept((Map) res), error); } @@ -327,7 +340,7 @@ public List put(List data, Optional format) throws I public MerkleNode put(byte[] data, Optional format) throws IOException { String fmt = format.map(f -> "&format=" + f).orElse(""); - Multipart m = new Multipart(protocol +"://" + host + ":" + port + version+"block/put?stream-channels=true" + fmt, "UTF-8"); + Multipart m = new Multipart(protocol + "://" + host + ":" + port + version + "block/put?stream-channels=true" + fmt, "UTF-8"); try { m.addFilePart("file", Paths.get(""), new NamedStreamable.ByteArrayWrapper(data)); String res = m.finish(); @@ -346,7 +359,7 @@ public Map stat(Multihash hash) throws IOException { */ public class IPFSObject { public List put(List data) throws IOException { - Multipart m = new Multipart(protocol +"://" + host + ":" + port + version+"object/put?stream-channels=true", "UTF-8"); + Multipart m = new Multipart(protocol + "://" + host + ":" + port + version + "object/put?stream-channels=true", "UTF-8"); for (byte[] f : data) m.addFilePart("file", Paths.get(""), new NamedStreamable.ByteArrayWrapper(f)); String res = m.finish(); @@ -356,7 +369,7 @@ public List put(List data) throws IOException { public List put(String encoding, List data) throws IOException { if (!"json".equals(encoding) && !"protobuf".equals(encoding)) throw new IllegalArgumentException("Encoding must be json or protobuf"); - Multipart m = new Multipart(protocol +"://" + host + ":" + port + version+"object/put?stream-channels=true&encoding="+encoding, "UTF-8"); + Multipart m = new Multipart(protocol + "://" + host + ":" + port + version + "object/put?stream-channels=true&encoding=" + encoding, "UTF-8"); for (byte[] f : data) m.addFilePart("file", Paths.get(""), new NamedStreamable.ByteArrayWrapper(f)); String res = m.finish(); @@ -384,15 +397,15 @@ public byte[] data(Multihash hash) throws IOException { public MerkleNode _new(Optional template) throws IOException { if (template.isPresent() && !ObjectTemplates.contains(template.get())) - throw new IllegalStateException("Unrecognised template: "+template.get()); - Map json = retrieveMap("object/new?stream-channels=true"+(template.isPresent() ? "&arg=" + template.get() : "")); + throw new IllegalStateException("Unrecognised template: " + template.get()); + Map json = retrieveMap("object/new?stream-channels=true" + (template.isPresent() ? "&arg=" + template.get() : "")); return MerkleNode.fromJSON(json); } public MerkleNode patch(Multihash base, String command, Optional data, Optional name, Optional target) throws IOException { if (!ObjectPatchTypes.contains(command)) - throw new IllegalStateException("Illegal Object.patch command type: "+command); - String targetPath = "object/patch/"+command+"?arg=" + base.toBase58(); + throw new IllegalStateException("Illegal Object.patch command type: " + command); + String targetPath = "object/patch/" + command + "?arg=" + base.toBase58(); if (name.isPresent()) targetPath += "&arg=" + name.get(); if (target.isPresent()) @@ -410,7 +423,7 @@ public MerkleNode patch(Multihash base, String command, Optional data, O case "append-data": if (!data.isPresent()) throw new IllegalStateException("set-data requires data!"); - Multipart m = new Multipart(protocol +"://" + host + ":" + port + version+"object/patch/"+command+"?arg="+base.toBase58()+"&stream-channels=true", "UTF-8"); + Multipart m = new Multipart(protocol + "://" + host + ":" + port + version + "object/patch/" + command + "?arg=" + base.toBase58() + "&stream-channels=true", "UTF-8"); m.addFilePart("file", Paths.get(""), new NamedStreamable.ByteArrayWrapper(data.get())); String res = m.finish(); return MerkleNode.fromJSON(JSONParser.parse(res)); @@ -432,9 +445,9 @@ public Map publish(Multihash hash, Optional id) throws IOException { public String resolve(String ipns) throws IOException { Map res = (Map) retrieveAndParse("name/resolve?arg=" + ipns); - return (String)res.get("Path"); + return (String) res.get("Path"); } - + } public class DHT { @@ -457,7 +470,7 @@ public Map get(Multihash hash) throws IOException { } public Map put(String key, String value) throws IOException { - return retrieveMap("dht/put?arg=" + key + "&arg="+value); + return retrieveMap("dht/put?arg=" + key + "&arg=" + value); } } @@ -470,7 +483,7 @@ public Map ls(Multihash path) throws IOException { // Network commands public List bootstrap() throws IOException { - return ((List)retrieveMap("bootstrap/").get("Peers")) + return ((List) retrieveMap("bootstrap/").get("Peers")) .stream() .flatMap(x -> { try { @@ -487,7 +500,7 @@ public List list() throws IOException { } public List add(MultiAddress addr) throws IOException { - return ((List)retrieveMap("bootstrap/add?arg="+addr).get("Peers")).stream().map(x -> new MultiAddress(x)).collect(Collectors.toList()); + return ((List) retrieveMap("bootstrap/add?arg=" + addr).get("Peers")).stream().map(x -> new MultiAddress(x)).collect(Collectors.toList()); } public List rm(MultiAddress addr) throws IOException { @@ -495,7 +508,7 @@ public List rm(MultiAddress addr) throws IOException { } public List rm(MultiAddress addr, boolean all) throws IOException { - return ((List)retrieveMap("bootstrap/rm?"+(all ? "all=true&":"")+"arg="+addr).get("Peers")).stream().map(x -> new MultiAddress(x)).collect(Collectors.toList()); + return ((List) retrieveMap("bootstrap/rm?" + (all ? "all=true&" : "") + "arg=" + addr).get("Peers")).stream().map(x -> new MultiAddress(x)).collect(Collectors.toList()); } } @@ -506,7 +519,7 @@ public List rm(MultiAddress addr, boolean all) throws IOException public class Swarm { public List peers() throws IOException { Map m = retrieveMap("swarm/peers?stream-channels=true"); - return ((List)m.get("Peers")).stream() + return ((List) m.get("Peers")).stream() .flatMap(json -> { try { return Stream.of(Peer.fromJSON(json)); @@ -518,23 +531,23 @@ public List peers() throws IOException { public Map> addrs() throws IOException { Map m = retrieveMap("swarm/addrs?stream-channels=true"); - return ((Map)m.get("Addrs")).entrySet() + return ((Map) m.get("Addrs")).entrySet() .stream() .collect(Collectors.toMap( e -> Multihash.fromBase58(e.getKey()), - e -> ((List)e.getValue()) + e -> ((List) e.getValue()) .stream() .map(MultiAddress::new) .collect(Collectors.toList()))); } public Map connect(MultiAddress multiAddr) throws IOException { - Map m = retrieveMap("swarm/connect?arg="+multiAddr); + Map m = retrieveMap("swarm/connect?arg=" + multiAddr); return m; } public Map disconnect(MultiAddress multiAddr) throws IOException { - Map m = retrieveMap("swarm/disconnect?arg="+multiAddr); + Map m = retrieveMap("swarm/disconnect?arg=" + multiAddr); return m; } } @@ -595,8 +608,8 @@ public Map bw() throws IOException { // Tools public String version() throws IOException { - Map m = (Map)retrieveAndParse("version"); - return (String)m.get("Version"); + Map m = (Map) retrieveAndParse("version"); + return (String) m.get("Version"); } public Map commands() throws IOException { @@ -609,17 +622,17 @@ public Map log() throws IOException { public class Config { public Map show() throws IOException { - return (Map)retrieveAndParse("config/show"); + return (Map) retrieveAndParse("config/show"); } public void replace(NamedStreamable file) throws IOException { - Multipart m = new Multipart(protocol +"://" + host + ":" + port + version+"config/replace?stream-channels=true", "UTF-8"); + Multipart m = new Multipart(protocol + "://" + host + ":" + port + version + "config/replace?stream-channels=true", "UTF-8"); m.addFilePart("file", Paths.get(""), file); String res = m.finish(); } public Object get(String key) throws IOException { - Map m = (Map)retrieveAndParse("config?arg="+key); + Map m = (Map) retrieveAndParse("config?arg=" + key); return m.get("Value"); } @@ -643,7 +656,7 @@ public Object log() throws IOException { } private Map retrieveMap(String path) throws IOException { - return (Map)retrieveAndParse(path); + return (Map) retrieveAndParse(path); } private Object retrieveAndParse(String path) throws IOException { @@ -652,21 +665,20 @@ private Object retrieveAndParse(String path) throws IOException { } private Stream retrieveAndParseStream(String path, ForkJoinPool executor) throws IOException { - BlockingQueue> results = new LinkedBlockingQueue<>(); - InputStream in = retrieveStream(path); - executor.submit(() -> getObjectStream(in, - res -> { - results.add(CompletableFuture.completedFuture(res)); - }, - err -> { - CompletableFuture fut = new CompletableFuture<>(); - fut.completeExceptionally(err); - results.add(fut); - }) - ); +// BlockingQueue> results = new LinkedBlockingQueue<>(); +// InputStream stream = +// executor.submit(() -> getObjectStream(stream, +// res -> results.add(CompletableFuture.completedFuture(res)), +// err -> { +// CompletableFuture fut = new CompletableFuture<>(); +// fut.completeExceptionally(err); +// results.add(fut); +// }) +// ); return Stream.generate(() -> { try { - return JSONParser.parse(new String(results.take().get())); + String json = new String(retrieve(path)); + return JSONParser.parse(json); } catch (Exception e) { throw new RuntimeException(e); } @@ -675,6 +687,7 @@ private Stream retrieveAndParseStream(String path, ForkJoinPool executor /** * A synchronous stream retriever that consumes the calling thread + * * @param path * @param results * @throws IOException @@ -689,87 +702,48 @@ private byte[] retrieve(String path) throws IOException { } private static byte[] get(URL target, int connectTimeoutMillis, int readTimeoutMillis) throws IOException { - HttpURLConnection conn = configureConnection(target, "POST", connectTimeoutMillis, readTimeoutMillis); - conn.setDoOutput(true); - /* See IPFS commit for why this is a POST and not a GET https://github.com/ipfs/go-ipfs/pull/7097 - This commit upgrades go-ipfs-cmds and configures the commands HTTP API Handler - to only allow POST/OPTIONS, disallowing GET and others in the handling of - command requests in the IPFS HTTP API (where before every type of request - method was handled, with GET/POST/PUT/PATCH being equivalent). - - The Read-Only commands that the HTTP API attaches to the gateway endpoint will - additional handled GET as they did before (but stop handling PUT,DELETEs). - - By limiting the request types we address the possibility that a website - accessed by a browser abuses the IPFS API by issuing GET requests to it which - have no Origin or Referrer set, and are thus bypass CORS and CSRF protections. - - This is a breaking change for clients that relay on GET requests against the - HTTP endpoint (usually :5001). Applications integrating on top of the - gateway-read-only API should still work (including cross-domain access). - */ - conn.setRequestMethod("POST"); - conn.setRequestProperty("Content-Type", "application/json"); - try { - OutputStream out = conn.getOutputStream(); - out.write(new byte[0]); - out.flush(); - out.close(); - InputStream in = conn.getInputStream(); - ByteArrayOutputStream resp = new ByteArrayOutputStream(); - - byte[] buf = new byte[4096]; - int r; - while ((r = in.read(buf)) >= 0) - resp.write(buf, 0, r); - return resp.toByteArray(); - } catch (ConnectException e) { - throw new RuntimeException("Couldn't connect to IPFS daemon at "+target+"\n Is IPFS running?"); - } catch (IOException e) { - InputStream errorStream = conn.getErrorStream(); - String err = errorStream == null ? e.getMessage() : new String(readFully(errorStream)); - throw new RuntimeException("IOException contacting IPFS daemon.\n"+err+"\nTrailer: " + conn.getHeaderFields().get("Trailer"), e); - } + return Unirest.post(target.toString()) + .header("accept", "application/json") + .contentType("application/json") + .connectTimeout(connectTimeoutMillis) + .socketTimeout(readTimeoutMillis) + .asBytes() + .ifFailure(response -> { + response.getParsingError().ifPresent(e -> { + throw new RuntimeException("IO Exception:" + e + " " + e.getOriginalBody()); + }); + }) + .getBody(); + } private void getObjectStream(InputStream in, Consumer processor, Consumer error) { - byte LINE_FEED = (byte)10; + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); try { - ByteArrayOutputStream resp = new ByteArrayOutputStream(); - - byte[] buf = new byte[4096]; - int r; - while ((r = in.read(buf)) >= 0) { - resp.write(buf, 0, r); - if (buf[r - 1] == LINE_FEED) { - processor.accept(resp.toByteArray()); - resp.reset(); - } + while (reader.ready()) { + String line = reader.readLine(); + processor.accept(line.getBytes()); } - } catch (IOException e) { - error.accept(e); + } catch (IOException ex) { + error.accept(ex); } + } private List getAndParseStream(String path) throws IOException { - InputStream in = retrieveStream(path); - byte LINE_FEED = (byte)10; - - ByteArrayOutputStream resp = new ByteArrayOutputStream(); - byte[] buf = new byte[4096]; - int r; List res = new ArrayList<>(); - while ((r = in.read(buf)) >= 0) { - resp.write(buf, 0, r); - if (buf[r - 1] == LINE_FEED) { - res.add(JSONParser.parse(new String(resp.toByteArray()))); - resp.reset(); - } + InputStream in = retrieveStream(path); + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + + while (reader.ready()) { + String line = reader.readLine(); + res.add(JSONParser.parse(line)); } return res; + } private InputStream retrieveStream(String path) throws IOException { @@ -778,53 +752,44 @@ private InputStream retrieveStream(String path) throws IOException { } private static InputStream getStream(URL target, int connectTimeoutMillis, int readTimeoutMillis) throws IOException { - HttpURLConnection conn = configureConnection(target, "POST", connectTimeoutMillis, readTimeoutMillis); - return conn.getInputStream(); + try { + return Unirest.post(target.toString()) + .header("accept", "application/json") + .contentType("application/json") + .connectTimeout(connectTimeoutMillis) + .socketTimeout(readTimeoutMillis) + .asObjectAsync(RawResponse::getContent) + .get() + .getBody(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } } - private Map postMap(String path, byte[] body, Map headers) throws IOException { + private Map postMap(String path, byte[] body, Map headers) throws IOException { URL target = new URL(protocol, host, port, version + path); - return (Map) JSONParser.parse(new String(post(target, body, headers, connectTimeoutMillis, readTimeoutMillis))); + return (Map) JSONParser.parse(new String(post(target, body, headers, connectTimeoutMillis, readTimeoutMillis))); } private static byte[] post(URL target, byte[] body, Map headers, int connectTimeoutMillis, int readTimeoutMillis) throws IOException { - HttpURLConnection conn = configureConnection(target, "POST", connectTimeoutMillis, readTimeoutMillis); - for (String key: headers.keySet()) - conn.setRequestProperty(key, headers.get(key)); - conn.setDoOutput(true); - OutputStream out = conn.getOutputStream(); - out.write(body); - out.flush(); - out.close(); - - InputStream in = conn.getInputStream(); - return readFully(in); - } - - private static final byte[] readFully(InputStream in) { - try { - ByteArrayOutputStream resp = new ByteArrayOutputStream(); - byte[] buf = new byte[4096]; - int r; - while ((r=in.read(buf)) >= 0) - resp.write(buf, 0, r); - return resp.toByteArray(); - - } catch(IOException ex) { - throw new RuntimeException("Error reading InputStrean", ex); - } + return Unirest.post(target.toString()) + .header("accept", "application/json") + .headers(headers) + .body(body) + .contentType("application/json") + .connectTimeout(connectTimeoutMillis) + .socketTimeout(readTimeoutMillis) + .asBytes() + .ifFailure(response -> { + response.getParsingError().ifPresent(e -> { + throw new RuntimeException("IO Exception:" + e + " " + e.getOriginalBody()); + }); + }) + .getBody(); } private static boolean detectSSL(MultiAddress multiaddress) { return multiaddress.toString().contains("/https"); } - - private static HttpURLConnection configureConnection(URL target, String method, int connectTimeoutMillis, int readTimeoutMillis) throws IOException { - HttpURLConnection conn = (HttpURLConnection) target.openConnection(); - conn.setRequestMethod(method); - conn.setRequestProperty("Content-Type", "application/json"); - conn.setConnectTimeout(connectTimeoutMillis); - conn.setReadTimeout(readTimeoutMillis); - return conn; - } + } diff --git a/src/main/java/io/ipfs/api/JSONParser.java b/src/main/java/io/ipfs/api/JSONParser.java index 08bc0a4..f01dfe3 100644 --- a/src/main/java/io/ipfs/api/JSONParser.java +++ b/src/main/java/io/ipfs/api/JSONParser.java @@ -5,10 +5,7 @@ import com.fasterxml.jackson.databind.ObjectWriter; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; +import java.util.*; import java.util.stream.Collectors; public class JSONParser { @@ -16,7 +13,7 @@ public class JSONParser { private static ObjectWriter printer; private static final ObjectMapper mapper = new ObjectMapper(); - public static Object parse(String json) { + public static Map parse(String json) { return parse(json, HashMap.class); } diff --git a/src/test/java/io/ipfs/api/APITest.java b/src/test/java/io/ipfs/api/APITest.java index 82af176..295553b 100755 --- a/src/test/java/io/ipfs/api/APITest.java +++ b/src/test/java/io/ipfs/api/APITest.java @@ -1,17 +1,20 @@ package io.ipfs.api; -import io.ipfs.api.cbor.*; -import io.ipfs.cid.*; -import io.ipfs.multihash.Multihash; +import io.ipfs.api.cbor.CborObject; +import io.ipfs.cid.Cid; import io.ipfs.multiaddr.MultiAddress; -import org.junit.*; +import io.ipfs.multihash.Multihash; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; import java.io.*; -import java.nio.file.*; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.*; -import java.util.concurrent.*; -import java.util.function.*; -import java.util.stream.*; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.junit.Assert.assertTrue; @@ -47,7 +50,7 @@ public void dagCbor() throws IOException { Cid cid = (Cid) put.hash; byte[] get = ipfs.dag.get(cid); - Assert.assertTrue("Raw data equal", ((Map)JSONParser.parse(new String(get))).get("data").equals(value)); + Assert.assertTrue("Raw data equal", ((Map) JSONParser.parse(new String(get))).get("data").equals(value)); Cid expected = Cid.decode("zdpuApemz4XMURSCkBr9W5y974MXkSbeDfLeZmiQTPpvkatFF"); Assert.assertTrue("Correct cid returned", cid.equals(expected)); @@ -141,28 +144,28 @@ public void directoryTest() throws IOException { List lsResult = ipfs.ls(addResult.hash); if (lsResult.size() != 2) throw new IllegalStateException("Incorrect number of objects in ls!"); - if (! lsResult.stream().map(x -> x.name.get()).collect(Collectors.toSet()).equals(new HashSet<>(Arrays.asList(subdirName, fileName)))) + if (!lsResult.stream().map(x -> x.name.get()).collect(Collectors.toSet()).equals(new HashSet<>(Arrays.asList(subdirName, fileName)))) throw new IllegalStateException("Dir not returned in ls!"); byte[] catResult = ipfs.cat(addResult.hash, "/" + fileName); - if (! Arrays.equals(catResult, fileContents)) + if (!Arrays.equals(catResult, fileContents)) throw new IllegalStateException("Different contents!"); byte[] catResult2 = ipfs.cat(addResult.hash, "/" + subdirName + "/" + subfileName); - if (! Arrays.equals(catResult2, file2Contents)) + if (!Arrays.equals(catResult2, file2Contents)) throw new IllegalStateException("Different contents!"); } -// @Test + // @Test public void largeFileTest() throws IOException { - byte[] largerData = new byte[100*1024*1024]; + byte[] largerData = new byte[100 * 1024 * 1024]; new Random(1).nextBytes(largerData); NamedStreamable.ByteArrayWrapper largeFile = new NamedStreamable.ByteArrayWrapper("nontrivial.txt", largerData); fileTest(largeFile); } -// @Test + // @Test public void hugeFileStreamTest() throws IOException { - byte[] hugeData = new byte[1000*1024*1024]; + byte[] hugeData = new byte[1000 * 1024 * 1024]; new Random(1).nextBytes(hugeData); NamedStreamable.ByteArrayWrapper largeFile = new NamedStreamable.ByteArrayWrapper("massive.txt", hugeData); MerkleNode addResult = ipfs.add(largeFile).get(0); @@ -176,7 +179,7 @@ public void hugeFileStreamTest() throws IOException { try { System.arraycopy(buf, 0, res, offset, r); offset += r; - }catch (Exception e){ + } catch (Exception e) { e.printStackTrace(); } } @@ -205,7 +208,7 @@ public void hashOnly() throws IOException { throw new IllegalStateException("Object shouldn't be present!"); } - public void fileTest(NamedStreamable file) throws IOException{ + public void fileTest(NamedStreamable file) throws IOException { MerkleNode addResult = ipfs.add(file).get(0); byte[] catResult = ipfs.cat(addResult.hash); byte[] getResult = ipfs.get(addResult.hash); @@ -312,7 +315,7 @@ public void indirectPinTest() throws IOException { boolean childPresentAfterGC = ls2.containsKey(child); if (!childPresentAfterGC) throw new IllegalStateException("Child not present!"); -} + } @Test public void objectPatch() throws IOException { @@ -339,9 +342,9 @@ public void objectPatch() throws IOException { MerkleNode twicePatched = ipfs.object.patch(patched.hash, "append-data", Optional.of(data), Optional.empty(), Optional.empty()); byte[] twicePatchedResult = ipfs.object.data(twicePatched.hash); - byte[] twice = new byte[2*data.length]; - for (int i=0; i < 2; i++) - System.arraycopy(data, 0, twice, i*data.length, data.length); + byte[] twice = new byte[2 * data.length]; + for (int i = 0; i < 2; i++) + System.arraycopy(data, 0, twice, i * data.length, data.length); if (!Arrays.equals(twicePatchedResult, twice)) throw new RuntimeException("object.patch: returned data after append != stored data!"); @@ -350,7 +353,7 @@ public void objectPatch() throws IOException { @Test public void refsTest() throws IOException { List local = ipfs.refs.local(); - for (Multihash ref: local) { + for (Multihash ref : local) { Object refs = ipfs.refs(ref, false); } } @@ -393,7 +396,7 @@ public void publish() throws Exception { // Add a DAG node to IPFS MerkleNode merkleNode = ipfs.dag.put("json", json.getBytes()); - Assert.assertEquals("expected to be zdpuAknRh1Kro2r2xBDKiXyTiwA3Nu5XcmvjRPA1VNjH41NF7" , "zdpuAknRh1Kro2r2xBDKiXyTiwA3Nu5XcmvjRPA1VNjH41NF7", merkleNode.hash.toString()); + Assert.assertEquals("expected to be zdpuAknRh1Kro2r2xBDKiXyTiwA3Nu5XcmvjRPA1VNjH41NF7", "zdpuAknRh1Kro2r2xBDKiXyTiwA3Nu5XcmvjRPA1VNjH41NF7", merkleNode.hash.toString()); // Get a DAG node byte[] res = ipfs.dag.get((Cid) merkleNode.hash); @@ -413,16 +416,23 @@ public void pubsubSynchronous() throws Exception { List> res = Collections.synchronizedList(new ArrayList<>()); new Thread(() -> { try { - ipfs.pubsub.sub(topic, res::add, t -> t.printStackTrace()); + ipfs.pubsub.sub(topic, res::add, Throwable::printStackTrace); } catch (IOException e) { - throw new RuntimeException(e);} + throw new RuntimeException(e); + } }).start(); int nMessages = 100; + int maxCount = 100; for (int i = 1; i < nMessages; ) { ipfs.pubsub.pub(topic, "Hello!"); if (res.size() >= i) { i++; + } else { + maxCount--; + } + if (maxCount <= 0) { + break; } } Assert.assertTrue(res.size() > nMessages - 5); // pubsub is not reliable so it loses messages @@ -436,7 +446,7 @@ public void pubsub() throws Exception { Object pub = ipfs.pubsub.pub(topic, data); Object pub2 = ipfs.pubsub.pub(topic, "G'day"); List results = sub.limit(2).collect(Collectors.toList()); - Assert.assertTrue( ! results.get(0).equals(Collections.emptyMap())); + Assert.assertTrue(!results.get(0).equals(Collections.emptyMap())); } private static String toEscapedHex(byte[] in) throws IOException { @@ -449,7 +459,7 @@ private static String toEscapedHex(byte[] in) throws IOException { } /** - * Test that merkle links in values of a cbor map are followed during recursive pins + * Test that merkle links in values of a cbor map are followed during recursive pins */ @Test public void merkleLinkInMap() throws IOException { @@ -518,7 +528,7 @@ public void recursiveRefs() throws IOException { } /** - * Test that merkle links as a root object are followed during recursive pins + * Test that merkle links as a root object are followed during recursive pins */ @Test public void rootMerkleLink() throws IOException { @@ -549,7 +559,7 @@ public void rootMerkleLink() throws IOException { } /** - * Test that a cbor null is allowed as an object root + * Test that a cbor null is allowed as an object root */ @Test public void rootNull() throws IOException { @@ -569,7 +579,7 @@ public void rootNull() throws IOException { } /** - * Test that merkle links in a cbor list are followed during recursive pins + * Test that merkle links in a cbor list are followed during recursive pins */ @Test public void merkleLinkInList() throws IOException { @@ -598,11 +608,12 @@ public void merkleLinkInList() throws IOException { public void fileContentsTest() throws IOException { ipfs.repo.gc(); List local = ipfs.refs.local(); - for (Multihash hash: local) { + for (Multihash hash : local) { try { Map ls = ipfs.file.ls(hash); return; - } catch (Exception e) {} // non unixfs files will throw an exception here + } catch (Exception e) { + } // non unixfs files will throw an exception here } }