聊一聊gRPC 的四種通信模式

2023.02.14

聊一聊gRPC 的四種通信模式


一元RPC 是一種比較簡單的RPC 模式,其實說白了我們上篇文章和大家介紹的就是一種一元RPC,也就是客戶端發起一個請求,服務端給出一個響應,然後請求結束。

前面一篇文章松哥和大家聊了gRPC 的基本用法,今天我們再來稍微深入一點點,來看下gRPC 中四種不同的通信模式。

gRPC 中四種不同的通信模式分別是:

  1. 一元RPC
  2. 服務端流RPC
  3. 客戶端流RPC
  4. 雙向流RPC

接下來松哥就通過四個完整的案例,來分別和向夥伴們演示這四種不同的通信模式。

1. 準備工作

關於gRPC 的基礎知識我們就不囉嗦了,咱們直接來看我今天的proto 文件,如下:

這次我新建了一個名為book.proto 的文件,這裡主要定義了一些圖書相關的方法,如下:

syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.javaboy.grpc.demo";
option java_outer_classname = "BookServiceProto";
import "google/protobuf/wrappers.proto";

package book;

service BookService {
  rpc addBook(Book) returns (google.protobuf.StringValue);
  rpc getBook(google.protobuf.StringValue) returns (Book);
  rpc searchBooks(google.protobuf.StringValue) returns (stream Book);
  rpc updateBooks(stream Book) returns (google.protobuf.StringValue);
  rpc processBooks(stream google.protobuf.StringValue) returns (stream BookSet);
}

message Book {
  string id = 1;
  repeated string tags = 2;
  string name = 3;
  float price = 4;
  string author = 5;
}

