Build a Set of Blockchains Using Java PlatoBlockchain Data Intelligence. Vertical Search. Ai.

Build a Set of Blockchains Using Java

Trust via peer-to-peer networking

Spider web
Image by ClaudiaWollesen from Pixabay

In the first part of this series, we made a single blockchain. Now we’re going to make a set of them and get them talking to one another. The real point of the blockchain is a distributed system of verification. You can add blocks from any nodes and eventually it gets to peer nodes so everyone agrees on what the blockchain looks like. This is important because you don’t want a single source of truth in a distributed system.

There is one problem that comes up right away: Each node is two services, plus a MongoDB and a Kafka message bus that all need to talk to one another. But I want to test and demonstrate multiple nodes on a single host (my laptop). I’ve been running with Docker compose, so I’ll have to make one Docker compose file for each node to help keep the ports straight.

We’ll be working on a node service that will allow the nodes to work with one another. This will get input from two places, a restful interface that allows you to add and list the nodes connected, and a message bus provided by Kafka that notifies the node service of changes in the local blockchain that need to be broadcast to the peer nodes.

In order for the services to be used as images, I’ll be using Google Jib maven plugin. This is the simplest way to create an image from a maven build. We add the following to the pom file of each module that we need an image for:

<build>
<plugins>
<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<version>2.7.1</version>
</plugin>
</plugins>
</build>

In our case, we can use default values for the configuration. Next, you can run mvn clean install jib:build and it will create an image you can use in your Docker compose file.

Let’s take what we have so far and start it all up with an all-inclusive Docker compose file called docker-compose-node1:

version: '3.1'
services:
mongo:
image: mongo
restart: always
ports:
- 27017
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: example
mongo-express:
image: mongo-express
restart: always
ports:
- 8081:8081
environment:
ME_CONFIG_MONGODB_ADMINUSERNAME: root
ME_CONFIG_MONGODB_ADMINPASSWORD: example
zookeeper:
image: confluentinc/cp-zookeeper:latest
ports:
- 2181
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:latest
ports:
- 9092
- 29092
links:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
blockchain:
image: rlkamradt/blockchain:1.0-SNAPSHOT
ports:
- 8080:8080
environment:
MONGO_HOST: mongo
SPRING_KAFKA_BOOTSTRAP-SERVERS: kafka:29092

You’ll notice a few things have changed. First, I removed the external port from Kafka so it will only be accessible inside the compose network. Then I’ve added the blockchain app with the image created by the build. Finally, I overrode a few spring properties with the environment variable so that it accesses mongo and Kafka from inside the compose network. Compose will create DNS entries so that services run from the same compose file can access each other. Run it with this command docker compose -f docker-compose-node1.yaml up -d and make sure you can still hit the basic blockchain API:

Now our application is up and running and has created the genesis block.

There is code already in the blockchain service that sends a message to Kafka whenever we add a block or add a transaction. We need to create a new service that will read those events and broadcast them to a list of peers. Let’s bring down what we have running for now and add in a simple node service that will log a message when it’s received. We’ll need a new module in the project — this will be another Spring Boot service and it will be able to talk to external nodes.

First, we will need to define a node, which is just a URL. Here’s the Node:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
@Document
public class Node {
@Id
private String id;
private String url;
}

We are storing it in a collection in the MongoDB, so we need a repository interface:

@Repository
public interface NodeRepository extends
ReactiveMongoRepository<Node, String> {
Mono<Node> findByUrl(String url);
}

Next, we need to have a controller to be able to see and add Node objects:

@Slf4j
@RestController
@RequestMapping("/node")
public class SimpleNodeController {
private final Blockchain blockchain;
private final SimpleNodeService simpleNodeService;

public SimpleNodeController(Blockchain blockchain,
SimpleNodeService simpleNodeService) {
this.simpleNodeService = simpleNodeService;
this.blockchain = blockchain;
}

@GetMapping(path = "peers", produces =
MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Node> getNodes() {
return simpleNodeService.getPeers();
}

@PostMapping(path = "peers", produces =
MediaType.APPLICATION_JSON_VALUE)
Mono<Node> addNode(@RequestBody Node node) {
return simpleNodeService.connectToPeer(node);
}

}

Then, we need a simple service class that will interact with MongoDB and Kafka:

@Component
@Slf4j
public class SimpleNodeService {
private static Node myself;
final private String host;
final private int port;
final private NodeRepository peers;
final private Blockchain blockchain;
final private ReactiveKafkaConsumerTemplate<String,
Message> emitter;

public SimpleNodeService(@Value("${server.host}") String host,
@Value("${server.port}") String port,
Blockchain blockchain,
NodeRepository peers,
ReactiveKafkaConsumerTemplate<String, Message> emitter) {
this.host = host;
this.port = Integer.parseInt(port);
this.blockchain = blockchain;
this.emitter = emitter;
this.peers = peers;
myself = Node.builder()
.url("http://" + host + ":" + port)
.build();
emitter
.receiveAutoAck()
.doOnNext(consumerRecord -> log.info(
"received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.map(ConsumerRecord::value)
.subscribe(
m -> log.info("received message {}", m),
e -> log.error("error receiving Message", e));
}

public Flux<Node> getPeers() {
return peers.findAll();
}

public Mono<Node> connectToPeer(Node node) {
return peers.save(node);
}
}

Any message received from Kafka will simply be logged and we won’t actually do anything with the nodes, aside from store and list them.

Finally, we need to tell Spring Boot where it can find shared components and repositories. We can annotate the main class:

@Slf4j
@SpringBootApplication
@ComponentScan(basePackageClasses = {
net.kamradtfamily.blockchain.api.Blockchain.class,
net.kamradtfamily.blockchainnode.Application.class})
@EnableReactiveMongoRepositories(basePackageClasses = {
net.kamradtfamily.blockchain.api.BlockRepository.class,
net.kamradtfamily.blockchainnode.NodeRepository.class})
public class Application {
public static void main(String [] args) {
SpringApplication.run(Application.class, args);
try {
Properties gitProps = new Properties();
gitProps.load(
Application
.class
.getResourceAsStream("/git.properties"));
log.info("Git Properties:");
gitProps.entrySet().stream()
.forEach(es ->
log.info("{}: {}",
es.getKey(),
es.getValue()));
} catch (Exception e) {
log.error("Error reading Git Properties");
}
}
}

Spring needs to be told where to scan for components and repositories. It usually looks in the package that the main class is in but in our case we wanted to share components from net.kamradtfamily.blockchain.api. So I added the ComponentScan and EnableReactiveMongoRepositories annotations. I also added some logging so whenever it starts up we’ll know what Git commit hash we’re running.

In order to run all of this, we need to move some ports around. In order to have the new service and the existing service, we will have to give each of them unique external ports. Let’s add that to our docker-compose-node1.yaml:

blockchainnode:
image: rlkamradt/blockchainnode:1.0-SNAPSHOT
ports:
- 8080:8082
environment:
MONGO_HOST: mongo
SPRING_KAFKA_BOOTSTRAP-SERVERS: kafka:29092

The MongoExpress service is already taking up port 8081, so we’ll expose it as 8082. Now build the new images, pull them, and run them all:

mvn clean install jib:build
docker compose -f docker-compose-node1.yaml pull
docker compose -f docker-compose-node1.yaml up

Then when you create a transaction with the blockchain service, you’ll see in the logs of the blockchainnode service that a message was received. You’ll also be able to hit the endpoints http://localhost:8082/node/peers and create and list peers.

Here’s where things get complicated. We need more than one node running and we need the nodes to respond to messages after transaction or block adds. We also need the nodes to talk to each other during start-up or adding nodes. I’m going to copy SimpleNodeService and SimpleNodeController to NodeService and NodeController. I’m going to leave the old classes around in case you’re looking at the code on GitHub and want to follow along, but I’m going to comment out the Component and RestController annotations so they don’t get started at runtime.

I’m going to add an extra endpoint to the NodeController to allow confirmation that a transaction has made it into a block in all the nodes:

@GetMapping(path = "transactions/:transactionId/confirmations",
produces = MediaType.ALL_VALUE)
Mono<String> getTransactionFromNode(
@RequestParam("transactionId") String transactionId) {
return nodeService
.getConfirmations(Long.valueOf(transactionId))
.map(b -> b.toString());
}

This means I need a new set of methods in the NodeService to get the confirmations from all the nodes:

public Mono<Block> getConfirmation(Node peer, long transactionId) {
String URL = peer.getUrl()
+ "/block/blocks/transactions/"
+ transactionId;
log.info("Getting transactions from: {}", URL);
return client
.get()
.uri(URL)
.retrieve().bodyToMono(Block.class);
.onErrorContinue((t, o) -> Mono.empty());
}

Mono<Long> getConfirmations(long transactionId) {
// Get count of peers with confirmations that the transaction exists
return blockchain
.findTransactionInChain(transactionId, blockchain
.getAllBlocks())
.zipWith(peers.findAll()
.flatMap(peer -> getConfirmation(peer,
transactionId)))
.count();
}

This will return a count of the nodes that have this transaction in a block. But first, I need to create a new endpoint in the BlockController, one that will report if the transaction is in a block in the blockchain.

@GetMapping(path = "blocks/transaction/{transactionId}", 
produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<Block> getTransaction(
@PathVariable("transactionId") Long transactionId) {
return blockchain
.findTransactionInChain(transactionId,
blockchain.getAllBlocks())
.last() // assume there's only one
.switchIfEmpty(Mono.error(new ResponseStatusException(
HttpStatus.NOT_FOUND,
"Transaction Not Found in Blockchain")));

Fortunately, we already have a method findTransactionInChain that will return the block that a transaction is found in.

Next, we have to respond to messages from Kafka. We’ll add a messageHandler method that will broadcast messages to all peer nodes:

public Mono<? extends Object> messageHandler(Message m) {
if("addedBlock".equals(m.getMessage())) {
return peers.findAll()
.flatMap(p -> sendLatestBlock(p, m.getBlock()))
.switchIfEmpty(Mono.just(m.getBlock()))
.last();
} else if("addedTransaction".equals(m.getMessage())) {
return peers.findAll()
.flatMap(p -> sendTransaction(p, m.getTransaction()))
.switchIfEmpty(Mono.just(m.getTransaction()))
.last();
} else if("getBlocks".equals(m.getMessage())) {
return peers.findAll()
.flatMap(p -> getBlocks(p))
.switchIfEmpty(blockchain.getLastBlock())
.last();
} else {
log.error("unknown message {}", m);
return Mono.empty();
}
}

This requires two new methods to make requests to other nodes:

public Mono<ClientResponse> sendLatestBlock(Node peer, 
Block block) {
String URL = peer.getUrl() + "/block/blocks/latest";
log.info("Posting latest block to: {}", URL);
return client
.put()
.uri(URL)
.body(block, Block.class)
.exchange();
}

public Mono<ClientResponse> sendTransaction(Node peer,
Transaction transaction) {
String URL = peer.getUrl() + "/transaction";
log.info("Sending transaction '{}' to: {}", transaction, URL);
return client
.post()
.uri(URL)
.body(transaction, Transaction.class)
.exchange();
}

We already have the POST to the /transaction endpoint, but we need to add the PUT to the /block/blocks/latest endpoint.

@PutMapping(path = "/block/blocks/latest", 
produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<Block> checkReceivedBlock(
@RequestBody Block receivedBlock) {
return blockchain.checkReceivedBlock(receivedBlock);
}

This requires a new method in the Blockchain service.

public Mono<Block> checkReceivedBlock(Block receivedBlock) {
return getLastBlock()
.filter(b -> b.getIndex() < receivedBlock.getIndex())
.flatMap(b -> {
log.info(
"Blockchain possibly behind. We got: {}, Peer got: {}",
b.getIndex(),
receivedBlock.getIndex());
if (b.getHash().equals(receivedBlock.getPreviousHash())) {
log.info("Appending received block to our chain");
return addBlock(receivedBlock, true);
} else {
log.info("Querying chain from our peers");
emitter.send(TOPIC, Message.builder()
.type("empty")
.message("getBlocks")
.build())
.subscribe();
return Mono.empty();
}
});
}

You can see how the node layer talks to other nodes via the public API. There is one problem though. Each node is represented by a single URL, but we have to talk to two separate services: the block service and the node service. I’m going to make a simple ingress with an Nginx instance. That way we can talk to both services (and more later) with a single URL. You can look at the code on GitHub for details about the Nginx configuration and adding all the services to the docker-compose-node1.yaml.

Starting everything up, all of the endpoints still work and I can see in the logs the communication between the blockchain service and the node service on the Kafka bus. Now it’s time to make a second node. Copy the docker-compose-node1.yaml to docker-compose-node2.yaml and switch the external port of the Nginx service from 8080 to 8081 in order to reach node 1 at port 8080 and node 2 at port 8081. I’m also going to create a small script called startnode1 to start each service in order and output the logs from the node service:

docker compose -p node1 -f docker-compose-node1.yaml up -d mongo
docker compose -p node1 -f docker-compose-node1.yaml up -d zookeeper
docker compose -p node1 -f docker-compose-node1.yaml up -d mongo-express
docker compose -p node1 -f docker-compose-node1.yaml up -d kafka
docker compose -p node1 -f docker-compose-node1.yaml up -d blockchain
docker compose -p node1 -f docker-compose-node1.yaml up blockchainnode

Because the last line doesn’t have a -d flag, it displays the logs until Ctrl-C stops it. I use the -p node1 flag so that I can create separate instances of the services. Next, copy to a file called startnode2, but replace the Docker compose file to start up node 2 and the -p flag to node2. Don’t forget to set the executable flag on each:

chmod +x startnode1
chmod +x startnode2

There is one final change. The myself member of the node service needs to have the URL as seen by other services, so using local host won’t do. I set up a Spring property in the application.properties:

server.myself: http://localhost:8080

Then I override it in the docker-compose-node1.yaml which now looks like this:

version: '3.1'
services:
mongo:
image: mongo
restart: always
ports:
- 27017
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: example
zookeeper:
image: confluentinc/cp-zookeeper:latest
ports:
- 2181
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:latest
ports:
- 9092
- 29092
links:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
blockchain:
image: rlkamradt/blockchain:1.0-SNAPSHOT
ports:
- 8080
environment:
SERVER_MYSELF: http://192.168.0.174:8080
MONGO_HOST: mongo
SPRING_KAFKA_BOOTSTRAP-SERVERS: kafka:29092
blockchainnode:
image: rlkamradt/blockchainnode:1.0-SNAPSHOT
ports:
- 8080
environment:
SERVER_MYSELF: http://192.168.0.174:8080
MONGO_HOST: mongo
SPRING_KAFKA_BOOTSTRAP-SERVERS: kafka:29092
nginx:
image: nginx:latest
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
ports:
- 8080:80

The docker-compose-node2.yaml also changes the port to 8081 for both the SERVER_MYSELF and the nginx.ports value.

Start up both instances. When they’re both running you can connect one to the other:

Now you can create transactions and mine blocks, as demonstrated in the previous article, but you can list the transactions and blocks in both nodes. The peer-to-peer protocol ensures that both nodes have the same data.

I won’t say it’s perfect at this point. There are many different sequences that need to be tested to ensure that no matter how things are done, the blockchain in the different instances remains the same. But this article is already long enough and I’m sure you don’t want to read about me debugging this web!

Thank you for reading this rather long article, I’ve tried to condense it to make it as concise as possible but it’s a very complicated subject.

I think the next article in this series will be a little simpler. We will be discussing the final part of the puzzle: miners and users.

All of the code for this article can be found here:

The previous article in this series:

Source: https://betterprogramming.pub/build-a-set-of-blockchains-using-java-d99cd866931b?source=rss——-8—————–cryptocurrency

Time Stamp:

More from Medium

11.5.21

Source Node: 1104608
Time Stamp: Nov 5, 2021