Talk about the four communication modes of gRPC
Talk about the four communication modes of gRPC
In the previous article, Brother Song talked with you about the basic usage of gRPC. Today we will go a little deeper and look at the four different communication modes in gRPC.
The four different communication modes in gRPC are:
- Unary RPC
- Server stream RPC
- Client streaming RPC
- Bidirectional streaming RPC
Next, Brother Song will demonstrate these four different communication modes to his partners through four complete cases.
1. Preparations
Let's not talk about the basic knowledge of gRPC, let's directly look at my proto file today, as follows:
This time I created a new file named book.proto, which mainly defines some book-related methods, as follows:
syntax = "proto3";
option java_multiple_files = true;
option java_package = "org.javaboy.grpc.demo";
option java_outer_classname = "BookServiceProto";
import "google/protobuf/wrappers.proto";
package book;
service BookService {
rpc addBook(Book) returns (google.protobuf.StringValue);
rpc getBook(google.protobuf.StringValue) returns (Book);
rpc searchBooks(google.protobuf.StringValue) returns (stream Book);
rpc updateBooks(stream Book) returns (google.protobuf.StringValue);
rpc processBooks(stream google.protobuf.StringValue) returns (stream BookSet);
}
message Book {
string id = 1;
repeated string tags = 2;
string name = 3;
float price = 4;
string author = 5;
}
message BookSet {
string id = 1;
repeated Book bookList = 3;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- twenty one.
- twenty two.
- twenty three.
- twenty four.
- 25.
- 26.
- 27.
- 28.
- 29.
In this file, there are some contents that we have talked about in the previous article, and I will not repeat what I have said. I will say some things that were not covered in the previous article:
- Since we referenced the StringValue (google.protobuf.StringValue) provided by Google in this file, we first use import to import related files in this file, and then they can be used after importing.
- The stream appearing in the method parameter and return value means that the parameter or return value of this method is in the form of stream (in fact, the data can be transmitted multiple times).
- In the message, there is a keyword repeated that was not included in the previous article, which means that this field can be repeated, which can be simply understood as an array in our Java.
Well, compared with the previous article, this article is mainly different in these places.
After the proto file is written, compile it according to the method introduced in the previous article to generate the corresponding code, which will not be repeated here.
2. Unary RPC
Unary RPC is a relatively simple RPC mode. In fact, what we introduced in the last article is a kind of unary RPC, that is, the client initiates a request, the server gives a response, and then the request ends.
Among the five methods we defined above, addBook and getBook are both one-way RPCs.
2.1 addBook
Let’s first look at the addBook method. The logic of this method is very simple. We prepare a Map on the server in advance to save the Book. When addBook is called, the book object is stored in the Map and the ID of the book is returned. For such a thing, let's take a look at the code on the server side:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
private Map<String, Book> bookMap = new HashMap<>();
public BookServiceImpl() {
Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
bookMap.put("1", b1);
bookMap.put("2", b2);
bookMap.put("3", b3);
}
@Override
public void addBook(Book request, StreamObserver<StringValue> responseObserver) {
bookMap.put(request.getId(), request);
responseObserver.onNext(StringValue.newBuilder().setValue(request.getId()).build());
responseObserver.onCompleted();
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
Friends who have read the previous article, I think this code should be easy to understand.
The client call method is as follows:
public class BookServiceClient {
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
addBook(stub);
}
private static void addBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
stub.addBook(Book.newBuilder().setPrice(99).setId("100").setName("java").setAuthor("javaboy").build(), new StreamObserver<StringValue>() {
@Override
public void onNext(StringValue stringValue) {
System.out.println("stringValue.getValue() = " + stringValue.getValue());
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
countDownLatch.countDown();
System.out.println("添加完毕");
}
});
countDownLatch.await();
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- twenty one.
- twenty two.
- twenty three.
- twenty four.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
Here I use CountDownLatch to implement thread waiting, and the client will end after the server gives a response. Here in the onNext method of the callback, we can get the return value of the server.
2.2 getBook
getBook is similar to addBook above, let's look at the server code first, as follows:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
private Map<String, Book> bookMap = new HashMap<>();
public BookServiceImpl() {
Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
bookMap.put("1", b1);
bookMap.put("2", b2);
bookMap.put("3", b3);
}
@Override
public void getBook(StringValue request, StreamObserver<Book> responseObserver) {
String id = request.getValue();
Book book = bookMap.get(id);
if (book != null) {
responseObserver.onNext(book);
responseObserver.onCompleted();
} else {
responseObserver.onCompleted();
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- twenty one.
- twenty two.
- twenty three.
- twenty four.
This getBook is to query a Book from the Map according to the id sent by the client and return it.
The client calling code is as follows:
public class BookServiceClient {
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
getBook(stub);
}
private static void getBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
stub.getBook(StringValue.newBuilder().setValue("2").build(), new StreamObserver<Book>() {
@Override
public void onNext(Book book) {
System.out.println("book = " + book);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
countDownLatch.countDown();
System.out.println("查询完毕");
}
});
countDownLatch.await();
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- twenty one.
- twenty two.
- twenty three.
- twenty four.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
Friends can probably also see that the basic operation routines of addBook and getBook are exactly the same.
3. Server stream RPC
In the previous unary RPC, the client initiates a request, the server gives a response, and the request ends. The server-side flow means that the client initiates a request, and the server gives a response sequence, and this response sequence forms a flow.
The searchBook we gave above is such an example. searchBook is to pass the tags parameter of the book, and then query which book's tags meet the conditions on the server side, and return all the books that meet the conditions.
Let's look at the server code:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
private Map<String, Book> bookMap = new HashMap<>();
public BookServiceImpl() {
Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
bookMap.put("1", b1);
bookMap.put("2", b2);
bookMap.put("3", b3);
}
@Override
public void searchBooks(StringValue request, StreamObserver<Book> responseObserver) {
Set<String> keySet = bookMap.keySet();
String tags = request.getValue();
for (String key : keySet) {
Book book = bookMap.get(key);
int tagsCount = book.getTagsCount();
for (int i = 0; i < tagsCount; i++) {
String t = book.getTags(i);
if (t.equals(tags)) {
responseObserver.onNext(book);
break;
}
}
}
responseObserver.onCompleted();
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- twenty one.
- twenty two.
- twenty three.
- twenty four.
- 25.
- 26.
- 27.
- 28.
- 29.
Friends, take a look, this Java code should be easy to understand:
- First extract the tags parameter from the client from the request.
- Traverse the bookMap to check whether the tags of each book are equal to the tags sent by the client. If they are equal, it means adding a match, and write the book back to the client through responseObserver.onNext(book);
- After all operations are completed, execute responseObserver.onCompleted();, indicating that the server's response sequence is over, so that the client knows that the request is over.
Let's take a look at the client code, as follows:
public class BookServiceClient {
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
searchBook(stub);
}
private static void searchBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
stub.searchBooks(StringValue.newBuilder().setValue("明清小说").build(), new StreamObserver<Book>() {
@Override
public void onNext(Book book) {
System.out.println(book);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
countDownLatch.countDown();
System.out.println("查询完毕!");
}
});
countDownLatch.await();
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- twenty one.
- twenty two.
- twenty three.
- twenty four.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
The code of the client is easy to understand. The search keywords are Ming and Qing novels. Whenever the server returns data, the onNext method of the client callback will be triggered once. When the server executes responseObserver.onCompleted(); , the client's onCompleted method will also be triggered.
This is the server flow. The client initiates a request, and the server can write back data multiple times through onNext.
4. Client streaming RPC
In the client flow, the client initiates multiple requests, and the server only gives one response.
The above updateBooks is a case of client flow. If a client wants to modify a book, it can initiate multiple requests to modify multiple books. The server collects the results of multiple modifications, summarizes them and returns them to the client at one time.
Let's take a look at the server code first:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
private Map<String, Book> bookMap = new HashMap<>();
public BookServiceImpl() {
Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
bookMap.put("1", b1);
bookMap.put("2", b2);
bookMap.put("3", b3);
}
@Override
public StreamObserver<Book> updateBooks(StreamObserver<StringValue> responseObserver) {
StringBuilder sb = new StringBuilder("更新的图书 ID 为:");
return new StreamObserver<Book>() {
@Override
public void onNext(Book book) {
bookMap.put(book.getId(), book);
sb.append(book.getId())
.append(",");
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
responseObserver.onNext(StringValue.newBuilder().setValue(sb.toString()).build());
responseObserver.onCompleted();
}
};
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- twenty one.
- twenty two.
- twenty three.
- twenty four.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
Every time the client sends a book, it will trigger the server's onNext method, and then we update the book in this method and record the update result. Finally, in the onCompleted method, we summarize the update results and return them to the client, which is basically such a process.
Let's look at the client code again:
public class BookServiceClient {
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
updateBook(stub);
}
private static void updateBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
StreamObserver<Book> request = stub.updateBooks(new StreamObserver<StringValue>() {
@Override
public void onNext(StringValue stringValue) {
System.out.println("stringValue.getValue() = " + stringValue.getValue());
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
System.out.println("更新完毕");
countDownLatch.countDown();
}
});
request.onNext(Book.newBuilder().setId("1").setName("a").setAuthor("b").build());
request.onNext(Book.newBuilder().setId("2").setName("c").setAuthor("d").build());
request.onCompleted();
countDownLatch.await();
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- twenty one.
- twenty two.
- twenty three.
- twenty four.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
On the client side, the updateBooks method will return a StreamObserver object. Calling the onNext method of this object is to transfer data to the server. Multiple data can be transferred. Calling the onCompleted method of this object is to tell the server that the data transfer is over. The onCompleted method of the server will also be triggered when the onCompleted method of the server is executed, and then the onCompleted method of the client will be triggered.
5. Two-way streaming RPC
The two-way flow is actually a combination of 3 and 4 bars. That is, the client sends data multiple times, and the server responds to the data multiple times.
Let's look at the server code first:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
private Map<String, Book> bookMap = new HashMap<>();
private List<Book> books = new ArrayList<>();
public BookServiceImpl() {
Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
bookMap.put("1", b1);
bookMap.put("2", b2);
bookMap.put("3", b3);
}
@Override
public StreamObserver<StringValue> processBooks(StreamObserver<BookSet> responseObserver) {
return new StreamObserver<StringValue>() {
@Override
public void onNext(StringValue stringValue) {
Book b = Book.newBuilder().setId(stringValue.getValue()).build();
books.add(b);
if (books.size() == 3) {
BookSet bookSet = BookSet.newBuilder().addAllBookList(books).build();
responseObserver.onNext(bookSet);
books.clear();
}
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
BookSet bookSet = BookSet.newBuilder().addAllBookList(books).build();
responseObserver.onNext(bookSet);
books.clear();
responseObserver.onCompleted();
}
};
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- twenty one.
- twenty two.
- twenty three.
- twenty four.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
This code has no practical significance. It is purely to demonstrate two-way flow to friends. My operation logic is that the client passes multiple IDs to the server, and then the server builds corresponding Book objects based on these IDs, and then three three one group, and then returned to the client. Every time the client sends a request, it will trigger the server's onNext method, and we return the requests in groups in this method. Finally, if there are remaining requests, we return in the onCompleted() method.
Let's take a look at the client code:
public class BookServiceClient {
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
processBook(stub);
}
private static void processBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
StreamObserver<StringValue> request = stub.processBooks(new StreamObserver<BookSet>() {
@Override
public void onNext(BookSet bookSet) {
System.out.println("bookSet = " + bookSet);
System.out.println("=============");
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
System.out.println("处理完毕!");
countDownLatch.countDown();
}
});
request.onNext(StringValue.newBuilder().setValue("a").build());
request.onNext(StringValue.newBuilder().setValue("b").build());
request.onNext(StringValue.newBuilder().setValue("c").build());
request.onNext(StringValue.newBuilder().setValue("d").build());
request.onCompleted();
countDownLatch.await();
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- twenty one.
- twenty two.
- twenty three.
- twenty four.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
The code of this client is exactly the same as the fourth section, so I won't repeat it here.
Well, this is the four different communication modes of gRPC introduced by Brother Song and his friends. Only some key codes are given in the article. If you don't understand it, it is recommended to read it together with the previous article. ~