message BookSet {
  string id = 1;
  repeated Book bookList = 3;
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.

這個文件中,有一些內容我們在上篇文章中都講過了,講過的我就不再重複了,我說一些上篇文章沒有涉及到的東西:

  1. 由於我們在這個文件中,引用了Google 提供的StringValue(google.protobuf.StringValue),所以這個文件上面我們首先用import 導入相關的文件,導入之後,才可以使用。
  2. 在方法參數和返回值中出現的stream,就表示這個方法的參數或者返回值是流的形式(其實就是數據可以多次傳輸)。
  3. message 中出現了一個上篇文章沒有的關鍵字repeated,這個表示這個字段可以重複,可以簡單理解為這就是我們Java 中的數組。

好了,和上篇文章相比,本文主要就是這幾個地方不一樣。

proto 文件寫好之後,按照上篇文章介紹的方法進行編譯,生成對應的代碼,這裡就不再重複了。

2. 一元RPC

一元RPC 是一種比較簡單的RPC 模式,其實說白了我們上篇文章和大家介紹的就是一種一元RPC,也就是客戶端發起一個請求,服務端給出一個響應,然後請求結束。

上面我們定義的五個方法中,addBook 和getBook 都算是一種一元RPC。

2.1 addBook

先來看addBook 方法,這個方法的邏輯很簡單,我們提前在服務端準備一個Map 用來保存Book,addBook 調用的時候,就把book 對象存入到Map 中,並且將book 的ID 返回,大家就這樣一件事,來看看服務端的代碼:

public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
    private Map<String, Book> bookMap = new HashMap<>();

    public BookServiceImpl() {
        Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
        Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
        Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
        bookMap.put("1", b1);
        bookMap.put("2", b2);
        bookMap.put("3", b3);
    }

    @Override
    public void addBook(Book request, StreamObserver<StringValue> responseObserver) {
        bookMap.put(request.getId(), request);
        responseObserver.onNext(StringValue.newBuilder().setValue(request.getId()).build());
        responseObserver.onCompleted();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.

看過上篇文章的小伙伴,我覺得這段代碼應該很好理解。

客戶端調用方式如下:

public class BookServiceClient {
    public static void main(String[] args) throws InterruptedException {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
                .usePlaintext()
                .build();
        BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
        addBook(stub);
    }

    private static void addBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        stub.addBook(Book.newBuilder().setPrice(99).setId("100").setName("java").setAuthor("javaboy").build(), new StreamObserver<StringValue>() {
            @Override
            public void onNext(StringValue stringValue) {
                System.out.println("stringValue.getValue() = " + stringValue.getValue());
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                countDownLatch.countDown();
                System.out.println("添加完毕");
            }
        });
        countDownLatch.await();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.

這裡我使用了CountDownLatch 來實現線程等待,等服務端給出響應之後,客戶端再結束。這裡在回調的onNext 方法中,我們就可以拿到服務端的返回值。

2.2 getBook

getBook 跟上面的addBook 類似,先來看服務端代碼,如下:

public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
    private Map<String, Book> bookMap = new HashMap<>();

    public BookServiceImpl() {
        Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
        Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
        Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
        bookMap.put("1", b1);
        bookMap.put("2", b2);
        bookMap.put("3", b3);
    }

    @Override
    public void getBook(StringValue request, StreamObserver<Book> responseObserver) {
        String id = request.getValue();
        Book book = bookMap.get(id);
        if (book != null) {
            responseObserver.onNext(book);
            responseObserver.onCompleted();
        } else {
            responseObserver.onCompleted();
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.

這個getBook 就是根據客戶端傳來的id,從Map 中查詢到一個Book 並返回。

客戶端調用代碼如下:

public class BookServiceClient {
    public static void main(String[] args) throws InterruptedException {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
                .usePlaintext()
                .build();
        BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
        getBook(stub);
    }

    private static void getBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        stub.getBook(StringValue.newBuilder().setValue("2").build(), new StreamObserver<Book>() {
            @Override
            public void onNext(Book book) {
                System.out.println("book = " + book);
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                countDownLatch.countDown();
                System.out.println("查询完毕");
            }
        });
        countDownLatch.await();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.

小伙伴們大概也能看出來,addBook 和getBook 基本上操作套路是一模一樣的。

3. 服務端流RPC

前面的一元RPC,客戶端發起一個請求,服務端給出一個響應,請求就結束了。服務端流則是客戶端發起一個請求,服務端給一個響應序列,這個響應序列組成一個流。

上面我們給出的searchBook 就是這樣一個例子,searchBook 是傳遞圖書的tags 參數,然後在服務端查詢哪些書的tags 滿足條件,將滿足條件的書全部都返回去。

我們來看下服務端的代碼:

public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
    private Map<String, Book> bookMap = new HashMap<>();

    public BookServiceImpl() {
        Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
        Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
        Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
        bookMap.put("1", b1);
        bookMap.put("2", b2);
        bookMap.put("3", b3);
    }
    @Override
    public void searchBooks(StringValue request, StreamObserver<Book> responseObserver) {
        Set<String> keySet = bookMap.keySet();
        String tags = request.getValue();
        for (String key : keySet) {
            Book book = bookMap.get(key);
            int tagsCount = book.getTagsCount();
            for (int i = 0; i < tagsCount; i++) {
                String t = book.getTags(i);
                if (t.equals(tags)) {
                    responseObserver.onNext(book);
                    break;
                }
            }
        }
        responseObserver.onCompleted();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.

小伙伴們看下,這段Java 代碼應該很好理解:

  1. 首先從request 中提取客戶端傳來的tags 參數。
  2. 遍歷bookMap,查看每一本書的tags 是否等於客戶端傳來的tags,如果相等,說明添加匹配,則通過 responseObserver.onNext(book); 將這本書寫回到客戶端。
  3. 等所有操作都完成後,執行 responseObserver.onCompleted();,表示服務端的響應序列結束了,這樣客戶端也就知道請求結束了。

我們來看看客戶端的代碼,如下:

public class BookServiceClient {
    public static void main(String[] args) throws InterruptedException {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
                .usePlaintext()
                .build();
        BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
        searchBook(stub);
    }
    private static void searchBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        stub.searchBooks(StringValue.newBuilder().setValue("明清小说").build(), new StreamObserver<Book>() {
            @Override
            public void onNext(Book book) {
                System.out.println(book);
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                countDownLatch.countDown();
                System.out.println("查询完毕!");
            }
        });
        countDownLatch.await();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.

客戶端的代碼好理解,搜索的關鍵字是 明清小說,每當服務端返回一次數據的時候,客戶端回調的onNext 方法就會被觸發一次,當服務端之行了 responseObserver.onCompleted(); 之後,客戶端的onCompleted 方法也會被觸發。

這個就是服務端流,客戶端發起一個請求,服務端通過onNext 可以多次寫回數據。

4. 客戶端流RPC

客戶端流則是客戶端發起多個請求,服務端只給出一個響應。

上面的updateBooks 就是一個客戶端流的案例,客戶端想要修改圖書,可以發起多個請求修改多本書,服務端則收集多次修改的結果,將之匯總然後一次性返回給客戶端。

我們先來看看服務端的代碼:

public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
    private Map<String, Book> bookMap = new HashMap<>();

    public BookServiceImpl() {
        Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
        Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
        Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
        bookMap.put("1", b1);
        bookMap.put("2", b2);
        bookMap.put("3", b3);
    }
    
    @Override
    public StreamObserver<Book> updateBooks(StreamObserver<StringValue> responseObserver) {
        StringBuilder sb = new StringBuilder("更新的图书 ID 为:");
        return new StreamObserver<Book>() {
            @Override
            public void onNext(Book book) {
                bookMap.put(book.getId(), book);
                sb.append(book.getId())
                        .append(",");
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                responseObserver.onNext(StringValue.newBuilder().setValue(sb.toString()).build());
                responseObserver.onCompleted();
            }
        };
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.

客戶端每發送一本書來,就會觸發服務端的onNext 方法,然後我們在這方法中進行圖書的更新操作,並記錄更新結果。最後,我們在onCompleted 方法中,將更新結果匯總返回給客戶端,基本上就是這樣一個流程。

我們再來看看客戶端的代碼:

public class BookServiceClient {
    public static void main(String[] args) throws InterruptedException {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
                .usePlaintext()
                .build();
        BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
        updateBook(stub);
    }

    private static void updateBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        StreamObserver<Book> request = stub.updateBooks(new StreamObserver<StringValue>() {
            @Override
            public void onNext(StringValue stringValue) {
                System.out.println("stringValue.getValue() = " + stringValue.getValue());
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                System.out.println("更新完毕");
                countDownLatch.countDown();
            }
        });
        request.onNext(Book.newBuilder().setId("1").setName("a").setAuthor("b").build());
        request.onNext(Book.newBuilder().setId("2").setName("c").setAuthor("d").build());
        request.onCompleted();
        countDownLatch.await();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.

在客戶端這塊,updateBooks 方法會返回一個StreamObserver對象,調用該對象的onNext 方法就是給服務端傳遞數據了,可以傳遞多個數據,調用該對象的onCompleted 方法就是告訴服務端數據傳遞結束了,此時也會觸發服務端的onCompleted 方法,服務端的onCompleted 方法執行之後,進而觸發了客戶端的onCompleted 方法。

5. 雙向流RPC

雙向流其實就是3、4 小節的合體。即客戶端多次發送數據,服務端也多次響應數據。

我們先來看下服務端的代碼:

public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
    private Map<String, Book> bookMap = new HashMap<>();
    private List<Book> books = new ArrayList<>();

    public BookServiceImpl() {
        Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
        Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
        Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
        bookMap.put("1", b1);
        bookMap.put("2", b2);
        bookMap.put("3", b3);
    }
    @Override
    public StreamObserver<StringValue> processBooks(StreamObserver<BookSet> responseObserver) {
        return new StreamObserver<StringValue>() {
            @Override
            public void onNext(StringValue stringValue) {
                Book b = Book.newBuilder().setId(stringValue.getValue()).build();
                books.add(b);
                if (books.size() == 3) {
                    BookSet bookSet = BookSet.newBuilder().addAllBookList(books).build();
                    responseObserver.onNext(bookSet);
                    books.clear();
                }
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                BookSet bookSet = BookSet.newBuilder().addAllBookList(books).build();
                responseObserver.onNext(bookSet);
                books.clear();
                responseObserver.onCompleted();
            }
        };
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.

這段代碼沒有實際意義,單純為了給小伙伴們演示雙向流,我的操作邏輯是客戶端傳遞多個ID 到服務端,然後服務端根據這些ID 構建對應的Book 對象,然後三個三個一組,再返回給客戶端。客戶端每次發送一個請求,都會觸發服務端的onNext 方法,我們在這個方法中對請求分組返回。最後如果還有剩餘的請求,我們在onCompleted() 方法中返回。

再來看看客戶端的代碼:

public class BookServiceClient {
    public static void main(String[] args) throws InterruptedException {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
                .usePlaintext()
                .build();
        BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
        processBook(stub);
    }

    private static void processBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        StreamObserver<StringValue> request = stub.processBooks(new StreamObserver<BookSet>() {
            @Override
            public void onNext(BookSet bookSet) {
                System.out.println("bookSet = " + bookSet);
                System.out.println("=============");
            }

            @Override
            public void onError(Throwable throwable) {
            }

            @Override
            public void onCompleted() {
                System.out.println("处理完毕!");
                countDownLatch.countDown();
            }
        });
        request.onNext(StringValue.newBuilder().setValue("a").build());
        request.onNext(StringValue.newBuilder().setValue("b").build());
        request.onNext(StringValue.newBuilder().setValue("c").build());
        request.onNext(StringValue.newBuilder().setValue("d").build());
        request.onCompleted();
        countDownLatch.await();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.

這個客戶端的代碼跟第四小節一模一樣,不再贅述了。

好啦,這就是松哥和小伙伴們介紹的gRPC 的四種不同的通信模式,文章中只給出了一些關鍵代碼,如果小伙伴們沒看明白,建議結合上篇文章一起閱讀就懂啦~